DBImpl.cpp 39.3 KB
Newer Older
1
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
J
jinhai 已提交
2
//
3 4
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
J
jinhai 已提交
5
//
6 7 8 9 10
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
J
jinhai 已提交
11

S
starlord 已提交
12
#include "db/DBImpl.h"
S
starlord 已提交
13
#include "cache/CpuCacheMgr.h"
14
#include "codecs/Codec.h"
W
Wang XiangYu 已提交
15
#include "config/ServerConfig.h"
16
#include "db/IDGenerator.h"
G
groot 已提交
17 18
#include "db/SnapshotUtils.h"
#include "db/SnapshotVisitor.h"
G
groot 已提交
19
#include "db/merge/MergeManagerFactory.h"
G
groot 已提交
20 21 22 23 24 25 26 27
#include "db/merge/MergeTask.h"
#include "db/snapshot/CompoundOperations.h"
#include "db/snapshot/EventExecutor.h"
#include "db/snapshot/IterateHandler.h"
#include "db/snapshot/OperationExecutor.h"
#include "db/snapshot/ResourceHelper.h"
#include "db/snapshot/ResourceTypes.h"
#include "db/snapshot/Snapshots.h"
28
#include "insert/MemManagerFactory.h"
G
groot 已提交
29
#include "knowhere/index/vector_index/helpers/BuilderSuspend.h"
X
Xiaohai Xu 已提交
30
#include "knowhere/index/vector_index/helpers/FaissIO.h"
G
groot 已提交
31
#include "metrics/Metrics.h"
G
groot 已提交
32
#include "metrics/SystemInfo.h"
G
groot 已提交
33
#include "scheduler/Definition.h"
S
starlord 已提交
34
#include "scheduler/SchedInst.h"
S
starlord 已提交
35
#include "scheduler/job/SearchJob.h"
36 37 38
#include "segment/SegmentReader.h"
#include "segment/SegmentWriter.h"
#include "utils/Exception.h"
G
groot 已提交
39
#include "utils/StringHelpFunctions.h"
S
starlord 已提交
40
#include "utils/TimeRecorder.h"
X
Xu Peng 已提交
41

42
#include <fiu/fiu-local.h>
G
groot 已提交
43
#include <src/scheduler/job/BuildIndexJob.h>
44 45
#include <algorithm>
#include <functional>
G
groot 已提交
46
#include <limits>
47
#include <unordered_set>
G
groot 已提交
48
#include <utility>
49

J
jinhai 已提交
50
namespace milvus {
X
Xu Peng 已提交
51
namespace engine {
X
Xu Peng 已提交
52

G
groot 已提交
53
namespace {
G
groot 已提交
54 55
constexpr uint64_t BACKGROUND_METRIC_INTERVAL = 1;
constexpr uint64_t BACKGROUND_INDEX_INTERVAL = 1;
G
groot 已提交
56
constexpr uint64_t WAIT_BUILD_INDEX_INTERVAL = 5;
G
groot 已提交
57

G
groot 已提交
58
static const Status SHUTDOWN_ERROR = Status(DB_ERROR, "Milvus server is shutdown!");
S
starlord 已提交
59
}  // namespace
G
groot 已提交
60

G
groot 已提交
61 62 63 64 65
#define CHECK_INITIALIZED                                \
    if (!initialized_.load(std::memory_order_acquire)) { \
        return SHUTDOWN_ERROR;                           \
    }

Y
Yu Kun 已提交
66
DBImpl::DBImpl(const DBOptions& options)
67
    : options_(options), initialized_(false), merge_thread_pool_(1, 1), index_thread_pool_(1, 1) {
G
groot 已提交
68 69
    mem_mgr_ = MemManagerFactory::Build(options_);
    merge_mgr_ptr_ = MergeManagerFactory::SSBuild(options_);
70

C
Cai Yudong 已提交
71 72 73
    /* watch on storage.auto_flush_interval */
    ConfigMgr::GetInstance().Attach("storage.auto_flush_interval", this);

C
cqy123456 已提交
74
    DBImpl::Start();
S
starlord 已提交
75 76 77
}

DBImpl::~DBImpl() {
C
Cai Yudong 已提交
78 79
    ConfigMgr::GetInstance().Detach("storage.auto_flush_interval", this);

C
cqy123456 已提交
80
    DBImpl::Stop();
S
starlord 已提交
81 82
}

G
groot 已提交
83 84 85
////////////////////////////////////////////////////////////////////////////////
// External APIs
////////////////////////////////////////////////////////////////////////////////
S
starlord 已提交
86 87
Status
DBImpl::Start() {
88
    if (initialized_.load(std::memory_order_acquire)) {
S
starlord 已提交
89 90 91
        return Status::OK();
    }

G
groot 已提交
92
    // snapshot
93 94
    auto store = snapshot::Store::Build(options_.meta_.backend_uri_, options_.meta_.path_,
                                        codec::Codec::instance().GetSuffixSet());
G
groot 已提交
95 96 97 98 99 100
    snapshot::OperationExecutor::Init(store);
    snapshot::OperationExecutor::GetInstance().Start();
    snapshot::EventExecutor::Init(store);
    snapshot::EventExecutor::GetInstance().Start();
    snapshot::Snapshots::GetInstance().Init(store);

X
Xiaohai Xu 已提交
101 102
    knowhere::enable_faiss_logging();

103
    // LOG_ENGINE_TRACE_ << "DB service start";
104
    initialized_.store(true, std::memory_order_release);
S
starlord 已提交
105

G
groot 已提交
106
    // TODO: merge files
G
groot 已提交
107

G
groot 已提交
108 109 110 111
    // for distribute version, some nodes are read only
    if (options_.mode_ != DBOptions::MODE::CLUSTER_READONLY) {
        // background flush thread
        bg_flush_thread_ = std::thread(&DBImpl::TimingFlushThread, this);
Z
update  
zhiru 已提交
112
    }
S
starlord 已提交
113

G
groot 已提交
114 115 116
    // for distribute version, some nodes are read only
    if (options_.mode_ != DBOptions::MODE::CLUSTER_READONLY) {
        // background build index thread
G
groot 已提交
117
        bg_index_thread_ = std::thread(&DBImpl::TimingIndexThread, this);
G
groot 已提交
118 119 120
    }

    // background metric thread
Y
yukun 已提交
121
    fiu_do_on("options_metric_enable", options_.metric_enable_ = true);
G
groot 已提交
122
    if (options_.metric_enable_) {
G
groot 已提交
123
        bg_metric_thread_ = std::thread(&DBImpl::TimingMetricThread, this);
G
groot 已提交
124
    }
G
groot 已提交
125

S
starlord 已提交
126 127 128
    return Status::OK();
}

S
starlord 已提交
129 130
Status
DBImpl::Stop() {
131
    if (!initialized_.load(std::memory_order_acquire)) {
S
starlord 已提交
132 133
        return Status::OK();
    }
134

135
    initialized_.store(false, std::memory_order_release);
S
starlord 已提交
136

137
    if (options_.mode_ != DBOptions::MODE::CLUSTER_READONLY) {
G
groot 已提交
138
        // flush all without merge
G
groot 已提交
139
        InternalFlush("", false);
G
groot 已提交
140 141 142 143

        // wait flush thread finish
        swn_flush_.Notify();
        bg_flush_thread_.join();
S
starlord 已提交
144

145 146
        WaitMergeFileFinish();

G
groot 已提交
147 148
        swn_index_.Notify();
        bg_index_thread_.join();
S
starlord 已提交
149 150
    }

G
groot 已提交
151
    // wait metric thread exit
G
groot 已提交
152 153 154 155
    if (options_.metric_enable_) {
        swn_metric_.Notify();
        bg_metric_thread_.join();
    }
G
groot 已提交
156

G
groot 已提交
157 158 159
    snapshot::EventExecutor::GetInstance().Stop();
    snapshot::OperationExecutor::GetInstance().Stop();

160
    // LOG_ENGINE_TRACE_ << "DB service stop";
S
starlord 已提交
161
    return Status::OK();
X
Xu Peng 已提交
162 163
}

S
starlord 已提交
164
Status
G
groot 已提交
165 166 167 168
DBImpl::CreateCollection(const snapshot::CreateCollectionContext& context) {
    CHECK_INITIALIZED;

    auto ctx = context;
G
groot 已提交
169 170 171 172 173 174

    // default id is auto-generated
    auto params = ctx.collection->GetParams();
    if (params.find(PARAM_UID_AUTOGEN) == params.end()) {
        params[PARAM_UID_AUTOGEN] = true;
    }
175
    ctx.collection->SetParams(params);
G
groot 已提交
176 177 178

    // check uid existence
    snapshot::FieldPtr uid_field;
G
groot 已提交
179
    for (auto& pair : ctx.fields_schema) {
G
groot 已提交
180
        if (pair.first->GetName() == FIELD_UID) {
G
groot 已提交
181
            uid_field = pair.first;
G
groot 已提交
182 183 184
            break;
        }
    }
S
starlord 已提交
185

G
groot 已提交
186
    // add uid field if not specified
G
groot 已提交
187
    if (uid_field == nullptr) {
G
groot 已提交
188
        uid_field = std::make_shared<snapshot::Field>(FIELD_UID, 0, DataType::INT64);
S
starlord 已提交
189 190
    }

G
groot 已提交
191 192
    // define uid elements
    auto bloom_filter_element = std::make_shared<snapshot::FieldElement>(
G
groot 已提交
193
        0, 0, ELEMENT_BLOOM_FILTER, milvus::engine::FieldElementType::FET_BLOOM_FILTER);
G
groot 已提交
194
    auto delete_doc_element = std::make_shared<snapshot::FieldElement>(
G
groot 已提交
195
        0, 0, ELEMENT_DELETED_DOCS, milvus::engine::FieldElementType::FET_DELETED_DOCS);
G
groot 已提交
196 197
    ctx.fields_schema[uid_field] = {bloom_filter_element, delete_doc_element};

G
groot 已提交
198 199
    auto op = std::make_shared<snapshot::CreateCollectionOperation>(ctx);
    return op->Push();
200 201
}

202
Status
G
groot 已提交
203
DBImpl::DropCollection(const std::string& collection_name) {
G
groot 已提交
204 205
    CHECK_INITIALIZED;

G
groot 已提交
206
    LOG_ENGINE_DEBUG_ << "Prepare to drop collection " << collection_name;
G
groot 已提交
207 208 209

    snapshot::ScopedSnapshotT ss;
    auto& snapshots = snapshot::Snapshots::GetInstance();
G
groot 已提交
210
    STATUS_CHECK(snapshots.GetSnapshot(ss, collection_name));
G
groot 已提交
211

212 213
    // erase insert buffer of this collection
    mem_mgr_->EraseMem(ss->GetCollectionId());
G
groot 已提交
214 215

    return snapshots.DropCollection(ss->GetCollectionId(), std::numeric_limits<snapshot::LSN_TYPE>::max());
216 217
}

S
starlord 已提交
218
Status
G
groot 已提交
219 220
DBImpl::HasCollection(const std::string& collection_name, bool& has_or_not) {
    CHECK_INITIALIZED;
S
starlord 已提交
221

G
groot 已提交
222 223 224
    snapshot::ScopedSnapshotT ss;
    auto status = snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_name);
    has_or_not = status.ok();
225

226
    return Status::OK();
G
groot 已提交
227 228 229
}

Status
C
Cai Yudong 已提交
230
DBImpl::ListCollections(std::vector<std::string>& names) {
G
groot 已提交
231
    CHECK_INITIALIZED;
232

G
groot 已提交
233 234 235
    names.clear();
    return snapshot::Snapshots::GetInstance().GetCollectionNames(names);
}
236

G
groot 已提交
237
Status
C
Cai Yudong 已提交
238
DBImpl::GetCollectionInfo(const std::string& collection_name, snapshot::CollectionPtr& collection,
C
Cai Yudong 已提交
239
                          snapshot::FieldElementMappings& fields_schema) {
G
groot 已提交
240
    CHECK_INITIALIZED;
241

C
Cai Yudong 已提交
242 243
    snapshot::ScopedSnapshotT ss;
    STATUS_CHECK(snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_name));
244

C
Cai Yudong 已提交
245 246 247 248 249
    collection = ss->GetCollection();
    auto& fields = ss->GetResources<snapshot::Field>();
    for (auto& kv : fields) {
        fields_schema[kv.second.Get()] = ss->GetFieldElementsByField(kv.second->GetName());
    }
250
    return Status::OK();
G
groot 已提交
251 252
}

S
starlord 已提交
253
Status
G
groot 已提交
254
DBImpl::GetCollectionStats(const std::string& collection_name, milvus::json& collection_stats) {
G
groot 已提交
255 256
    CHECK_INITIALIZED;

G
groot 已提交
257
    STATUS_CHECK(GetSnapshotInfo(collection_name, collection_stats));
G
groot 已提交
258
    return Status::OK();
259 260
}

S
starlord 已提交
261
Status
C
Cai Yudong 已提交
262
DBImpl::CountEntities(const std::string& collection_name, int64_t& row_count) {
G
groot 已提交
263
    CHECK_INITIALIZED;
S
starlord 已提交
264

G
groot 已提交
265 266 267
    snapshot::ScopedSnapshotT ss;
    STATUS_CHECK(snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_name));

C
Cai Yudong 已提交
268 269
    row_count = ss->GetCollectionCommit()->GetRowCount();
    return Status::OK();
270 271
}

272
Status
G
groot 已提交
273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288
DBImpl::CreatePartition(const std::string& collection_name, const std::string& partition_name) {
    CHECK_INITIALIZED;

    snapshot::ScopedSnapshotT ss;
    STATUS_CHECK(snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_name));

    snapshot::LSN_TYPE lsn = 0;
    snapshot::OperationContext context;
    context.lsn = lsn;
    auto op = std::make_shared<snapshot::CreatePartitionOperation>(context, ss);

    snapshot::PartitionContext p_ctx;
    p_ctx.name = partition_name;
    snapshot::PartitionPtr partition;
    STATUS_CHECK(op->CommitNewPartition(p_ctx, partition));
    return op->Push();
289 290
}

S
starlord 已提交
291
Status
G
groot 已提交
292 293
DBImpl::DropPartition(const std::string& collection_name, const std::string& partition_name) {
    CHECK_INITIALIZED;
S
starlord 已提交
294

G
groot 已提交
295 296
    snapshot::ScopedSnapshotT ss;
    STATUS_CHECK(snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_name));
297

298 299 300 301 302
    // erase insert buffer of this partition
    auto partition = ss->GetPartition(partition_name);
    if (partition != nullptr) {
        mem_mgr_->EraseMem(ss->GetCollectionId(), partition->GetID());
    }
303

G
groot 已提交
304 305 306 307
    snapshot::PartitionContext context;
    context.name = partition_name;
    auto op = std::make_shared<snapshot::DropPartitionOperation>(context, ss);
    return op->Push();
G
groot 已提交
308 309
}

310
Status
C
Cai Yudong 已提交
311
DBImpl::HasPartition(const std::string& collection_name, const std::string& partition_tag, bool& exist) {
G
groot 已提交
312
    CHECK_INITIALIZED;
313

G
groot 已提交
314 315
    snapshot::ScopedSnapshotT ss;
    STATUS_CHECK(snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_name));
316

C
Cai Yudong 已提交
317 318 319 320 321 322 323 324 325
    auto partition_tags = std::move(ss->GetPartitionNames());
    for (auto& tag : partition_tags) {
        if (tag == partition_tag) {
            exist = true;
            return Status::OK();
        }
    }

    exist = false;
G
groot 已提交
326 327
    return Status::OK();
}
328

G
groot 已提交
329
Status
C
Cai Yudong 已提交
330
DBImpl::ListPartitions(const std::string& collection_name, std::vector<std::string>& partition_names) {
G
groot 已提交
331
    CHECK_INITIALIZED;
332

G
groot 已提交
333 334
    snapshot::ScopedSnapshotT ss;
    STATUS_CHECK(snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_name));
335

C
Cai Yudong 已提交
336 337 338 339 340 341 342 343 344
    partition_names = std::move(ss->GetPartitionNames());
    return Status::OK();
}

Status
DBImpl::CreateIndex(const std::shared_ptr<server::Context>& context, const std::string& collection_name,
                    const std::string& field_name, const CollectionIndex& index) {
    CHECK_INITIALIZED;

G
groot 已提交
345 346
    LOG_ENGINE_DEBUG_ << "Create index for collection: " << collection_name << " field: " << field_name;

C
Cai Yudong 已提交
347 348 349 350 351 352 353 354 355 356 357 358 359 360
    // step 1: wait merge file thread finished to avoid duplicate data bug
    auto status = Flush();
    WaitMergeFileFinish();  // let merge file thread finish

    // step 2: compare old index and new index
    CollectionIndex new_index = index;
    CollectionIndex old_index;
    STATUS_CHECK(GetSnapshotIndex(collection_name, field_name, old_index));

    if (utils::IsSameIndex(old_index, new_index)) {
        return Status::OK();  // same index
    }

    // step 3: drop old index
G
groot 已提交
361
    DropIndex(collection_name, field_name);
C
Cai Yudong 已提交
362 363 364 365 366 367 368 369 370 371 372
    WaitMergeFileFinish();  // let merge file thread finish since DropIndex start a merge task

    // step 4: create field element for index
    status = SetSnapshotIndex(collection_name, field_name, new_index);
    if (!status.ok()) {
        return status;
    }

    // step 5: start background build index thread
    std::vector<std::string> collection_names = {collection_name};
    WaitBuildIndexFinish();
373
    StartBuildIndexTask(collection_names, true);
C
Cai Yudong 已提交
374 375 376 377 378 379 380

    // step 6: iterate segments need to be build index, wait until all segments are built
    while (true) {
        SnapshotVisitor ss_visitor(collection_name);
        snapshot::IDS_TYPE segment_ids;
        ss_visitor.SegmentsToIndex(field_name, segment_ids);
        if (segment_ids.empty()) {
381 382 383 384 385 386 387 388
            break;  // all segments build index finished
        }

        snapshot::ScopedSnapshotT ss;
        STATUS_CHECK(snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_name));
        IgnoreIndexFailedSegments(ss->GetCollectionId(), segment_ids);
        if (segment_ids.empty()) {
            break;  // some segments failed to build index, and ignored
C
Cai Yudong 已提交
389 390 391 392 393 394 395 396
        }

        index_req_swn_.Wait_For(std::chrono::seconds(1));

        // client break the connection, no need to block, check every 1 second
        if (context && context->IsConnectionBroken()) {
            LOG_ENGINE_DEBUG_ << "Client connection broken, build index in background";
            break;  // just break, not return, continue to update partitions files to to_index
397
        }
G
groot 已提交
398
    }
399

G
groot 已提交
400 401
    return Status::OK();
}
402

G
groot 已提交
403
Status
C
Cai Yudong 已提交
404 405 406
DBImpl::DropIndex(const std::string& collection_name, const std::string& field_name) {
    CHECK_INITIALIZED;

G
groot 已提交
407
    LOG_ENGINE_DEBUG_ << "Drop index for collection: " << collection_name << " field: " << field_name;
C
Cai Yudong 已提交
408 409 410

    STATUS_CHECK(DeleteSnapshotIndex(collection_name, field_name));

411 412 413 414
    snapshot::ScopedSnapshotT ss;
    STATUS_CHECK(snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_name));
    std::set<int64_t> collection_ids = {ss->GetCollectionId()};
    StartMergeTask(collection_ids, true);
G
groot 已提交
415

C
Cai Yudong 已提交
416 417 418 419
    return Status::OK();
}

Status
G
groot 已提交
420
DBImpl::DescribeIndex(const std::string& collection_name, const std::string& field_name, CollectionIndex& index) {
C
Cai Yudong 已提交
421 422
    CHECK_INITIALIZED;

G
groot 已提交
423
    LOG_ENGINE_DEBUG_ << "Describe index for collection: " << collection_name << " field: " << field_name;
C
Cai Yudong 已提交
424

G
groot 已提交
425
    STATUS_CHECK(GetSnapshotIndex(collection_name, field_name, index));
C
Cai Yudong 已提交
426 427 428 429 430

    return Status::OK();
}

Status
G
groot 已提交
431
DBImpl::Insert(const std::string& collection_name, const std::string& partition_name, DataChunkPtr& data_chunk,
G
groot 已提交
432
               idx_t op_id) {
G
groot 已提交
433
    CHECK_INITIALIZED;
434

G
groot 已提交
435 436 437
    if (data_chunk == nullptr) {
        return Status(DB_ERROR, "Null pointer");
    }
438

G
groot 已提交
439 440 441
    snapshot::ScopedSnapshotT ss;
    STATUS_CHECK(snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_name));

442 443
    auto partition = ss->GetPartition(partition_name);
    if (partition == nullptr) {
G
groot 已提交
444
        return Status(DB_NOT_FOUND, "Fail to get partition " + partition_name);
445 446
    }

G
groot 已提交
447
    auto id_field = ss->GetField(FIELD_UID);
G
groot 已提交
448 449 450 451
    if (id_field == nullptr) {
        return Status(DB_ERROR, "Field '_id' not found");
    }

452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482
    // check field names
    auto field_names = ss->GetFieldNames();
    std::unordered_set<std::string> collection_field_names;
    for (auto& name : field_names) {
        collection_field_names.insert(name);
    }
    collection_field_names.erase(engine::FIELD_UID);

    std::unordered_set<std::string> chunk_field_names;
    for (auto& pair : data_chunk->fixed_fields_) {
        chunk_field_names.insert(pair.first);
    }
    for (auto& pair : data_chunk->variable_fields_) {
        chunk_field_names.insert(pair.first);
    }
    chunk_field_names.erase(engine::FIELD_UID);

    if (collection_field_names.size() != chunk_field_names.size()) {
        std::string msg = "Collection has " + std::to_string(collection_field_names.size()) +
                          " fields while the insert data has " + std::to_string(chunk_field_names.size()) + " fields";
        return Status(DB_ERROR, msg);
    } else {
        for (auto& name : chunk_field_names) {
            if (collection_field_names.find(name) == collection_field_names.end()) {
                std::string msg = "The field " + name + " is not defined in collection mapping";
                return Status(DB_ERROR, msg);
            }
        }
    }

    // check id field existence
G
groot 已提交
483
    auto& params = ss->GetCollection()->GetParams();
G
groot 已提交
484 485 486 487 488 489
    bool auto_increment = true;
    if (params.find(PARAM_UID_AUTOGEN) != params.end()) {
        auto_increment = params[PARAM_UID_AUTOGEN];
    }

    FIXEDX_FIELD_MAP& fields = data_chunk->fixed_fields_;
G
groot 已提交
490
    auto pair = fields.find(engine::FIELD_UID);
G
groot 已提交
491
    if (auto_increment) {
492
        // id is auto generated, but client provides id, return error
G
groot 已提交
493 494 495
        if (pair != fields.end() && pair->second != nullptr) {
            return Status(DB_ERROR, "Field '_id' is auto increment, no need to provide id");
        }
G
groot 已提交
496
    } else {
497
        // id is not auto generated, but client doesn't provide id, return error
G
groot 已提交
498 499 500 501 502
        if (pair == fields.end() || pair->second == nullptr) {
            return Status(DB_ERROR, "Field '_id' is user defined");
        }
    }

503 504 505 506 507 508
    // consume the data chunk
    DataChunkPtr consume_chunk = std::make_shared<DataChunk>();
    consume_chunk->count_ = data_chunk->count_;
    consume_chunk->fixed_fields_.swap(data_chunk->fixed_fields_);
    consume_chunk->variable_fields_.swap(data_chunk->variable_fields_);

G
groot 已提交
509 510
    // generate id
    if (auto_increment) {
G
groot 已提交
511 512
        SafeIDGenerator& id_generator = SafeIDGenerator::GetInstance();
        IDNumbers ids;
513
        STATUS_CHECK(id_generator.GetNextIDNumbers(consume_chunk->count_, ids));
G
groot 已提交
514 515 516
        BinaryDataPtr id_data = std::make_shared<BinaryData>();
        id_data->data_.resize(ids.size() * sizeof(int64_t));
        memcpy(id_data->data_.data(), ids.data(), ids.size() * sizeof(int64_t));
517 518 519 520 521 522
        consume_chunk->fixed_fields_[engine::FIELD_UID] = id_data;
        data_chunk->fixed_fields_[engine::FIELD_UID] = id_data;  // return generated id to customer;
    } else {
        BinaryDataPtr id_data = std::make_shared<BinaryData>();
        id_data->data_ = consume_chunk->fixed_fields_[engine::FIELD_UID]->data_;
        data_chunk->fixed_fields_[engine::FIELD_UID] = id_data;  // return the id created by client
G
groot 已提交
523 524
    }

525
    // do insert
G
groot 已提交
526
    int64_t collection_id = ss->GetCollectionId();
527
    int64_t partition_id = partition->GetID();
G
groot 已提交
528

529
    auto status = mem_mgr_->InsertEntities(collection_id, partition_id, consume_chunk, op_id);
G
groot 已提交
530 531 532 533 534 535 536 537 538 539
    if (!status.ok()) {
        return status;
    }
    if (mem_mgr_->GetCurrentMem() > options_.insert_buffer_size_) {
        LOG_ENGINE_DEBUG_ << LogOut("[%s][%ld] ", "insert", 0) << "Insert buffer size exceeds limit. Force flush";
        InternalFlush();
    }

    // metrics
    milvus::server::CollectInsertMetrics metrics(data_chunk->count_, status);
540

541 542 543
    return Status::OK();
}

S
starlord 已提交
544
Status
C
Cai Yudong 已提交
545
DBImpl::GetEntityByID(const std::string& collection_name, const IDNumbers& id_array,
546 547
                      const std::vector<std::string>& field_names, std::vector<bool>& valid_row,
                      DataChunkPtr& data_chunk) {
C
Cai Yudong 已提交
548 549 550 551 552 553
    CHECK_INITIALIZED;

    snapshot::ScopedSnapshotT ss;
    STATUS_CHECK(snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_name));

    std::string dir_root = options_.meta_.path_;
554
    valid_row.resize(id_array.size(), false);
555 556
    auto handler =
        std::make_shared<GetEntityByIdSegmentHandler>(nullptr, ss, dir_root, id_array, field_names, valid_row);
C
Cai Yudong 已提交
557 558 559 560 561 562 563 564
    handler->Iterate();
    STATUS_CHECK(handler->GetStatus());

    data_chunk = handler->data_chunk_;
    return Status::OK();
}

Status
G
groot 已提交
565
DBImpl::DeleteEntityByID(const std::string& collection_name, const engine::IDNumbers& entity_ids, idx_t op_id) {
G
groot 已提交
566
    CHECK_INITIALIZED;
S
starlord 已提交
567

G
groot 已提交
568 569 570 571 572 573
    snapshot::ScopedSnapshotT ss;
    auto status = snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_name);
    if (!status.ok()) {
        LOG_WAL_ERROR_ << LogOut("[%s][%ld] ", "delete", 0) << "Get snapshot fail: " << status.message();
        return status;
    }
G
groot 已提交
574

G
groot 已提交
575
    status = mem_mgr_->DeleteEntities(ss->GetCollectionId(), entity_ids, op_id);
G
groot 已提交
576 577
    return status;
}
G
groot 已提交
578

G
groot 已提交
579
Status
C
Cai Yudong 已提交
580 581
DBImpl::Query(const server::ContextPtr& context, const query::QueryPtr& query_ptr, engine::QueryResultPtr& result) {
    CHECK_INITIALIZED;
G
groot 已提交
582

G
groot 已提交
583
    TimeRecorder rc("DBImpl::Query");
Y
Yu Kun 已提交
584

Y
yukun 已提交
585 586 587 588
    if (!query_ptr->root) {
        return Status{DB_ERROR, "BinaryQuery is null"};
    }

589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633
    snapshot::ScopedSnapshotT ss;
    STATUS_CHECK(snapshot::Snapshots::GetInstance().GetSnapshot(ss, query_ptr->collection_id));

    /* collect all valid segment */
    std::vector<SegmentVisitor::Ptr> segment_visitors;
    auto exec = [&](const snapshot::Segment::Ptr& segment, snapshot::SegmentIterator* handler) -> Status {
        auto p_id = segment->GetPartitionId();
        auto p_ptr = ss->GetResource<snapshot::Partition>(p_id);
        auto& p_name = p_ptr->GetName();

        /* check partition match pattern */
        bool match = false;
        if (query_ptr->partitions.empty()) {
            match = true;
        } else {
            for (auto& pattern : query_ptr->partitions) {
                if (StringHelpFunctions::IsRegexMatch(p_name, pattern)) {
                    match = true;
                    break;
                }
            }
        }

        if (match) {
            auto visitor = SegmentVisitor::Build(ss, segment->GetID());
            if (!visitor) {
                return Status(milvus::SS_ERROR, "Cannot build segment visitor");
            }
            segment_visitors.push_back(visitor);
        }
        return Status::OK();
    };

    auto segment_iter = std::make_shared<snapshot::SegmentIterator>(ss, exec);
    segment_iter->Iterate();
    STATUS_CHECK(segment_iter->GetStatus());

    LOG_ENGINE_DEBUG_ << LogOut("Engine query begin, segment count: %ld", segment_visitors.size());

    engine::snapshot::IDS_TYPE segment_ids;
    for (auto& sv : segment_visitors) {
        segment_ids.emplace_back(sv->GetSegment()->GetID());
    }

    scheduler::SearchJobPtr job = std::make_shared<scheduler::SearchJob>(nullptr, ss, options_, query_ptr, segment_ids);
634

G
groot 已提交
635
    cache::CpuCacheMgr::GetInstance().PrintInfo();  // print cache info before query
C
Cai Yudong 已提交
636 637 638
    /* put search job to scheduler and wait job finish */
    scheduler::JobMgrInst::GetInstance()->Put(job);
    job->WaitFinish();
G
groot 已提交
639
    cache::CpuCacheMgr::GetInstance().PrintInfo();  // print cache info after query
G
groot 已提交
640

C
Cai Yudong 已提交
641 642
    if (!job->status().ok()) {
        return job->status();
643 644
    }

Y
yukun 已提交
645 646 647
    if (job->query_result()) {
        result = job->query_result();
    }
Y
yukun 已提交
648 649

    // step 4: get entities by result ids
Y
yukun 已提交
650
    std::vector<bool> valid_row;
Y
yukun 已提交
651 652 653 654
    if (!query_ptr->field_names.empty()) {
        STATUS_CHECK(GetEntityByID(query_ptr->collection_id, result->result_ids_, query_ptr->field_names, valid_row,
                                   result->data_chunk_));
    }
Y
yukun 已提交
655 656

    // step 5: filter entities by field names
657 658 659 660 661 662 663 664 665 666 667 668 669
    //    std::vector<engine::AttrsData> filter_attrs;
    //    for (auto attr : result.attrs_) {
    //        AttrsData attrs_data;
    //        attrs_data.attr_type_ = attr.attr_type_;
    //        attrs_data.attr_count_ = attr.attr_count_;
    //        attrs_data.id_array_ = attr.id_array_;
    //        for (auto& name : field_names) {
    //            if (attr.attr_data_.find(name) != attr.attr_data_.end()) {
    //                attrs_data.attr_data_.insert(std::make_pair(name, attr.attr_data_.at(name)));
    //            }
    //        }
    //        filter_attrs.emplace_back(attrs_data);
    //    }
Y
yukun 已提交
670

671 672
    rc.ElapseFromBegin("Engine query totally cost");

G
groot 已提交
673
    // tracer.Context()->GetTraceContext()->GetSpan()->Finish();
674 675 676 677

    return Status::OK();
}

C
Cai Yudong 已提交
678 679 680 681 682 683 684 685
Status
DBImpl::ListIDInSegment(const std::string& collection_name, int64_t segment_id, IDNumbers& entity_ids) {
    CHECK_INITIALIZED;

    snapshot::ScopedSnapshotT ss;
    STATUS_CHECK(snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_name));

    auto read_visitor = engine::SegmentVisitor::Build(ss, segment_id);
686 687 688
    if (!read_visitor) {
        return Status(SERVER_FILE_NOT_FOUND, "Segment not exist");
    }
C
Cai Yudong 已提交
689 690 691 692 693
    segment::SegmentReaderPtr segment_reader =
        std::make_shared<segment::SegmentReader>(options_.meta_.path_, read_visitor);

    STATUS_CHECK(segment_reader->LoadUids(entity_ids));

G
groot 已提交
694 695 696 697 698
    // remove delete id from the id list
    segment::DeletedDocsPtr deleted_docs_ptr;
    STATUS_CHECK(segment_reader->LoadDeletedDocs(deleted_docs_ptr));
    if (deleted_docs_ptr) {
        const std::vector<offset_t>& delete_ids = deleted_docs_ptr->GetDeletedDocs();
699 700 701
        std::vector<offset_t> temp_ids;
        temp_ids.reserve(delete_ids.size());
        std::copy(delete_ids.begin(), delete_ids.end(), std::back_inserter(temp_ids));
C
chen qingxiang 已提交
702
        std::sort(temp_ids.begin(), temp_ids.end(), std::greater<>());
703
        for (auto offset : temp_ids) {
G
groot 已提交
704 705 706 707
            entity_ids.erase(entity_ids.begin() + offset, entity_ids.begin() + offset + 1);
        }
    }

C
Cai Yudong 已提交
708 709 710 711 712 713 714 715 716 717 718
    return Status::OK();
}

Status
DBImpl::LoadCollection(const server::ContextPtr& context, const std::string& collection_name,
                       const std::vector<std::string>& field_names, bool force) {
    CHECK_INITIALIZED;

    snapshot::ScopedSnapshotT ss;
    STATUS_CHECK(snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_name));

G
groot 已提交
719
    auto handler = std::make_shared<LoadCollectionHandler>(nullptr, ss, options_.meta_.path_, field_names, force);
C
Cai Yudong 已提交
720
    handler->Iterate();
G
groot 已提交
721
    STATUS_CHECK(handler->GetStatus());
C
Cai Yudong 已提交
722

G
groot 已提交
723
    return Status::OK();
C
Cai Yudong 已提交
724 725 726 727 728 729 730 731 732
}

Status
DBImpl::Flush(const std::string& collection_name) {
    if (!initialized_.load(std::memory_order_acquire)) {
        return SHUTDOWN_ERROR;
    }

    Status status;
C
cqy123456 已提交
733
    bool has_collection = false;
C
Cai Yudong 已提交
734 735 736 737 738 739 740 741 742 743
    status = HasCollection(collection_name, has_collection);
    if (!status.ok()) {
        return status;
    }
    if (!has_collection) {
        LOG_ENGINE_ERROR_ << "Collection to flush does not exist: " << collection_name;
        return Status(DB_NOT_FOUND, "Collection to flush does not exist");
    }

    LOG_ENGINE_DEBUG_ << "Begin flush collection: " << collection_name;
G
groot 已提交
744
    InternalFlush(collection_name);
C
Cai Yudong 已提交
745 746 747 748 749 750 751 752 753 754 755 756
    LOG_ENGINE_DEBUG_ << "End flush collection: " << collection_name;

    return status;
}

Status
DBImpl::Flush() {
    if (!initialized_.load(std::memory_order_acquire)) {
        return SHUTDOWN_ERROR;
    }

    LOG_ENGINE_DEBUG_ << "Begin flush all collections";
G
groot 已提交
757
    InternalFlush();
C
Cai Yudong 已提交
758 759
    LOG_ENGINE_DEBUG_ << "End flush all collections";

G
groot 已提交
760
    return Status::OK();
C
Cai Yudong 已提交
761 762 763 764 765 766 767 768 769 770 771 772 773
}

Status
DBImpl::Compact(const std::shared_ptr<server::Context>& context, const std::string& collection_name, double threshold) {
    if (!initialized_.load(std::memory_order_acquire)) {
        return SHUTDOWN_ERROR;
    }

    LOG_ENGINE_DEBUG_ << "Before compacting, wait for build index thread to finish...";
    const std::lock_guard<std::mutex> index_lock(build_index_mutex_);
    const std::lock_guard<std::mutex> merge_lock(flush_merge_compact_mutex_);

    Status status;
C
cqy123456 已提交
774
    bool has_collection = false;
C
Cai Yudong 已提交
775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811
    status = HasCollection(collection_name, has_collection);
    if (!status.ok()) {
        return status;
    }
    if (!has_collection) {
        LOG_ENGINE_ERROR_ << "Collection to compact does not exist: " << collection_name;
        return Status(DB_NOT_FOUND, "Collection to compact does not exist");
    }

    snapshot::ScopedSnapshotT latest_ss;
    status = snapshot::Snapshots::GetInstance().GetSnapshot(latest_ss, collection_name);
    if (!status.ok()) {
        return status;
    }

    auto& segments = latest_ss->GetResources<snapshot::Segment>();
    for (auto& kv : segments) {
        // client break the connection, no need to continue
        if (context && context->IsConnectionBroken()) {
            LOG_ENGINE_DEBUG_ << "Client connection broken, stop compact operation";
            break;
        }

        snapshot::ID_TYPE segment_id = kv.first;
        auto read_visitor = engine::SegmentVisitor::Build(latest_ss, segment_id);
        segment::SegmentReaderPtr segment_reader =
            std::make_shared<segment::SegmentReader>(options_.meta_.path_, read_visitor);

        segment::DeletedDocsPtr deleted_docs;
        status = segment_reader->LoadDeletedDocs(deleted_docs);
        if (!status.ok() || deleted_docs == nullptr) {
            continue;  // no deleted docs, no need to compact
        }

        auto segment_commit = latest_ss->GetSegmentCommitBySegmentId(segment_id);
        auto row_count = segment_commit->GetRowCount();
        if (row_count == 0) {
812 813 814 815 816 817 818 819 820
            snapshot::OperationContext drop_seg_context;
            auto seg = latest_ss->GetResource<snapshot::Segment>(segment_id);
            drop_seg_context.prev_segment = seg;
            auto drop_op = std::make_shared<snapshot::DropSegmentOperation>(drop_seg_context, latest_ss);
            status = drop_op->Push();
            if (!status.ok()) {
                LOG_ENGINE_ERROR_ << "Compact failed for segment " << segment_reader->GetSegmentPath() << ": "
                                  << status.message();
            }
C
Cai Yudong 已提交
821 822 823
            continue;
        }

G
groot 已提交
824
        auto deleted_count = deleted_docs->GetCount();
825
        if (double(deleted_count) / (row_count + deleted_count) < threshold) {
C
Cai Yudong 已提交
826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841
            continue;  // no need to compact
        }

        snapshot::IDS_TYPE ids = {segment_id};
        MergeTask merge_task(options_, latest_ss, ids);
        status = merge_task.Execute();
        if (!status.ok()) {
            LOG_ENGINE_ERROR_ << "Compact failed for segment " << segment_reader->GetSegmentPath() << ": "
                              << status.message();
            continue;  // skip this file and try compact next one
        }
    }

    return status;
}

G
groot 已提交
842 843 844
////////////////////////////////////////////////////////////////////////////////
// Internal APIs
////////////////////////////////////////////////////////////////////////////////
S
starlord 已提交
845
void
G
groot 已提交
846 847
DBImpl::InternalFlush(const std::string& collection_name, bool merge) {
    Status status;
848
    std::set<int64_t> flushed_collection_ids;
G
groot 已提交
849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864
    if (!collection_name.empty()) {
        // flush one collection
        snapshot::ScopedSnapshotT ss;
        status = snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_name);
        if (!status.ok()) {
            LOG_WAL_ERROR_ << LogOut("[%s][%ld] ", "flush", 0) << "Get snapshot fail: " << status.message();
            return;
        }

        {
            const std::lock_guard<std::mutex> lock(flush_merge_compact_mutex_);
            int64_t collection_id = ss->GetCollectionId();
            status = mem_mgr_->Flush(collection_id);
            if (!status.ok()) {
                return;
            }
865
            flushed_collection_ids.insert(collection_id);
G
groot 已提交
866 867 868 869 870
        }
    } else {
        // flush all collections
        {
            const std::lock_guard<std::mutex> lock(flush_merge_compact_mutex_);
871
            status = mem_mgr_->Flush(flushed_collection_ids);
G
groot 已提交
872 873 874 875 876 877 878
            if (!status.ok()) {
                return;
            }
        }
    }

    if (merge) {
879
        StartMergeTask(flushed_collection_ids);
G
groot 已提交
880
    }
G
groot 已提交
881 882 883 884
}

void
DBImpl::TimingFlushThread() {
G
groot 已提交
885
    SetThreadName("timing_flush");
Y
yu yunfeng 已提交
886
    server::SystemInfo::GetInstance().Init();
X
Xu Peng 已提交
887
    while (true) {
888
        if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
889
            LOG_ENGINE_DEBUG_ << "DB background flush thread exit";
G
groot 已提交
890 891
            break;
        }
X
Xu Peng 已提交
892

G
groot 已提交
893 894 895 896 897 898
        InternalFlush();
        if (options_.auto_flush_interval_ > 0) {
            swn_flush_.Wait_For(std::chrono::seconds(options_.auto_flush_interval_));
        } else {
            swn_flush_.Wait();
        }
899 900 901
    }
}

S
starlord 已提交
902 903
void
DBImpl::StartMetricTask() {
G
groot 已提交
904
    server::Metrics::GetInstance().KeepingAliveCounterIncrement(BACKGROUND_METRIC_INTERVAL);
G
groot 已提交
905 906
    int64_t cache_usage = cache::CpuCacheMgr::GetInstance().CacheUsage();
    int64_t cache_total = cache::CpuCacheMgr::GetInstance().CacheCapacity();
S
shengjh 已提交
907 908
    fiu_do_on("DBImpl.StartMetricTask.InvalidTotalCache", cache_total = 0);

J
JinHai-CN 已提交
909 910 911 912 913 914 915
    if (cache_total > 0) {
        double cache_usage_double = cache_usage;
        server::Metrics::GetInstance().CpuCacheUsageGaugeSet(cache_usage_double * 100 / cache_total);
    } else {
        server::Metrics::GetInstance().CpuCacheUsageGaugeSet(0);
    }

Y
Yu Kun 已提交
916
    server::Metrics::GetInstance().GpuCacheUsageGaugeSet();
G
groot 已提交
917 918 919 920
    /* SS TODO */
    // uint64_t size;
    // Size(size);
    // server::Metrics::GetInstance().DataFileSizeGaugeSet(size);
G
groot 已提交
921 922 923 924 925
    server::Metrics::GetInstance().CPUUsagePercentSet();
    server::Metrics::GetInstance().RAMUsagePercentSet();
    server::Metrics::GetInstance().GPUPercentGaugeSet();
    server::Metrics::GetInstance().GPUMemoryUsageGaugeSet();
    server::Metrics::GetInstance().OctetsSet();
S
starlord 已提交
926

K
kun yu 已提交
927
    server::Metrics::GetInstance().CPUCoreUsagePercentSet();
K
kun yu 已提交
928 929
    server::Metrics::GetInstance().GPUTemperature();
    server::Metrics::GetInstance().CPUTemperature();
930
    server::Metrics::GetInstance().PushToGateway();
G
groot 已提交
931 932
}

S
starlord 已提交
933
void
G
groot 已提交
934
DBImpl::TimingMetricThread() {
G
groot 已提交
935
    SetThreadName("timing_metric");
G
groot 已提交
936 937
    server::SystemInfo::GetInstance().Init();
    while (true) {
938
        if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
939
            LOG_ENGINE_DEBUG_ << "DB background metric thread exit";
S
starlord 已提交
940 941
            break;
        }
Z
update  
zhiru 已提交
942

G
groot 已提交
943 944
        swn_metric_.Wait_For(std::chrono::seconds(BACKGROUND_METRIC_INTERVAL));
        StartMetricTask();
Z
update  
zhiru 已提交
945
    }
G
groot 已提交
946
}
X
Xu Peng 已提交
947

S
starlord 已提交
948
void
949
DBImpl::StartBuildIndexTask(const std::vector<std::string>& collection_names, bool reset_retry_times) {
S
starlord 已提交
950
    // build index has been finished?
951 952 953 954 955 956 957
    {
        std::lock_guard<std::mutex> lck(index_result_mutex_);
        if (!index_thread_results_.empty()) {
            std::chrono::milliseconds span(10);
            if (index_thread_results_.back().wait_for(span) == std::future_status::ready) {
                index_thread_results_.pop_back();
            }
G
groot 已提交
958 959 960
        }
    }

S
starlord 已提交
961
    // add new build index task
962 963 964
    {
        std::lock_guard<std::mutex> lck(index_result_mutex_);
        if (index_thread_results_.empty()) {
965 966 967 968 969
            if (reset_retry_times) {
                std::lock_guard<std::mutex> lock(index_retry_mutex_);
                index_retry_map_.clear();  // reset index retry times
            }

G
groot 已提交
970 971
            index_thread_results_.push_back(
                index_thread_pool_.enqueue(&DBImpl::BackgroundBuildIndexTask, this, collection_names));
972
        }
G
groot 已提交
973
    }
X
Xu Peng 已提交
974 975
}

S
starlord 已提交
976
void
G
groot 已提交
977
DBImpl::BackgroundBuildIndexTask(std::vector<std::string> collection_names) {
G
groot 已提交
978 979
    SetThreadName("build_index");

P
peng.xu 已提交
980
    std::unique_lock<std::mutex> lock(build_index_mutex_);
981

C
chen qingxiang 已提交
982
    for (const auto& collection_name : collection_names) {
G
groot 已提交
983 984 985 986 987 988
        snapshot::ScopedSnapshotT latest_ss;
        auto status = snapshot::Snapshots::GetInstance().GetSnapshot(latest_ss, collection_name);
        if (!status.ok()) {
            return;
        }
        SnapshotVisitor ss_visitor(latest_ss);
G
groot 已提交
989

G
groot 已提交
990 991
        snapshot::IDS_TYPE segment_ids;
        ss_visitor.SegmentsToIndex("", segment_ids);
G
groot 已提交
992 993 994
        if (segment_ids.empty()) {
            continue;
        }
T
Tinkerrr 已提交
995

996 997 998 999 1000 1001 1002 1003
        // check index retry times
        snapshot::ID_TYPE collection_id = latest_ss->GetCollectionId();
        IgnoreIndexFailedSegments(collection_id, segment_ids);
        if (segment_ids.empty()) {
            continue;
        }

        // start build index job
G
groot 已提交
1004
        LOG_ENGINE_DEBUG_ << "Create BuildIndexJob for " << segment_ids.size() << " segments of " << collection_name;
G
groot 已提交
1005
        cache::CpuCacheMgr::GetInstance().PrintInfo();  // print cache info before build index
G
groot 已提交
1006
        scheduler::BuildIndexJobPtr job = std::make_shared<scheduler::BuildIndexJob>(latest_ss, options_, segment_ids);
G
groot 已提交
1007 1008
        scheduler::JobMgrInst::GetInstance()->Put(job);
        job->WaitFinish();
G
groot 已提交
1009
        cache::CpuCacheMgr::GetInstance().PrintInfo();  // print cache info after build index
G
groot 已提交
1010

1011 1012 1013 1014
        // record failed segments, avoid build index hang
        snapshot::IDS_TYPE& failed_ids = job->FailedSegments();
        MarkIndexFailedSegments(collection_id, failed_ids);

G
groot 已提交
1015 1016
        if (!job->status().ok()) {
            LOG_ENGINE_ERROR_ << job->status().message();
G
groot 已提交
1017 1018 1019 1020
        }
    }
}

G
groot 已提交
1021 1022
void
DBImpl::TimingIndexThread() {
G
groot 已提交
1023
    SetThreadName("timing_index");
G
groot 已提交
1024 1025 1026 1027 1028
    server::SystemInfo::GetInstance().Init();
    while (true) {
        if (!initialized_.load(std::memory_order_acquire)) {
            WaitMergeFileFinish();
            WaitBuildIndexFinish();
G
groot 已提交
1029

G
groot 已提交
1030 1031
            LOG_ENGINE_DEBUG_ << "DB background thread exit";
            break;
G
groot 已提交
1032 1033
        }

G
groot 已提交
1034
        swn_index_.Wait_For(std::chrono::seconds(BACKGROUND_INDEX_INTERVAL));
G
groot 已提交
1035

G
groot 已提交
1036 1037 1038
        std::vector<std::string> collection_names;
        snapshot::Snapshots::GetInstance().GetCollectionNames(collection_names);
        WaitMergeFileFinish();
1039
        StartBuildIndexTask(collection_names, false);
G
groot 已提交
1040 1041
    }
}
G
groot 已提交
1042

G
groot 已提交
1043 1044 1045 1046 1047 1048 1049 1050
void
DBImpl::WaitBuildIndexFinish() {
    //    LOG_ENGINE_DEBUG_ << "Begin WaitBuildIndexFinish";
    std::lock_guard<std::mutex> lck(index_result_mutex_);
    for (auto& iter : index_thread_results_) {
        iter.wait();
    }
    //    LOG_ENGINE_DEBUG_ << "End WaitBuildIndexFinish";
G
groot 已提交
1051 1052
}

1053
void
1054
DBImpl::StartMergeTask(const std::set<int64_t>& collection_ids, bool force_merge_all) {
G
groot 已提交
1055 1056 1057 1058 1059 1060 1061
    // merge task has been finished?
    {
        std::lock_guard<std::mutex> lck(merge_result_mutex_);
        if (!merge_thread_results_.empty()) {
            std::chrono::milliseconds span(10);
            if (merge_thread_results_.back().wait_for(span) == std::future_status::ready) {
                merge_thread_results_.pop_back();
1062
            }
1063
        }
G
groot 已提交
1064
    }
1065

G
groot 已提交
1066 1067 1068 1069 1070 1071
    // add new merge task
    {
        std::lock_guard<std::mutex> lck(merge_result_mutex_);
        if (merge_thread_results_.empty()) {
            // start merge file thread
            merge_thread_results_.push_back(
1072
                merge_thread_pool_.enqueue(&DBImpl::BackgroundMerge, this, collection_ids, force_merge_all));
1073 1074 1075 1076
        }
    }
}

G
groot 已提交
1077
void
1078
DBImpl::BackgroundMerge(std::set<int64_t> collection_ids, bool force_merge_all) {
G
groot 已提交
1079
    SetThreadName("merge");
G
groot 已提交
1080

1081
    for (auto& collection_id : collection_ids) {
G
groot 已提交
1082 1083
        const std::lock_guard<std::mutex> lock(flush_merge_compact_mutex_);

G
groot 已提交
1084 1085
        MergeStrategyType type = force_merge_all ? MergeStrategyType::SIMPLE : MergeStrategyType::LAYERED;
        auto status = merge_mgr_ptr_->MergeSegments(collection_id, type);
G
groot 已提交
1086
        if (!status.ok()) {
1087
            LOG_ENGINE_ERROR_ << "Failed to get merge files for collection id: " << collection_id
G
groot 已提交
1088
                              << " reason:" << status.message();
G
groot 已提交
1089 1090 1091
        }

        if (!initialized_.load(std::memory_order_acquire)) {
1092
            LOG_ENGINE_DEBUG_ << "Server will shutdown, skip merge action for collection id: " << collection_id;
G
groot 已提交
1093 1094 1095 1096 1097
            break;
        }
    }
}

1098
void
G
groot 已提交
1099 1100 1101 1102 1103 1104 1105
DBImpl::WaitMergeFileFinish() {
    //    LOG_ENGINE_DEBUG_ << "Begin WaitMergeFileFinish";
    std::lock_guard<std::mutex> lck(merge_result_mutex_);
    for (auto& iter : merge_thread_results_) {
        iter.wait();
    }
    //    LOG_ENGINE_DEBUG_ << "End WaitMergeFileFinish";
1106 1107
}

1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 1125
void
DBImpl::SuspendIfFirst() {
    std::lock_guard<std::mutex> lock(suspend_build_mutex_);
    if (++live_search_num_ == 1) {
        LOG_ENGINE_TRACE_ << "live_search_num_: " << live_search_num_;
        knowhere::BuilderSuspend();
    }
}

void
DBImpl::ResumeIfLast() {
    std::lock_guard<std::mutex> lock(suspend_build_mutex_);
    if (--live_search_num_ == 0) {
        LOG_ENGINE_TRACE_ << "live_search_num_: " << live_search_num_;
        knowhere::BuildResume();
    }
}

C
Cai Yudong 已提交
1126 1127 1128 1129 1130 1131 1132
void
DBImpl::ConfigUpdate(const std::string& name) {
    if (name == "storage.auto_flush_interval") {
        options_.auto_flush_interval_ = config.storage.auto_flush_interval();
    }
}

1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154
void
DBImpl::MarkIndexFailedSegments(snapshot::ID_TYPE collection_id, const snapshot::IDS_TYPE& failed_ids) {
    std::lock_guard<std::mutex> lock(index_retry_mutex_);
    SegmentIndexRetryMap& retry_map = index_retry_map_[collection_id];
    for (auto& id : failed_ids) {
        retry_map[id]++;
    }
}

void
DBImpl::IgnoreIndexFailedSegments(snapshot::ID_TYPE collection_id, snapshot::IDS_TYPE& segment_ids) {
    std::lock_guard<std::mutex> lock(index_retry_mutex_);
    SegmentIndexRetryMap& retry_map = index_retry_map_[collection_id];
    snapshot::IDS_TYPE segment_ids_to_build;
    for (auto id : segment_ids) {
        if (retry_map[id] < BUILD_INEDX_RETRY_TIMES) {
            segment_ids_to_build.push_back(id);
        }
    }
    segment_ids.swap(segment_ids_to_build);
}

S
starlord 已提交
1155 1156
}  // namespace engine
}  // namespace milvus