DBImpl.cpp 44.9 KB
Newer Older
1
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
J
jinhai 已提交
2
//
3 4
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
J
jinhai 已提交
5
//
6 7 8 9 10
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
J
jinhai 已提交
11

S
starlord 已提交
12
#include "db/DBImpl.h"
S
starlord 已提交
13
#include "cache/CpuCacheMgr.h"
14
#include "codecs/Codec.h"
W
Wang XiangYu 已提交
15
#include "config/ServerConfig.h"
16
#include "db/IDGenerator.h"
G
groot 已提交
17 18
#include "db/SnapshotUtils.h"
#include "db/SnapshotVisitor.h"
G
groot 已提交
19
#include "db/merge/MergeManagerFactory.h"
G
groot 已提交
20 21 22 23 24 25 26 27
#include "db/merge/MergeTask.h"
#include "db/snapshot/CompoundOperations.h"
#include "db/snapshot/EventExecutor.h"
#include "db/snapshot/IterateHandler.h"
#include "db/snapshot/OperationExecutor.h"
#include "db/snapshot/ResourceHelper.h"
#include "db/snapshot/ResourceTypes.h"
#include "db/snapshot/Snapshots.h"
28
#include "insert/MemManagerFactory.h"
G
groot 已提交
29
#include "knowhere/index/vector_index/helpers/BuilderSuspend.h"
G
groot 已提交
30
#include "metrics/Metrics.h"
G
groot 已提交
31
#include "metrics/SystemInfo.h"
G
groot 已提交
32
#include "scheduler/Definition.h"
S
starlord 已提交
33
#include "scheduler/SchedInst.h"
S
starlord 已提交
34
#include "scheduler/job/SearchJob.h"
35 36 37
#include "segment/SegmentReader.h"
#include "segment/SegmentWriter.h"
#include "utils/Exception.h"
G
groot 已提交
38
#include "utils/StringHelpFunctions.h"
S
starlord 已提交
39
#include "utils/TimeRecorder.h"
40
#include "wal/WalDefinations.h"
X
Xu Peng 已提交
41

G
groot 已提交
42 43 44 45
#include <fiu-local.h>
#include <src/scheduler/job/BuildIndexJob.h>
#include <limits>
#include <utility>
46

J
jinhai 已提交
47
namespace milvus {
X
Xu Peng 已提交
48
namespace engine {
X
Xu Peng 已提交
49

G
groot 已提交
50
namespace {
G
groot 已提交
51 52
constexpr uint64_t BACKGROUND_METRIC_INTERVAL = 1;
constexpr uint64_t BACKGROUND_INDEX_INTERVAL = 1;
G
groot 已提交
53
constexpr uint64_t WAIT_BUILD_INDEX_INTERVAL = 5;
G
groot 已提交
54

G
groot 已提交
55
static const Status SHUTDOWN_ERROR = Status(DB_ERROR, "Milvus server is shutdown!");
S
starlord 已提交
56
}  // namespace
G
groot 已提交
57

G
groot 已提交
58 59 60 61 62
#define CHECK_INITIALIZED                                \
    if (!initialized_.load(std::memory_order_acquire)) { \
        return SHUTDOWN_ERROR;                           \
    }

Y
Yu Kun 已提交
63
DBImpl::DBImpl(const DBOptions& options)
64
    : options_(options), initialized_(false), merge_thread_pool_(1, 1), index_thread_pool_(1, 1) {
G
groot 已提交
65 66
    mem_mgr_ = MemManagerFactory::Build(options_);
    merge_mgr_ptr_ = MergeManagerFactory::SSBuild(options_);
67 68

    if (options_.wal_enable_) {
G
groot 已提交
69 70 71 72 73 74
        //        wal::MXLogConfiguration mxlog_config;
        //        mxlog_config.recovery_error_ignore = options_.recovery_error_ignore_;
        //        // 2 buffers in the WAL
        //        mxlog_config.buffer_size = options_.buffer_size_ / 2;
        //        mxlog_config.mxlog_path = options_.mxlog_path_;
        //        wal_mgr_ = std::make_shared<wal::WalManager>(mxlog_config);
75
    }
C
Cai Yudong 已提交
76 77 78 79

    /* watch on storage.auto_flush_interval */
    ConfigMgr::GetInstance().Attach("storage.auto_flush_interval", this);

S
starlord 已提交
80 81 82 83
    Start();
}

DBImpl::~DBImpl() {
C
Cai Yudong 已提交
84 85
    ConfigMgr::GetInstance().Detach("storage.auto_flush_interval", this);

S
starlord 已提交
86 87 88
    Stop();
}

G
groot 已提交
89 90 91
////////////////////////////////////////////////////////////////////////////////
// External APIs
////////////////////////////////////////////////////////////////////////////////
S
starlord 已提交
92 93
Status
DBImpl::Start() {
94
    if (initialized_.load(std::memory_order_acquire)) {
S
starlord 已提交
95 96 97
        return Status::OK();
    }

G
groot 已提交
98
    // snapshot
99 100
    auto store = snapshot::Store::Build(options_.meta_.backend_uri_, options_.meta_.path_,
                                        codec::Codec::instance().GetSuffixSet());
G
groot 已提交
101 102 103 104 105 106
    snapshot::OperationExecutor::Init(store);
    snapshot::OperationExecutor::GetInstance().Start();
    snapshot::EventExecutor::Init(store);
    snapshot::EventExecutor::GetInstance().Start();
    snapshot::Snapshots::GetInstance().Init(store);

107
    // LOG_ENGINE_TRACE_ << "DB service start";
108
    initialized_.store(true, std::memory_order_release);
S
starlord 已提交
109

G
groot 已提交
110
    // TODO: merge files
G
groot 已提交
111

112 113
    // wal
    if (options_.wal_enable_) {
G
groot 已提交
114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140
        return Status(SERVER_NOT_IMPLEMENT, "Wal not implemented");
        //        auto error_code = DB_ERROR;
        //        if (wal_mgr_ != nullptr) {
        //            error_code = wal_mgr_->Init();
        //        }
        //        if (error_code != WAL_SUCCESS) {
        //            throw Exception(error_code, "Wal init error!");
        //        }
        //
        //        // recovery
        //        while (true) {
        //            wal::MXLogRecord record;
        //            auto error_code = wal_mgr_->GetNextRecovery(record);
        //            if (error_code != WAL_SUCCESS) {
        //                throw Exception(error_code, "Wal recovery error!");
        //            }
        //            if (record.type == wal::MXLogType::None) {
        //                break;
        //            }
        //            ExecWalRecord(record);
        //        }
        //
        //        // for distribute version, some nodes are read only
        //        if (options_.mode_ != DBOptions::MODE::CLUSTER_READONLY) {
        //            // background wal thread
        //            bg_wal_thread_ = std::thread(&SSDBImpl::TimingWalThread, this);
        //        }
141 142 143
    } else {
        // for distribute version, some nodes are read only
        if (options_.mode_ != DBOptions::MODE::CLUSTER_READONLY) {
G
groot 已提交
144
            // background flush thread
G
groot 已提交
145
            bg_flush_thread_ = std::thread(&DBImpl::TimingFlushThread, this);
146
        }
Z
update  
zhiru 已提交
147
    }
S
starlord 已提交
148

G
groot 已提交
149 150 151
    // for distribute version, some nodes are read only
    if (options_.mode_ != DBOptions::MODE::CLUSTER_READONLY) {
        // background build index thread
G
groot 已提交
152
        bg_index_thread_ = std::thread(&DBImpl::TimingIndexThread, this);
G
groot 已提交
153 154 155
    }

    // background metric thread
Y
yukun 已提交
156
    fiu_do_on("options_metric_enable", options_.metric_enable_ = true);
G
groot 已提交
157
    if (options_.metric_enable_) {
G
groot 已提交
158
        bg_metric_thread_ = std::thread(&DBImpl::TimingMetricThread, this);
G
groot 已提交
159
    }
G
groot 已提交
160

S
starlord 已提交
161 162 163
    return Status::OK();
}

S
starlord 已提交
164 165
Status
DBImpl::Stop() {
166
    if (!initialized_.load(std::memory_order_acquire)) {
S
starlord 已提交
167 168
        return Status::OK();
    }
169

170
    initialized_.store(false, std::memory_order_release);
S
starlord 已提交
171

172 173
    if (options_.mode_ != DBOptions::MODE::CLUSTER_READONLY) {
        if (options_.wal_enable_) {
G
groot 已提交
174 175 176
            //            // wait wal thread finish
            //            swn_wal_.Notify();
            //            bg_wal_thread_.join();
177
        } else {
G
groot 已提交
178
            // flush all without merge
179 180 181 182
            wal::MXLogRecord record;
            record.type = wal::MXLogType::Flush;
            ExecWalRecord(record);

G
groot 已提交
183 184 185
            // wait flush thread finish
            swn_flush_.Notify();
            bg_flush_thread_.join();
186
        }
S
starlord 已提交
187

188 189
        WaitMergeFileFinish();

G
groot 已提交
190 191
        swn_index_.Notify();
        bg_index_thread_.join();
S
starlord 已提交
192 193
    }

G
groot 已提交
194
    // wait metric thread exit
G
groot 已提交
195 196 197 198
    if (options_.metric_enable_) {
        swn_metric_.Notify();
        bg_metric_thread_.join();
    }
G
groot 已提交
199

G
groot 已提交
200 201 202
    snapshot::EventExecutor::GetInstance().Stop();
    snapshot::OperationExecutor::GetInstance().Stop();

203
    // LOG_ENGINE_TRACE_ << "DB service stop";
S
starlord 已提交
204
    return Status::OK();
X
Xu Peng 已提交
205 206
}

S
starlord 已提交
207
Status
G
groot 已提交
208 209 210 211 212 213 214
DBImpl::CreateCollection(const snapshot::CreateCollectionContext& context) {
    CHECK_INITIALIZED;

    auto ctx = context;
    // check uid existence/validation
    bool has_uid = false;
    for (auto& pair : ctx.fields_schema) {
G
groot 已提交
215
        if (pair.first->GetFtype() == meta::DataType::UID) {
G
groot 已提交
216 217 218 219
            has_uid = true;
            break;
        }
    }
S
starlord 已提交
220

G
groot 已提交
221 222 223 224 225 226 227 228 229
    // add uid field if not specified
    if (!has_uid) {
        auto uid_field = std::make_shared<snapshot::Field>(DEFAULT_UID_NAME, 0, milvus::engine::FieldType::UID);
        auto bloom_filter_element = std::make_shared<snapshot::FieldElement>(
            0, 0, DEFAULT_BLOOM_FILTER_NAME, milvus::engine::FieldElementType::FET_BLOOM_FILTER);
        auto delete_doc_element = std::make_shared<snapshot::FieldElement>(
            0, 0, DEFAULT_DELETED_DOCS_NAME, milvus::engine::FieldElementType::FET_DELETED_DOCS);

        ctx.fields_schema[uid_field] = {bloom_filter_element, delete_doc_element};
S
starlord 已提交
230 231
    }

232
    if (options_.wal_enable_) {
G
groot 已提交
233
        //        ctx.lsn = wal_mgr_->CreateCollection(context.collection->GetName());
234
    }
G
groot 已提交
235 236
    auto op = std::make_shared<snapshot::CreateCollectionOperation>(ctx);
    return op->Push();
237 238
}

239
Status
G
groot 已提交
240 241 242
DBImpl::DropCollection(const std::string& name) {
    CHECK_INITIALIZED;

C
Cai Yudong 已提交
243
    LOG_ENGINE_DEBUG_ << "Prepare to drop collection " << name;
G
groot 已提交
244 245 246 247 248 249 250 251

    snapshot::ScopedSnapshotT ss;
    auto& snapshots = snapshot::Snapshots::GetInstance();
    STATUS_CHECK(snapshots.GetSnapshot(ss, name));

    if (options_.wal_enable_) {
        // SS TODO
        /* wal_mgr_->DropCollection(ss->GetCollectionId()); */
252 253
    }

G
groot 已提交
254 255 256
    mem_mgr_->EraseMemVector(ss->GetCollectionId());  // not allow insert

    return snapshots.DropCollection(ss->GetCollectionId(), std::numeric_limits<snapshot::LSN_TYPE>::max());
257 258
}

S
starlord 已提交
259
Status
G
groot 已提交
260 261
DBImpl::HasCollection(const std::string& collection_name, bool& has_or_not) {
    CHECK_INITIALIZED;
S
starlord 已提交
262

G
groot 已提交
263 264 265
    snapshot::ScopedSnapshotT ss;
    auto status = snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_name);
    has_or_not = status.ok();
266

G
groot 已提交
267 268 269 270
    return status;
}

Status
C
Cai Yudong 已提交
271
DBImpl::ListCollections(std::vector<std::string>& names) {
G
groot 已提交
272
    CHECK_INITIALIZED;
273

G
groot 已提交
274 275 276
    names.clear();
    return snapshot::Snapshots::GetInstance().GetCollectionNames(names);
}
277

G
groot 已提交
278
Status
C
Cai Yudong 已提交
279 280
DBImpl::GetCollectionInfo(const std::string& collection_name, snapshot::CollectionPtr& collection,
                          snapshot::CollectionMappings& fields_schema) {
G
groot 已提交
281
    CHECK_INITIALIZED;
282

C
Cai Yudong 已提交
283 284
    snapshot::ScopedSnapshotT ss;
    STATUS_CHECK(snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_name));
285

C
Cai Yudong 已提交
286 287 288 289 290
    collection = ss->GetCollection();
    auto& fields = ss->GetResources<snapshot::Field>();
    for (auto& kv : fields) {
        fields_schema[kv.second.Get()] = ss->GetFieldElementsByField(kv.second->GetName());
    }
291
    return Status::OK();
G
groot 已提交
292 293
}

S
starlord 已提交
294
Status
C
Cai Yudong 已提交
295
DBImpl::GetCollectionStats(const std::string& collection_name, std::string& collection_stats) {
G
groot 已提交
296 297
    CHECK_INITIALIZED;

C
Cai Yudong 已提交
298 299
    nlohmann::json json;
    STATUS_CHECK(GetSnapshotInfo(collection_name, json));
S
starlord 已提交
300

C
Cai Yudong 已提交
301
    collection_stats = json.dump();
G
groot 已提交
302
    return Status::OK();
303 304
}

S
starlord 已提交
305
Status
C
Cai Yudong 已提交
306
DBImpl::CountEntities(const std::string& collection_name, int64_t& row_count) {
G
groot 已提交
307
    CHECK_INITIALIZED;
S
starlord 已提交
308

G
groot 已提交
309 310 311
    snapshot::ScopedSnapshotT ss;
    STATUS_CHECK(snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_name));

C
Cai Yudong 已提交
312 313
    row_count = ss->GetCollectionCommit()->GetRowCount();
    return Status::OK();
314 315
}

316
Status
G
groot 已提交
317 318 319 320 321 322 323 324 325 326
DBImpl::CreatePartition(const std::string& collection_name, const std::string& partition_name) {
    CHECK_INITIALIZED;

    snapshot::ScopedSnapshotT ss;
    STATUS_CHECK(snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_name));

    snapshot::LSN_TYPE lsn = 0;
    if (options_.wal_enable_) {
        // SS TODO
        /* lsn = wal_mgr_->CreatePartition(collection_name, partition_tag); */
327 328
    }

G
groot 已提交
329 330 331 332 333 334 335 336 337
    snapshot::OperationContext context;
    context.lsn = lsn;
    auto op = std::make_shared<snapshot::CreatePartitionOperation>(context, ss);

    snapshot::PartitionContext p_ctx;
    p_ctx.name = partition_name;
    snapshot::PartitionPtr partition;
    STATUS_CHECK(op->CommitNewPartition(p_ctx, partition));
    return op->Push();
338 339
}

S
starlord 已提交
340
Status
G
groot 已提交
341 342
DBImpl::DropPartition(const std::string& collection_name, const std::string& partition_name) {
    CHECK_INITIALIZED;
S
starlord 已提交
343

G
groot 已提交
344 345
    snapshot::ScopedSnapshotT ss;
    STATUS_CHECK(snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_name));
346

G
groot 已提交
347 348
    // SS TODO: Is below step needed? Or How to implement it?
    /* mem_mgr_->EraseMemVector(partition_name); */
349

G
groot 已提交
350 351 352 353
    snapshot::PartitionContext context;
    context.name = partition_name;
    auto op = std::make_shared<snapshot::DropPartitionOperation>(context, ss);
    return op->Push();
G
groot 已提交
354 355
}

356
Status
C
Cai Yudong 已提交
357
DBImpl::HasPartition(const std::string& collection_name, const std::string& partition_tag, bool& exist) {
G
groot 已提交
358
    CHECK_INITIALIZED;
359

G
groot 已提交
360 361
    snapshot::ScopedSnapshotT ss;
    STATUS_CHECK(snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_name));
362

C
Cai Yudong 已提交
363 364 365 366 367 368 369 370 371
    auto partition_tags = std::move(ss->GetPartitionNames());
    for (auto& tag : partition_tags) {
        if (tag == partition_tag) {
            exist = true;
            return Status::OK();
        }
    }

    exist = false;
G
groot 已提交
372 373
    return Status::OK();
}
374

G
groot 已提交
375
Status
C
Cai Yudong 已提交
376
DBImpl::ListPartitions(const std::string& collection_name, std::vector<std::string>& partition_names) {
G
groot 已提交
377
    CHECK_INITIALIZED;
378

G
groot 已提交
379 380
    snapshot::ScopedSnapshotT ss;
    STATUS_CHECK(snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_name));
381

C
Cai Yudong 已提交
382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433
    partition_names = std::move(ss->GetPartitionNames());
    return Status::OK();
}

Status
DBImpl::CreateIndex(const std::shared_ptr<server::Context>& context, const std::string& collection_name,
                    const std::string& field_name, const CollectionIndex& index) {
    CHECK_INITIALIZED;

    // step 1: wait merge file thread finished to avoid duplicate data bug
    auto status = Flush();
    WaitMergeFileFinish();  // let merge file thread finish

    // step 2: compare old index and new index
    CollectionIndex new_index = index;
    CollectionIndex old_index;
    STATUS_CHECK(GetSnapshotIndex(collection_name, field_name, old_index));

    if (utils::IsSameIndex(old_index, new_index)) {
        return Status::OK();  // same index
    }

    // step 3: drop old index
    DropIndex(collection_name);
    WaitMergeFileFinish();  // let merge file thread finish since DropIndex start a merge task

    // step 4: create field element for index
    status = SetSnapshotIndex(collection_name, field_name, new_index);
    if (!status.ok()) {
        return status;
    }

    // step 5: start background build index thread
    std::vector<std::string> collection_names = {collection_name};
    WaitBuildIndexFinish();
    StartBuildIndexTask(collection_names);

    // step 6: iterate segments need to be build index, wait until all segments are built
    while (true) {
        SnapshotVisitor ss_visitor(collection_name);
        snapshot::IDS_TYPE segment_ids;
        ss_visitor.SegmentsToIndex(field_name, segment_ids);
        if (segment_ids.empty()) {
            break;
        }

        index_req_swn_.Wait_For(std::chrono::seconds(1));

        // client break the connection, no need to block, check every 1 second
        if (context && context->IsConnectionBroken()) {
            LOG_ENGINE_DEBUG_ << "Client connection broken, build index in background";
            break;  // just break, not return, continue to update partitions files to to_index
434
        }
G
groot 已提交
435
    }
436

G
groot 已提交
437 438
    return Status::OK();
}
439

G
groot 已提交
440
Status
C
Cai Yudong 已提交
441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477
DBImpl::DropIndex(const std::string& collection_name, const std::string& field_name) {
    CHECK_INITIALIZED;

    LOG_ENGINE_DEBUG_ << "Drop index for collection: " << collection_name;

    STATUS_CHECK(DeleteSnapshotIndex(collection_name, field_name));

    std::set<std::string> merge_collection_names = {collection_name};
    StartMergeTask(merge_collection_names, true);
    return Status::OK();
}

Status
DBImpl::DropIndex(const std::string& collection_name) {
    CHECK_INITIALIZED;

    LOG_ENGINE_DEBUG_ << "Drop index for collection: " << collection_name;

    std::vector<std::string> field_names;
    {
        snapshot::ScopedSnapshotT ss;
        STATUS_CHECK(snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_name));
        field_names = ss->GetFieldNames();
    }

    snapshot::OperationContext context;
    for (auto& field_name : field_names) {
        STATUS_CHECK(DeleteSnapshotIndex(collection_name, field_name));
    }

    std::set<std::string> merge_collection_names = {collection_name};
    StartMergeTask(merge_collection_names, true);
    return Status::OK();
}

Status
DBImpl::Insert(const std::string& collection_name, const std::string& partition_name, DataChunkPtr& data_chunk) {
G
groot 已提交
478
    CHECK_INITIALIZED;
479

G
groot 已提交
480 481 482
    if (data_chunk == nullptr) {
        return Status(DB_ERROR, "Null pointer");
    }
483

G
groot 已提交
484 485 486 487 488 489
    snapshot::ScopedSnapshotT ss;
    STATUS_CHECK(snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_name));

    auto partition_ptr = ss->GetPartition(partition_name);
    if (partition_ptr == nullptr) {
        return Status(DB_NOT_FOUND, "Fail to get partition " + partition_name);
490 491
    }

G
groot 已提交
492 493 494 495 496 497 498 499
    // Generate id
    if (data_chunk->fixed_fields_.find(engine::DEFAULT_UID_NAME) == data_chunk->fixed_fields_.end()) {
        SafeIDGenerator& id_generator = SafeIDGenerator::GetInstance();
        IDNumbers ids;
        STATUS_CHECK(id_generator.GetNextIDNumbers(data_chunk->count_, ids));
        FIXED_FIELD_DATA& id_data = data_chunk->fixed_fields_[engine::DEFAULT_UID_NAME];
        id_data.resize(ids.size() * sizeof(int64_t));
        memcpy(id_data.data(), ids.data(), ids.size() * sizeof(int64_t));
500 501
    }

G
groot 已提交
502 503
    if (options_.wal_enable_) {
        return Status(SERVER_NOT_IMPLEMENT, "Wal not implemented");
C
Cai Yudong 已提交
504 505 506 507 508 509 510 511 512 513 514
        //  auto vector_it = entity.vector_data_.begin();
        //  if (!vector_it->second.binary_data_.empty()) {
        //      wal_mgr_->InsertEntities(collection_name, partition_name, entity.id_array_,
        //      vector_it->second.binary_data_,
        //                               attr_nbytes, attr_data);
        //  } else if (!vector_it->second.float_data_.empty()) {
        //      wal_mgr_->InsertEntities(collection_name, partition_name, entity.id_array_,
        //      vector_it->second.float_data_,
        //                               attr_nbytes, attr_data);
        //  }
        //  swn_wal_.Notify();
G
groot 已提交
515 516 517 518 519 520 521 522 523
    } else {
        // insert entities: collection_name is field id
        wal::MXLogRecord record;
        record.lsn = 0;
        record.collection_id = collection_name;
        record.partition_tag = partition_name;
        record.data_chunk = data_chunk;
        record.length = data_chunk->count_;
        record.type = wal::MXLogType::Entity;
524

G
groot 已提交
525 526
        STATUS_CHECK(ExecWalRecord(record));
    }
527

528 529 530
    return Status::OK();
}

S
starlord 已提交
531
Status
C
Cai Yudong 已提交
532
DBImpl::GetEntityByID(const std::string& collection_name, const IDNumbers& id_array,
533 534
                      const std::vector<std::string>& field_names, std::vector<bool>& valid_row,
                      DataChunkPtr& data_chunk) {
C
Cai Yudong 已提交
535 536 537 538 539 540
    CHECK_INITIALIZED;

    snapshot::ScopedSnapshotT ss;
    STATUS_CHECK(snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_name));

    std::string dir_root = options_.meta_.path_;
541 542
    auto handler =
        std::make_shared<GetEntityByIdSegmentHandler>(nullptr, ss, dir_root, id_array, field_names, valid_row);
C
Cai Yudong 已提交
543 544 545 546 547 548 549 550 551
    handler->Iterate();
    STATUS_CHECK(handler->GetStatus());

    data_chunk = handler->data_chunk_;
    return Status::OK();
}

Status
DBImpl::DeleteEntityByID(const std::string& collection_name, const engine::IDNumbers entity_ids) {
G
groot 已提交
552
    CHECK_INITIALIZED;
S
starlord 已提交
553

G
groot 已提交
554 555 556
    Status status;
    if (options_.wal_enable_) {
        return Status(SERVER_NOT_IMPLEMENT, "Wal not implemented");
C
Cai Yudong 已提交
557 558
        //  wal_mgr_->DeleteById(collection_name, entity_ids);
        //  swn_wal_.Notify();
G
groot 已提交
559 560 561 562 563 564 565
    } else {
        wal::MXLogRecord record;
        record.lsn = 0;  // need to get from meta ?
        record.type = wal::MXLogType::Delete;
        record.collection_id = collection_name;
        record.ids = entity_ids.data();
        record.length = entity_ids.size();
Y
Yu Kun 已提交
566

G
groot 已提交
567
        status = ExecWalRecord(record);
G
groot 已提交
568 569
    }

G
groot 已提交
570 571
    return status;
}
G
groot 已提交
572

G
groot 已提交
573
Status
C
Cai Yudong 已提交
574 575
DBImpl::Query(const server::ContextPtr& context, const query::QueryPtr& query_ptr, engine::QueryResultPtr& result) {
    CHECK_INITIALIZED;
G
groot 已提交
576

G
groot 已提交
577
    TimeRecorder rc("DBImpl::Query");
Y
Yu Kun 已提交
578

G
groot 已提交
579
    scheduler::SearchJobPtr job = std::make_shared<scheduler::SearchJob>(nullptr, options_, query_ptr);
580

C
Cai Yudong 已提交
581 582 583
    /* put search job to scheduler and wait job finish */
    scheduler::JobMgrInst::GetInstance()->Put(job);
    job->WaitFinish();
G
groot 已提交
584

C
Cai Yudong 已提交
585 586
    if (!job->status().ok()) {
        return job->status();
587 588
    }

G
groot 已提交
589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628
    //    snapshot::ScopedSnapshotT ss;
    //    STATUS_CHECK(snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_name));
    //
    //    /* collect all valid segment */
    //    std::vector<SegmentVisitor::Ptr> segment_visitors;
    //    auto exec = [&] (const snapshot::Segment::Ptr& segment, snapshot::SegmentIterator* handler) -> Status {
    //        auto p_id = segment->GetPartitionId();
    //        auto p_ptr = ss->GetResource<snapshot::Partition>(p_id);
    //        auto& p_name = p_ptr->GetName();
    //
    //        /* check partition match pattern */
    //        bool match = false;
    //        if (partition_patterns.empty()) {
    //            match = true;
    //        } else {
    //            for (auto &pattern : partition_patterns) {
    //                if (StringHelpFunctions::IsRegexMatch(p_name, pattern)) {
    //                    match = true;
    //                    break;
    //                }
    //            }
    //        }
    //
    //        if (match) {
    //            auto visitor = SegmentVisitor::Build(ss, segment->GetID());
    //            if (!visitor) {
    //                return Status(milvus::SS_ERROR, "Cannot build segment visitor");
    //            }
    //            segment_visitors.push_back(visitor);
    //        }
    //        return Status::OK();
    //    };
    //
    //    auto segment_iter = std::make_shared<snapshot::SegmentIterator>(ss, exec);
    //    segment_iter->Iterate();
    //    STATUS_CHECK(segment_iter->GetStatus());
    //
    //    LOG_ENGINE_DEBUG_ << LogOut("Engine query begin, segment count: %ld", segment_visitors.size());
    //
    //    VectorsData vectors;
G
groot 已提交
629
    //    scheduler::SearchJobPtr job =
G
groot 已提交
630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646
    //        std::make_shared<scheduler::SSSearchJob>(tracer.Context(), general_query, query_ptr, attr_type, vectors);
    //    for (auto& sv : segment_visitors) {
    //        job->AddSegmentVisitor(sv);
    //    }
    //
    //    // step 2: put search job to scheduler and wait result
    //    scheduler::JobMgrInst::GetInstance()->Put(job);
    //    job->WaitResult();
    //
    //    if (!job->GetStatus().ok()) {
    //        return job->GetStatus();
    //    }
    //
    //    // step 3: construct results
    //    result.row_num_ = job->vector_count();
    //    result.result_ids_ = job->GetResultIds();
    //    result.result_distances_ = job->GetResultDistances();
Y
yukun 已提交
647 648

    // step 4: get entities by result ids
G
groot 已提交
649
    // STATUS_CHECK(GetEntityByID(collection_name, result.result_ids_, field_names, result.vectors_, result.attrs_));
Y
yukun 已提交
650 651

    // step 5: filter entities by field names
652 653 654 655 656 657 658 659 660 661 662 663 664
    //    std::vector<engine::AttrsData> filter_attrs;
    //    for (auto attr : result.attrs_) {
    //        AttrsData attrs_data;
    //        attrs_data.attr_type_ = attr.attr_type_;
    //        attrs_data.attr_count_ = attr.attr_count_;
    //        attrs_data.id_array_ = attr.id_array_;
    //        for (auto& name : field_names) {
    //            if (attr.attr_data_.find(name) != attr.attr_data_.end()) {
    //                attrs_data.attr_data_.insert(std::make_pair(name, attr.attr_data_.at(name)));
    //            }
    //        }
    //        filter_attrs.emplace_back(attrs_data);
    //    }
Y
yukun 已提交
665

666 667
    rc.ElapseFromBegin("Engine query totally cost");

G
groot 已提交
668
    // tracer.Context()->GetTraceContext()->GetSpan()->Finish();
669 670 671 672

    return Status::OK();
}

C
Cai Yudong 已提交
673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845
Status
DBImpl::ListIDInSegment(const std::string& collection_name, int64_t segment_id, IDNumbers& entity_ids) {
    CHECK_INITIALIZED;

    snapshot::ScopedSnapshotT ss;
    STATUS_CHECK(snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_name));

    auto read_visitor = engine::SegmentVisitor::Build(ss, segment_id);
    segment::SegmentReaderPtr segment_reader =
        std::make_shared<segment::SegmentReader>(options_.meta_.path_, read_visitor);

    STATUS_CHECK(segment_reader->LoadUids(entity_ids));

    return Status::OK();
}

Status
DBImpl::LoadCollection(const server::ContextPtr& context, const std::string& collection_name,
                       const std::vector<std::string>& field_names, bool force) {
    CHECK_INITIALIZED;

    snapshot::ScopedSnapshotT ss;
    STATUS_CHECK(snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_name));

    auto handler = std::make_shared<LoadVectorFieldHandler>(context, ss);
    handler->Iterate();

    return handler->GetStatus();
}

Status
DBImpl::Flush(const std::string& collection_name) {
    if (!initialized_.load(std::memory_order_acquire)) {
        return SHUTDOWN_ERROR;
    }

    Status status;
    bool has_collection;
    status = HasCollection(collection_name, has_collection);
    if (!status.ok()) {
        return status;
    }
    if (!has_collection) {
        LOG_ENGINE_ERROR_ << "Collection to flush does not exist: " << collection_name;
        return Status(DB_NOT_FOUND, "Collection to flush does not exist");
    }

    LOG_ENGINE_DEBUG_ << "Begin flush collection: " << collection_name;

    if (options_.wal_enable_) {
        return Status(SERVER_NOT_IMPLEMENT, "Wal not implemented");
        //        LOG_ENGINE_DEBUG_ << "WAL flush";
        //        auto lsn = wal_mgr_->Flush(collection_name);
        //        if (lsn != 0) {
        //            swn_wal_.Notify();
        //            flush_req_swn_.Wait();
        //        } else {
        //            // no collection flushed, call merge task to cleanup files
        //            std::set<std::string> merge_collection_names;
        //            StartMergeTask(merge_collection_names);
        //        }
    } else {
        LOG_ENGINE_DEBUG_ << "MemTable flush";
        InternalFlush(collection_name);
    }

    LOG_ENGINE_DEBUG_ << "End flush collection: " << collection_name;

    return status;
}

Status
DBImpl::Flush() {
    if (!initialized_.load(std::memory_order_acquire)) {
        return SHUTDOWN_ERROR;
    }

    LOG_ENGINE_DEBUG_ << "Begin flush all collections";

    Status status;
    fiu_do_on("options_wal_enable_false", options_.wal_enable_ = false);
    if (options_.wal_enable_) {
        return Status(SERVER_NOT_IMPLEMENT, "Wal not implemented");
        //        LOG_ENGINE_DEBUG_ << "WAL flush";
        //        auto lsn = wal_mgr_->Flush();
        //        if (lsn != 0) {
        //            swn_wal_.Notify();
        //            flush_req_swn_.Wait();
        //        } else {
        //            // no collection flushed, call merge task to cleanup files
        //            std::set<std::string> merge_collection_names;
        //            StartMergeTask(merge_collection_names);
        //        }
    } else {
        LOG_ENGINE_DEBUG_ << "MemTable flush";
        InternalFlush();
    }

    LOG_ENGINE_DEBUG_ << "End flush all collections";

    return status;
}

Status
DBImpl::Compact(const std::shared_ptr<server::Context>& context, const std::string& collection_name, double threshold) {
    if (!initialized_.load(std::memory_order_acquire)) {
        return SHUTDOWN_ERROR;
    }

    LOG_ENGINE_DEBUG_ << "Before compacting, wait for build index thread to finish...";
    const std::lock_guard<std::mutex> index_lock(build_index_mutex_);
    const std::lock_guard<std::mutex> merge_lock(flush_merge_compact_mutex_);

    Status status;
    bool has_collection;
    status = HasCollection(collection_name, has_collection);
    if (!status.ok()) {
        return status;
    }
    if (!has_collection) {
        LOG_ENGINE_ERROR_ << "Collection to compact does not exist: " << collection_name;
        return Status(DB_NOT_FOUND, "Collection to compact does not exist");
    }

    snapshot::ScopedSnapshotT latest_ss;
    status = snapshot::Snapshots::GetInstance().GetSnapshot(latest_ss, collection_name);
    if (!status.ok()) {
        return status;
    }

    auto& segments = latest_ss->GetResources<snapshot::Segment>();
    for (auto& kv : segments) {
        // client break the connection, no need to continue
        if (context && context->IsConnectionBroken()) {
            LOG_ENGINE_DEBUG_ << "Client connection broken, stop compact operation";
            break;
        }

        snapshot::ID_TYPE segment_id = kv.first;
        auto read_visitor = engine::SegmentVisitor::Build(latest_ss, segment_id);
        segment::SegmentReaderPtr segment_reader =
            std::make_shared<segment::SegmentReader>(options_.meta_.path_, read_visitor);

        segment::DeletedDocsPtr deleted_docs;
        status = segment_reader->LoadDeletedDocs(deleted_docs);
        if (!status.ok() || deleted_docs == nullptr) {
            continue;  // no deleted docs, no need to compact
        }

        auto segment_commit = latest_ss->GetSegmentCommitBySegmentId(segment_id);
        auto row_count = segment_commit->GetRowCount();
        if (row_count == 0) {
            continue;
        }

        auto deleted_count = deleted_docs->GetSize();
        if (deleted_count / (row_count + deleted_count) < threshold) {
            continue;  // no need to compact
        }

        snapshot::IDS_TYPE ids = {segment_id};
        MergeTask merge_task(options_, latest_ss, ids);
        status = merge_task.Execute();
        if (!status.ok()) {
            LOG_ENGINE_ERROR_ << "Compact failed for segment " << segment_reader->GetSegmentPath() << ": "
                              << status.message();
            continue;  // skip this file and try compact next one
        }
    }

    return status;
}

G
groot 已提交
846 847 848
////////////////////////////////////////////////////////////////////////////////
// Internal APIs
////////////////////////////////////////////////////////////////////////////////
S
starlord 已提交
849
void
G
groot 已提交
850 851 852 853 854 855 856 857 858 859
DBImpl::InternalFlush(const std::string& collection_name) {
    wal::MXLogRecord record;
    record.type = wal::MXLogType::Flush;
    record.collection_id = collection_name;
    ExecWalRecord(record);
}

void
DBImpl::TimingFlushThread() {
    SetThreadName("flush_thread");
Y
yu yunfeng 已提交
860
    server::SystemInfo::GetInstance().Init();
X
Xu Peng 已提交
861
    while (true) {
862
        if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
863
            LOG_ENGINE_DEBUG_ << "DB background flush thread exit";
G
groot 已提交
864 865
            break;
        }
X
Xu Peng 已提交
866

G
groot 已提交
867 868 869 870 871 872
        InternalFlush();
        if (options_.auto_flush_interval_ > 0) {
            swn_flush_.Wait_For(std::chrono::seconds(options_.auto_flush_interval_));
        } else {
            swn_flush_.Wait();
        }
873 874 875
    }
}

S
starlord 已提交
876 877
void
DBImpl::StartMetricTask() {
G
groot 已提交
878
    server::Metrics::GetInstance().KeepingAliveCounterIncrement(BACKGROUND_METRIC_INTERVAL);
G
groot 已提交
879 880
    int64_t cache_usage = cache::CpuCacheMgr::GetInstance()->CacheUsage();
    int64_t cache_total = cache::CpuCacheMgr::GetInstance()->CacheCapacity();
S
shengjh 已提交
881 882
    fiu_do_on("DBImpl.StartMetricTask.InvalidTotalCache", cache_total = 0);

J
JinHai-CN 已提交
883 884 885 886 887 888 889
    if (cache_total > 0) {
        double cache_usage_double = cache_usage;
        server::Metrics::GetInstance().CpuCacheUsageGaugeSet(cache_usage_double * 100 / cache_total);
    } else {
        server::Metrics::GetInstance().CpuCacheUsageGaugeSet(0);
    }

Y
Yu Kun 已提交
890
    server::Metrics::GetInstance().GpuCacheUsageGaugeSet();
G
groot 已提交
891 892 893 894
    /* SS TODO */
    // uint64_t size;
    // Size(size);
    // server::Metrics::GetInstance().DataFileSizeGaugeSet(size);
G
groot 已提交
895 896 897 898 899
    server::Metrics::GetInstance().CPUUsagePercentSet();
    server::Metrics::GetInstance().RAMUsagePercentSet();
    server::Metrics::GetInstance().GPUPercentGaugeSet();
    server::Metrics::GetInstance().GPUMemoryUsageGaugeSet();
    server::Metrics::GetInstance().OctetsSet();
S
starlord 已提交
900

K
kun yu 已提交
901
    server::Metrics::GetInstance().CPUCoreUsagePercentSet();
K
kun yu 已提交
902 903
    server::Metrics::GetInstance().GPUTemperature();
    server::Metrics::GetInstance().CPUTemperature();
904
    server::Metrics::GetInstance().PushToGateway();
G
groot 已提交
905 906
}

S
starlord 已提交
907
void
G
groot 已提交
908 909 910 911
DBImpl::TimingMetricThread() {
    SetThreadName("metric_thread");
    server::SystemInfo::GetInstance().Init();
    while (true) {
912
        if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
913
            LOG_ENGINE_DEBUG_ << "DB background metric thread exit";
S
starlord 已提交
914 915
            break;
        }
Z
update  
zhiru 已提交
916

G
groot 已提交
917 918
        swn_metric_.Wait_For(std::chrono::seconds(BACKGROUND_METRIC_INTERVAL));
        StartMetricTask();
Z
update  
zhiru 已提交
919
    }
G
groot 已提交
920
}
X
Xu Peng 已提交
921

S
starlord 已提交
922
void
G
groot 已提交
923
DBImpl::StartBuildIndexTask(const std::vector<std::string>& collection_names) {
S
starlord 已提交
924
    // build index has been finished?
925 926 927 928 929 930 931
    {
        std::lock_guard<std::mutex> lck(index_result_mutex_);
        if (!index_thread_results_.empty()) {
            std::chrono::milliseconds span(10);
            if (index_thread_results_.back().wait_for(span) == std::future_status::ready) {
                index_thread_results_.pop_back();
            }
G
groot 已提交
932 933 934
        }
    }

S
starlord 已提交
935
    // add new build index task
936 937 938
    {
        std::lock_guard<std::mutex> lck(index_result_mutex_);
        if (index_thread_results_.empty()) {
G
groot 已提交
939 940
            index_thread_results_.push_back(
                index_thread_pool_.enqueue(&DBImpl::BackgroundBuildIndexTask, this, collection_names));
941
        }
G
groot 已提交
942
    }
X
Xu Peng 已提交
943 944
}

S
starlord 已提交
945
void
G
groot 已提交
946
DBImpl::BackgroundBuildIndexTask(std::vector<std::string> collection_names) {
P
peng.xu 已提交
947
    std::unique_lock<std::mutex> lock(build_index_mutex_);
948

G
groot 已提交
949
    for (auto collection_name : collection_names) {
G
groot 已提交
950 951 952 953 954 955
        snapshot::ScopedSnapshotT latest_ss;
        auto status = snapshot::Snapshots::GetInstance().GetSnapshot(latest_ss, collection_name);
        if (!status.ok()) {
            return;
        }
        SnapshotVisitor ss_visitor(latest_ss);
G
groot 已提交
956

G
groot 已提交
957 958
        snapshot::IDS_TYPE segment_ids;
        ss_visitor.SegmentsToIndex("", segment_ids);
G
groot 已提交
959 960 961
        if (segment_ids.empty()) {
            continue;
        }
T
Tinkerrr 已提交
962

G
groot 已提交
963
        scheduler::BuildIndexJobPtr job = std::make_shared<scheduler::BuildIndexJob>(latest_ss, options_, segment_ids);
G
groot 已提交
964

G
groot 已提交
965 966
        scheduler::JobMgrInst::GetInstance()->Put(job);
        job->WaitFinish();
G
groot 已提交
967

G
groot 已提交
968 969 970
        if (!job->status().ok()) {
            LOG_ENGINE_ERROR_ << job->status().message();
            break;
G
groot 已提交
971 972 973 974
        }
    }
}

G
groot 已提交
975 976 977 978 979 980 981 982
void
DBImpl::TimingIndexThread() {
    SetThreadName("index_thread");
    server::SystemInfo::GetInstance().Init();
    while (true) {
        if (!initialized_.load(std::memory_order_acquire)) {
            WaitMergeFileFinish();
            WaitBuildIndexFinish();
G
groot 已提交
983

G
groot 已提交
984 985
            LOG_ENGINE_DEBUG_ << "DB background thread exit";
            break;
G
groot 已提交
986 987
        }

G
groot 已提交
988
        swn_index_.Wait_For(std::chrono::seconds(BACKGROUND_INDEX_INTERVAL));
G
groot 已提交
989

G
groot 已提交
990 991 992 993 994 995
        std::vector<std::string> collection_names;
        snapshot::Snapshots::GetInstance().GetCollectionNames(collection_names);
        WaitMergeFileFinish();
        StartBuildIndexTask(collection_names);
    }
}
G
groot 已提交
996

G
groot 已提交
997 998 999 1000 1001 1002 1003 1004
void
DBImpl::WaitBuildIndexFinish() {
    //    LOG_ENGINE_DEBUG_ << "Begin WaitBuildIndexFinish";
    std::lock_guard<std::mutex> lck(index_result_mutex_);
    for (auto& iter : index_thread_results_) {
        iter.wait();
    }
    //    LOG_ENGINE_DEBUG_ << "End WaitBuildIndexFinish";
G
groot 已提交
1005 1006
}

G
groot 已提交
1007 1008
void
DBImpl::TimingWalThread() {
G
groot 已提交
1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065
    //    SetThreadName("wal_thread");
    //    server::SystemInfo::GetInstance().Init();
    //
    //    std::chrono::system_clock::time_point next_auto_flush_time;
    //    auto get_next_auto_flush_time = [&]() {
    //        return std::chrono::system_clock::now() + std::chrono::seconds(options_.auto_flush_interval_);
    //    };
    //    if (options_.auto_flush_interval_ > 0) {
    //        next_auto_flush_time = get_next_auto_flush_time();
    //    }
    //
    //    InternalFlush();
    //    while (true) {
    //        if (options_.auto_flush_interval_ > 0) {
    //            if (std::chrono::system_clock::now() >= next_auto_flush_time) {
    //                InternalFlush();
    //                next_auto_flush_time = get_next_auto_flush_time();
    //            }
    //        }
    //
    //        wal::MXLogRecord record;
    //        auto error_code = wal_mgr_->GetNextRecord(record);
    //        if (error_code != WAL_SUCCESS) {
    //            LOG_ENGINE_ERROR_ << "WAL background GetNextRecord error";
    //            break;
    //        }
    //
    //        if (record.type != wal::MXLogType::None) {
    //            ExecWalRecord(record);
    //            if (record.type == wal::MXLogType::Flush) {
    //                // notify flush request to return
    //                flush_req_swn_.Notify();
    //
    //                // if user flush all manually, update auto flush also
    //                if (record.collection_id.empty() && options_.auto_flush_interval_ > 0) {
    //                    next_auto_flush_time = get_next_auto_flush_time();
    //                }
    //            }
    //
    //        } else {
    //            if (!initialized_.load(std::memory_order_acquire)) {
    //                InternalFlush();
    //                flush_req_swn_.Notify();
    //                // SS TODO
    //                // WaitMergeFileFinish();
    //                // WaitBuildIndexFinish();
    //                LOG_ENGINE_DEBUG_ << "WAL background thread exit";
    //                break;
    //            }
    //
    //            if (options_.auto_flush_interval_ > 0) {
    //                swn_wal_.Wait_Until(next_auto_flush_time);
    //            } else {
    //                swn_wal_.Wait();
    //            }
    //        }
    //    }
G
groot 已提交
1066 1067
}

1068 1069
Status
DBImpl::ExecWalRecord(const wal::MXLogRecord& record) {
G
groot 已提交
1070
    auto collections_flushed = [&](const std::string& collection_name,
G
groot 已提交
1071
                                   const std::set<std::string>& target_collection_names) -> uint64_t {
1072
        uint64_t max_lsn = 0;
G
groot 已提交
1073
        if (options_.wal_enable_ && !target_collection_names.empty()) {
G
groot 已提交
1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086
            //            uint64_t lsn = 0;
            //            for (auto& collection_name : target_collection_names) {
            //                snapshot::ScopedSnapshotT ss;
            //                snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_name);
            //                lsn = ss->GetMaxLsn();
            //                if (lsn > max_lsn) {
            //                    max_lsn = lsn;
            //                }
            //            }
            //            wal_mgr_->CollectionFlushed(collection_name, lsn);
        }

        std::set<std::string> merge_collection_names;
G
groot 已提交
1087
        for (auto& collection : target_collection_names) {
G
groot 已提交
1088
            merge_collection_names.insert(collection);
1089
        }
G
groot 已提交
1090
        StartMergeTask(merge_collection_names);
1091 1092 1093
        return max_lsn;
    };

G
groot 已提交
1094
    auto force_flush_if_mem_full = [&]() -> void {
G
groot 已提交
1095 1096 1097
        if (mem_mgr_->GetCurrentMem() > options_.insert_buffer_size_) {
            LOG_ENGINE_DEBUG_ << LogOut("[%s][%ld] ", "insert", 0) << "Insert buffer size exceeds limit. Force flush";
            InternalFlush();
G
groot 已提交
1098 1099 1100
        }
    };

G
groot 已提交
1101 1102 1103 1104 1105 1106 1107
    auto get_collection_partition_id = [&](const wal::MXLogRecord& record, int64_t& col_id,
                                           int64_t& part_id) -> Status {
        snapshot::ScopedSnapshotT ss;
        auto status = snapshot::Snapshots::GetInstance().GetSnapshot(ss, record.collection_id);
        if (!status.ok()) {
            LOG_ENGINE_ERROR_ << LogOut("[%s][%ld] ", "insert", 0) << "Get snapshot fail: " << status.message();
            return status;
1108
        }
G
groot 已提交
1109 1110 1111 1112 1113 1114 1115
        col_id = ss->GetCollectionId();
        snapshot::PartitionPtr part = ss->GetPartition(record.partition_tag);
        if (part == nullptr) {
            LOG_ENGINE_ERROR_ << LogOut("[%s][%ld] ", "insert", 0) << "Get partition fail: " << status.message();
            return status;
        }
        part_id = part->GetID();
1116

G
groot 已提交
1117 1118
        return Status::OK();
    };
1119

G
groot 已提交
1120
    Status status;
1121

G
groot 已提交
1122 1123 1124
    switch (record.type) {
        case wal::MXLogType::Entity: {
            int64_t collection_name = 0, partition_id = 0;
1125
            status = get_collection_partition_id(record, collection_name, partition_id);
1126
            if (!status.ok()) {
G
groot 已提交
1127
                LOG_WAL_ERROR_ << LogOut("[%s][%ld] ", "insert", 0) << status.message();
1128 1129 1130
                return status;
            }

G
groot 已提交
1131
            status = mem_mgr_->InsertEntities(collection_name, partition_id, record.data_chunk, record.lsn);
G
groot 已提交
1132
            force_flush_if_mem_full();
1133 1134 1135 1136 1137 1138 1139

            // metrics
            milvus::server::CollectInsertMetrics metrics(record.length, status);
            break;
        }

        case wal::MXLogType::Delete: {
G
groot 已提交
1140
            snapshot::ScopedSnapshotT ss;
1141
            status = snapshot::Snapshots::GetInstance().GetSnapshot(ss, record.collection_id);
1142
            if (!status.ok()) {
G
groot 已提交
1143
                LOG_WAL_ERROR_ << LogOut("[%s][%ld] ", "delete", 0) << "Get snapshot fail: " << status.message();
1144 1145 1146 1147
                return status;
            }

            if (record.length == 1) {
G
groot 已提交
1148 1149 1150
                status = mem_mgr_->DeleteEntity(ss->GetCollectionId(), *record.ids, record.lsn);
                if (!status.ok()) {
                    return status;
1151 1152
                }
            } else {
G
groot 已提交
1153 1154 1155
                status = mem_mgr_->DeleteEntities(ss->GetCollectionId(), record.length, record.ids, record.lsn);
                if (!status.ok()) {
                    return status;
1156 1157 1158 1159 1160 1161
                }
            }
            break;
        }

        case wal::MXLogType::Flush: {
J
Jin Hai 已提交
1162 1163
            if (!record.collection_id.empty()) {
                // flush one collection
G
groot 已提交
1164
                snapshot::ScopedSnapshotT ss;
1165
                status = snapshot::Snapshots::GetInstance().GetSnapshot(ss, record.collection_id);
1166
                if (!status.ok()) {
G
groot 已提交
1167
                    LOG_WAL_ERROR_ << LogOut("[%s][%ld] ", "flush", 0) << "Get snapshot fail: " << status.message();
1168 1169 1170
                    return status;
                }

G
groot 已提交
1171 1172 1173 1174 1175
                const std::lock_guard<std::mutex> lock(flush_merge_compact_mutex_);
                int64_t collection_name = ss->GetCollectionId();
                status = mem_mgr_->Flush(collection_name);
                if (!status.ok()) {
                    return status;
1176 1177
                }

1178
                std::set<std::string> flushed_collections;
G
groot 已提交
1179
                collections_flushed(record.collection_id, flushed_collections);
1180 1181

            } else {
1182
                // flush all collections
G
groot 已提交
1183
                std::set<int64_t> collection_names;
1184 1185
                {
                    const std::lock_guard<std::mutex> lock(flush_merge_compact_mutex_);
G
groot 已提交
1186 1187 1188 1189 1190 1191
                    status = mem_mgr_->Flush(collection_names);
                }

                std::set<std::string> flushed_collections;
                for (auto id : collection_names) {
                    snapshot::ScopedSnapshotT ss;
1192
                    status = snapshot::Snapshots::GetInstance().GetSnapshot(ss, id);
G
groot 已提交
1193 1194 1195 1196 1197 1198
                    if (!status.ok()) {
                        LOG_WAL_ERROR_ << LogOut("[%s][%ld] ", "flush", 0) << "Get snapshot fail: " << status.message();
                        return status;
                    }

                    flushed_collections.insert(ss->GetName());
1199 1200
                }

G
groot 已提交
1201
                uint64_t lsn = collections_flushed("", flushed_collections);
1202
                if (options_.wal_enable_) {
G
groot 已提交
1203
                    //                    wal_mgr_->RemoveOldFiles(lsn);
1204 1205 1206 1207
                }
            }
            break;
        }
C
Cai Yudong 已提交
1208 1209 1210

        default:
            break;
1211 1212 1213 1214 1215 1216
    }

    return status;
}

void
G
groot 已提交
1217 1218 1219 1220 1221 1222 1223 1224 1225
DBImpl::StartMergeTask(const std::set<std::string>& collection_names, bool force_merge_all) {
    // LOG_ENGINE_DEBUG_ << "Begin StartMergeTask";
    // merge task has been finished?
    {
        std::lock_guard<std::mutex> lck(merge_result_mutex_);
        if (!merge_thread_results_.empty()) {
            std::chrono::milliseconds span(10);
            if (merge_thread_results_.back().wait_for(span) == std::future_status::ready) {
                merge_thread_results_.pop_back();
1226
            }
1227
        }
G
groot 已提交
1228
    }
1229

G
groot 已提交
1230 1231 1232 1233 1234 1235 1236
    // add new merge task
    {
        std::lock_guard<std::mutex> lck(merge_result_mutex_);
        if (merge_thread_results_.empty()) {
            // start merge file thread
            merge_thread_results_.push_back(
                merge_thread_pool_.enqueue(&DBImpl::BackgroundMerge, this, collection_names, force_merge_all));
1237 1238
        }
    }
G
groot 已提交
1239 1240

    // LOG_ENGINE_DEBUG_ << "End StartMergeTask";
1241 1242
}

G
groot 已提交
1243
void
G
groot 已提交
1244 1245 1246 1247 1248 1249 1250 1251 1252 1253
DBImpl::BackgroundMerge(std::set<std::string> collection_names, bool force_merge_all) {
    // LOG_ENGINE_TRACE_ << " Background merge thread start";

    Status status;
    for (auto& collection_name : collection_names) {
        const std::lock_guard<std::mutex> lock(flush_merge_compact_mutex_);

        auto old_strategy = merge_mgr_ptr_->Strategy();
        if (force_merge_all) {
            merge_mgr_ptr_->UseStrategy(MergeStrategyType::ADAPTIVE);
G
groot 已提交
1254 1255
        }

1256
        status = merge_mgr_ptr_->MergeFiles(collection_name);
G
groot 已提交
1257 1258 1259 1260
        merge_mgr_ptr_->UseStrategy(old_strategy);
        if (!status.ok()) {
            LOG_ENGINE_ERROR_ << "Failed to get merge files for collection: " << collection_name
                              << " reason:" << status.message();
G
groot 已提交
1261 1262 1263
        }

        if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
1264
            LOG_ENGINE_DEBUG_ << "Server will shutdown, skip merge action for collection: " << collection_name;
G
groot 已提交
1265 1266 1267
            break;
        }
    }
G
groot 已提交
1268 1269

    // TODO: cleanup with ttl
G
groot 已提交
1270 1271
}

1272
void
G
groot 已提交
1273 1274 1275 1276 1277 1278 1279
DBImpl::WaitMergeFileFinish() {
    //    LOG_ENGINE_DEBUG_ << "Begin WaitMergeFileFinish";
    std::lock_guard<std::mutex> lck(merge_result_mutex_);
    for (auto& iter : merge_thread_results_) {
        iter.wait();
    }
    //    LOG_ENGINE_DEBUG_ << "End WaitMergeFileFinish";
1280 1281
}

1282 1283 1284 1285 1286 1287 1288 1289 1290 1291 1292 1293 1294 1295 1296 1297 1298 1299
void
DBImpl::SuspendIfFirst() {
    std::lock_guard<std::mutex> lock(suspend_build_mutex_);
    if (++live_search_num_ == 1) {
        LOG_ENGINE_TRACE_ << "live_search_num_: " << live_search_num_;
        knowhere::BuilderSuspend();
    }
}

void
DBImpl::ResumeIfLast() {
    std::lock_guard<std::mutex> lock(suspend_build_mutex_);
    if (--live_search_num_ == 0) {
        LOG_ENGINE_TRACE_ << "live_search_num_: " << live_search_num_;
        knowhere::BuildResume();
    }
}

C
Cai Yudong 已提交
1300 1301 1302 1303 1304 1305 1306
void
DBImpl::ConfigUpdate(const std::string& name) {
    if (name == "storage.auto_flush_interval") {
        options_.auto_flush_interval_ = config.storage.auto_flush_interval();
    }
}

S
starlord 已提交
1307 1308
}  // namespace engine
}  // namespace milvus