DBImpl.cpp 40.2 KB
Newer Older
1
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
J
jinhai 已提交
2
//
3 4
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
J
jinhai 已提交
5
//
6 7 8 9 10
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
J
jinhai 已提交
11

S
starlord 已提交
12
#include "db/DBImpl.h"
S
starlord 已提交
13
#include "cache/CpuCacheMgr.h"
14
#include "codecs/Codec.h"
W
Wang XiangYu 已提交
15
#include "config/ServerConfig.h"
16
#include "db/IDGenerator.h"
G
groot 已提交
17 18
#include "db/SnapshotUtils.h"
#include "db/SnapshotVisitor.h"
G
groot 已提交
19
#include "db/merge/MergeManagerFactory.h"
G
groot 已提交
20 21 22 23 24 25 26 27
#include "db/merge/MergeTask.h"
#include "db/snapshot/CompoundOperations.h"
#include "db/snapshot/EventExecutor.h"
#include "db/snapshot/IterateHandler.h"
#include "db/snapshot/OperationExecutor.h"
#include "db/snapshot/ResourceHelper.h"
#include "db/snapshot/ResourceTypes.h"
#include "db/snapshot/Snapshots.h"
28
#include "insert/MemManagerFactory.h"
G
groot 已提交
29
#include "knowhere/index/vector_index/helpers/BuilderSuspend.h"
X
Xiaohai Xu 已提交
30
#include "knowhere/index/vector_index/helpers/FaissIO.h"
G
groot 已提交
31
#include "metrics/Metrics.h"
G
groot 已提交
32
#include "metrics/SystemInfo.h"
G
groot 已提交
33
#include "scheduler/Definition.h"
S
starlord 已提交
34
#include "scheduler/SchedInst.h"
S
starlord 已提交
35
#include "scheduler/job/SearchJob.h"
36 37 38
#include "segment/SegmentReader.h"
#include "segment/SegmentWriter.h"
#include "utils/Exception.h"
G
groot 已提交
39
#include "utils/StringHelpFunctions.h"
S
starlord 已提交
40
#include "utils/TimeRecorder.h"
41
#include "wal/WalDefinations.h"
X
Xu Peng 已提交
42

G
groot 已提交
43 44 45 46
#include <fiu-local.h>
#include <src/scheduler/job/BuildIndexJob.h>
#include <limits>
#include <utility>
47

J
jinhai 已提交
48
namespace milvus {
X
Xu Peng 已提交
49
namespace engine {
X
Xu Peng 已提交
50

G
groot 已提交
51
namespace {
G
groot 已提交
52 53
constexpr uint64_t BACKGROUND_METRIC_INTERVAL = 1;
constexpr uint64_t BACKGROUND_INDEX_INTERVAL = 1;
G
groot 已提交
54
constexpr uint64_t WAIT_BUILD_INDEX_INTERVAL = 5;
G
groot 已提交
55

G
groot 已提交
56
static const Status SHUTDOWN_ERROR = Status(DB_ERROR, "Milvus server is shutdown!");
S
starlord 已提交
57
}  // namespace
G
groot 已提交
58

G
groot 已提交
59 60 61 62 63
#define CHECK_INITIALIZED                                \
    if (!initialized_.load(std::memory_order_acquire)) { \
        return SHUTDOWN_ERROR;                           \
    }

Y
Yu Kun 已提交
64
DBImpl::DBImpl(const DBOptions& options)
65
    : options_(options), initialized_(false), merge_thread_pool_(1, 1), index_thread_pool_(1, 1) {
G
groot 已提交
66 67
    mem_mgr_ = MemManagerFactory::Build(options_);
    merge_mgr_ptr_ = MergeManagerFactory::SSBuild(options_);
68

C
Cai Yudong 已提交
69 70 71
    /* watch on storage.auto_flush_interval */
    ConfigMgr::GetInstance().Attach("storage.auto_flush_interval", this);

S
starlord 已提交
72 73 74 75
    Start();
}

DBImpl::~DBImpl() {
C
Cai Yudong 已提交
76 77
    ConfigMgr::GetInstance().Detach("storage.auto_flush_interval", this);

S
starlord 已提交
78 79 80
    Stop();
}

G
groot 已提交
81 82 83
////////////////////////////////////////////////////////////////////////////////
// External APIs
////////////////////////////////////////////////////////////////////////////////
S
starlord 已提交
84 85
Status
DBImpl::Start() {
86
    if (initialized_.load(std::memory_order_acquire)) {
S
starlord 已提交
87 88 89
        return Status::OK();
    }

G
groot 已提交
90
    // snapshot
91 92
    auto store = snapshot::Store::Build(options_.meta_.backend_uri_, options_.meta_.path_,
                                        codec::Codec::instance().GetSuffixSet());
G
groot 已提交
93 94 95 96 97 98
    snapshot::OperationExecutor::Init(store);
    snapshot::OperationExecutor::GetInstance().Start();
    snapshot::EventExecutor::Init(store);
    snapshot::EventExecutor::GetInstance().Start();
    snapshot::Snapshots::GetInstance().Init(store);

X
Xiaohai Xu 已提交
99 100
    knowhere::enable_faiss_logging();

101
    // LOG_ENGINE_TRACE_ << "DB service start";
102
    initialized_.store(true, std::memory_order_release);
S
starlord 已提交
103

G
groot 已提交
104
    // TODO: merge files
G
groot 已提交
105

G
groot 已提交
106 107 108 109
    // for distribute version, some nodes are read only
    if (options_.mode_ != DBOptions::MODE::CLUSTER_READONLY) {
        // background flush thread
        bg_flush_thread_ = std::thread(&DBImpl::TimingFlushThread, this);
Z
update  
zhiru 已提交
110
    }
S
starlord 已提交
111

G
groot 已提交
112 113 114
    // for distribute version, some nodes are read only
    if (options_.mode_ != DBOptions::MODE::CLUSTER_READONLY) {
        // background build index thread
G
groot 已提交
115
        bg_index_thread_ = std::thread(&DBImpl::TimingIndexThread, this);
G
groot 已提交
116 117 118
    }

    // background metric thread
Y
yukun 已提交
119
    fiu_do_on("options_metric_enable", options_.metric_enable_ = true);
G
groot 已提交
120
    if (options_.metric_enable_) {
G
groot 已提交
121
        bg_metric_thread_ = std::thread(&DBImpl::TimingMetricThread, this);
G
groot 已提交
122
    }
G
groot 已提交
123

S
starlord 已提交
124 125 126
    return Status::OK();
}

S
starlord 已提交
127 128
Status
DBImpl::Stop() {
129
    if (!initialized_.load(std::memory_order_acquire)) {
S
starlord 已提交
130 131
        return Status::OK();
    }
132

133
    initialized_.store(false, std::memory_order_release);
S
starlord 已提交
134

135
    if (options_.mode_ != DBOptions::MODE::CLUSTER_READONLY) {
G
groot 已提交
136 137 138 139 140 141 142 143
        // flush all without merge
        wal::MXLogRecord record;
        record.type = wal::MXLogType::Flush;
        ExecWalRecord(record);

        // wait flush thread finish
        swn_flush_.Notify();
        bg_flush_thread_.join();
S
starlord 已提交
144

145 146
        WaitMergeFileFinish();

G
groot 已提交
147 148
        swn_index_.Notify();
        bg_index_thread_.join();
S
starlord 已提交
149 150
    }

G
groot 已提交
151
    // wait metric thread exit
G
groot 已提交
152 153 154 155
    if (options_.metric_enable_) {
        swn_metric_.Notify();
        bg_metric_thread_.join();
    }
G
groot 已提交
156

G
groot 已提交
157 158 159
    snapshot::EventExecutor::GetInstance().Stop();
    snapshot::OperationExecutor::GetInstance().Stop();

160
    // LOG_ENGINE_TRACE_ << "DB service stop";
S
starlord 已提交
161
    return Status::OK();
X
Xu Peng 已提交
162 163
}

S
starlord 已提交
164
Status
G
groot 已提交
165 166 167 168
DBImpl::CreateCollection(const snapshot::CreateCollectionContext& context) {
    CHECK_INITIALIZED;

    auto ctx = context;
G
groot 已提交
169 170 171 172 173 174 175 176 177 178

    // default id is auto-generated
    auto params = ctx.collection->GetParams();
    if (params.find(PARAM_UID_AUTOGEN) == params.end()) {
        params[PARAM_UID_AUTOGEN] = true;
        ctx.collection->SetParams(params);
    }

    // check uid existence
    snapshot::FieldPtr uid_field;
G
groot 已提交
179
    for (auto& pair : ctx.fields_schema) {
G
groot 已提交
180
        if (pair.first->GetName() == FIELD_UID) {
G
groot 已提交
181
            uid_field = pair.first;
G
groot 已提交
182 183 184
            break;
        }
    }
S
starlord 已提交
185

G
groot 已提交
186
    // add uid field if not specified
G
groot 已提交
187
    if (uid_field == nullptr) {
G
groot 已提交
188
        uid_field = std::make_shared<snapshot::Field>(FIELD_UID, 0, DataType::INT64);
S
starlord 已提交
189 190
    }

G
groot 已提交
191 192
    // define uid elements
    auto bloom_filter_element = std::make_shared<snapshot::FieldElement>(
G
groot 已提交
193
        0, 0, ELEMENT_BLOOM_FILTER, milvus::engine::FieldElementType::FET_BLOOM_FILTER);
G
groot 已提交
194
    auto delete_doc_element = std::make_shared<snapshot::FieldElement>(
G
groot 已提交
195
        0, 0, ELEMENT_DELETED_DOCS, milvus::engine::FieldElementType::FET_DELETED_DOCS);
G
groot 已提交
196 197
    ctx.fields_schema[uid_field] = {bloom_filter_element, delete_doc_element};

G
groot 已提交
198 199
    auto op = std::make_shared<snapshot::CreateCollectionOperation>(ctx);
    return op->Push();
200 201
}

202
Status
G
groot 已提交
203 204 205
DBImpl::DropCollection(const std::string& name) {
    CHECK_INITIALIZED;

C
Cai Yudong 已提交
206
    LOG_ENGINE_DEBUG_ << "Prepare to drop collection " << name;
G
groot 已提交
207 208 209 210 211

    snapshot::ScopedSnapshotT ss;
    auto& snapshots = snapshot::Snapshots::GetInstance();
    STATUS_CHECK(snapshots.GetSnapshot(ss, name));

G
groot 已提交
212
    mem_mgr_->EraseMem(ss->GetCollectionId());  // not allow insert
G
groot 已提交
213 214

    return snapshots.DropCollection(ss->GetCollectionId(), std::numeric_limits<snapshot::LSN_TYPE>::max());
215 216
}

S
starlord 已提交
217
Status
G
groot 已提交
218 219
DBImpl::HasCollection(const std::string& collection_name, bool& has_or_not) {
    CHECK_INITIALIZED;
S
starlord 已提交
220

G
groot 已提交
221 222 223
    snapshot::ScopedSnapshotT ss;
    auto status = snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_name);
    has_or_not = status.ok();
224

225
    return Status::OK();
G
groot 已提交
226 227 228
}

Status
C
Cai Yudong 已提交
229
DBImpl::ListCollections(std::vector<std::string>& names) {
G
groot 已提交
230
    CHECK_INITIALIZED;
231

G
groot 已提交
232 233 234
    names.clear();
    return snapshot::Snapshots::GetInstance().GetCollectionNames(names);
}
235

G
groot 已提交
236
Status
C
Cai Yudong 已提交
237
DBImpl::GetCollectionInfo(const std::string& collection_name, snapshot::CollectionPtr& collection,
C
Cai Yudong 已提交
238
                          snapshot::FieldElementMappings& fields_schema) {
G
groot 已提交
239
    CHECK_INITIALIZED;
240

C
Cai Yudong 已提交
241 242
    snapshot::ScopedSnapshotT ss;
    STATUS_CHECK(snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_name));
243

C
Cai Yudong 已提交
244 245 246 247 248
    collection = ss->GetCollection();
    auto& fields = ss->GetResources<snapshot::Field>();
    for (auto& kv : fields) {
        fields_schema[kv.second.Get()] = ss->GetFieldElementsByField(kv.second->GetName());
    }
249
    return Status::OK();
G
groot 已提交
250 251
}

S
starlord 已提交
252
Status
G
groot 已提交
253
DBImpl::GetCollectionStats(const std::string& collection_name, milvus::json& collection_stats) {
G
groot 已提交
254 255
    CHECK_INITIALIZED;

G
groot 已提交
256
    STATUS_CHECK(GetSnapshotInfo(collection_name, collection_stats));
G
groot 已提交
257
    return Status::OK();
258 259
}

S
starlord 已提交
260
Status
C
Cai Yudong 已提交
261
DBImpl::CountEntities(const std::string& collection_name, int64_t& row_count) {
G
groot 已提交
262
    CHECK_INITIALIZED;
S
starlord 已提交
263

G
groot 已提交
264 265 266
    snapshot::ScopedSnapshotT ss;
    STATUS_CHECK(snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_name));

C
Cai Yudong 已提交
267 268
    row_count = ss->GetCollectionCommit()->GetRowCount();
    return Status::OK();
269 270
}

271
Status
G
groot 已提交
272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287
DBImpl::CreatePartition(const std::string& collection_name, const std::string& partition_name) {
    CHECK_INITIALIZED;

    snapshot::ScopedSnapshotT ss;
    STATUS_CHECK(snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_name));

    snapshot::LSN_TYPE lsn = 0;
    snapshot::OperationContext context;
    context.lsn = lsn;
    auto op = std::make_shared<snapshot::CreatePartitionOperation>(context, ss);

    snapshot::PartitionContext p_ctx;
    p_ctx.name = partition_name;
    snapshot::PartitionPtr partition;
    STATUS_CHECK(op->CommitNewPartition(p_ctx, partition));
    return op->Push();
288 289
}

S
starlord 已提交
290
Status
G
groot 已提交
291 292
DBImpl::DropPartition(const std::string& collection_name, const std::string& partition_name) {
    CHECK_INITIALIZED;
S
starlord 已提交
293

G
groot 已提交
294 295
    snapshot::ScopedSnapshotT ss;
    STATUS_CHECK(snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_name));
296

G
groot 已提交
297
    // SS TODO: Is below step needed? Or How to implement it?
G
groot 已提交
298
    /* mem_mgr_->EraseMem(partition_name); */
299

G
groot 已提交
300 301 302 303
    snapshot::PartitionContext context;
    context.name = partition_name;
    auto op = std::make_shared<snapshot::DropPartitionOperation>(context, ss);
    return op->Push();
G
groot 已提交
304 305
}

306
Status
C
Cai Yudong 已提交
307
DBImpl::HasPartition(const std::string& collection_name, const std::string& partition_tag, bool& exist) {
G
groot 已提交
308
    CHECK_INITIALIZED;
309

G
groot 已提交
310 311
    snapshot::ScopedSnapshotT ss;
    STATUS_CHECK(snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_name));
312

C
Cai Yudong 已提交
313 314 315 316 317 318 319 320 321
    auto partition_tags = std::move(ss->GetPartitionNames());
    for (auto& tag : partition_tags) {
        if (tag == partition_tag) {
            exist = true;
            return Status::OK();
        }
    }

    exist = false;
G
groot 已提交
322 323
    return Status::OK();
}
324

G
groot 已提交
325
Status
C
Cai Yudong 已提交
326
DBImpl::ListPartitions(const std::string& collection_name, std::vector<std::string>& partition_names) {
G
groot 已提交
327
    CHECK_INITIALIZED;
328

G
groot 已提交
329 330
    snapshot::ScopedSnapshotT ss;
    STATUS_CHECK(snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_name));
331

C
Cai Yudong 已提交
332 333 334 335 336 337 338 339 340
    partition_names = std::move(ss->GetPartitionNames());
    return Status::OK();
}

Status
DBImpl::CreateIndex(const std::shared_ptr<server::Context>& context, const std::string& collection_name,
                    const std::string& field_name, const CollectionIndex& index) {
    CHECK_INITIALIZED;

G
groot 已提交
341 342
    LOG_ENGINE_DEBUG_ << "Create index for collection: " << collection_name << " field: " << field_name;

C
Cai Yudong 已提交
343 344 345 346 347 348 349 350 351 352 353 354 355 356
    // step 1: wait merge file thread finished to avoid duplicate data bug
    auto status = Flush();
    WaitMergeFileFinish();  // let merge file thread finish

    // step 2: compare old index and new index
    CollectionIndex new_index = index;
    CollectionIndex old_index;
    STATUS_CHECK(GetSnapshotIndex(collection_name, field_name, old_index));

    if (utils::IsSameIndex(old_index, new_index)) {
        return Status::OK();  // same index
    }

    // step 3: drop old index
G
groot 已提交
357
    DropIndex(collection_name, field_name);
C
Cai Yudong 已提交
358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385
    WaitMergeFileFinish();  // let merge file thread finish since DropIndex start a merge task

    // step 4: create field element for index
    status = SetSnapshotIndex(collection_name, field_name, new_index);
    if (!status.ok()) {
        return status;
    }

    // step 5: start background build index thread
    std::vector<std::string> collection_names = {collection_name};
    WaitBuildIndexFinish();
    StartBuildIndexTask(collection_names);

    // step 6: iterate segments need to be build index, wait until all segments are built
    while (true) {
        SnapshotVisitor ss_visitor(collection_name);
        snapshot::IDS_TYPE segment_ids;
        ss_visitor.SegmentsToIndex(field_name, segment_ids);
        if (segment_ids.empty()) {
            break;
        }

        index_req_swn_.Wait_For(std::chrono::seconds(1));

        // client break the connection, no need to block, check every 1 second
        if (context && context->IsConnectionBroken()) {
            LOG_ENGINE_DEBUG_ << "Client connection broken, build index in background";
            break;  // just break, not return, continue to update partitions files to to_index
386
        }
G
groot 已提交
387
    }
388

G
groot 已提交
389 390
    return Status::OK();
}
391

G
groot 已提交
392
Status
C
Cai Yudong 已提交
393 394 395
DBImpl::DropIndex(const std::string& collection_name, const std::string& field_name) {
    CHECK_INITIALIZED;

G
groot 已提交
396
    LOG_ENGINE_DEBUG_ << "Drop index for collection: " << collection_name << " field: " << field_name;
C
Cai Yudong 已提交
397 398 399 400 401

    STATUS_CHECK(DeleteSnapshotIndex(collection_name, field_name));

    std::set<std::string> merge_collection_names = {collection_name};
    StartMergeTask(merge_collection_names, true);
G
groot 已提交
402

C
Cai Yudong 已提交
403 404 405 406
    return Status::OK();
}

Status
G
groot 已提交
407
DBImpl::DescribeIndex(const std::string& collection_name, const std::string& field_name, CollectionIndex& index) {
C
Cai Yudong 已提交
408 409
    CHECK_INITIALIZED;

G
groot 已提交
410
    LOG_ENGINE_DEBUG_ << "Describe index for collection: " << collection_name << " field: " << field_name;
C
Cai Yudong 已提交
411

G
groot 已提交
412
    STATUS_CHECK(GetSnapshotIndex(collection_name, field_name, index));
C
Cai Yudong 已提交
413 414 415 416 417 418

    return Status::OK();
}

Status
DBImpl::Insert(const std::string& collection_name, const std::string& partition_name, DataChunkPtr& data_chunk) {
G
groot 已提交
419
    CHECK_INITIALIZED;
420

G
groot 已提交
421 422 423
    if (data_chunk == nullptr) {
        return Status(DB_ERROR, "Null pointer");
    }
424

G
groot 已提交
425 426 427 428 429 430
    snapshot::ScopedSnapshotT ss;
    STATUS_CHECK(snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_name));

    auto partition_ptr = ss->GetPartition(partition_name);
    if (partition_ptr == nullptr) {
        return Status(DB_NOT_FOUND, "Fail to get partition " + partition_name);
431 432
    }

G
groot 已提交
433
    auto id_field = ss->GetField(FIELD_UID);
G
groot 已提交
434 435 436 437
    if (id_field == nullptr) {
        return Status(DB_ERROR, "Field '_id' not found");
    }

G
groot 已提交
438
    auto& params = ss->GetCollection()->GetParams();
G
groot 已提交
439 440 441 442 443 444
    bool auto_increment = true;
    if (params.find(PARAM_UID_AUTOGEN) != params.end()) {
        auto_increment = params[PARAM_UID_AUTOGEN];
    }

    FIXEDX_FIELD_MAP& fields = data_chunk->fixed_fields_;
G
groot 已提交
445
    auto pair = fields.find(engine::FIELD_UID);
G
groot 已提交
446
    if (auto_increment) {
G
groot 已提交
447
        // id is auto increment, but client provides id, return error
G
groot 已提交
448 449 450
        if (pair != fields.end() && pair->second != nullptr) {
            return Status(DB_ERROR, "Field '_id' is auto increment, no need to provide id");
        }
G
groot 已提交
451 452
    } else {
        // id is not auto increment, but client doesn't provide id, return error
G
groot 已提交
453 454 455 456 457 458 459
        if (pair == fields.end() || pair->second == nullptr) {
            return Status(DB_ERROR, "Field '_id' is user defined");
        }
    }

    // generate id
    if (auto_increment) {
G
groot 已提交
460 461 462
        SafeIDGenerator& id_generator = SafeIDGenerator::GetInstance();
        IDNumbers ids;
        STATUS_CHECK(id_generator.GetNextIDNumbers(data_chunk->count_, ids));
G
groot 已提交
463 464 465
        BinaryDataPtr id_data = std::make_shared<BinaryData>();
        id_data->data_.resize(ids.size() * sizeof(int64_t));
        memcpy(id_data->data_.data(), ids.data(), ids.size() * sizeof(int64_t));
G
groot 已提交
466
        data_chunk->fixed_fields_[engine::FIELD_UID] = id_data;
467 468
    }

G
groot 已提交
469 470 471 472 473 474 475 476 477 478
    // insert entities: collection_name is field id
    wal::MXLogRecord record;
    record.lsn = 0;
    record.collection_id = collection_name;
    record.partition_tag = partition_name;
    record.data_chunk = data_chunk;
    record.length = data_chunk->count_;
    record.type = wal::MXLogType::Entity;

    STATUS_CHECK(ExecWalRecord(record));
479

480 481 482
    return Status::OK();
}

S
starlord 已提交
483
Status
C
Cai Yudong 已提交
484
DBImpl::GetEntityByID(const std::string& collection_name, const IDNumbers& id_array,
485 486
                      const std::vector<std::string>& field_names, std::vector<bool>& valid_row,
                      DataChunkPtr& data_chunk) {
C
Cai Yudong 已提交
487 488 489 490 491 492
    CHECK_INITIALIZED;

    snapshot::ScopedSnapshotT ss;
    STATUS_CHECK(snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_name));

    std::string dir_root = options_.meta_.path_;
493
    valid_row.resize(id_array.size(), false);
494 495
    auto handler =
        std::make_shared<GetEntityByIdSegmentHandler>(nullptr, ss, dir_root, id_array, field_names, valid_row);
C
Cai Yudong 已提交
496 497 498 499 500 501 502 503
    handler->Iterate();
    STATUS_CHECK(handler->GetStatus());

    data_chunk = handler->data_chunk_;
    return Status::OK();
}

Status
B
BossZou 已提交
504
DBImpl::DeleteEntityByID(const std::string& collection_name, const engine::IDNumbers& entity_ids) {
G
groot 已提交
505
    CHECK_INITIALIZED;
S
starlord 已提交
506

G
groot 已提交
507
    Status status;
G
groot 已提交
508 509 510 511 512 513
    wal::MXLogRecord record;
    record.lsn = 0;  // need to get from meta ?
    record.type = wal::MXLogType::Delete;
    record.collection_id = collection_name;
    record.ids = entity_ids.data();
    record.length = entity_ids.size();
Y
Yu Kun 已提交
514

G
groot 已提交
515
    status = ExecWalRecord(record);
G
groot 已提交
516

G
groot 已提交
517 518
    return status;
}
G
groot 已提交
519

G
groot 已提交
520
Status
C
Cai Yudong 已提交
521 522
DBImpl::Query(const server::ContextPtr& context, const query::QueryPtr& query_ptr, engine::QueryResultPtr& result) {
    CHECK_INITIALIZED;
G
groot 已提交
523

G
groot 已提交
524
    TimeRecorder rc("DBImpl::Query");
Y
Yu Kun 已提交
525

Y
yukun 已提交
526 527 528 529
    if (!query_ptr->root) {
        return Status{DB_ERROR, "BinaryQuery is null"};
    }

530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574
    snapshot::ScopedSnapshotT ss;
    STATUS_CHECK(snapshot::Snapshots::GetInstance().GetSnapshot(ss, query_ptr->collection_id));

    /* collect all valid segment */
    std::vector<SegmentVisitor::Ptr> segment_visitors;
    auto exec = [&](const snapshot::Segment::Ptr& segment, snapshot::SegmentIterator* handler) -> Status {
        auto p_id = segment->GetPartitionId();
        auto p_ptr = ss->GetResource<snapshot::Partition>(p_id);
        auto& p_name = p_ptr->GetName();

        /* check partition match pattern */
        bool match = false;
        if (query_ptr->partitions.empty()) {
            match = true;
        } else {
            for (auto& pattern : query_ptr->partitions) {
                if (StringHelpFunctions::IsRegexMatch(p_name, pattern)) {
                    match = true;
                    break;
                }
            }
        }

        if (match) {
            auto visitor = SegmentVisitor::Build(ss, segment->GetID());
            if (!visitor) {
                return Status(milvus::SS_ERROR, "Cannot build segment visitor");
            }
            segment_visitors.push_back(visitor);
        }
        return Status::OK();
    };

    auto segment_iter = std::make_shared<snapshot::SegmentIterator>(ss, exec);
    segment_iter->Iterate();
    STATUS_CHECK(segment_iter->GetStatus());

    LOG_ENGINE_DEBUG_ << LogOut("Engine query begin, segment count: %ld", segment_visitors.size());

    engine::snapshot::IDS_TYPE segment_ids;
    for (auto& sv : segment_visitors) {
        segment_ids.emplace_back(sv->GetSegment()->GetID());
    }

    scheduler::SearchJobPtr job = std::make_shared<scheduler::SearchJob>(nullptr, ss, options_, query_ptr, segment_ids);
575

G
groot 已提交
576
    cache::CpuCacheMgr::GetInstance().PrintInfo();  // print cache info before query
C
Cai Yudong 已提交
577 578 579
    /* put search job to scheduler and wait job finish */
    scheduler::JobMgrInst::GetInstance()->Put(job);
    job->WaitFinish();
G
groot 已提交
580
    cache::CpuCacheMgr::GetInstance().PrintInfo();  // print cache info after query
G
groot 已提交
581

C
Cai Yudong 已提交
582 583
    if (!job->status().ok()) {
        return job->status();
584 585
    }

Y
yukun 已提交
586 587 588
    if (job->query_result()) {
        result = job->query_result();
    }
Y
yukun 已提交
589 590

    // step 4: get entities by result ids
Y
yukun 已提交
591
    std::vector<bool> valid_row;
Y
yukun 已提交
592 593 594 595
    if (!query_ptr->field_names.empty()) {
        STATUS_CHECK(GetEntityByID(query_ptr->collection_id, result->result_ids_, query_ptr->field_names, valid_row,
                                   result->data_chunk_));
    }
Y
yukun 已提交
596 597

    // step 5: filter entities by field names
598 599 600 601 602 603 604 605 606 607 608 609 610
    //    std::vector<engine::AttrsData> filter_attrs;
    //    for (auto attr : result.attrs_) {
    //        AttrsData attrs_data;
    //        attrs_data.attr_type_ = attr.attr_type_;
    //        attrs_data.attr_count_ = attr.attr_count_;
    //        attrs_data.id_array_ = attr.id_array_;
    //        for (auto& name : field_names) {
    //            if (attr.attr_data_.find(name) != attr.attr_data_.end()) {
    //                attrs_data.attr_data_.insert(std::make_pair(name, attr.attr_data_.at(name)));
    //            }
    //        }
    //        filter_attrs.emplace_back(attrs_data);
    //    }
Y
yukun 已提交
611

612 613
    rc.ElapseFromBegin("Engine query totally cost");

G
groot 已提交
614
    // tracer.Context()->GetTraceContext()->GetSpan()->Finish();
615 616 617 618

    return Status::OK();
}

C
Cai Yudong 已提交
619 620 621 622 623 624 625 626
Status
DBImpl::ListIDInSegment(const std::string& collection_name, int64_t segment_id, IDNumbers& entity_ids) {
    CHECK_INITIALIZED;

    snapshot::ScopedSnapshotT ss;
    STATUS_CHECK(snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_name));

    auto read_visitor = engine::SegmentVisitor::Build(ss, segment_id);
627 628 629
    if (!read_visitor) {
        return Status(SERVER_FILE_NOT_FOUND, "Segment not exist");
    }
C
Cai Yudong 已提交
630 631 632 633 634
    segment::SegmentReaderPtr segment_reader =
        std::make_shared<segment::SegmentReader>(options_.meta_.path_, read_visitor);

    STATUS_CHECK(segment_reader->LoadUids(entity_ids));

G
groot 已提交
635 636 637 638 639 640 641 642 643 644
    // remove delete id from the id list
    segment::DeletedDocsPtr deleted_docs_ptr;
    STATUS_CHECK(segment_reader->LoadDeletedDocs(deleted_docs_ptr));
    if (deleted_docs_ptr) {
        const std::vector<offset_t>& delete_ids = deleted_docs_ptr->GetDeletedDocs();
        for (auto offset : delete_ids) {
            entity_ids.erase(entity_ids.begin() + offset, entity_ids.begin() + offset + 1);
        }
    }

C
Cai Yudong 已提交
645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679
    return Status::OK();
}

Status
DBImpl::LoadCollection(const server::ContextPtr& context, const std::string& collection_name,
                       const std::vector<std::string>& field_names, bool force) {
    CHECK_INITIALIZED;

    snapshot::ScopedSnapshotT ss;
    STATUS_CHECK(snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_name));

    auto handler = std::make_shared<LoadVectorFieldHandler>(context, ss);
    handler->Iterate();

    return handler->GetStatus();
}

Status
DBImpl::Flush(const std::string& collection_name) {
    if (!initialized_.load(std::memory_order_acquire)) {
        return SHUTDOWN_ERROR;
    }

    Status status;
    bool has_collection;
    status = HasCollection(collection_name, has_collection);
    if (!status.ok()) {
        return status;
    }
    if (!has_collection) {
        LOG_ENGINE_ERROR_ << "Collection to flush does not exist: " << collection_name;
        return Status(DB_NOT_FOUND, "Collection to flush does not exist");
    }

    LOG_ENGINE_DEBUG_ << "Begin flush collection: " << collection_name;
G
groot 已提交
680
    InternalFlush(collection_name);
C
Cai Yudong 已提交
681 682 683 684 685 686 687 688 689 690 691 692
    LOG_ENGINE_DEBUG_ << "End flush collection: " << collection_name;

    return status;
}

Status
DBImpl::Flush() {
    if (!initialized_.load(std::memory_order_acquire)) {
        return SHUTDOWN_ERROR;
    }

    LOG_ENGINE_DEBUG_ << "Begin flush all collections";
G
groot 已提交
693
    InternalFlush();
C
Cai Yudong 已提交
694 695
    LOG_ENGINE_DEBUG_ << "End flush all collections";

G
groot 已提交
696
    return Status::OK();
C
Cai Yudong 已提交
697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750
}

Status
DBImpl::Compact(const std::shared_ptr<server::Context>& context, const std::string& collection_name, double threshold) {
    if (!initialized_.load(std::memory_order_acquire)) {
        return SHUTDOWN_ERROR;
    }

    LOG_ENGINE_DEBUG_ << "Before compacting, wait for build index thread to finish...";
    const std::lock_guard<std::mutex> index_lock(build_index_mutex_);
    const std::lock_guard<std::mutex> merge_lock(flush_merge_compact_mutex_);

    Status status;
    bool has_collection;
    status = HasCollection(collection_name, has_collection);
    if (!status.ok()) {
        return status;
    }
    if (!has_collection) {
        LOG_ENGINE_ERROR_ << "Collection to compact does not exist: " << collection_name;
        return Status(DB_NOT_FOUND, "Collection to compact does not exist");
    }

    snapshot::ScopedSnapshotT latest_ss;
    status = snapshot::Snapshots::GetInstance().GetSnapshot(latest_ss, collection_name);
    if (!status.ok()) {
        return status;
    }

    auto& segments = latest_ss->GetResources<snapshot::Segment>();
    for (auto& kv : segments) {
        // client break the connection, no need to continue
        if (context && context->IsConnectionBroken()) {
            LOG_ENGINE_DEBUG_ << "Client connection broken, stop compact operation";
            break;
        }

        snapshot::ID_TYPE segment_id = kv.first;
        auto read_visitor = engine::SegmentVisitor::Build(latest_ss, segment_id);
        segment::SegmentReaderPtr segment_reader =
            std::make_shared<segment::SegmentReader>(options_.meta_.path_, read_visitor);

        segment::DeletedDocsPtr deleted_docs;
        status = segment_reader->LoadDeletedDocs(deleted_docs);
        if (!status.ok() || deleted_docs == nullptr) {
            continue;  // no deleted docs, no need to compact
        }

        auto segment_commit = latest_ss->GetSegmentCommitBySegmentId(segment_id);
        auto row_count = segment_commit->GetRowCount();
        if (row_count == 0) {
            continue;
        }

G
groot 已提交
751
        auto deleted_count = deleted_docs->GetCount();
752
        if (double(deleted_count) / (row_count + deleted_count) < threshold) {
C
Cai Yudong 已提交
753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768
            continue;  // no need to compact
        }

        snapshot::IDS_TYPE ids = {segment_id};
        MergeTask merge_task(options_, latest_ss, ids);
        status = merge_task.Execute();
        if (!status.ok()) {
            LOG_ENGINE_ERROR_ << "Compact failed for segment " << segment_reader->GetSegmentPath() << ": "
                              << status.message();
            continue;  // skip this file and try compact next one
        }
    }

    return status;
}

G
groot 已提交
769 770 771
////////////////////////////////////////////////////////////////////////////////
// Internal APIs
////////////////////////////////////////////////////////////////////////////////
S
starlord 已提交
772
void
G
groot 已提交
773 774 775 776 777 778 779 780 781
DBImpl::InternalFlush(const std::string& collection_name) {
    wal::MXLogRecord record;
    record.type = wal::MXLogType::Flush;
    record.collection_id = collection_name;
    ExecWalRecord(record);
}

void
DBImpl::TimingFlushThread() {
G
groot 已提交
782
    SetThreadName("timing_flush");
Y
yu yunfeng 已提交
783
    server::SystemInfo::GetInstance().Init();
X
Xu Peng 已提交
784
    while (true) {
785
        if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
786
            LOG_ENGINE_DEBUG_ << "DB background flush thread exit";
G
groot 已提交
787 788
            break;
        }
X
Xu Peng 已提交
789

G
groot 已提交
790 791 792 793 794 795
        InternalFlush();
        if (options_.auto_flush_interval_ > 0) {
            swn_flush_.Wait_For(std::chrono::seconds(options_.auto_flush_interval_));
        } else {
            swn_flush_.Wait();
        }
796 797 798
    }
}

S
starlord 已提交
799 800
void
DBImpl::StartMetricTask() {
G
groot 已提交
801
    server::Metrics::GetInstance().KeepingAliveCounterIncrement(BACKGROUND_METRIC_INTERVAL);
G
groot 已提交
802 803
    int64_t cache_usage = cache::CpuCacheMgr::GetInstance().CacheUsage();
    int64_t cache_total = cache::CpuCacheMgr::GetInstance().CacheCapacity();
S
shengjh 已提交
804 805
    fiu_do_on("DBImpl.StartMetricTask.InvalidTotalCache", cache_total = 0);

J
JinHai-CN 已提交
806 807 808 809 810 811 812
    if (cache_total > 0) {
        double cache_usage_double = cache_usage;
        server::Metrics::GetInstance().CpuCacheUsageGaugeSet(cache_usage_double * 100 / cache_total);
    } else {
        server::Metrics::GetInstance().CpuCacheUsageGaugeSet(0);
    }

Y
Yu Kun 已提交
813
    server::Metrics::GetInstance().GpuCacheUsageGaugeSet();
G
groot 已提交
814 815 816 817
    /* SS TODO */
    // uint64_t size;
    // Size(size);
    // server::Metrics::GetInstance().DataFileSizeGaugeSet(size);
G
groot 已提交
818 819 820 821 822
    server::Metrics::GetInstance().CPUUsagePercentSet();
    server::Metrics::GetInstance().RAMUsagePercentSet();
    server::Metrics::GetInstance().GPUPercentGaugeSet();
    server::Metrics::GetInstance().GPUMemoryUsageGaugeSet();
    server::Metrics::GetInstance().OctetsSet();
S
starlord 已提交
823

K
kun yu 已提交
824
    server::Metrics::GetInstance().CPUCoreUsagePercentSet();
K
kun yu 已提交
825 826
    server::Metrics::GetInstance().GPUTemperature();
    server::Metrics::GetInstance().CPUTemperature();
827
    server::Metrics::GetInstance().PushToGateway();
G
groot 已提交
828 829
}

S
starlord 已提交
830
void
G
groot 已提交
831
DBImpl::TimingMetricThread() {
G
groot 已提交
832
    SetThreadName("timing_metric");
G
groot 已提交
833 834
    server::SystemInfo::GetInstance().Init();
    while (true) {
835
        if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
836
            LOG_ENGINE_DEBUG_ << "DB background metric thread exit";
S
starlord 已提交
837 838
            break;
        }
Z
update  
zhiru 已提交
839

G
groot 已提交
840 841
        swn_metric_.Wait_For(std::chrono::seconds(BACKGROUND_METRIC_INTERVAL));
        StartMetricTask();
Z
update  
zhiru 已提交
842
    }
G
groot 已提交
843
}
X
Xu Peng 已提交
844

S
starlord 已提交
845
void
G
groot 已提交
846
DBImpl::StartBuildIndexTask(const std::vector<std::string>& collection_names) {
S
starlord 已提交
847
    // build index has been finished?
848 849 850 851 852 853 854
    {
        std::lock_guard<std::mutex> lck(index_result_mutex_);
        if (!index_thread_results_.empty()) {
            std::chrono::milliseconds span(10);
            if (index_thread_results_.back().wait_for(span) == std::future_status::ready) {
                index_thread_results_.pop_back();
            }
G
groot 已提交
855 856 857
        }
    }

S
starlord 已提交
858
    // add new build index task
859 860 861
    {
        std::lock_guard<std::mutex> lck(index_result_mutex_);
        if (index_thread_results_.empty()) {
G
groot 已提交
862 863
            index_thread_results_.push_back(
                index_thread_pool_.enqueue(&DBImpl::BackgroundBuildIndexTask, this, collection_names));
864
        }
G
groot 已提交
865
    }
X
Xu Peng 已提交
866 867
}

S
starlord 已提交
868
void
G
groot 已提交
869
DBImpl::BackgroundBuildIndexTask(std::vector<std::string> collection_names) {
G
groot 已提交
870 871
    SetThreadName("build_index");

P
peng.xu 已提交
872
    std::unique_lock<std::mutex> lock(build_index_mutex_);
873

G
groot 已提交
874
    for (auto collection_name : collection_names) {
G
groot 已提交
875 876 877 878 879 880
        snapshot::ScopedSnapshotT latest_ss;
        auto status = snapshot::Snapshots::GetInstance().GetSnapshot(latest_ss, collection_name);
        if (!status.ok()) {
            return;
        }
        SnapshotVisitor ss_visitor(latest_ss);
G
groot 已提交
881

G
groot 已提交
882 883
        snapshot::IDS_TYPE segment_ids;
        ss_visitor.SegmentsToIndex("", segment_ids);
G
groot 已提交
884 885 886
        if (segment_ids.empty()) {
            continue;
        }
T
Tinkerrr 已提交
887

G
groot 已提交
888
        LOG_ENGINE_DEBUG_ << "Create BuildIndexJob for " << segment_ids.size() << " segments of " << collection_name;
G
groot 已提交
889
        cache::CpuCacheMgr::GetInstance().PrintInfo();  // print cache info before build index
G
groot 已提交
890
        scheduler::BuildIndexJobPtr job = std::make_shared<scheduler::BuildIndexJob>(latest_ss, options_, segment_ids);
G
groot 已提交
891 892
        scheduler::JobMgrInst::GetInstance()->Put(job);
        job->WaitFinish();
G
groot 已提交
893
        cache::CpuCacheMgr::GetInstance().PrintInfo();  // print cache info after build index
G
groot 已提交
894

G
groot 已提交
895 896 897
        if (!job->status().ok()) {
            LOG_ENGINE_ERROR_ << job->status().message();
            break;
G
groot 已提交
898 899 900 901
        }
    }
}

G
groot 已提交
902 903
void
DBImpl::TimingIndexThread() {
G
groot 已提交
904
    SetThreadName("timing_index");
G
groot 已提交
905 906 907 908 909
    server::SystemInfo::GetInstance().Init();
    while (true) {
        if (!initialized_.load(std::memory_order_acquire)) {
            WaitMergeFileFinish();
            WaitBuildIndexFinish();
G
groot 已提交
910

G
groot 已提交
911 912
            LOG_ENGINE_DEBUG_ << "DB background thread exit";
            break;
G
groot 已提交
913 914
        }

G
groot 已提交
915
        swn_index_.Wait_For(std::chrono::seconds(BACKGROUND_INDEX_INTERVAL));
G
groot 已提交
916

G
groot 已提交
917 918 919 920 921 922
        std::vector<std::string> collection_names;
        snapshot::Snapshots::GetInstance().GetCollectionNames(collection_names);
        WaitMergeFileFinish();
        StartBuildIndexTask(collection_names);
    }
}
G
groot 已提交
923

G
groot 已提交
924 925 926 927 928 929 930 931
void
DBImpl::WaitBuildIndexFinish() {
    //    LOG_ENGINE_DEBUG_ << "Begin WaitBuildIndexFinish";
    std::lock_guard<std::mutex> lck(index_result_mutex_);
    for (auto& iter : index_thread_results_) {
        iter.wait();
    }
    //    LOG_ENGINE_DEBUG_ << "End WaitBuildIndexFinish";
G
groot 已提交
932 933
}

G
groot 已提交
934 935
void
DBImpl::TimingWalThread() {
G
groot 已提交
936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992
    //    SetThreadName("wal_thread");
    //    server::SystemInfo::GetInstance().Init();
    //
    //    std::chrono::system_clock::time_point next_auto_flush_time;
    //    auto get_next_auto_flush_time = [&]() {
    //        return std::chrono::system_clock::now() + std::chrono::seconds(options_.auto_flush_interval_);
    //    };
    //    if (options_.auto_flush_interval_ > 0) {
    //        next_auto_flush_time = get_next_auto_flush_time();
    //    }
    //
    //    InternalFlush();
    //    while (true) {
    //        if (options_.auto_flush_interval_ > 0) {
    //            if (std::chrono::system_clock::now() >= next_auto_flush_time) {
    //                InternalFlush();
    //                next_auto_flush_time = get_next_auto_flush_time();
    //            }
    //        }
    //
    //        wal::MXLogRecord record;
    //        auto error_code = wal_mgr_->GetNextRecord(record);
    //        if (error_code != WAL_SUCCESS) {
    //            LOG_ENGINE_ERROR_ << "WAL background GetNextRecord error";
    //            break;
    //        }
    //
    //        if (record.type != wal::MXLogType::None) {
    //            ExecWalRecord(record);
    //            if (record.type == wal::MXLogType::Flush) {
    //                // notify flush request to return
    //                flush_req_swn_.Notify();
    //
    //                // if user flush all manually, update auto flush also
    //                if (record.collection_id.empty() && options_.auto_flush_interval_ > 0) {
    //                    next_auto_flush_time = get_next_auto_flush_time();
    //                }
    //            }
    //
    //        } else {
    //            if (!initialized_.load(std::memory_order_acquire)) {
    //                InternalFlush();
    //                flush_req_swn_.Notify();
    //                // SS TODO
    //                // WaitMergeFileFinish();
    //                // WaitBuildIndexFinish();
    //                LOG_ENGINE_DEBUG_ << "WAL background thread exit";
    //                break;
    //            }
    //
    //            if (options_.auto_flush_interval_ > 0) {
    //                swn_wal_.Wait_Until(next_auto_flush_time);
    //            } else {
    //                swn_wal_.Wait();
    //            }
    //        }
    //    }
G
groot 已提交
993 994
}

995 996
Status
DBImpl::ExecWalRecord(const wal::MXLogRecord& record) {
G
groot 已提交
997
    auto force_flush_if_mem_full = [&]() -> void {
G
groot 已提交
998 999 1000
        if (mem_mgr_->GetCurrentMem() > options_.insert_buffer_size_) {
            LOG_ENGINE_DEBUG_ << LogOut("[%s][%ld] ", "insert", 0) << "Insert buffer size exceeds limit. Force flush";
            InternalFlush();
G
groot 已提交
1001 1002 1003
        }
    };

G
groot 已提交
1004 1005 1006 1007 1008 1009 1010
    auto get_collection_partition_id = [&](const wal::MXLogRecord& record, int64_t& col_id,
                                           int64_t& part_id) -> Status {
        snapshot::ScopedSnapshotT ss;
        auto status = snapshot::Snapshots::GetInstance().GetSnapshot(ss, record.collection_id);
        if (!status.ok()) {
            LOG_ENGINE_ERROR_ << LogOut("[%s][%ld] ", "insert", 0) << "Get snapshot fail: " << status.message();
            return status;
1011
        }
G
groot 已提交
1012 1013 1014 1015 1016 1017 1018
        col_id = ss->GetCollectionId();
        snapshot::PartitionPtr part = ss->GetPartition(record.partition_tag);
        if (part == nullptr) {
            LOG_ENGINE_ERROR_ << LogOut("[%s][%ld] ", "insert", 0) << "Get partition fail: " << status.message();
            return status;
        }
        part_id = part->GetID();
1019

G
groot 已提交
1020 1021
        return Status::OK();
    };
1022

G
groot 已提交
1023
    Status status;
1024

G
groot 已提交
1025 1026 1027
    switch (record.type) {
        case wal::MXLogType::Entity: {
            int64_t collection_name = 0, partition_id = 0;
1028
            status = get_collection_partition_id(record, collection_name, partition_id);
1029
            if (!status.ok()) {
G
groot 已提交
1030
                LOG_WAL_ERROR_ << LogOut("[%s][%ld] ", "insert", 0) << status.message();
1031 1032 1033
                return status;
            }

G
groot 已提交
1034
            status = mem_mgr_->InsertEntities(collection_name, partition_id, record.data_chunk, record.lsn);
G
groot 已提交
1035
            force_flush_if_mem_full();
1036 1037 1038 1039 1040 1041 1042

            // metrics
            milvus::server::CollectInsertMetrics metrics(record.length, status);
            break;
        }

        case wal::MXLogType::Delete: {
G
groot 已提交
1043
            snapshot::ScopedSnapshotT ss;
1044
            status = snapshot::Snapshots::GetInstance().GetSnapshot(ss, record.collection_id);
1045
            if (!status.ok()) {
G
groot 已提交
1046
                LOG_WAL_ERROR_ << LogOut("[%s][%ld] ", "delete", 0) << "Get snapshot fail: " << status.message();
1047 1048 1049
                return status;
            }

G
groot 已提交
1050 1051 1052 1053 1054 1055
            std::vector<id_t> delete_ids;
            delete_ids.resize(record.length);
            memcpy(delete_ids.data(), record.ids, record.length * sizeof(id_t));
            status = mem_mgr_->DeleteEntities(ss->GetCollectionId(), delete_ids, record.lsn);
            if (!status.ok()) {
                return status;
1056
            }
G
groot 已提交
1057

1058 1059 1060 1061
            break;
        }

        case wal::MXLogType::Flush: {
J
Jin Hai 已提交
1062 1063
            if (!record.collection_id.empty()) {
                // flush one collection
G
groot 已提交
1064
                snapshot::ScopedSnapshotT ss;
1065
                status = snapshot::Snapshots::GetInstance().GetSnapshot(ss, record.collection_id);
1066
                if (!status.ok()) {
G
groot 已提交
1067
                    LOG_WAL_ERROR_ << LogOut("[%s][%ld] ", "flush", 0) << "Get snapshot fail: " << status.message();
1068 1069 1070
                    return status;
                }

G
groot 已提交
1071 1072 1073 1074 1075 1076 1077
                {
                    const std::lock_guard<std::mutex> lock(flush_merge_compact_mutex_);
                    int64_t collection_id = ss->GetCollectionId();
                    status = mem_mgr_->Flush(collection_id);
                    if (!status.ok()) {
                        return status;
                    }
1078 1079
                }

1080
                std::set<std::string> flushed_collections;
G
groot 已提交
1081 1082
                flushed_collections.insert(record.collection_id);
                StartMergeTask(flushed_collections);
1083 1084

            } else {
1085
                // flush all collections
G
groot 已提交
1086
                std::set<int64_t> collection_ids;
1087 1088
                {
                    const std::lock_guard<std::mutex> lock(flush_merge_compact_mutex_);
G
groot 已提交
1089
                    status = mem_mgr_->Flush(collection_ids);
G
groot 已提交
1090 1091 1092
                }

                std::set<std::string> flushed_collections;
G
groot 已提交
1093
                for (auto id : collection_ids) {
G
groot 已提交
1094
                    snapshot::ScopedSnapshotT ss;
1095
                    status = snapshot::Snapshots::GetInstance().GetSnapshot(ss, id);
G
groot 已提交
1096 1097 1098 1099 1100 1101
                    if (!status.ok()) {
                        LOG_WAL_ERROR_ << LogOut("[%s][%ld] ", "flush", 0) << "Get snapshot fail: " << status.message();
                        return status;
                    }

                    flushed_collections.insert(ss->GetName());
1102 1103
                }

G
groot 已提交
1104
                StartMergeTask(flushed_collections);
1105 1106 1107
            }
            break;
        }
C
Cai Yudong 已提交
1108 1109 1110

        default:
            break;
1111 1112 1113 1114 1115 1116
    }

    return status;
}

void
G
groot 已提交
1117 1118 1119 1120 1121 1122 1123 1124 1125
DBImpl::StartMergeTask(const std::set<std::string>& collection_names, bool force_merge_all) {
    // LOG_ENGINE_DEBUG_ << "Begin StartMergeTask";
    // merge task has been finished?
    {
        std::lock_guard<std::mutex> lck(merge_result_mutex_);
        if (!merge_thread_results_.empty()) {
            std::chrono::milliseconds span(10);
            if (merge_thread_results_.back().wait_for(span) == std::future_status::ready) {
                merge_thread_results_.pop_back();
1126
            }
1127
        }
G
groot 已提交
1128
    }
1129

G
groot 已提交
1130 1131 1132 1133 1134 1135 1136
    // add new merge task
    {
        std::lock_guard<std::mutex> lck(merge_result_mutex_);
        if (merge_thread_results_.empty()) {
            // start merge file thread
            merge_thread_results_.push_back(
                merge_thread_pool_.enqueue(&DBImpl::BackgroundMerge, this, collection_names, force_merge_all));
1137 1138
        }
    }
G
groot 已提交
1139 1140

    // LOG_ENGINE_DEBUG_ << "End StartMergeTask";
1141 1142
}

G
groot 已提交
1143
void
G
groot 已提交
1144
DBImpl::BackgroundMerge(std::set<std::string> collection_names, bool force_merge_all) {
G
groot 已提交
1145
    SetThreadName("merge");
G
groot 已提交
1146 1147 1148 1149

    for (auto& collection_name : collection_names) {
        const std::lock_guard<std::mutex> lock(flush_merge_compact_mutex_);

G
groot 已提交
1150
        auto status = merge_mgr_ptr_->MergeFiles(collection_name);
G
groot 已提交
1151 1152 1153
        if (!status.ok()) {
            LOG_ENGINE_ERROR_ << "Failed to get merge files for collection: " << collection_name
                              << " reason:" << status.message();
G
groot 已提交
1154 1155 1156
        }

        if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
1157
            LOG_ENGINE_DEBUG_ << "Server will shutdown, skip merge action for collection: " << collection_name;
G
groot 已提交
1158 1159 1160 1161 1162
            break;
        }
    }
}

1163
void
G
groot 已提交
1164 1165 1166 1167 1168 1169 1170
DBImpl::WaitMergeFileFinish() {
    //    LOG_ENGINE_DEBUG_ << "Begin WaitMergeFileFinish";
    std::lock_guard<std::mutex> lck(merge_result_mutex_);
    for (auto& iter : merge_thread_results_) {
        iter.wait();
    }
    //    LOG_ENGINE_DEBUG_ << "End WaitMergeFileFinish";
1171 1172
}

1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190
void
DBImpl::SuspendIfFirst() {
    std::lock_guard<std::mutex> lock(suspend_build_mutex_);
    if (++live_search_num_ == 1) {
        LOG_ENGINE_TRACE_ << "live_search_num_: " << live_search_num_;
        knowhere::BuilderSuspend();
    }
}

void
DBImpl::ResumeIfLast() {
    std::lock_guard<std::mutex> lock(suspend_build_mutex_);
    if (--live_search_num_ == 0) {
        LOG_ENGINE_TRACE_ << "live_search_num_: " << live_search_num_;
        knowhere::BuildResume();
    }
}

C
Cai Yudong 已提交
1191 1192 1193 1194 1195 1196 1197
void
DBImpl::ConfigUpdate(const std::string& name) {
    if (name == "storage.auto_flush_interval") {
        options_.auto_flush_interval_ = config.storage.auto_flush_interval();
    }
}

S
starlord 已提交
1198 1199
}  // namespace engine
}  // namespace milvus