DBImpl.cpp 35.4 KB
Newer Older
1
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
J
jinhai 已提交
2
//
3 4
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
J
jinhai 已提交
5
//
6 7 8 9 10
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
J
jinhai 已提交
11

S
starlord 已提交
12
#include "db/DBImpl.h"
S
starlord 已提交
13
#include "cache/CpuCacheMgr.h"
14
#include "codecs/Codec.h"
W
Wang XiangYu 已提交
15
#include "config/ServerConfig.h"
16
#include "db/IDGenerator.h"
G
groot 已提交
17 18
#include "db/SnapshotUtils.h"
#include "db/SnapshotVisitor.h"
G
groot 已提交
19
#include "db/merge/MergeManagerFactory.h"
G
groot 已提交
20 21 22 23 24 25 26 27
#include "db/merge/MergeTask.h"
#include "db/snapshot/CompoundOperations.h"
#include "db/snapshot/EventExecutor.h"
#include "db/snapshot/IterateHandler.h"
#include "db/snapshot/OperationExecutor.h"
#include "db/snapshot/ResourceHelper.h"
#include "db/snapshot/ResourceTypes.h"
#include "db/snapshot/Snapshots.h"
28
#include "insert/MemManagerFactory.h"
G
groot 已提交
29
#include "knowhere/index/vector_index/helpers/BuilderSuspend.h"
X
Xiaohai Xu 已提交
30
#include "knowhere/index/vector_index/helpers/FaissIO.h"
G
groot 已提交
31
#include "metrics/Metrics.h"
G
groot 已提交
32
#include "metrics/SystemInfo.h"
G
groot 已提交
33
#include "scheduler/Definition.h"
S
starlord 已提交
34
#include "scheduler/SchedInst.h"
S
starlord 已提交
35
#include "scheduler/job/SearchJob.h"
36 37 38
#include "segment/SegmentReader.h"
#include "segment/SegmentWriter.h"
#include "utils/Exception.h"
G
groot 已提交
39
#include "utils/StringHelpFunctions.h"
S
starlord 已提交
40
#include "utils/TimeRecorder.h"
X
Xu Peng 已提交
41

42
#include <fiu/fiu-local.h>
G
groot 已提交
43 44 45
#include <src/scheduler/job/BuildIndexJob.h>
#include <limits>
#include <utility>
46

J
jinhai 已提交
47
namespace milvus {
X
Xu Peng 已提交
48
namespace engine {
X
Xu Peng 已提交
49

G
groot 已提交
50
namespace {
G
groot 已提交
51 52
constexpr uint64_t BACKGROUND_METRIC_INTERVAL = 1;
constexpr uint64_t BACKGROUND_INDEX_INTERVAL = 1;
G
groot 已提交
53
constexpr uint64_t WAIT_BUILD_INDEX_INTERVAL = 5;
G
groot 已提交
54

G
groot 已提交
55
static const Status SHUTDOWN_ERROR = Status(DB_ERROR, "Milvus server is shutdown!");
S
starlord 已提交
56
}  // namespace
G
groot 已提交
57

G
groot 已提交
58 59 60 61 62
#define CHECK_INITIALIZED                                \
    if (!initialized_.load(std::memory_order_acquire)) { \
        return SHUTDOWN_ERROR;                           \
    }

Y
Yu Kun 已提交
63
DBImpl::DBImpl(const DBOptions& options)
64
    : options_(options), initialized_(false), merge_thread_pool_(1, 1), index_thread_pool_(1, 1) {
G
groot 已提交
65 66
    mem_mgr_ = MemManagerFactory::Build(options_);
    merge_mgr_ptr_ = MergeManagerFactory::SSBuild(options_);
67

C
Cai Yudong 已提交
68 69 70
    /* watch on storage.auto_flush_interval */
    ConfigMgr::GetInstance().Attach("storage.auto_flush_interval", this);

C
cqy123456 已提交
71
    DBImpl::Start();
S
starlord 已提交
72 73 74
}

DBImpl::~DBImpl() {
C
Cai Yudong 已提交
75 76
    ConfigMgr::GetInstance().Detach("storage.auto_flush_interval", this);

C
cqy123456 已提交
77
    DBImpl::Stop();
S
starlord 已提交
78 79
}

G
groot 已提交
80 81 82
////////////////////////////////////////////////////////////////////////////////
// External APIs
////////////////////////////////////////////////////////////////////////////////
S
starlord 已提交
83 84
Status
DBImpl::Start() {
85
    if (initialized_.load(std::memory_order_acquire)) {
S
starlord 已提交
86 87 88
        return Status::OK();
    }

G
groot 已提交
89
    // snapshot
90 91
    auto store = snapshot::Store::Build(options_.meta_.backend_uri_, options_.meta_.path_,
                                        codec::Codec::instance().GetSuffixSet());
G
groot 已提交
92 93 94 95 96 97
    snapshot::OperationExecutor::Init(store);
    snapshot::OperationExecutor::GetInstance().Start();
    snapshot::EventExecutor::Init(store);
    snapshot::EventExecutor::GetInstance().Start();
    snapshot::Snapshots::GetInstance().Init(store);

X
Xiaohai Xu 已提交
98 99
    knowhere::enable_faiss_logging();

100
    // LOG_ENGINE_TRACE_ << "DB service start";
101
    initialized_.store(true, std::memory_order_release);
S
starlord 已提交
102

G
groot 已提交
103
    // TODO: merge files
G
groot 已提交
104

G
groot 已提交
105 106 107 108
    // for distribute version, some nodes are read only
    if (options_.mode_ != DBOptions::MODE::CLUSTER_READONLY) {
        // background flush thread
        bg_flush_thread_ = std::thread(&DBImpl::TimingFlushThread, this);
Z
update  
zhiru 已提交
109
    }
S
starlord 已提交
110

G
groot 已提交
111 112 113
    // for distribute version, some nodes are read only
    if (options_.mode_ != DBOptions::MODE::CLUSTER_READONLY) {
        // background build index thread
G
groot 已提交
114
        bg_index_thread_ = std::thread(&DBImpl::TimingIndexThread, this);
G
groot 已提交
115 116 117
    }

    // background metric thread
Y
yukun 已提交
118
    fiu_do_on("options_metric_enable", options_.metric_enable_ = true);
G
groot 已提交
119
    if (options_.metric_enable_) {
G
groot 已提交
120
        bg_metric_thread_ = std::thread(&DBImpl::TimingMetricThread, this);
G
groot 已提交
121
    }
G
groot 已提交
122

S
starlord 已提交
123 124 125
    return Status::OK();
}

S
starlord 已提交
126 127
Status
DBImpl::Stop() {
128
    if (!initialized_.load(std::memory_order_acquire)) {
S
starlord 已提交
129 130
        return Status::OK();
    }
131

132
    initialized_.store(false, std::memory_order_release);
S
starlord 已提交
133

134
    if (options_.mode_ != DBOptions::MODE::CLUSTER_READONLY) {
G
groot 已提交
135
        // flush all without merge
G
groot 已提交
136
        InternalFlush("", false);
G
groot 已提交
137 138 139 140

        // wait flush thread finish
        swn_flush_.Notify();
        bg_flush_thread_.join();
S
starlord 已提交
141

142 143
        WaitMergeFileFinish();

G
groot 已提交
144 145
        swn_index_.Notify();
        bg_index_thread_.join();
S
starlord 已提交
146 147
    }

G
groot 已提交
148
    // wait metric thread exit
G
groot 已提交
149 150 151 152
    if (options_.metric_enable_) {
        swn_metric_.Notify();
        bg_metric_thread_.join();
    }
G
groot 已提交
153

G
groot 已提交
154 155 156
    snapshot::EventExecutor::GetInstance().Stop();
    snapshot::OperationExecutor::GetInstance().Stop();

157
    // LOG_ENGINE_TRACE_ << "DB service stop";
S
starlord 已提交
158
    return Status::OK();
X
Xu Peng 已提交
159 160
}

S
starlord 已提交
161
Status
G
groot 已提交
162 163 164 165
DBImpl::CreateCollection(const snapshot::CreateCollectionContext& context) {
    CHECK_INITIALIZED;

    auto ctx = context;
G
groot 已提交
166 167 168 169 170 171 172 173 174 175

    // default id is auto-generated
    auto params = ctx.collection->GetParams();
    if (params.find(PARAM_UID_AUTOGEN) == params.end()) {
        params[PARAM_UID_AUTOGEN] = true;
        ctx.collection->SetParams(params);
    }

    // check uid existence
    snapshot::FieldPtr uid_field;
G
groot 已提交
176
    for (auto& pair : ctx.fields_schema) {
G
groot 已提交
177
        if (pair.first->GetName() == FIELD_UID) {
G
groot 已提交
178
            uid_field = pair.first;
G
groot 已提交
179 180 181
            break;
        }
    }
S
starlord 已提交
182

G
groot 已提交
183
    // add uid field if not specified
G
groot 已提交
184
    if (uid_field == nullptr) {
G
groot 已提交
185
        uid_field = std::make_shared<snapshot::Field>(FIELD_UID, 0, DataType::INT64);
S
starlord 已提交
186 187
    }

G
groot 已提交
188 189
    // define uid elements
    auto bloom_filter_element = std::make_shared<snapshot::FieldElement>(
G
groot 已提交
190
        0, 0, ELEMENT_BLOOM_FILTER, milvus::engine::FieldElementType::FET_BLOOM_FILTER);
G
groot 已提交
191
    auto delete_doc_element = std::make_shared<snapshot::FieldElement>(
G
groot 已提交
192
        0, 0, ELEMENT_DELETED_DOCS, milvus::engine::FieldElementType::FET_DELETED_DOCS);
G
groot 已提交
193 194
    ctx.fields_schema[uid_field] = {bloom_filter_element, delete_doc_element};

G
groot 已提交
195 196
    auto op = std::make_shared<snapshot::CreateCollectionOperation>(ctx);
    return op->Push();
197 198
}

199
Status
G
groot 已提交
200 201 202
DBImpl::DropCollection(const std::string& name) {
    CHECK_INITIALIZED;

C
Cai Yudong 已提交
203
    LOG_ENGINE_DEBUG_ << "Prepare to drop collection " << name;
G
groot 已提交
204 205 206 207 208

    snapshot::ScopedSnapshotT ss;
    auto& snapshots = snapshot::Snapshots::GetInstance();
    STATUS_CHECK(snapshots.GetSnapshot(ss, name));

G
groot 已提交
209
    mem_mgr_->EraseMem(ss->GetCollectionId());  // not allow insert
G
groot 已提交
210 211

    return snapshots.DropCollection(ss->GetCollectionId(), std::numeric_limits<snapshot::LSN_TYPE>::max());
212 213
}

S
starlord 已提交
214
Status
G
groot 已提交
215 216
DBImpl::HasCollection(const std::string& collection_name, bool& has_or_not) {
    CHECK_INITIALIZED;
S
starlord 已提交
217

G
groot 已提交
218 219 220
    snapshot::ScopedSnapshotT ss;
    auto status = snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_name);
    has_or_not = status.ok();
221

222
    return Status::OK();
G
groot 已提交
223 224 225
}

Status
C
Cai Yudong 已提交
226
DBImpl::ListCollections(std::vector<std::string>& names) {
G
groot 已提交
227
    CHECK_INITIALIZED;
228

G
groot 已提交
229 230 231
    names.clear();
    return snapshot::Snapshots::GetInstance().GetCollectionNames(names);
}
232

G
groot 已提交
233
Status
C
Cai Yudong 已提交
234
DBImpl::GetCollectionInfo(const std::string& collection_name, snapshot::CollectionPtr& collection,
C
Cai Yudong 已提交
235
                          snapshot::FieldElementMappings& fields_schema) {
G
groot 已提交
236
    CHECK_INITIALIZED;
237

C
Cai Yudong 已提交
238 239
    snapshot::ScopedSnapshotT ss;
    STATUS_CHECK(snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_name));
240

C
Cai Yudong 已提交
241 242 243 244 245
    collection = ss->GetCollection();
    auto& fields = ss->GetResources<snapshot::Field>();
    for (auto& kv : fields) {
        fields_schema[kv.second.Get()] = ss->GetFieldElementsByField(kv.second->GetName());
    }
246
    return Status::OK();
G
groot 已提交
247 248
}

S
starlord 已提交
249
Status
G
groot 已提交
250
DBImpl::GetCollectionStats(const std::string& collection_name, milvus::json& collection_stats) {
G
groot 已提交
251 252
    CHECK_INITIALIZED;

G
groot 已提交
253
    STATUS_CHECK(GetSnapshotInfo(collection_name, collection_stats));
G
groot 已提交
254
    return Status::OK();
255 256
}

S
starlord 已提交
257
Status
C
Cai Yudong 已提交
258
DBImpl::CountEntities(const std::string& collection_name, int64_t& row_count) {
G
groot 已提交
259
    CHECK_INITIALIZED;
S
starlord 已提交
260

G
groot 已提交
261 262 263
    snapshot::ScopedSnapshotT ss;
    STATUS_CHECK(snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_name));

C
Cai Yudong 已提交
264 265
    row_count = ss->GetCollectionCommit()->GetRowCount();
    return Status::OK();
266 267
}

268
Status
G
groot 已提交
269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284
DBImpl::CreatePartition(const std::string& collection_name, const std::string& partition_name) {
    CHECK_INITIALIZED;

    snapshot::ScopedSnapshotT ss;
    STATUS_CHECK(snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_name));

    snapshot::LSN_TYPE lsn = 0;
    snapshot::OperationContext context;
    context.lsn = lsn;
    auto op = std::make_shared<snapshot::CreatePartitionOperation>(context, ss);

    snapshot::PartitionContext p_ctx;
    p_ctx.name = partition_name;
    snapshot::PartitionPtr partition;
    STATUS_CHECK(op->CommitNewPartition(p_ctx, partition));
    return op->Push();
285 286
}

S
starlord 已提交
287
Status
G
groot 已提交
288 289
DBImpl::DropPartition(const std::string& collection_name, const std::string& partition_name) {
    CHECK_INITIALIZED;
S
starlord 已提交
290

G
groot 已提交
291 292
    snapshot::ScopedSnapshotT ss;
    STATUS_CHECK(snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_name));
293

G
groot 已提交
294
    // SS TODO: Is below step needed? Or How to implement it?
G
groot 已提交
295
    /* mem_mgr_->EraseMem(partition_name); */
296

G
groot 已提交
297 298 299 300
    snapshot::PartitionContext context;
    context.name = partition_name;
    auto op = std::make_shared<snapshot::DropPartitionOperation>(context, ss);
    return op->Push();
G
groot 已提交
301 302
}

303
Status
C
Cai Yudong 已提交
304
DBImpl::HasPartition(const std::string& collection_name, const std::string& partition_tag, bool& exist) {
G
groot 已提交
305
    CHECK_INITIALIZED;
306

G
groot 已提交
307 308
    snapshot::ScopedSnapshotT ss;
    STATUS_CHECK(snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_name));
309

C
Cai Yudong 已提交
310 311 312 313 314 315 316 317 318
    auto partition_tags = std::move(ss->GetPartitionNames());
    for (auto& tag : partition_tags) {
        if (tag == partition_tag) {
            exist = true;
            return Status::OK();
        }
    }

    exist = false;
G
groot 已提交
319 320
    return Status::OK();
}
321

G
groot 已提交
322
Status
C
Cai Yudong 已提交
323
DBImpl::ListPartitions(const std::string& collection_name, std::vector<std::string>& partition_names) {
G
groot 已提交
324
    CHECK_INITIALIZED;
325

G
groot 已提交
326 327
    snapshot::ScopedSnapshotT ss;
    STATUS_CHECK(snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_name));
328

C
Cai Yudong 已提交
329 330 331 332 333 334 335 336 337
    partition_names = std::move(ss->GetPartitionNames());
    return Status::OK();
}

Status
DBImpl::CreateIndex(const std::shared_ptr<server::Context>& context, const std::string& collection_name,
                    const std::string& field_name, const CollectionIndex& index) {
    CHECK_INITIALIZED;

G
groot 已提交
338 339
    LOG_ENGINE_DEBUG_ << "Create index for collection: " << collection_name << " field: " << field_name;

C
Cai Yudong 已提交
340 341 342 343 344 345 346 347 348 349 350 351 352 353
    // step 1: wait merge file thread finished to avoid duplicate data bug
    auto status = Flush();
    WaitMergeFileFinish();  // let merge file thread finish

    // step 2: compare old index and new index
    CollectionIndex new_index = index;
    CollectionIndex old_index;
    STATUS_CHECK(GetSnapshotIndex(collection_name, field_name, old_index));

    if (utils::IsSameIndex(old_index, new_index)) {
        return Status::OK();  // same index
    }

    // step 3: drop old index
G
groot 已提交
354
    DropIndex(collection_name, field_name);
C
Cai Yudong 已提交
355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382
    WaitMergeFileFinish();  // let merge file thread finish since DropIndex start a merge task

    // step 4: create field element for index
    status = SetSnapshotIndex(collection_name, field_name, new_index);
    if (!status.ok()) {
        return status;
    }

    // step 5: start background build index thread
    std::vector<std::string> collection_names = {collection_name};
    WaitBuildIndexFinish();
    StartBuildIndexTask(collection_names);

    // step 6: iterate segments need to be build index, wait until all segments are built
    while (true) {
        SnapshotVisitor ss_visitor(collection_name);
        snapshot::IDS_TYPE segment_ids;
        ss_visitor.SegmentsToIndex(field_name, segment_ids);
        if (segment_ids.empty()) {
            break;
        }

        index_req_swn_.Wait_For(std::chrono::seconds(1));

        // client break the connection, no need to block, check every 1 second
        if (context && context->IsConnectionBroken()) {
            LOG_ENGINE_DEBUG_ << "Client connection broken, build index in background";
            break;  // just break, not return, continue to update partitions files to to_index
383
        }
G
groot 已提交
384
    }
385

G
groot 已提交
386 387
    return Status::OK();
}
388

G
groot 已提交
389
Status
C
Cai Yudong 已提交
390 391 392
DBImpl::DropIndex(const std::string& collection_name, const std::string& field_name) {
    CHECK_INITIALIZED;

G
groot 已提交
393
    LOG_ENGINE_DEBUG_ << "Drop index for collection: " << collection_name << " field: " << field_name;
C
Cai Yudong 已提交
394 395 396 397 398

    STATUS_CHECK(DeleteSnapshotIndex(collection_name, field_name));

    std::set<std::string> merge_collection_names = {collection_name};
    StartMergeTask(merge_collection_names, true);
G
groot 已提交
399

C
Cai Yudong 已提交
400 401 402 403
    return Status::OK();
}

Status
G
groot 已提交
404
DBImpl::DescribeIndex(const std::string& collection_name, const std::string& field_name, CollectionIndex& index) {
C
Cai Yudong 已提交
405 406
    CHECK_INITIALIZED;

G
groot 已提交
407
    LOG_ENGINE_DEBUG_ << "Describe index for collection: " << collection_name << " field: " << field_name;
C
Cai Yudong 已提交
408

G
groot 已提交
409
    STATUS_CHECK(GetSnapshotIndex(collection_name, field_name, index));
C
Cai Yudong 已提交
410 411 412 413 414

    return Status::OK();
}

Status
G
groot 已提交
415
DBImpl::Insert(const std::string& collection_name, const std::string& partition_name, DataChunkPtr& data_chunk,
G
groot 已提交
416
               idx_t op_id) {
G
groot 已提交
417
    CHECK_INITIALIZED;
418

G
groot 已提交
419 420 421
    if (data_chunk == nullptr) {
        return Status(DB_ERROR, "Null pointer");
    }
422

G
groot 已提交
423 424 425 426 427 428
    snapshot::ScopedSnapshotT ss;
    STATUS_CHECK(snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_name));

    auto partition_ptr = ss->GetPartition(partition_name);
    if (partition_ptr == nullptr) {
        return Status(DB_NOT_FOUND, "Fail to get partition " + partition_name);
429 430
    }

G
groot 已提交
431
    auto id_field = ss->GetField(FIELD_UID);
G
groot 已提交
432 433 434 435
    if (id_field == nullptr) {
        return Status(DB_ERROR, "Field '_id' not found");
    }

G
groot 已提交
436
    auto& params = ss->GetCollection()->GetParams();
G
groot 已提交
437 438 439 440 441 442
    bool auto_increment = true;
    if (params.find(PARAM_UID_AUTOGEN) != params.end()) {
        auto_increment = params[PARAM_UID_AUTOGEN];
    }

    FIXEDX_FIELD_MAP& fields = data_chunk->fixed_fields_;
G
groot 已提交
443
    auto pair = fields.find(engine::FIELD_UID);
G
groot 已提交
444
    if (auto_increment) {
G
groot 已提交
445
        // id is auto increment, but client provides id, return error
G
groot 已提交
446 447 448
        if (pair != fields.end() && pair->second != nullptr) {
            return Status(DB_ERROR, "Field '_id' is auto increment, no need to provide id");
        }
G
groot 已提交
449 450
    } else {
        // id is not auto increment, but client doesn't provide id, return error
G
groot 已提交
451 452 453 454 455 456 457
        if (pair == fields.end() || pair->second == nullptr) {
            return Status(DB_ERROR, "Field '_id' is user defined");
        }
    }

    // generate id
    if (auto_increment) {
G
groot 已提交
458 459 460
        SafeIDGenerator& id_generator = SafeIDGenerator::GetInstance();
        IDNumbers ids;
        STATUS_CHECK(id_generator.GetNextIDNumbers(data_chunk->count_, ids));
G
groot 已提交
461 462 463
        BinaryDataPtr id_data = std::make_shared<BinaryData>();
        id_data->data_.resize(ids.size() * sizeof(int64_t));
        memcpy(id_data->data_.data(), ids.data(), ids.size() * sizeof(int64_t));
G
groot 已提交
464
        data_chunk->fixed_fields_[engine::FIELD_UID] = id_data;
465 466
    }

G
groot 已提交
467
    // insert entities: collection_name is field id
G
groot 已提交
468 469 470 471 472 473 474 475
    snapshot::PartitionPtr part = ss->GetPartition(partition_name);
    if (part == nullptr) {
        LOG_ENGINE_ERROR_ << LogOut("[%s][%ld] ", "insert", 0) << "Get partition fail: " << partition_name;
        return Status(DB_ERROR, "Invalid partiiton name");
    }

    int64_t collection_id = ss->GetCollectionId();
    int64_t partition_id = part->GetID();
G
groot 已提交
476

G
groot 已提交
477
    auto status = mem_mgr_->InsertEntities(collection_id, partition_id, data_chunk, op_id);
G
groot 已提交
478 479 480 481 482 483 484 485 486 487
    if (!status.ok()) {
        return status;
    }
    if (mem_mgr_->GetCurrentMem() > options_.insert_buffer_size_) {
        LOG_ENGINE_DEBUG_ << LogOut("[%s][%ld] ", "insert", 0) << "Insert buffer size exceeds limit. Force flush";
        InternalFlush();
    }

    // metrics
    milvus::server::CollectInsertMetrics metrics(data_chunk->count_, status);
488

489 490 491
    return Status::OK();
}

S
starlord 已提交
492
Status
C
Cai Yudong 已提交
493
DBImpl::GetEntityByID(const std::string& collection_name, const IDNumbers& id_array,
494 495
                      const std::vector<std::string>& field_names, std::vector<bool>& valid_row,
                      DataChunkPtr& data_chunk) {
C
Cai Yudong 已提交
496 497 498 499 500 501
    CHECK_INITIALIZED;

    snapshot::ScopedSnapshotT ss;
    STATUS_CHECK(snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_name));

    std::string dir_root = options_.meta_.path_;
502
    valid_row.resize(id_array.size(), false);
503 504
    auto handler =
        std::make_shared<GetEntityByIdSegmentHandler>(nullptr, ss, dir_root, id_array, field_names, valid_row);
C
Cai Yudong 已提交
505 506 507 508 509 510 511 512
    handler->Iterate();
    STATUS_CHECK(handler->GetStatus());

    data_chunk = handler->data_chunk_;
    return Status::OK();
}

Status
G
groot 已提交
513
DBImpl::DeleteEntityByID(const std::string& collection_name, const engine::IDNumbers& entity_ids, idx_t op_id) {
G
groot 已提交
514
    CHECK_INITIALIZED;
S
starlord 已提交
515

G
groot 已提交
516 517 518 519 520 521
    snapshot::ScopedSnapshotT ss;
    auto status = snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_name);
    if (!status.ok()) {
        LOG_WAL_ERROR_ << LogOut("[%s][%ld] ", "delete", 0) << "Get snapshot fail: " << status.message();
        return status;
    }
G
groot 已提交
522

G
groot 已提交
523
    status = mem_mgr_->DeleteEntities(ss->GetCollectionId(), entity_ids, op_id);
G
groot 已提交
524 525
    return status;
}
G
groot 已提交
526

G
groot 已提交
527
Status
C
Cai Yudong 已提交
528 529
DBImpl::Query(const server::ContextPtr& context, const query::QueryPtr& query_ptr, engine::QueryResultPtr& result) {
    CHECK_INITIALIZED;
G
groot 已提交
530

G
groot 已提交
531
    TimeRecorder rc("DBImpl::Query");
Y
Yu Kun 已提交
532

Y
yukun 已提交
533 534 535 536
    if (!query_ptr->root) {
        return Status{DB_ERROR, "BinaryQuery is null"};
    }

537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581
    snapshot::ScopedSnapshotT ss;
    STATUS_CHECK(snapshot::Snapshots::GetInstance().GetSnapshot(ss, query_ptr->collection_id));

    /* collect all valid segment */
    std::vector<SegmentVisitor::Ptr> segment_visitors;
    auto exec = [&](const snapshot::Segment::Ptr& segment, snapshot::SegmentIterator* handler) -> Status {
        auto p_id = segment->GetPartitionId();
        auto p_ptr = ss->GetResource<snapshot::Partition>(p_id);
        auto& p_name = p_ptr->GetName();

        /* check partition match pattern */
        bool match = false;
        if (query_ptr->partitions.empty()) {
            match = true;
        } else {
            for (auto& pattern : query_ptr->partitions) {
                if (StringHelpFunctions::IsRegexMatch(p_name, pattern)) {
                    match = true;
                    break;
                }
            }
        }

        if (match) {
            auto visitor = SegmentVisitor::Build(ss, segment->GetID());
            if (!visitor) {
                return Status(milvus::SS_ERROR, "Cannot build segment visitor");
            }
            segment_visitors.push_back(visitor);
        }
        return Status::OK();
    };

    auto segment_iter = std::make_shared<snapshot::SegmentIterator>(ss, exec);
    segment_iter->Iterate();
    STATUS_CHECK(segment_iter->GetStatus());

    LOG_ENGINE_DEBUG_ << LogOut("Engine query begin, segment count: %ld", segment_visitors.size());

    engine::snapshot::IDS_TYPE segment_ids;
    for (auto& sv : segment_visitors) {
        segment_ids.emplace_back(sv->GetSegment()->GetID());
    }

    scheduler::SearchJobPtr job = std::make_shared<scheduler::SearchJob>(nullptr, ss, options_, query_ptr, segment_ids);
582

G
groot 已提交
583
    cache::CpuCacheMgr::GetInstance().PrintInfo();  // print cache info before query
C
Cai Yudong 已提交
584 585 586
    /* put search job to scheduler and wait job finish */
    scheduler::JobMgrInst::GetInstance()->Put(job);
    job->WaitFinish();
G
groot 已提交
587
    cache::CpuCacheMgr::GetInstance().PrintInfo();  // print cache info after query
G
groot 已提交
588

C
Cai Yudong 已提交
589 590
    if (!job->status().ok()) {
        return job->status();
591 592
    }

Y
yukun 已提交
593 594 595
    if (job->query_result()) {
        result = job->query_result();
    }
Y
yukun 已提交
596 597

    // step 4: get entities by result ids
Y
yukun 已提交
598
    std::vector<bool> valid_row;
Y
yukun 已提交
599 600 601 602
    if (!query_ptr->field_names.empty()) {
        STATUS_CHECK(GetEntityByID(query_ptr->collection_id, result->result_ids_, query_ptr->field_names, valid_row,
                                   result->data_chunk_));
    }
Y
yukun 已提交
603 604

    // step 5: filter entities by field names
605 606 607 608 609 610 611 612 613 614 615 616 617
    //    std::vector<engine::AttrsData> filter_attrs;
    //    for (auto attr : result.attrs_) {
    //        AttrsData attrs_data;
    //        attrs_data.attr_type_ = attr.attr_type_;
    //        attrs_data.attr_count_ = attr.attr_count_;
    //        attrs_data.id_array_ = attr.id_array_;
    //        for (auto& name : field_names) {
    //            if (attr.attr_data_.find(name) != attr.attr_data_.end()) {
    //                attrs_data.attr_data_.insert(std::make_pair(name, attr.attr_data_.at(name)));
    //            }
    //        }
    //        filter_attrs.emplace_back(attrs_data);
    //    }
Y
yukun 已提交
618

619 620
    rc.ElapseFromBegin("Engine query totally cost");

G
groot 已提交
621
    // tracer.Context()->GetTraceContext()->GetSpan()->Finish();
622 623 624 625

    return Status::OK();
}

C
Cai Yudong 已提交
626 627 628 629 630 631 632 633
Status
DBImpl::ListIDInSegment(const std::string& collection_name, int64_t segment_id, IDNumbers& entity_ids) {
    CHECK_INITIALIZED;

    snapshot::ScopedSnapshotT ss;
    STATUS_CHECK(snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_name));

    auto read_visitor = engine::SegmentVisitor::Build(ss, segment_id);
634 635 636
    if (!read_visitor) {
        return Status(SERVER_FILE_NOT_FOUND, "Segment not exist");
    }
C
Cai Yudong 已提交
637 638 639 640 641
    segment::SegmentReaderPtr segment_reader =
        std::make_shared<segment::SegmentReader>(options_.meta_.path_, read_visitor);

    STATUS_CHECK(segment_reader->LoadUids(entity_ids));

G
groot 已提交
642 643 644 645 646 647 648 649 650 651
    // remove delete id from the id list
    segment::DeletedDocsPtr deleted_docs_ptr;
    STATUS_CHECK(segment_reader->LoadDeletedDocs(deleted_docs_ptr));
    if (deleted_docs_ptr) {
        const std::vector<offset_t>& delete_ids = deleted_docs_ptr->GetDeletedDocs();
        for (auto offset : delete_ids) {
            entity_ids.erase(entity_ids.begin() + offset, entity_ids.begin() + offset + 1);
        }
    }

C
Cai Yudong 已提交
652 653 654 655 656 657 658 659 660 661 662
    return Status::OK();
}

Status
DBImpl::LoadCollection(const server::ContextPtr& context, const std::string& collection_name,
                       const std::vector<std::string>& field_names, bool force) {
    CHECK_INITIALIZED;

    snapshot::ScopedSnapshotT ss;
    STATUS_CHECK(snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_name));

G
groot 已提交
663
    auto handler = std::make_shared<LoadCollectionHandler>(nullptr, ss, options_.meta_.path_, field_names, force);
C
Cai Yudong 已提交
664
    handler->Iterate();
G
groot 已提交
665
    STATUS_CHECK(handler->GetStatus());
C
Cai Yudong 已提交
666

G
groot 已提交
667
    return Status::OK();
C
Cai Yudong 已提交
668 669 670 671 672 673 674 675 676
}

Status
DBImpl::Flush(const std::string& collection_name) {
    if (!initialized_.load(std::memory_order_acquire)) {
        return SHUTDOWN_ERROR;
    }

    Status status;
C
cqy123456 已提交
677
    bool has_collection = false;
C
Cai Yudong 已提交
678 679 680 681 682 683 684 685 686 687
    status = HasCollection(collection_name, has_collection);
    if (!status.ok()) {
        return status;
    }
    if (!has_collection) {
        LOG_ENGINE_ERROR_ << "Collection to flush does not exist: " << collection_name;
        return Status(DB_NOT_FOUND, "Collection to flush does not exist");
    }

    LOG_ENGINE_DEBUG_ << "Begin flush collection: " << collection_name;
G
groot 已提交
688
    InternalFlush(collection_name);
C
Cai Yudong 已提交
689 690 691 692 693 694 695 696 697 698 699 700
    LOG_ENGINE_DEBUG_ << "End flush collection: " << collection_name;

    return status;
}

Status
DBImpl::Flush() {
    if (!initialized_.load(std::memory_order_acquire)) {
        return SHUTDOWN_ERROR;
    }

    LOG_ENGINE_DEBUG_ << "Begin flush all collections";
G
groot 已提交
701
    InternalFlush();
C
Cai Yudong 已提交
702 703
    LOG_ENGINE_DEBUG_ << "End flush all collections";

G
groot 已提交
704
    return Status::OK();
C
Cai Yudong 已提交
705 706 707 708 709 710 711 712 713 714 715 716 717
}

Status
DBImpl::Compact(const std::shared_ptr<server::Context>& context, const std::string& collection_name, double threshold) {
    if (!initialized_.load(std::memory_order_acquire)) {
        return SHUTDOWN_ERROR;
    }

    LOG_ENGINE_DEBUG_ << "Before compacting, wait for build index thread to finish...";
    const std::lock_guard<std::mutex> index_lock(build_index_mutex_);
    const std::lock_guard<std::mutex> merge_lock(flush_merge_compact_mutex_);

    Status status;
C
cqy123456 已提交
718
    bool has_collection = false;
C
Cai Yudong 已提交
719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758
    status = HasCollection(collection_name, has_collection);
    if (!status.ok()) {
        return status;
    }
    if (!has_collection) {
        LOG_ENGINE_ERROR_ << "Collection to compact does not exist: " << collection_name;
        return Status(DB_NOT_FOUND, "Collection to compact does not exist");
    }

    snapshot::ScopedSnapshotT latest_ss;
    status = snapshot::Snapshots::GetInstance().GetSnapshot(latest_ss, collection_name);
    if (!status.ok()) {
        return status;
    }

    auto& segments = latest_ss->GetResources<snapshot::Segment>();
    for (auto& kv : segments) {
        // client break the connection, no need to continue
        if (context && context->IsConnectionBroken()) {
            LOG_ENGINE_DEBUG_ << "Client connection broken, stop compact operation";
            break;
        }

        snapshot::ID_TYPE segment_id = kv.first;
        auto read_visitor = engine::SegmentVisitor::Build(latest_ss, segment_id);
        segment::SegmentReaderPtr segment_reader =
            std::make_shared<segment::SegmentReader>(options_.meta_.path_, read_visitor);

        segment::DeletedDocsPtr deleted_docs;
        status = segment_reader->LoadDeletedDocs(deleted_docs);
        if (!status.ok() || deleted_docs == nullptr) {
            continue;  // no deleted docs, no need to compact
        }

        auto segment_commit = latest_ss->GetSegmentCommitBySegmentId(segment_id);
        auto row_count = segment_commit->GetRowCount();
        if (row_count == 0) {
            continue;
        }

G
groot 已提交
759
        auto deleted_count = deleted_docs->GetCount();
760
        if (double(deleted_count) / (row_count + deleted_count) < threshold) {
C
Cai Yudong 已提交
761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776
            continue;  // no need to compact
        }

        snapshot::IDS_TYPE ids = {segment_id};
        MergeTask merge_task(options_, latest_ss, ids);
        status = merge_task.Execute();
        if (!status.ok()) {
            LOG_ENGINE_ERROR_ << "Compact failed for segment " << segment_reader->GetSegmentPath() << ": "
                              << status.message();
            continue;  // skip this file and try compact next one
        }
    }

    return status;
}

G
groot 已提交
777 778 779
////////////////////////////////////////////////////////////////////////////////
// Internal APIs
////////////////////////////////////////////////////////////////////////////////
S
starlord 已提交
780
void
G
groot 已提交
781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828
DBImpl::InternalFlush(const std::string& collection_name, bool merge) {
    Status status;
    std::set<std::string> flushed_collections;
    if (!collection_name.empty()) {
        // flush one collection
        snapshot::ScopedSnapshotT ss;
        status = snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_name);
        if (!status.ok()) {
            LOG_WAL_ERROR_ << LogOut("[%s][%ld] ", "flush", 0) << "Get snapshot fail: " << status.message();
            return;
        }

        {
            const std::lock_guard<std::mutex> lock(flush_merge_compact_mutex_);
            int64_t collection_id = ss->GetCollectionId();
            status = mem_mgr_->Flush(collection_id);
            if (!status.ok()) {
                return;
            }
        }

        flushed_collections.insert(collection_name);
    } else {
        // flush all collections
        std::set<int64_t> collection_ids;
        {
            const std::lock_guard<std::mutex> lock(flush_merge_compact_mutex_);
            status = mem_mgr_->Flush(collection_ids);
            if (!status.ok()) {
                return;
            }
        }

        for (auto id : collection_ids) {
            snapshot::ScopedSnapshotT ss;
            status = snapshot::Snapshots::GetInstance().GetSnapshot(ss, id);
            if (!status.ok()) {
                LOG_WAL_ERROR_ << LogOut("[%s][%ld] ", "flush", 0) << "Get snapshot fail: " << status.message();
                return;
            }

            flushed_collections.insert(ss->GetName());
        }
    }

    if (merge) {
        StartMergeTask(flushed_collections);
    }
G
groot 已提交
829 830 831 832
}

void
DBImpl::TimingFlushThread() {
G
groot 已提交
833
    SetThreadName("timing_flush");
Y
yu yunfeng 已提交
834
    server::SystemInfo::GetInstance().Init();
X
Xu Peng 已提交
835
    while (true) {
836
        if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
837
            LOG_ENGINE_DEBUG_ << "DB background flush thread exit";
G
groot 已提交
838 839
            break;
        }
X
Xu Peng 已提交
840

G
groot 已提交
841 842 843 844 845 846
        InternalFlush();
        if (options_.auto_flush_interval_ > 0) {
            swn_flush_.Wait_For(std::chrono::seconds(options_.auto_flush_interval_));
        } else {
            swn_flush_.Wait();
        }
847 848 849
    }
}

S
starlord 已提交
850 851
void
DBImpl::StartMetricTask() {
G
groot 已提交
852
    server::Metrics::GetInstance().KeepingAliveCounterIncrement(BACKGROUND_METRIC_INTERVAL);
G
groot 已提交
853 854
    int64_t cache_usage = cache::CpuCacheMgr::GetInstance().CacheUsage();
    int64_t cache_total = cache::CpuCacheMgr::GetInstance().CacheCapacity();
S
shengjh 已提交
855 856
    fiu_do_on("DBImpl.StartMetricTask.InvalidTotalCache", cache_total = 0);

J
JinHai-CN 已提交
857 858 859 860 861 862 863
    if (cache_total > 0) {
        double cache_usage_double = cache_usage;
        server::Metrics::GetInstance().CpuCacheUsageGaugeSet(cache_usage_double * 100 / cache_total);
    } else {
        server::Metrics::GetInstance().CpuCacheUsageGaugeSet(0);
    }

Y
Yu Kun 已提交
864
    server::Metrics::GetInstance().GpuCacheUsageGaugeSet();
G
groot 已提交
865 866 867 868
    /* SS TODO */
    // uint64_t size;
    // Size(size);
    // server::Metrics::GetInstance().DataFileSizeGaugeSet(size);
G
groot 已提交
869 870 871 872 873
    server::Metrics::GetInstance().CPUUsagePercentSet();
    server::Metrics::GetInstance().RAMUsagePercentSet();
    server::Metrics::GetInstance().GPUPercentGaugeSet();
    server::Metrics::GetInstance().GPUMemoryUsageGaugeSet();
    server::Metrics::GetInstance().OctetsSet();
S
starlord 已提交
874

K
kun yu 已提交
875
    server::Metrics::GetInstance().CPUCoreUsagePercentSet();
K
kun yu 已提交
876 877
    server::Metrics::GetInstance().GPUTemperature();
    server::Metrics::GetInstance().CPUTemperature();
878
    server::Metrics::GetInstance().PushToGateway();
G
groot 已提交
879 880
}

S
starlord 已提交
881
void
G
groot 已提交
882
DBImpl::TimingMetricThread() {
G
groot 已提交
883
    SetThreadName("timing_metric");
G
groot 已提交
884 885
    server::SystemInfo::GetInstance().Init();
    while (true) {
886
        if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
887
            LOG_ENGINE_DEBUG_ << "DB background metric thread exit";
S
starlord 已提交
888 889
            break;
        }
Z
update  
zhiru 已提交
890

G
groot 已提交
891 892
        swn_metric_.Wait_For(std::chrono::seconds(BACKGROUND_METRIC_INTERVAL));
        StartMetricTask();
Z
update  
zhiru 已提交
893
    }
G
groot 已提交
894
}
X
Xu Peng 已提交
895

S
starlord 已提交
896
void
G
groot 已提交
897
DBImpl::StartBuildIndexTask(const std::vector<std::string>& collection_names) {
S
starlord 已提交
898
    // build index has been finished?
899 900 901 902 903 904 905
    {
        std::lock_guard<std::mutex> lck(index_result_mutex_);
        if (!index_thread_results_.empty()) {
            std::chrono::milliseconds span(10);
            if (index_thread_results_.back().wait_for(span) == std::future_status::ready) {
                index_thread_results_.pop_back();
            }
G
groot 已提交
906 907 908
        }
    }

S
starlord 已提交
909
    // add new build index task
910 911 912
    {
        std::lock_guard<std::mutex> lck(index_result_mutex_);
        if (index_thread_results_.empty()) {
G
groot 已提交
913 914
            index_thread_results_.push_back(
                index_thread_pool_.enqueue(&DBImpl::BackgroundBuildIndexTask, this, collection_names));
915
        }
G
groot 已提交
916
    }
X
Xu Peng 已提交
917 918
}

S
starlord 已提交
919
void
G
groot 已提交
920
DBImpl::BackgroundBuildIndexTask(std::vector<std::string> collection_names) {
G
groot 已提交
921 922
    SetThreadName("build_index");

P
peng.xu 已提交
923
    std::unique_lock<std::mutex> lock(build_index_mutex_);
924

G
groot 已提交
925
    for (auto collection_name : collection_names) {
G
groot 已提交
926 927 928 929 930 931
        snapshot::ScopedSnapshotT latest_ss;
        auto status = snapshot::Snapshots::GetInstance().GetSnapshot(latest_ss, collection_name);
        if (!status.ok()) {
            return;
        }
        SnapshotVisitor ss_visitor(latest_ss);
G
groot 已提交
932

G
groot 已提交
933 934
        snapshot::IDS_TYPE segment_ids;
        ss_visitor.SegmentsToIndex("", segment_ids);
G
groot 已提交
935 936 937
        if (segment_ids.empty()) {
            continue;
        }
T
Tinkerrr 已提交
938

G
groot 已提交
939
        LOG_ENGINE_DEBUG_ << "Create BuildIndexJob for " << segment_ids.size() << " segments of " << collection_name;
G
groot 已提交
940
        cache::CpuCacheMgr::GetInstance().PrintInfo();  // print cache info before build index
G
groot 已提交
941
        scheduler::BuildIndexJobPtr job = std::make_shared<scheduler::BuildIndexJob>(latest_ss, options_, segment_ids);
G
groot 已提交
942 943
        scheduler::JobMgrInst::GetInstance()->Put(job);
        job->WaitFinish();
G
groot 已提交
944
        cache::CpuCacheMgr::GetInstance().PrintInfo();  // print cache info after build index
G
groot 已提交
945

G
groot 已提交
946 947 948
        if (!job->status().ok()) {
            LOG_ENGINE_ERROR_ << job->status().message();
            break;
G
groot 已提交
949 950 951 952
        }
    }
}

G
groot 已提交
953 954
void
DBImpl::TimingIndexThread() {
G
groot 已提交
955
    SetThreadName("timing_index");
G
groot 已提交
956 957 958 959 960
    server::SystemInfo::GetInstance().Init();
    while (true) {
        if (!initialized_.load(std::memory_order_acquire)) {
            WaitMergeFileFinish();
            WaitBuildIndexFinish();
G
groot 已提交
961

G
groot 已提交
962 963
            LOG_ENGINE_DEBUG_ << "DB background thread exit";
            break;
G
groot 已提交
964 965
        }

G
groot 已提交
966
        swn_index_.Wait_For(std::chrono::seconds(BACKGROUND_INDEX_INTERVAL));
G
groot 已提交
967

G
groot 已提交
968 969 970 971 972 973
        std::vector<std::string> collection_names;
        snapshot::Snapshots::GetInstance().GetCollectionNames(collection_names);
        WaitMergeFileFinish();
        StartBuildIndexTask(collection_names);
    }
}
G
groot 已提交
974

G
groot 已提交
975 976 977 978 979 980 981 982
void
DBImpl::WaitBuildIndexFinish() {
    //    LOG_ENGINE_DEBUG_ << "Begin WaitBuildIndexFinish";
    std::lock_guard<std::mutex> lck(index_result_mutex_);
    for (auto& iter : index_thread_results_) {
        iter.wait();
    }
    //    LOG_ENGINE_DEBUG_ << "End WaitBuildIndexFinish";
G
groot 已提交
983 984
}

985
void
G
groot 已提交
986 987 988 989 990 991 992 993 994
DBImpl::StartMergeTask(const std::set<std::string>& collection_names, bool force_merge_all) {
    // LOG_ENGINE_DEBUG_ << "Begin StartMergeTask";
    // merge task has been finished?
    {
        std::lock_guard<std::mutex> lck(merge_result_mutex_);
        if (!merge_thread_results_.empty()) {
            std::chrono::milliseconds span(10);
            if (merge_thread_results_.back().wait_for(span) == std::future_status::ready) {
                merge_thread_results_.pop_back();
995
            }
996
        }
G
groot 已提交
997
    }
998

G
groot 已提交
999 1000 1001 1002 1003 1004 1005
    // add new merge task
    {
        std::lock_guard<std::mutex> lck(merge_result_mutex_);
        if (merge_thread_results_.empty()) {
            // start merge file thread
            merge_thread_results_.push_back(
                merge_thread_pool_.enqueue(&DBImpl::BackgroundMerge, this, collection_names, force_merge_all));
1006 1007
        }
    }
G
groot 已提交
1008 1009

    // LOG_ENGINE_DEBUG_ << "End StartMergeTask";
1010 1011
}

G
groot 已提交
1012
void
G
groot 已提交
1013
DBImpl::BackgroundMerge(std::set<std::string> collection_names, bool force_merge_all) {
G
groot 已提交
1014
    SetThreadName("merge");
G
groot 已提交
1015 1016 1017 1018

    for (auto& collection_name : collection_names) {
        const std::lock_guard<std::mutex> lock(flush_merge_compact_mutex_);

G
groot 已提交
1019
        auto status = merge_mgr_ptr_->MergeFiles(collection_name);
G
groot 已提交
1020 1021 1022
        if (!status.ok()) {
            LOG_ENGINE_ERROR_ << "Failed to get merge files for collection: " << collection_name
                              << " reason:" << status.message();
G
groot 已提交
1023 1024 1025
        }

        if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
1026
            LOG_ENGINE_DEBUG_ << "Server will shutdown, skip merge action for collection: " << collection_name;
G
groot 已提交
1027 1028 1029 1030 1031
            break;
        }
    }
}

1032
void
G
groot 已提交
1033 1034 1035 1036 1037 1038 1039
DBImpl::WaitMergeFileFinish() {
    //    LOG_ENGINE_DEBUG_ << "Begin WaitMergeFileFinish";
    std::lock_guard<std::mutex> lck(merge_result_mutex_);
    for (auto& iter : merge_thread_results_) {
        iter.wait();
    }
    //    LOG_ENGINE_DEBUG_ << "End WaitMergeFileFinish";
1040 1041
}

1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059
void
DBImpl::SuspendIfFirst() {
    std::lock_guard<std::mutex> lock(suspend_build_mutex_);
    if (++live_search_num_ == 1) {
        LOG_ENGINE_TRACE_ << "live_search_num_: " << live_search_num_;
        knowhere::BuilderSuspend();
    }
}

void
DBImpl::ResumeIfLast() {
    std::lock_guard<std::mutex> lock(suspend_build_mutex_);
    if (--live_search_num_ == 0) {
        LOG_ENGINE_TRACE_ << "live_search_num_: " << live_search_num_;
        knowhere::BuildResume();
    }
}

C
Cai Yudong 已提交
1060 1061 1062 1063 1064 1065 1066
void
DBImpl::ConfigUpdate(const std::string& name) {
    if (name == "storage.auto_flush_interval") {
        options_.auto_flush_interval_ = config.storage.auto_flush_interval();
    }
}

S
starlord 已提交
1067 1068
}  // namespace engine
}  // namespace milvus