DBImpl.cpp 92.1 KB
Newer Older
1
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
J
jinhai 已提交
2
//
3 4
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
J
jinhai 已提交
5
//
6 7 8 9 10
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
J
jinhai 已提交
11

S
starlord 已提交
12
#include "db/DBImpl.h"
Z
Zhiru Zhu 已提交
13 14

#include <assert.h>
15
#include <fiu-local.h>
Z
Zhiru Zhu 已提交
16 17 18 19 20

#include <algorithm>
#include <boost/filesystem.hpp>
#include <chrono>
#include <cstring>
21
#include <functional>
Z
Zhiru Zhu 已提交
22
#include <iostream>
23
#include <limits>
24
#include <queue>
Z
Zhiru Zhu 已提交
25 26
#include <set>
#include <thread>
27
#include <unordered_map>
Z
Zhiru Zhu 已提交
28 29
#include <utility>

S
starlord 已提交
30
#include "Utils.h"
S
starlord 已提交
31 32
#include "cache/CpuCacheMgr.h"
#include "cache/GpuCacheMgr.h"
33
#include "db/IDGenerator.h"
S
starlord 已提交
34
#include "engine/EngineFactory.h"
35
#include "index/thirdparty/faiss/utils/distances.h"
S
starlord 已提交
36
#include "insert/MemMenagerFactory.h"
S
starlord 已提交
37
#include "meta/MetaConsts.h"
S
starlord 已提交
38 39
#include "meta/MetaFactory.h"
#include "meta/SqliteMetaImpl.h"
G
groot 已提交
40
#include "metrics/Metrics.h"
G
groot 已提交
41
#include "scheduler/Definition.h"
S
starlord 已提交
42
#include "scheduler/SchedInst.h"
Y
Yu Kun 已提交
43
#include "scheduler/job/BuildIndexJob.h"
S
starlord 已提交
44 45
#include "scheduler/job/DeleteJob.h"
#include "scheduler/job/SearchJob.h"
46 47 48
#include "segment/SegmentReader.h"
#include "segment/SegmentWriter.h"
#include "utils/Exception.h"
S
starlord 已提交
49
#include "utils/Log.h"
G
groot 已提交
50
#include "utils/StringHelpFunctions.h"
S
starlord 已提交
51
#include "utils/TimeRecorder.h"
52 53
#include "utils/ValidationUtil.h"
#include "wal/WalDefinations.h"
X
Xu Peng 已提交
54

55 56
#include "search/TaskInst.h"

J
jinhai 已提交
57
namespace milvus {
X
Xu Peng 已提交
58
namespace engine {
X
Xu Peng 已提交
59

G
groot 已提交
60
namespace {
G
groot 已提交
61 62
constexpr uint64_t BACKGROUND_METRIC_INTERVAL = 1;
constexpr uint64_t BACKGROUND_INDEX_INTERVAL = 1;
G
groot 已提交
63
constexpr uint64_t WAIT_BUILD_INDEX_INTERVAL = 5;
G
groot 已提交
64

G
groot 已提交
65
static const Status SHUTDOWN_ERROR = Status(DB_ERROR, "Milvus server is shutdown!");
G
groot 已提交
66

S
starlord 已提交
67
}  // namespace
G
groot 已提交
68

Y
Yu Kun 已提交
69
DBImpl::DBImpl(const DBOptions& options)
70
    : options_(options), initialized_(false), merge_thread_pool_(1, 1), index_thread_pool_(1, 1) {
S
starlord 已提交
71
    meta_ptr_ = MetaFactory::Build(options.meta_, options.mode_);
Z
zhiru 已提交
72
    mem_mgr_ = MemManagerFactory::Build(meta_ptr_, options_);
73 74 75 76 77 78 79 80 81 82

    if (options_.wal_enable_) {
        wal::MXLogConfiguration mxlog_config;
        mxlog_config.recovery_error_ignore = options_.recovery_error_ignore_;
        // 2 buffers in the WAL
        mxlog_config.buffer_size = options_.buffer_size_ / 2;
        mxlog_config.mxlog_path = options_.mxlog_path_;
        wal_mgr_ = std::make_shared<wal::WalManager>(mxlog_config);
    }

83 84
    SetIdentity("DBImpl");
    AddCacheInsertDataListener();
85
    AddUseBlasThresholdListener();
86

S
starlord 已提交
87 88 89 90 91 92 93
    Start();
}

DBImpl::~DBImpl() {
    Stop();
}

S
starlord 已提交
94
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////
S
starlord 已提交
95
// external api
S
starlord 已提交
96
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////
S
starlord 已提交
97 98
Status
DBImpl::Start() {
99
    if (initialized_.load(std::memory_order_acquire)) {
S
starlord 已提交
100 101 102
        return Status::OK();
    }

103
    // LOG_ENGINE_TRACE_ << "DB service start";
104
    initialized_.store(true, std::memory_order_release);
S
starlord 已提交
105

106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131
    // wal
    if (options_.wal_enable_) {
        auto error_code = DB_ERROR;
        if (wal_mgr_ != nullptr) {
            error_code = wal_mgr_->Init(meta_ptr_);
        }
        if (error_code != WAL_SUCCESS) {
            throw Exception(error_code, "Wal init error!");
        }

        // recovery
        while (1) {
            wal::MXLogRecord record;
            auto error_code = wal_mgr_->GetNextRecovery(record);
            if (error_code != WAL_SUCCESS) {
                throw Exception(error_code, "Wal recovery error!");
            }
            if (record.type == wal::MXLogType::None) {
                break;
            }

            ExecWalRecord(record);
        }

        // for distribute version, some nodes are read only
        if (options_.mode_ != DBOptions::MODE::CLUSTER_READONLY) {
G
groot 已提交
132 133
            // background wal thread
            bg_wal_thread_ = std::thread(&DBImpl::BackgroundWalThread, this);
134 135 136 137
        }
    } else {
        // for distribute version, some nodes are read only
        if (options_.mode_ != DBOptions::MODE::CLUSTER_READONLY) {
G
groot 已提交
138 139
            // background flush thread
            bg_flush_thread_ = std::thread(&DBImpl::BackgroundFlushThread, this);
140
        }
Z
update  
zhiru 已提交
141
    }
S
starlord 已提交
142

G
groot 已提交
143 144 145 146 147 148 149 150 151
    // for distribute version, some nodes are read only
    if (options_.mode_ != DBOptions::MODE::CLUSTER_READONLY) {
        // background build index thread
        bg_index_thread_ = std::thread(&DBImpl::BackgroundIndexThread, this);
    }

    // background metric thread
    bg_metric_thread_ = std::thread(&DBImpl::BackgroundMetricThread, this);

S
starlord 已提交
152 153 154
    return Status::OK();
}

S
starlord 已提交
155 156
Status
DBImpl::Stop() {
157
    if (!initialized_.load(std::memory_order_acquire)) {
S
starlord 已提交
158 159
        return Status::OK();
    }
160

161
    initialized_.store(false, std::memory_order_release);
S
starlord 已提交
162

163 164
    if (options_.mode_ != DBOptions::MODE::CLUSTER_READONLY) {
        if (options_.wal_enable_) {
G
groot 已提交
165 166
            // wait wal thread finish
            swn_wal_.Notify();
167 168
            bg_wal_thread_.join();
        } else {
G
groot 已提交
169
            // flush all without merge
170 171 172 173
            wal::MXLogRecord record;
            record.type = wal::MXLogType::Flush;
            ExecWalRecord(record);

G
groot 已提交
174 175 176
            // wait flush thread finish
            swn_flush_.Notify();
            bg_flush_thread_.join();
177
        }
S
starlord 已提交
178

179 180
        WaitMergeFileFinish();

G
groot 已提交
181 182 183
        swn_index_.Notify();
        bg_index_thread_.join();

184
        meta_ptr_->CleanUpShadowFiles();
S
starlord 已提交
185 186
    }

G
groot 已提交
187 188 189 190
    // wait metric thread exit
    swn_metric_.Notify();
    bg_metric_thread_.join();

191
    // LOG_ENGINE_TRACE_ << "DB service stop";
S
starlord 已提交
192
    return Status::OK();
X
Xu Peng 已提交
193 194
}

S
starlord 已提交
195 196
Status
DBImpl::DropAll() {
S
starlord 已提交
197 198 199
    return meta_ptr_->DropAll();
}

S
starlord 已提交
200
Status
201
DBImpl::CreateCollection(meta::CollectionSchema& collection_schema) {
202
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
203
        return SHUTDOWN_ERROR;
S
starlord 已提交
204 205
    }

206
    meta::CollectionSchema temp_schema = collection_schema;
B
bigbraver 已提交
207
    temp_schema.index_file_size_ *= MB;  // store as MB
208
    if (options_.wal_enable_) {
209
        temp_schema.flush_lsn_ = wal_mgr_->CreateCollection(collection_schema.collection_id_);
210 211
    }

212
    return meta_ptr_->CreateCollection(temp_schema);
213 214
}

215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239
Status
DBImpl::CreateHybridCollection(meta::CollectionSchema& collection_schema, meta::hybrid::FieldsSchema& fields_schema) {
    if (!initialized_.load(std::memory_order_acquire)) {
        return SHUTDOWN_ERROR;
    }

    meta::CollectionSchema temp_schema = collection_schema;
    if (options_.wal_enable_) {
        // TODO(yukun): wal_mgr_->CreateHybridCollection()
    }

    return meta_ptr_->CreateHybridCollection(temp_schema, fields_schema);
}

Status
DBImpl::DescribeHybridCollection(meta::CollectionSchema& collection_schema,
                                 milvus::engine::meta::hybrid::FieldsSchema& fields_schema) {
    if (!initialized_.load(std::memory_order_acquire)) {
        return SHUTDOWN_ERROR;
    }

    auto stat = meta_ptr_->DescribeHybridCollection(collection_schema, fields_schema);
    return stat;
}

S
starlord 已提交
240
Status
241
DBImpl::DropCollection(const std::string& collection_id) {
242
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
243
        return SHUTDOWN_ERROR;
S
starlord 已提交
244 245
    }

246
    if (options_.wal_enable_) {
247
        wal_mgr_->DropCollection(collection_id);
248 249
    }

250
    return DropCollectionRecursively(collection_id);
G
groot 已提交
251 252
}

S
starlord 已提交
253
Status
254
DBImpl::DescribeCollection(meta::CollectionSchema& collection_schema) {
255
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
256
        return SHUTDOWN_ERROR;
S
starlord 已提交
257 258
    }

259
    auto stat = meta_ptr_->DescribeCollection(collection_schema);
B
bigbraver 已提交
260
    collection_schema.index_file_size_ /= MB;  // return as MB
S
starlord 已提交
261
    return stat;
262 263
}

S
starlord 已提交
264
Status
265
DBImpl::HasCollection(const std::string& collection_id, bool& has_or_not) {
266
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
267
        return SHUTDOWN_ERROR;
S
starlord 已提交
268 269
    }

270
    return meta_ptr_->HasCollection(collection_id, has_or_not);
271 272
}

273
Status
274
DBImpl::HasNativeCollection(const std::string& collection_id, bool& has_or_not_) {
275 276 277 278
    if (!initialized_.load(std::memory_order_acquire)) {
        return SHUTDOWN_ERROR;
    }

279 280 281
    engine::meta::CollectionSchema collection_schema;
    collection_schema.collection_id_ = collection_id;
    auto status = DescribeCollection(collection_schema);
282 283 284 285
    if (!status.ok()) {
        has_or_not_ = false;
        return status;
    } else {
286
        if (!collection_schema.owner_collection_.empty()) {
287 288 289 290 291 292 293 294 295
            has_or_not_ = false;
            return Status(DB_NOT_FOUND, "");
        }

        has_or_not_ = true;
        return Status::OK();
    }
}

S
starlord 已提交
296
Status
297
DBImpl::AllCollections(std::vector<meta::CollectionSchema>& collection_schema_array) {
298
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
299
        return SHUTDOWN_ERROR;
S
starlord 已提交
300 301
    }

302 303
    std::vector<meta::CollectionSchema> all_collections;
    auto status = meta_ptr_->AllCollections(all_collections);
304

305 306 307 308 309
    // only return real collections, dont return partition collections
    collection_schema_array.clear();
    for (auto& schema : all_collections) {
        if (schema.owner_collection_.empty()) {
            collection_schema_array.push_back(schema);
310 311 312 313
        }
    }

    return status;
G
groot 已提交
314 315
}

316
Status
317
DBImpl::GetCollectionInfo(const std::string& collection_id, CollectionInfo& collection_info) {
318 319 320 321 322
    if (!initialized_.load(std::memory_order_acquire)) {
        return SHUTDOWN_ERROR;
    }

    // step1: get all partition ids
J
Jin Hai 已提交
323 324 325
    std::vector<std::pair<std::string, std::string>> name2tag = {{collection_id, milvus::engine::DEFAULT_PARTITON_TAG}};
    std::vector<meta::CollectionSchema> partition_array;
    auto status = meta_ptr_->ShowPartitions(collection_id, partition_array);
326
    for (auto& schema : partition_array) {
J
Jin Hai 已提交
327
        name2tag.push_back(std::make_pair(schema.collection_id_, schema.partition_tag_));
328 329
    }

J
Jin Hai 已提交
330 331 332
    // step2: get native collection info
    std::vector<int> file_types{meta::SegmentSchema::FILE_TYPE::RAW, meta::SegmentSchema::FILE_TYPE::TO_INDEX,
                                meta::SegmentSchema::FILE_TYPE::INDEX};
333 334 335 336 337 338

    static std::map<int32_t, std::string> index_type_name = {
        {(int32_t)engine::EngineType::FAISS_IDMAP, "IDMAP"},
        {(int32_t)engine::EngineType::FAISS_IVFFLAT, "IVFFLAT"},
        {(int32_t)engine::EngineType::FAISS_IVFSQ8, "IVFSQ8"},
        {(int32_t)engine::EngineType::NSG_MIX, "NSG"},
O
op-hunter 已提交
339
        {(int32_t)engine::EngineType::ANNOY, "ANNOY"},
340 341 342 343 344 345 346 347 348
        {(int32_t)engine::EngineType::FAISS_IVFSQ8H, "IVFSQ8H"},
        {(int32_t)engine::EngineType::FAISS_PQ, "PQ"},
        {(int32_t)engine::EngineType::SPTAG_KDT, "KDT"},
        {(int32_t)engine::EngineType::SPTAG_BKT, "BKT"},
        {(int32_t)engine::EngineType::FAISS_BIN_IDMAP, "IDMAP"},
        {(int32_t)engine::EngineType::FAISS_BIN_IVFFLAT, "IVFFLAT"},
    };

    for (auto& name_tag : name2tag) {
349 350
        meta::SegmentsSchema collection_files;
        status = meta_ptr_->FilesByType(name_tag.first, file_types, collection_files);
351
        if (!status.ok()) {
J
Jin Hai 已提交
352
            std::string err_msg = "Failed to get collection info: " + status.ToString();
353
            LOG_ENGINE_ERROR_ << err_msg;
354 355 356 357
            return Status(DB_ERROR, err_msg);
        }

        std::vector<SegmentStat> segments_stat;
358
        for (auto& file : collection_files) {
359 360 361 362 363 364 365 366 367
            SegmentStat seg_stat;
            seg_stat.name_ = file.segment_id_;
            seg_stat.row_count_ = (int64_t)file.row_count_;
            seg_stat.index_name_ = index_type_name[file.engine_type_];
            seg_stat.data_size_ = (int64_t)file.file_size_;
            segments_stat.emplace_back(seg_stat);
        }

        PartitionStat partition_stat;
J
Jin Hai 已提交
368
        if (name_tag.first == collection_id) {
369 370 371 372 373 374
            partition_stat.tag_ = milvus::engine::DEFAULT_PARTITON_TAG;
        } else {
            partition_stat.tag_ = name_tag.second;
        }

        partition_stat.segments_stat_.swap(segments_stat);
375
        collection_info.partitions_stat_.emplace_back(partition_stat);
376 377 378 379 380
    }

    return Status::OK();
}

S
starlord 已提交
381
Status
382
DBImpl::PreloadCollection(const std::string& collection_id) {
383
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
384
        return SHUTDOWN_ERROR;
S
starlord 已提交
385 386
    }

J
Jin Hai 已提交
387 388 389
    // step 1: get all collection files from parent collection
    meta::SegmentsSchema files_array;
    auto status = GetFilesToSearch(collection_id, files_array);
Y
Yu Kun 已提交
390 391 392
    if (!status.ok()) {
        return status;
    }
Y
Yu Kun 已提交
393

394
    // step 2: get files from partition collections
J
Jin Hai 已提交
395 396
    std::vector<meta::CollectionSchema> partition_array;
    status = meta_ptr_->ShowPartitions(collection_id, partition_array);
G
groot 已提交
397
    for (auto& schema : partition_array) {
J
Jin Hai 已提交
398
        status = GetFilesToSearch(schema.collection_id_, files_array);
G
groot 已提交
399 400
    }

Y
Yu Kun 已提交
401 402
    int64_t size = 0;
    int64_t cache_total = cache::CpuCacheMgr::GetInstance()->CacheCapacity();
Y
Yu Kun 已提交
403 404
    int64_t cache_usage = cache::CpuCacheMgr::GetInstance()->CacheUsage();
    int64_t available_size = cache_total - cache_usage;
Y
Yu Kun 已提交
405

406
    // step 3: load file one by one
407 408
    LOG_ENGINE_DEBUG_ << "Begin pre-load collection:" + collection_id + ", totally " << files_array.size()
                      << " files need to be pre-loaded";
J
Jin Hai 已提交
409
    TimeRecorderAuto rc("Pre-load collection:" + collection_id);
G
groot 已提交
410
    for (auto& file : files_array) {
411
        EngineType engine_type;
J
Jin Hai 已提交
412 413 414
        if (file.file_type_ == meta::SegmentSchema::FILE_TYPE::RAW ||
            file.file_type_ == meta::SegmentSchema::FILE_TYPE::TO_INDEX ||
            file.file_type_ == meta::SegmentSchema::FILE_TYPE::BACKUP) {
415 416
            engine_type =
                utils::IsBinaryMetricType(file.metric_type_) ? EngineType::FAISS_BIN_IDMAP : EngineType::FAISS_IDMAP;
417 418 419
        } else {
            engine_type = (EngineType)file.engine_type_;
        }
420 421 422 423

        auto json = milvus::json::parse(file.index_params_);
        ExecutionEnginePtr engine =
            EngineFactory::Build(file.dimension_, file.location_, engine_type, (MetricType)file.metric_type_, json);
424
        fiu_do_on("DBImpl.PreloadCollection.null_engine", engine = nullptr);
G
groot 已提交
425
        if (engine == nullptr) {
426
            LOG_ENGINE_ERROR_ << "Invalid engine type";
G
groot 已提交
427 428
            return Status(DB_ERROR, "Invalid engine type");
        }
Y
Yu Kun 已提交
429

430
        fiu_do_on("DBImpl.PreloadCollection.exceed_cache", size = available_size + 1);
431 432

        try {
433
            fiu_do_on("DBImpl.PreloadCollection.engine_throw_exception", throw std::exception());
434 435 436 437 438 439
            std::string msg = "Pre-loaded file: " + file.file_id_ + " size: " + std::to_string(file.file_size_);
            TimeRecorderAuto rc_1(msg);
            engine->Load(true);

            size += engine->Size();
            if (size > available_size) {
440
                LOG_ENGINE_DEBUG_ << "Pre-load cancelled since cache is almost full";
441
                return Status(SERVER_CACHE_FULL, "Cache is full");
Y
Yu Kun 已提交
442
            }
443
        } catch (std::exception& ex) {
J
Jin Hai 已提交
444
            std::string msg = "Pre-load collection encounter exception: " + std::string(ex.what());
445
            LOG_ENGINE_ERROR_ << msg;
446
            return Status(DB_ERROR, msg);
Y
Yu Kun 已提交
447 448
        }
    }
G
groot 已提交
449

Y
Yu Kun 已提交
450
    return Status::OK();
Y
Yu Kun 已提交
451 452
}

S
starlord 已提交
453
Status
454
DBImpl::UpdateCollectionFlag(const std::string& collection_id, int64_t flag) {
455
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
456
        return SHUTDOWN_ERROR;
S
starlord 已提交
457 458
    }

459
    return meta_ptr_->UpdateCollectionFlag(collection_id, flag);
S
starlord 已提交
460 461
}

S
starlord 已提交
462
Status
463
DBImpl::GetCollectionRowCount(const std::string& collection_id, uint64_t& row_count) {
464
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
465 466 467
        return SHUTDOWN_ERROR;
    }

468
    return GetCollectionRowCountRecursively(collection_id, row_count);
G
groot 已提交
469 470 471
}

Status
J
Jin Hai 已提交
472
DBImpl::CreatePartition(const std::string& collection_id, const std::string& partition_name,
G
groot 已提交
473
                        const std::string& partition_tag) {
474
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
475 476 477
        return SHUTDOWN_ERROR;
    }

478
    uint64_t lsn = 0;
479
    meta_ptr_->GetCollectionFlushLSN(collection_id, lsn);
J
Jin Hai 已提交
480
    return meta_ptr_->CreatePartition(collection_id, partition_name, partition_tag, lsn);
G
groot 已提交
481 482 483 484
}

Status
DBImpl::DropPartition(const std::string& partition_name) {
485
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
486
        return SHUTDOWN_ERROR;
S
starlord 已提交
487 488
    }

489
    mem_mgr_->EraseMemVector(partition_name);                // not allow insert
J
Jin Hai 已提交
490
    auto status = meta_ptr_->DropPartition(partition_name);  // soft delete collection
491
    if (!status.ok()) {
492
        LOG_ENGINE_ERROR_ << status.message();
493 494
        return status;
    }
G
groot 已提交
495

J
Jin Hai 已提交
496
    // scheduler will determine when to delete collection files
G
groot 已提交
497 498 499 500 501 502
    auto nres = scheduler::ResMgrInst::GetInstance()->GetNumOfComputeResource();
    scheduler::DeleteJobPtr job = std::make_shared<scheduler::DeleteJob>(partition_name, meta_ptr_, nres);
    scheduler::JobMgrInst::GetInstance()->Put(job);
    job->WaitAndDelete();

    return Status::OK();
G
groot 已提交
503 504
}

S
starlord 已提交
505
Status
J
Jin Hai 已提交
506
DBImpl::DropPartitionByTag(const std::string& collection_id, const std::string& partition_tag) {
507
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
508 509 510 511
        return SHUTDOWN_ERROR;
    }

    std::string partition_name;
J
Jin Hai 已提交
512
    auto status = meta_ptr_->GetPartitionName(collection_id, partition_tag, partition_name);
513
    if (!status.ok()) {
514
        LOG_ENGINE_ERROR_ << status.message();
515 516 517
        return status;
    }

G
groot 已提交
518 519 520 521
    return DropPartition(partition_name);
}

Status
J
Jin Hai 已提交
522
DBImpl::ShowPartitions(const std::string& collection_id, std::vector<meta::CollectionSchema>& partition_schema_array) {
523
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
524 525 526
        return SHUTDOWN_ERROR;
    }

J
Jin Hai 已提交
527
    return meta_ptr_->ShowPartitions(collection_id, partition_schema_array);
G
groot 已提交
528 529 530
}

Status
J
Jin Hai 已提交
531
DBImpl::InsertVectors(const std::string& collection_id, const std::string& partition_tag, VectorsData& vectors) {
532
    //    LOG_ENGINE_DEBUG_ << "Insert " << n << " vectors to cache";
533
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
534
        return SHUTDOWN_ERROR;
S
starlord 已提交
535
    }
Y
yu yunfeng 已提交
536

J
Jin Hai 已提交
537
    // insert vectors into target collection
538 539
    // (zhiru): generate ids
    if (vectors.id_array_.empty()) {
J
Jin Hai 已提交
540 541 542
        SafeIDGenerator& id_generator = SafeIDGenerator::GetInstance();
        Status status = id_generator.GetNextIDNumbers(vectors.vector_count_, vectors.id_array_);
        if (!status.ok()) {
543
            LOG_ENGINE_ERROR_ << LogOut("[%s][%ld] Get next id number fail: %s", "insert", 0, status.message().c_str());
J
Jin Hai 已提交
544 545
            return status;
        }
546 547
    }

548
    Status status;
549
    if (options_.wal_enable_) {
550 551
        std::string target_collection_name;
        status = GetPartitionByTag(collection_id, partition_tag, target_collection_name);
G
groot 已提交
552
        if (!status.ok()) {
553
            LOG_ENGINE_ERROR_ << LogOut("[%s][%ld] Get partition fail: %s", "insert", 0, status.message().c_str());
G
groot 已提交
554 555
            return status;
        }
556 557

        if (!vectors.float_data_.empty()) {
J
Jin Hai 已提交
558
            wal_mgr_->Insert(collection_id, partition_tag, vectors.id_array_, vectors.float_data_);
559
        } else if (!vectors.binary_data_.empty()) {
J
Jin Hai 已提交
560
            wal_mgr_->Insert(collection_id, partition_tag, vectors.id_array_, vectors.binary_data_);
561
        }
G
groot 已提交
562
        swn_wal_.Notify();
563 564 565
    } else {
        wal::MXLogRecord record;
        record.lsn = 0;  // need to get from meta ?
J
Jin Hai 已提交
566
        record.collection_id = collection_id;
567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582
        record.partition_tag = partition_tag;
        record.ids = vectors.id_array_.data();
        record.length = vectors.vector_count_;
        if (vectors.binary_data_.empty()) {
            record.type = wal::MXLogType::InsertVector;
            record.data = vectors.float_data_.data();
            record.data_size = vectors.float_data_.size() * sizeof(float);
        } else {
            record.type = wal::MXLogType::InsertBinary;
            record.ids = vectors.id_array_.data();
            record.length = vectors.vector_count_;
            record.data = vectors.binary_data_.data();
            record.data_size = vectors.binary_data_.size() * sizeof(uint8_t);
        }

        status = ExecWalRecord(record);
G
groot 已提交
583 584
    }

585 586 587
    return status;
}

588
Status
589 590
DBImpl::InsertEntities(const std::string& collection_id, const std::string& partition_tag,
                       const std::vector<std::string>& field_names, Entity& entity,
591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624
                       std::unordered_map<std::string, meta::hybrid::DataType>& attr_types) {
    if (!initialized_.load(std::memory_order_acquire)) {
        return SHUTDOWN_ERROR;
    }

    // Generate id
    if (entity.id_array_.empty()) {
        SafeIDGenerator& id_generator = SafeIDGenerator::GetInstance();
        Status status = id_generator.GetNextIDNumbers(entity.entity_count_, entity.id_array_);
        if (!status.ok()) {
            return status;
        }
    }

    Status status;
    // insert entities: collection_name is field id
    wal::MXLogRecord record;
    record.lsn = 0;
    record.collection_id = collection_id;
    record.partition_tag = partition_tag;
    record.ids = entity.id_array_.data();
    record.length = entity.entity_count_;

    auto vector_it = entity.vector_data_.begin();
    if (vector_it->second.binary_data_.empty()) {
        record.type = wal::MXLogType::Entity;
        record.data = vector_it->second.float_data_.data();
        record.data_size = vector_it->second.float_data_.size() * sizeof(float);
    } else {
        //        record.type = wal::MXLogType::InsertBinary;
        //        record.data = entities.vector_data_[0].binary_data_.data();
        //        record.length = entities.vector_data_[0].binary_data_.size() * sizeof(uint8_t);
    }

625 626 627
    uint64_t offset = 0;
    for (auto field_name : field_names) {
        switch (attr_types.at(field_name)) {
628 629 630 631
            case meta::hybrid::DataType::INT8: {
                std::vector<uint8_t> data;
                data.resize(entity.entity_count_ * sizeof(int8_t));

632 633 634 635 636 637 638 639 640 641 642 643 644 645
                std::vector<int64_t> attr_value(entity.entity_count_, 0);
                memcpy(attr_value.data(), entity.attr_value_.data() + offset, entity.entity_count_ * sizeof(int64_t));
                offset += entity.entity_count_ * sizeof(int64_t);

                std::vector<int8_t> raw_value(entity.entity_count_, 0);
                for (uint64_t i = 0; i < entity.entity_count_; ++i) {
                    raw_value[i] = attr_value[i];
                }

                memcpy(data.data(), raw_value.data(), entity.entity_count_ * sizeof(int8_t));
                record.attr_data.insert(std::make_pair(field_name, data));

                record.attr_nbytes.insert(std::make_pair(field_name, sizeof(int8_t)));
                record.attr_data_size.insert(std::make_pair(field_name, entity.entity_count_ * sizeof(int8_t)));
646 647 648 649 650 651
                break;
            }
            case meta::hybrid::DataType::INT16: {
                std::vector<uint8_t> data;
                data.resize(entity.entity_count_ * sizeof(int16_t));

652 653 654 655 656 657 658 659 660 661 662 663 664 665
                std::vector<int64_t> attr_value(entity.entity_count_, 0);
                memcpy(attr_value.data(), entity.attr_value_.data() + offset, entity.entity_count_ * sizeof(int64_t));
                offset += entity.entity_count_ * sizeof(int64_t);

                std::vector<int16_t> raw_value(entity.entity_count_, 0);
                for (uint64_t i = 0; i < entity.entity_count_; ++i) {
                    raw_value[i] = attr_value[i];
                }

                memcpy(data.data(), raw_value.data(), entity.entity_count_ * sizeof(int16_t));
                record.attr_data.insert(std::make_pair(field_name, data));

                record.attr_nbytes.insert(std::make_pair(field_name, sizeof(int16_t)));
                record.attr_data_size.insert(std::make_pair(field_name, entity.entity_count_ * sizeof(int16_t)));
666 667 668 669 670 671
                break;
            }
            case meta::hybrid::DataType::INT32: {
                std::vector<uint8_t> data;
                data.resize(entity.entity_count_ * sizeof(int32_t));

672 673 674 675 676 677 678 679 680 681 682 683 684 685
                std::vector<int64_t> attr_value(entity.entity_count_, 0);
                memcpy(attr_value.data(), entity.attr_value_.data() + offset, entity.entity_count_ * sizeof(int64_t));
                offset += entity.entity_count_ * sizeof(int64_t);

                std::vector<int32_t> raw_value(entity.entity_count_, 0);
                for (uint64_t i = 0; i < entity.entity_count_; ++i) {
                    raw_value[i] = attr_value[i];
                }

                memcpy(data.data(), raw_value.data(), entity.entity_count_ * sizeof(int32_t));
                record.attr_data.insert(std::make_pair(field_name, data));

                record.attr_nbytes.insert(std::make_pair(field_name, sizeof(int32_t)));
                record.attr_data_size.insert(std::make_pair(field_name, entity.entity_count_ * sizeof(int32_t)));
686 687 688 689 690
                break;
            }
            case meta::hybrid::DataType::INT64: {
                std::vector<uint8_t> data;
                data.resize(entity.entity_count_ * sizeof(int64_t));
691 692
                memcpy(data.data(), entity.attr_value_.data() + offset, entity.entity_count_ * sizeof(int64_t));
                record.attr_data.insert(std::make_pair(field_name, data));
693

694 695 696
                record.attr_nbytes.insert(std::make_pair(field_name, sizeof(int64_t)));
                record.attr_data_size.insert(std::make_pair(field_name, entity.entity_count_ * sizeof(int64_t)));
                offset += entity.entity_count_ * sizeof(int64_t);
697 698 699 700 701 702
                break;
            }
            case meta::hybrid::DataType::FLOAT: {
                std::vector<uint8_t> data;
                data.resize(entity.entity_count_ * sizeof(float));

703 704 705
                std::vector<double> attr_value(entity.entity_count_, 0);
                memcpy(attr_value.data(), entity.attr_value_.data() + offset, entity.entity_count_ * sizeof(double));
                offset += entity.entity_count_ * sizeof(double);
706

707 708 709 710 711 712 713 714 715 716
                std::vector<float> raw_value(entity.entity_count_, 0);
                for (uint64_t i = 0; i < entity.entity_count_; ++i) {
                    raw_value[i] = attr_value[i];
                }

                memcpy(data.data(), raw_value.data(), entity.entity_count_ * sizeof(float));
                record.attr_data.insert(std::make_pair(field_name, data));

                record.attr_nbytes.insert(std::make_pair(field_name, sizeof(float)));
                record.attr_data_size.insert(std::make_pair(field_name, entity.entity_count_ * sizeof(float)));
717 718 719 720 721
                break;
            }
            case meta::hybrid::DataType::DOUBLE: {
                std::vector<uint8_t> data;
                data.resize(entity.entity_count_ * sizeof(double));
722 723
                memcpy(data.data(), entity.attr_value_.data() + offset, entity.entity_count_ * sizeof(double));
                record.attr_data.insert(std::make_pair(field_name, data));
724

725 726 727
                record.attr_nbytes.insert(std::make_pair(field_name, sizeof(double)));
                record.attr_data_size.insert(std::make_pair(field_name, entity.entity_count_ * sizeof(double)));
                offset += entity.entity_count_ * sizeof(double);
728 729
                break;
            }
730 731
            default:
                break;
732 733 734 735 736 737 738
        }
    }

    status = ExecWalRecord(record);
    return status;
}

739
Status
J
Jin Hai 已提交
740
DBImpl::DeleteVector(const std::string& collection_id, IDNumber vector_id) {
741 742
    IDNumbers ids;
    ids.push_back(vector_id);
J
Jin Hai 已提交
743
    return DeleteVectors(collection_id, ids);
744 745 746
}

Status
J
Jin Hai 已提交
747
DBImpl::DeleteVectors(const std::string& collection_id, IDNumbers vector_ids) {
748 749 750 751 752 753
    if (!initialized_.load(std::memory_order_acquire)) {
        return SHUTDOWN_ERROR;
    }

    Status status;
    if (options_.wal_enable_) {
J
Jin Hai 已提交
754
        wal_mgr_->DeleteById(collection_id, vector_ids);
G
groot 已提交
755
        swn_wal_.Notify();
756 757 758 759
    } else {
        wal::MXLogRecord record;
        record.lsn = 0;  // need to get from meta ?
        record.type = wal::MXLogType::Delete;
J
Jin Hai 已提交
760
        record.collection_id = collection_id;
761 762 763 764 765 766 767 768 769 770
        record.ids = vector_ids.data();
        record.length = vector_ids.size();

        status = ExecWalRecord(record);
    }

    return status;
}

Status
J
Jin Hai 已提交
771
DBImpl::Flush(const std::string& collection_id) {
772 773 774 775 776
    if (!initialized_.load(std::memory_order_acquire)) {
        return SHUTDOWN_ERROR;
    }

    Status status;
777 778
    bool has_collection;
    status = HasCollection(collection_id, has_collection);
779 780 781
    if (!status.ok()) {
        return status;
    }
782
    if (!has_collection) {
783
        LOG_ENGINE_ERROR_ << "Collection to flush does not exist: " << collection_id;
J
Jin Hai 已提交
784
        return Status(DB_NOT_FOUND, "Collection to flush does not exist");
785 786
    }

787
    LOG_ENGINE_DEBUG_ << "Begin flush collection: " << collection_id;
788 789

    if (options_.wal_enable_) {
790
        LOG_ENGINE_DEBUG_ << "WAL flush";
J
Jin Hai 已提交
791
        auto lsn = wal_mgr_->Flush(collection_id);
792
        if (lsn != 0) {
G
groot 已提交
793 794
            swn_wal_.Notify();
            flush_req_swn_.Wait();
795 796 797
        }

    } else {
798
        LOG_ENGINE_DEBUG_ << "MemTable flush";
G
groot 已提交
799
        InternalFlush(collection_id);
800 801
    }

802
    LOG_ENGINE_DEBUG_ << "End flush collection: " << collection_id;
803 804 805 806 807 808 809 810 811 812

    return status;
}

Status
DBImpl::Flush() {
    if (!initialized_.load(std::memory_order_acquire)) {
        return SHUTDOWN_ERROR;
    }

813
    LOG_ENGINE_DEBUG_ << "Begin flush all collections";
814 815 816

    Status status;
    if (options_.wal_enable_) {
817
        LOG_ENGINE_DEBUG_ << "WAL flush";
818 819
        auto lsn = wal_mgr_->Flush();
        if (lsn != 0) {
G
groot 已提交
820 821
            swn_wal_.Notify();
            flush_req_swn_.Wait();
822 823
        }
    } else {
824
        LOG_ENGINE_DEBUG_ << "MemTable flush";
G
groot 已提交
825
        InternalFlush();
826 827
    }

828
    LOG_ENGINE_DEBUG_ << "End flush all collections";
829 830 831 832 833

    return status;
}

Status
J
Jin Hai 已提交
834
DBImpl::Compact(const std::string& collection_id) {
835 836 837 838
    if (!initialized_.load(std::memory_order_acquire)) {
        return SHUTDOWN_ERROR;
    }

839 840 841
    engine::meta::CollectionSchema collection_schema;
    collection_schema.collection_id_ = collection_id;
    auto status = DescribeCollection(collection_schema);
842 843
    if (!status.ok()) {
        if (status.code() == DB_NOT_FOUND) {
844
            LOG_ENGINE_ERROR_ << "Collection to compact does not exist: " << collection_id;
J
Jin Hai 已提交
845
            return Status(DB_NOT_FOUND, "Collection to compact does not exist");
846 847 848 849
        } else {
            return status;
        }
    } else {
850
        if (!collection_schema.owner_collection_.empty()) {
851
            LOG_ENGINE_ERROR_ << "Collection to compact does not exist: " << collection_id;
J
Jin Hai 已提交
852
            return Status(DB_NOT_FOUND, "Collection to compact does not exist");
853 854 855
        }
    }

856
    LOG_ENGINE_DEBUG_ << "Before compacting, wait for build index thread to finish...";
857

Z
update  
Zhiru Zhu 已提交
858
    // WaitBuildIndexFinish();
859

Z
update  
Zhiru Zhu 已提交
860
    const std::lock_guard<std::mutex> index_lock(build_index_mutex_);
Z
Zhiru Zhu 已提交
861
    const std::lock_guard<std::mutex> merge_lock(flush_merge_compact_mutex_);
Z
Zhiru Zhu 已提交
862

863
    LOG_ENGINE_DEBUG_ << "Compacting collection: " << collection_id;
Z
Zhiru Zhu 已提交
864

865
    // Get files to compact from meta.
J
Jin Hai 已提交
866 867 868 869
    std::vector<int> file_types{meta::SegmentSchema::FILE_TYPE::RAW, meta::SegmentSchema::FILE_TYPE::TO_INDEX,
                                meta::SegmentSchema::FILE_TYPE::BACKUP};
    meta::SegmentsSchema files_to_compact;
    status = meta_ptr_->FilesByType(collection_id, file_types, files_to_compact);
870 871
    if (!status.ok()) {
        std::string err_msg = "Failed to get files to compact: " + status.message();
872
        LOG_ENGINE_ERROR_ << err_msg;
873 874 875
        return Status(DB_ERROR, err_msg);
    }

876
    LOG_ENGINE_DEBUG_ << "Found " << files_to_compact.size() << " segment to compact";
877 878

    OngoingFileChecker::GetInstance().MarkOngoingFiles(files_to_compact);
Z
Zhiru Zhu 已提交
879 880

    Status compact_status;
Z
Zhiru Zhu 已提交
881
    for (auto iter = files_to_compact.begin(); iter != files_to_compact.end();) {
J
Jin Hai 已提交
882
        meta::SegmentSchema file = *iter;
G
groot 已提交
883 884
        iter = files_to_compact.erase(iter);

Z
Zhiru Zhu 已提交
885 886 887
        // Check if the segment needs compacting
        std::string segment_dir;
        utils::GetParentPath(file.location_, segment_dir);
888

Z
Zhiru Zhu 已提交
889
        segment::SegmentReader segment_reader(segment_dir);
Z
Zhiru Zhu 已提交
890 891
        size_t deleted_docs_size;
        status = segment_reader.ReadDeletedDocsSize(deleted_docs_size);
Z
Zhiru Zhu 已提交
892
        if (!status.ok()) {
G
groot 已提交
893 894
            OngoingFileChecker::GetInstance().UnmarkOngoingFile(file);
            continue;  // skip this file and try compact next one
Z
Zhiru Zhu 已提交
895 896
        }

J
Jin Hai 已提交
897
        meta::SegmentsSchema files_to_update;
Z
Zhiru Zhu 已提交
898
        if (deleted_docs_size != 0) {
J
Jin Hai 已提交
899
            compact_status = CompactFile(collection_id, file, files_to_update);
Z
Zhiru Zhu 已提交
900 901

            if (!compact_status.ok()) {
902 903
                LOG_ENGINE_ERROR_ << "Compact failed for segment " << file.segment_id_ << ": "
                                  << compact_status.message();
G
groot 已提交
904 905
                OngoingFileChecker::GetInstance().UnmarkOngoingFile(file);
                continue;  // skip this file and try compact next one
Z
Zhiru Zhu 已提交
906 907
            }
        } else {
G
groot 已提交
908
            OngoingFileChecker::GetInstance().UnmarkOngoingFile(file);
909
            LOG_ENGINE_DEBUG_ << "Segment " << file.segment_id_ << " has no deleted data. No need to compact";
G
groot 已提交
910
            continue;  // skip this file and try compact next one
911
        }
Z
Zhiru Zhu 已提交
912

913
        LOG_ENGINE_DEBUG_ << "Updating meta after compaction...";
914
        status = meta_ptr_->UpdateCollectionFiles(files_to_update);
G
groot 已提交
915
        OngoingFileChecker::GetInstance().UnmarkOngoingFile(file);
G
groot 已提交
916 917 918 919
        if (!status.ok()) {
            compact_status = status;
            break;  // meta error, could not go on
        }
Z
Zhiru Zhu 已提交
920 921
    }

922 923
    OngoingFileChecker::GetInstance().UnmarkOngoingFiles(files_to_compact);

G
groot 已提交
924
    if (compact_status.ok()) {
925
        LOG_ENGINE_DEBUG_ << "Finished compacting collection: " << collection_id;
G
groot 已提交
926
    }
927

G
groot 已提交
928
    return compact_status;
929 930 931
}

Status
J
Jin Hai 已提交
932 933
DBImpl::CompactFile(const std::string& collection_id, const meta::SegmentSchema& file,
                    meta::SegmentsSchema& files_to_update) {
934
    LOG_ENGINE_DEBUG_ << "Compacting segment " << file.segment_id_ << " for collection: " << collection_id;
935

J
Jin Hai 已提交
936 937 938
    // Create new collection file
    meta::SegmentSchema compacted_file;
    compacted_file.collection_id_ = collection_id;
939
    // compacted_file.date_ = date;
J
Jin Hai 已提交
940
    compacted_file.file_type_ = meta::SegmentSchema::NEW_MERGE;  // TODO: use NEW_MERGE for now
941
    Status status = meta_ptr_->CreateCollectionFile(compacted_file);
942 943

    if (!status.ok()) {
944
        LOG_ENGINE_ERROR_ << "Failed to create collection file: " << status.message();
945 946 947
        return status;
    }

J
Jin Hai 已提交
948
    // Compact (merge) file to the newly created collection file
949 950 951 952 953 954 955 956

    std::string new_segment_dir;
    utils::GetParentPath(compacted_file.location_, new_segment_dir);
    auto segment_writer_ptr = std::make_shared<segment::SegmentWriter>(new_segment_dir);

    std::string segment_dir_to_merge;
    utils::GetParentPath(file.location_, segment_dir_to_merge);

957
    LOG_ENGINE_DEBUG_ << "Compacting begin...";
958 959 960
    segment_writer_ptr->Merge(segment_dir_to_merge, compacted_file.file_id_);

    // Serialize
961
    LOG_ENGINE_DEBUG_ << "Serializing compacted segment...";
962 963
    status = segment_writer_ptr->Serialize();
    if (!status.ok()) {
964
        LOG_ENGINE_ERROR_ << "Failed to serialize compacted segment: " << status.message();
J
Jin Hai 已提交
965
        compacted_file.file_type_ = meta::SegmentSchema::TO_DELETE;
966
        auto mark_status = meta_ptr_->UpdateCollectionFile(compacted_file);
967
        if (mark_status.ok()) {
968
            LOG_ENGINE_DEBUG_ << "Mark file: " << compacted_file.file_id_ << " to to_delete";
969 970 971 972
        }
        return status;
    }

J
Jin Hai 已提交
973
    // Update collection files state
974 975
    // if index type isn't IDMAP, set file type to TO_INDEX if file size exceed index_file_size
    // else set file type to RAW, no need to build index
976
    if (!utils::IsRawIndexType(compacted_file.engine_type_)) {
977
        compacted_file.file_type_ = (segment_writer_ptr->Size() >= compacted_file.index_file_size_)
J
Jin Hai 已提交
978 979
                                        ? meta::SegmentSchema::TO_INDEX
                                        : meta::SegmentSchema::RAW;
980
    } else {
J
Jin Hai 已提交
981
        compacted_file.file_type_ = meta::SegmentSchema::RAW;
982 983 984 985 986
    }
    compacted_file.file_size_ = segment_writer_ptr->Size();
    compacted_file.row_count_ = segment_writer_ptr->VectorCount();

    if (compacted_file.row_count_ == 0) {
987
        LOG_ENGINE_DEBUG_ << "Compacted segment is empty. Mark it as TO_DELETE";
J
Jin Hai 已提交
988
        compacted_file.file_type_ = meta::SegmentSchema::TO_DELETE;
989 990
    }

Z
Zhiru Zhu 已提交
991
    files_to_update.emplace_back(compacted_file);
Z
Zhiru Zhu 已提交
992

Z
Zhiru Zhu 已提交
993 994
    // Set all files in segment to TO_DELETE
    auto& segment_id = file.segment_id_;
J
Jin Hai 已提交
995
    meta::SegmentsSchema segment_files;
996
    status = meta_ptr_->GetCollectionFilesBySegmentId(segment_id, segment_files);
Z
Zhiru Zhu 已提交
997 998 999 1000
    if (!status.ok()) {
        return status;
    }
    for (auto& f : segment_files) {
J
Jin Hai 已提交
1001
        f.file_type_ = meta::SegmentSchema::FILE_TYPE::TO_DELETE;
Z
Zhiru Zhu 已提交
1002 1003
        files_to_update.emplace_back(f);
    }
1004

1005 1006 1007
    LOG_ENGINE_DEBUG_ << "Compacted segment " << compacted_file.segment_id_ << " from "
                      << std::to_string(file.file_size_) << " bytes to " << std::to_string(compacted_file.file_size_)
                      << " bytes";
1008 1009 1010 1011 1012 1013 1014 1015 1016

    if (options_.insert_cache_immediately_) {
        segment_writer_ptr->Cache();
    }

    return status;
}

Status
J
Jin Hai 已提交
1017
DBImpl::GetVectorByID(const std::string& collection_id, const IDNumber& vector_id, VectorsData& vector) {
1018 1019 1020 1021
    if (!initialized_.load(std::memory_order_acquire)) {
        return SHUTDOWN_ERROR;
    }

1022 1023 1024
    bool has_collection;
    auto status = HasCollection(collection_id, has_collection);
    if (!has_collection) {
1025
        LOG_ENGINE_ERROR_ << "Collection " << collection_id << " does not exist: ";
J
Jin Hai 已提交
1026
        return Status(DB_NOT_FOUND, "Collection does not exist");
1027 1028 1029 1030 1031
    }
    if (!status.ok()) {
        return status;
    }

J
Jin Hai 已提交
1032
    meta::SegmentsSchema files_to_query;
1033

J
Jin Hai 已提交
1034 1035 1036
    std::vector<int> file_types{meta::SegmentSchema::FILE_TYPE::RAW, meta::SegmentSchema::FILE_TYPE::TO_INDEX,
                                meta::SegmentSchema::FILE_TYPE::BACKUP};
    status = meta_ptr_->FilesByType(collection_id, file_types, files_to_query);
1037 1038
    if (!status.ok()) {
        std::string err_msg = "Failed to get files for GetVectorByID: " + status.message();
1039
        LOG_ENGINE_ERROR_ << err_msg;
1040 1041 1042
        return status;
    }

J
Jin Hai 已提交
1043 1044
    OngoingFileChecker::GetInstance().MarkOngoingFiles(files_to_query);

J
Jin Hai 已提交
1045 1046
    std::vector<meta::CollectionSchema> partition_array;
    status = meta_ptr_->ShowPartitions(collection_id, partition_array);
1047
    for (auto& schema : partition_array) {
J
Jin Hai 已提交
1048 1049
        meta::SegmentsSchema files;
        status = meta_ptr_->FilesByType(schema.collection_id_, file_types, files);
1050 1051
        if (!status.ok()) {
            std::string err_msg = "Failed to get files for GetVectorByID: " + status.message();
1052
            LOG_ENGINE_ERROR_ << err_msg;
1053 1054
            return status;
        }
J
Jin Hai 已提交
1055 1056

        OngoingFileChecker::GetInstance().MarkOngoingFiles(files);
1057 1058 1059 1060 1061
        files_to_query.insert(files_to_query.end(), std::make_move_iterator(files.begin()),
                              std::make_move_iterator(files.end()));
    }

    if (files_to_query.empty()) {
1062
        LOG_ENGINE_DEBUG_ << "No files to get vector by id from";
J
Jin Hai 已提交
1063
        return Status(DB_NOT_FOUND, "Collection is empty");
1064 1065 1066 1067
    }

    cache::CpuCacheMgr::GetInstance()->PrintInfo();

J
Jin Hai 已提交
1068
    status = GetVectorByIdHelper(collection_id, vector_id, vector, files_to_query);
1069 1070 1071 1072 1073 1074 1075 1076

    OngoingFileChecker::GetInstance().UnmarkOngoingFiles(files_to_query);
    cache::CpuCacheMgr::GetInstance()->PrintInfo();

    return status;
}

Status
J
Jin Hai 已提交
1077
DBImpl::GetVectorIDs(const std::string& collection_id, const std::string& segment_id, IDNumbers& vector_ids) {
1078 1079 1080 1081
    if (!initialized_.load(std::memory_order_acquire)) {
        return SHUTDOWN_ERROR;
    }

J
Jin Hai 已提交
1082
    // step 1: check collection existence
1083 1084 1085
    bool has_collection;
    auto status = HasCollection(collection_id, has_collection);
    if (!has_collection) {
1086
        LOG_ENGINE_ERROR_ << "Collection " << collection_id << " does not exist: ";
J
Jin Hai 已提交
1087
        return Status(DB_NOT_FOUND, "Collection does not exist");
1088 1089 1090 1091 1092 1093
    }
    if (!status.ok()) {
        return status;
    }

    //  step 2: find segment
1094 1095
    meta::SegmentsSchema collection_files;
    status = meta_ptr_->GetCollectionFilesBySegmentId(segment_id, collection_files);
1096 1097 1098 1099
    if (!status.ok()) {
        return status;
    }

1100
    if (collection_files.empty()) {
1101 1102 1103
        return Status(DB_NOT_FOUND, "Segment does not exist");
    }

J
Jin Hai 已提交
1104
    // check the segment is belong to this collection
1105
    if (collection_files[0].collection_id_ != collection_id) {
J
Jin Hai 已提交
1106
        // the segment could be in a partition under this collection
1107 1108 1109 1110
        meta::CollectionSchema collection_schema;
        collection_schema.collection_id_ = collection_files[0].collection_id_;
        status = DescribeCollection(collection_schema);
        if (collection_schema.owner_collection_ != collection_id) {
J
Jin Hai 已提交
1111
            return Status(DB_NOT_FOUND, "Segment does not belong to this collection");
1112 1113 1114 1115 1116
        }
    }

    // step 3: load segment ids and delete offset
    std::string segment_dir;
1117
    engine::utils::GetParentPath(collection_files[0].location_, segment_dir);
1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 1142
    segment::SegmentReader segment_reader(segment_dir);

    std::vector<segment::doc_id_t> uids;
    status = segment_reader.LoadUids(uids);
    if (!status.ok()) {
        return status;
    }

    segment::DeletedDocsPtr deleted_docs_ptr;
    status = segment_reader.LoadDeletedDocs(deleted_docs_ptr);
    if (!status.ok()) {
        return status;
    }

    // step 4: construct id array
    // avoid duplicate offset and erase from max offset to min offset
    auto& deleted_offset = deleted_docs_ptr->GetDeletedDocs();
    std::set<segment::offset_t, std::greater<segment::offset_t>> ordered_offset;
    for (segment::offset_t offset : deleted_offset) {
        ordered_offset.insert(offset);
    }
    for (segment::offset_t offset : ordered_offset) {
        uids.erase(uids.begin() + offset);
    }
    vector_ids.swap(uids);
S
starlord 已提交
1143

G
groot 已提交
1144
    return status;
X
Xu Peng 已提交
1145 1146
}

1147
Status
J
Jin Hai 已提交
1148 1149
DBImpl::GetVectorByIdHelper(const std::string& collection_id, IDNumber vector_id, VectorsData& vector,
                            const meta::SegmentsSchema& files) {
J
Jin Hai 已提交
1150 1151 1152 1153 1154
    LOG_ENGINE_DEBUG_ << "Getting vector by id in " << files.size() << " files, id = " << vector_id;

    vector.vector_count_ = 0;
    vector.float_data_.clear();
    vector.binary_data_.clear();
1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180

    for (auto& file : files) {
        // Load bloom filter
        std::string segment_dir;
        engine::utils::GetParentPath(file.location_, segment_dir);
        segment::SegmentReader segment_reader(segment_dir);
        segment::IdBloomFilterPtr id_bloom_filter_ptr;
        segment_reader.LoadBloomFilter(id_bloom_filter_ptr);

        // Check if the id is present in bloom filter.
        if (id_bloom_filter_ptr->Check(vector_id)) {
            // Load uids and check if the id is indeed present. If yes, find its offset.
            std::vector<segment::doc_id_t> uids;
            auto status = segment_reader.LoadUids(uids);
            if (!status.ok()) {
                return status;
            }

            auto found = std::find(uids.begin(), uids.end(), vector_id);
            if (found != uids.end()) {
                auto offset = std::distance(uids.begin(), found);

                // Check whether the id has been deleted
                segment::DeletedDocsPtr deleted_docs_ptr;
                status = segment_reader.LoadDeletedDocs(deleted_docs_ptr);
                if (!status.ok()) {
J
Jin Hai 已提交
1181
                    LOG_ENGINE_ERROR_ << status.message();
1182 1183 1184 1185 1186 1187 1188
                    return status;
                }
                auto& deleted_docs = deleted_docs_ptr->GetDeletedDocs();

                auto deleted = std::find(deleted_docs.begin(), deleted_docs.end(), offset);
                if (deleted == deleted_docs.end()) {
                    // Load raw vector
1189
                    bool is_binary = utils::IsBinaryMetricType(file.metric_type_);
1190 1191 1192 1193
                    size_t single_vector_bytes = is_binary ? file.dimension_ / 8 : file.dimension_ * sizeof(float);
                    std::vector<uint8_t> raw_vector;
                    status = segment_reader.LoadVectors(offset * single_vector_bytes, single_vector_bytes, raw_vector);
                    if (!status.ok()) {
J
Jin Hai 已提交
1194
                        LOG_ENGINE_ERROR_ << status.message();
1195 1196 1197 1198 1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 1210 1211 1212 1213 1214
                        return status;
                    }

                    vector.vector_count_ = 1;
                    if (is_binary) {
                        vector.binary_data_ = std::move(raw_vector);
                    } else {
                        std::vector<float> float_vector;
                        float_vector.resize(file.dimension_);
                        memcpy(float_vector.data(), raw_vector.data(), single_vector_bytes);
                        vector.float_data_ = std::move(float_vector);
                    }
                    return Status::OK();
                }
            }
        } else {
            continue;
        }
    }

J
Jin Hai 已提交
1215 1216 1217 1218 1219
    if (vector.binary_data_.empty() && vector.float_data_.empty()) {
        std::string msg = "Vector with id " + std::to_string(vector_id) + " not found in collection " + collection_id;
        LOG_ENGINE_DEBUG_ << msg;
    }

1220 1221 1222
    return Status::OK();
}

S
starlord 已提交
1223
Status
1224
DBImpl::CreateIndex(const std::string& collection_id, const CollectionIndex& index) {
1225
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
1226 1227 1228
        return SHUTDOWN_ERROR;
    }

G
groot 已提交
1229
    // serialize memory data
1230 1231
    //    std::set<std::string> sync_collection_ids;
    //    auto status = SyncMemData(sync_collection_ids);
1232
    auto status = Flush();
G
groot 已提交
1233

S
starlord 已提交
1234 1235 1236
    {
        std::unique_lock<std::mutex> lock(build_index_mutex_);

S
starlord 已提交
1237
        // step 1: check index difference
1238
        CollectionIndex old_index;
J
Jin Hai 已提交
1239
        status = DescribeIndex(collection_id, old_index);
S
starlord 已提交
1240
        if (!status.ok()) {
1241
            LOG_ENGINE_ERROR_ << "Failed to get collection index info for collection: " << collection_id;
S
starlord 已提交
1242 1243 1244
            return status;
        }

S
starlord 已提交
1245
        // step 2: update index info
1246 1247
        CollectionIndex new_index = index;
        new_index.metric_type_ = old_index.metric_type_;  // dont change metric type, it was defined by CreateCollection
S
starlord 已提交
1248
        if (!utils::IsSameIndex(old_index, new_index)) {
1249
            status = UpdateCollectionIndexRecursively(collection_id, new_index);
S
starlord 已提交
1250 1251 1252 1253 1254 1255
            if (!status.ok()) {
                return status;
            }
        }
    }

S
starlord 已提交
1256 1257
    // step 3: let merge file thread finish
    // to avoid duplicate data bug
1258 1259
    WaitMergeFileFinish();

S
starlord 已提交
1260
    // step 4: wait and build index
1261 1262
    status = index_failed_checker_.CleanFailedIndexFileOfCollection(collection_id);
    status = WaitCollectionIndexRecursively(collection_id, index);
S
starlord 已提交
1263

G
groot 已提交
1264
    return status;
S
starlord 已提交
1265 1266
}

S
starlord 已提交
1267
Status
1268
DBImpl::DescribeIndex(const std::string& collection_id, CollectionIndex& index) {
1269
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
1270 1271 1272
        return SHUTDOWN_ERROR;
    }

1273
    return meta_ptr_->DescribeCollectionIndex(collection_id, index);
S
starlord 已提交
1274 1275
}

S
starlord 已提交
1276
Status
J
Jin Hai 已提交
1277
DBImpl::DropIndex(const std::string& collection_id) {
1278
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
1279 1280 1281
        return SHUTDOWN_ERROR;
    }

1282
    LOG_ENGINE_DEBUG_ << "Drop index for collection: " << collection_id;
1283
    return DropCollectionIndexRecursively(collection_id);
S
starlord 已提交
1284 1285
}

S
starlord 已提交
1286
Status
J
Jin Hai 已提交
1287
DBImpl::QueryByID(const std::shared_ptr<server::Context>& context, const std::string& collection_id,
1288 1289
                  const std::vector<std::string>& partition_tags, uint64_t k, const milvus::json& extra_params,
                  IDNumber vector_id, ResultIds& result_ids, ResultDistances& result_distances) {
1290
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
1291
        return SHUTDOWN_ERROR;
S
starlord 已提交
1292 1293
    }

1294 1295 1296
    VectorsData vectors_data = VectorsData();
    vectors_data.id_array_.emplace_back(vector_id);
    vectors_data.vector_count_ = 1;
1297
    Status result =
J
Jin Hai 已提交
1298
        Query(context, collection_id, partition_tags, k, extra_params, vectors_data, result_ids, result_distances);
Y
yu yunfeng 已提交
1299
    return result;
X
Xu Peng 已提交
1300 1301
}

1302 1303 1304 1305 1306 1307 1308 1309 1310 1311 1312 1313 1314 1315 1316 1317 1318 1319 1320 1321 1322 1323 1324 1325 1326 1327 1328 1329 1330 1331 1332 1333 1334 1335 1336 1337 1338 1339 1340 1341 1342 1343 1344 1345 1346 1347 1348 1349 1350 1351 1352 1353 1354 1355 1356 1357 1358 1359 1360 1361 1362 1363 1364 1365 1366 1367 1368 1369 1370
Status
DBImpl::HybridQuery(const std::shared_ptr<server::Context>& context, const std::string& collection_id,
                    const std::vector<std::string>& partition_tags,
                    context::HybridSearchContextPtr hybrid_search_context, query::GeneralQueryPtr general_query,
                    std::unordered_map<std::string, engine::meta::hybrid::DataType>& attr_type, uint64_t& nq,
                    ResultIds& result_ids, ResultDistances& result_distances) {
    auto query_ctx = context->Child("Query");

    if (!initialized_.load(std::memory_order_acquire)) {
        return SHUTDOWN_ERROR;
    }

    Status status;
    std::vector<size_t> ids;
    meta::SegmentsSchema files_array;

    if (partition_tags.empty()) {
        // no partition tag specified, means search in whole table
        // get all table files from parent table
        status = GetFilesToSearch(collection_id, files_array);
        if (!status.ok()) {
            return status;
        }

        std::vector<meta::CollectionSchema> partition_array;
        status = meta_ptr_->ShowPartitions(collection_id, partition_array);
        if (!status.ok()) {
            return status;
        }
        for (auto& schema : partition_array) {
            status = GetFilesToSearch(schema.collection_id_, files_array);
            if (!status.ok()) {
                return Status(DB_ERROR, "GetFilesToSearch failed in HybridQuery");
            }
        }

        if (files_array.empty()) {
            return Status::OK();
        }
    } else {
        // get files from specified partitions
        std::set<std::string> partition_name_array;
        GetPartitionsByTags(collection_id, partition_tags, partition_name_array);

        for (auto& partition_name : partition_name_array) {
            status = GetFilesToSearch(partition_name, files_array);
            if (!status.ok()) {
                return Status(DB_ERROR, "GetFilesToSearch failed in HybridQuery");
            }
        }

        if (files_array.empty()) {
            return Status::OK();
        }
    }

    cache::CpuCacheMgr::GetInstance()->PrintInfo();  // print cache info before query
    status = HybridQueryAsync(query_ctx, collection_id, files_array, hybrid_search_context, general_query, attr_type,
                              nq, result_ids, result_distances);
    if (!status.ok()) {
        return status;
    }
    cache::CpuCacheMgr::GetInstance()->PrintInfo();  // print cache info after query

    query_ctx->GetTraceContext()->GetSpan()->Finish();

    return status;
}

S
starlord 已提交
1371
Status
J
Jin Hai 已提交
1372
DBImpl::Query(const std::shared_ptr<server::Context>& context, const std::string& collection_id,
1373 1374
              const std::vector<std::string>& partition_tags, uint64_t k, const milvus::json& extra_params,
              const VectorsData& vectors, ResultIds& result_ids, ResultDistances& result_distances) {
1375
    milvus::server::ContextChild tracer(context, "Query");
Z
Zhiru Zhu 已提交
1376

1377
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
1378
        return SHUTDOWN_ERROR;
S
starlord 已提交
1379 1380
    }

G
groot 已提交
1381
    Status status;
J
Jin Hai 已提交
1382
    meta::SegmentsSchema files_array;
1383

G
groot 已提交
1384
    if (partition_tags.empty()) {
J
Jin Hai 已提交
1385 1386 1387
        // no partition tag specified, means search in whole collection
        // get all collection files from parent collection
        status = GetFilesToSearch(collection_id, files_array);
G
groot 已提交
1388 1389 1390 1391
        if (!status.ok()) {
            return status;
        }

J
Jin Hai 已提交
1392 1393
        std::vector<meta::CollectionSchema> partition_array;
        status = meta_ptr_->ShowPartitions(collection_id, partition_array);
G
groot 已提交
1394
        for (auto& schema : partition_array) {
J
Jin Hai 已提交
1395
            status = GetFilesToSearch(schema.collection_id_, files_array);
1396 1397 1398 1399
        }

        if (files_array.empty()) {
            return Status::OK();
G
groot 已提交
1400 1401 1402 1403
        }
    } else {
        // get files from specified partitions
        std::set<std::string> partition_name_array;
J
Jin Hai 已提交
1404
        status = GetPartitionsByTags(collection_id, partition_tags, partition_name_array);
T
Tinkerrr 已提交
1405 1406 1407
        if (!status.ok()) {
            return status;  // didn't match any partition.
        }
G
groot 已提交
1408 1409

        for (auto& partition_name : partition_name_array) {
1410
            status = GetFilesToSearch(partition_name, files_array);
1411 1412 1413 1414
        }

        if (files_array.empty()) {
            return Status::OK();
1415 1416 1417
        }
    }

S
starlord 已提交
1418
    cache::CpuCacheMgr::GetInstance()->PrintInfo();  // print cache info before query
1419
    status = QueryAsync(tracer.Context(), files_array, k, extra_params, vectors, result_ids, result_distances);
S
starlord 已提交
1420
    cache::CpuCacheMgr::GetInstance()->PrintInfo();  // print cache info after query
Z
Zhiru Zhu 已提交
1421

S
starlord 已提交
1422
    return status;
G
groot 已提交
1423
}
X
Xu Peng 已提交
1424

S
starlord 已提交
1425
Status
1426 1427 1428
DBImpl::QueryByFileID(const std::shared_ptr<server::Context>& context, const std::vector<std::string>& file_ids,
                      uint64_t k, const milvus::json& extra_params, const VectorsData& vectors, ResultIds& result_ids,
                      ResultDistances& result_distances) {
1429
    milvus::server::ContextChild tracer(context, "Query by file id");
Z
Zhiru Zhu 已提交
1430

1431
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
1432
        return SHUTDOWN_ERROR;
S
starlord 已提交
1433 1434
    }

S
starlord 已提交
1435
    // get specified files
1436
    std::vector<size_t> ids;
Y
Yu Kun 已提交
1437
    for (auto& id : file_ids) {
1438
        std::string::size_type sz;
J
jinhai 已提交
1439
        ids.push_back(std::stoul(id, &sz));
1440 1441
    }

J
Jin Hai 已提交
1442
    meta::SegmentsSchema search_files;
1443
    auto status = meta_ptr_->FilesByID(ids, search_files);
1444 1445
    if (!status.ok()) {
        return status;
1446 1447
    }

1448 1449
    fiu_do_on("DBImpl.QueryByFileID.empty_files_array", search_files.clear());
    if (search_files.empty()) {
S
starlord 已提交
1450
        return Status(DB_ERROR, "Invalid file id");
G
groot 已提交
1451 1452
    }

S
starlord 已提交
1453
    cache::CpuCacheMgr::GetInstance()->PrintInfo();  // print cache info before query
1454
    status = QueryAsync(tracer.Context(), search_files, k, extra_params, vectors, result_ids, result_distances);
S
starlord 已提交
1455
    cache::CpuCacheMgr::GetInstance()->PrintInfo();  // print cache info after query
Z
Zhiru Zhu 已提交
1456

S
starlord 已提交
1457
    return status;
1458 1459
}

S
starlord 已提交
1460
Status
Y
Yu Kun 已提交
1461
DBImpl::Size(uint64_t& result) {
1462
    if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
1463
        return SHUTDOWN_ERROR;
S
starlord 已提交
1464 1465
    }

S
starlord 已提交
1466
    return meta_ptr_->Size(result);
S
starlord 已提交
1467 1468 1469
}

///////////////////////////////////////////////////////////////////////////////////////////////////////////////////
S
starlord 已提交
1470
// internal methods
S
starlord 已提交
1471
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////
S
starlord 已提交
1472
Status
J
Jin Hai 已提交
1473
DBImpl::QueryAsync(const std::shared_ptr<server::Context>& context, const meta::SegmentsSchema& files, uint64_t k,
1474 1475
                   const milvus::json& extra_params, const VectorsData& vectors, ResultIds& result_ids,
                   ResultDistances& result_distances) {
1476
    milvus::server::ContextChild tracer(context, "Query Async");
G
groot 已提交
1477
    server::CollectQueryMetrics metrics(vectors.vector_count_);
Y
Yu Kun 已提交
1478

G
groot 已提交
1479 1480 1481
    if (files.size() > milvus::scheduler::TASK_TABLE_MAX_COUNT) {
        std::string msg =
            "Search files count exceed scheduler limit: " + std::to_string(milvus::scheduler::TASK_TABLE_MAX_COUNT);
1482
        LOG_ENGINE_ERROR_ << msg;
G
groot 已提交
1483 1484 1485
        return Status(DB_ERROR, msg);
    }

S
starlord 已提交
1486
    TimeRecorder rc("");
G
groot 已提交
1487

1488
    // step 1: construct search job
1489
    auto status = OngoingFileChecker::GetInstance().MarkOngoingFiles(files);
1490

1491
    LOG_ENGINE_DEBUG_ << LogOut("Engine query begin, index file count: %ld", files.size());
1492
    scheduler::SearchJobPtr job = std::make_shared<scheduler::SearchJob>(tracer.Context(), k, extra_params, vectors);
Y
Yu Kun 已提交
1493
    for (auto& file : files) {
J
Jin Hai 已提交
1494
        scheduler::SegmentSchemaPtr file_ptr = std::make_shared<meta::SegmentSchema>(file);
W
wxyu 已提交
1495
        job->AddIndexFile(file_ptr);
G
groot 已提交
1496 1497
    }

1498
    // step 2: put search job to scheduler and wait result
S
starlord 已提交
1499
    scheduler::JobMgrInst::GetInstance()->Put(job);
W
wxyu 已提交
1500
    job->WaitResult();
1501

1502
    status = OngoingFileChecker::GetInstance().UnmarkOngoingFiles(files);
W
wxyu 已提交
1503 1504
    if (!job->GetStatus().ok()) {
        return job->GetStatus();
1505
    }
G
groot 已提交
1506

1507
    // step 3: construct results
G
groot 已提交
1508 1509
    result_ids = job->GetResultIds();
    result_distances = job->GetResultDistances();
S
starlord 已提交
1510
    rc.ElapseFromBegin("Engine query totally cost");
G
groot 已提交
1511 1512 1513 1514

    return Status::OK();
}

1515 1516 1517 1518 1519 1520 1521 1522 1523 1524 1525 1526 1527 1528 1529 1530 1531 1532 1533 1534 1535 1536 1537 1538 1539 1540 1541 1542 1543 1544 1545 1546 1547 1548 1549 1550 1551 1552 1553 1554 1555 1556 1557 1558 1559 1560 1561 1562 1563 1564 1565 1566 1567 1568 1569 1570 1571 1572 1573 1574 1575 1576 1577 1578
Status
DBImpl::HybridQueryAsync(const std::shared_ptr<server::Context>& context, const std::string& table_id,
                         const meta::SegmentsSchema& files, context::HybridSearchContextPtr hybrid_search_context,
                         query::GeneralQueryPtr general_query,
                         std::unordered_map<std::string, engine::meta::hybrid::DataType>& attr_type, uint64_t& nq,
                         ResultIds& result_ids, ResultDistances& result_distances) {
    auto query_async_ctx = context->Child("Query Async");

#if 0
    // Construct tasks
    for (auto file : files) {
        std::unordered_map<std::string, engine::DataType> types;
        auto it = attr_type.begin();
        for (; it != attr_type.end(); it++) {
            types.insert(std::make_pair(it->first, (engine::DataType)it->second));
        }

        auto file_ptr = std::make_shared<meta::TableFileSchema>(file);
        search::TaskPtr
            task = std::make_shared<search::Task>(context, file_ptr, general_query, types, hybrid_search_context);
        search::TaskInst::GetInstance().load_queue().push(task);
        search::TaskInst::GetInstance().load_cv().notify_one();
        hybrid_search_context->tasks_.emplace_back(task);
    }

#endif

    //#if 0
    TimeRecorder rc("");

    // step 1: construct search job
    auto status = OngoingFileChecker::GetInstance().MarkOngoingFiles(files);

    VectorsData vectors;

    LOG_ENGINE_DEBUG_ << LogOut("Engine query begin, index file count: %ld", files.size());
    scheduler::SearchJobPtr job =
        std::make_shared<scheduler::SearchJob>(query_async_ctx, general_query, attr_type, vectors);
    for (auto& file : files) {
        scheduler::SegmentSchemaPtr file_ptr = std::make_shared<meta::SegmentSchema>(file);
        job->AddIndexFile(file_ptr);
    }

    // step 2: put search job to scheduler and wait result
    scheduler::JobMgrInst::GetInstance()->Put(job);
    job->WaitResult();

    status = OngoingFileChecker::GetInstance().UnmarkOngoingFiles(files);
    if (!job->GetStatus().ok()) {
        return job->GetStatus();
    }

    // step 3: construct results
    nq = job->vector_count();
    result_ids = job->GetResultIds();
    result_distances = job->GetResultDistances();
    rc.ElapseFromBegin("Engine query totally cost");

    query_async_ctx->GetTraceContext()->GetSpan()->Finish();
    //#endif

    return Status::OK();
}

S
starlord 已提交
1579
void
G
groot 已提交
1580
DBImpl::BackgroundIndexThread() {
Y
yu yunfeng 已提交
1581
    server::SystemInfo::GetInstance().Init();
X
Xu Peng 已提交
1582
    while (true) {
1583
        if (!initialized_.load(std::memory_order_acquire)) {
1584 1585
            WaitMergeFileFinish();
            WaitBuildIndexFinish();
S
starlord 已提交
1586

1587
            LOG_ENGINE_DEBUG_ << "DB background thread exit";
G
groot 已提交
1588 1589
            break;
        }
X
Xu Peng 已提交
1590

G
groot 已提交
1591
        swn_index_.Wait_For(std::chrono::seconds(BACKGROUND_INDEX_INTERVAL));
X
Xu Peng 已提交
1592

G
groot 已提交
1593
        WaitMergeFileFinish();
G
groot 已提交
1594 1595
        StartBuildIndexTask();
    }
X
Xu Peng 已提交
1596 1597
}

S
starlord 已提交
1598 1599
void
DBImpl::WaitMergeFileFinish() {
1600
    //    LOG_ENGINE_DEBUG_ << "Begin WaitMergeFileFinish";
1601 1602
    std::lock_guard<std::mutex> lck(merge_result_mutex_);
    for (auto& iter : merge_thread_results_) {
1603 1604
        iter.wait();
    }
1605
    //    LOG_ENGINE_DEBUG_ << "End WaitMergeFileFinish";
1606 1607
}

S
starlord 已提交
1608 1609
void
DBImpl::WaitBuildIndexFinish() {
1610
    //    LOG_ENGINE_DEBUG_ << "Begin WaitBuildIndexFinish";
1611
    std::lock_guard<std::mutex> lck(index_result_mutex_);
Y
Yu Kun 已提交
1612
    for (auto& iter : index_thread_results_) {
1613 1614
        iter.wait();
    }
1615
    //    LOG_ENGINE_DEBUG_ << "End WaitBuildIndexFinish";
1616 1617
}

S
starlord 已提交
1618 1619
void
DBImpl::StartMetricTask() {
G
groot 已提交
1620
    server::Metrics::GetInstance().KeepingAliveCounterIncrement(BACKGROUND_METRIC_INTERVAL);
G
groot 已提交
1621 1622
    int64_t cache_usage = cache::CpuCacheMgr::GetInstance()->CacheUsage();
    int64_t cache_total = cache::CpuCacheMgr::GetInstance()->CacheCapacity();
S
shengjh 已提交
1623 1624
    fiu_do_on("DBImpl.StartMetricTask.InvalidTotalCache", cache_total = 0);

J
JinHai-CN 已提交
1625 1626 1627 1628 1629 1630 1631
    if (cache_total > 0) {
        double cache_usage_double = cache_usage;
        server::Metrics::GetInstance().CpuCacheUsageGaugeSet(cache_usage_double * 100 / cache_total);
    } else {
        server::Metrics::GetInstance().CpuCacheUsageGaugeSet(0);
    }

Y
Yu Kun 已提交
1632
    server::Metrics::GetInstance().GpuCacheUsageGaugeSet();
G
groot 已提交
1633 1634 1635 1636 1637 1638 1639 1640
    uint64_t size;
    Size(size);
    server::Metrics::GetInstance().DataFileSizeGaugeSet(size);
    server::Metrics::GetInstance().CPUUsagePercentSet();
    server::Metrics::GetInstance().RAMUsagePercentSet();
    server::Metrics::GetInstance().GPUPercentGaugeSet();
    server::Metrics::GetInstance().GPUMemoryUsageGaugeSet();
    server::Metrics::GetInstance().OctetsSet();
S
starlord 已提交
1641

K
kun yu 已提交
1642
    server::Metrics::GetInstance().CPUCoreUsagePercentSet();
K
kun yu 已提交
1643 1644
    server::Metrics::GetInstance().GPUTemperature();
    server::Metrics::GetInstance().CPUTemperature();
1645
    server::Metrics::GetInstance().PushToGateway();
G
groot 已提交
1646 1647
}

S
starlord 已提交
1648
void
1649
DBImpl::StartMergeTask() {
1650
    // LOG_ENGINE_DEBUG_ << "Begin StartMergeTask";
1651
    // merge task has been finished?
1652
    {
1653 1654
        std::lock_guard<std::mutex> lck(merge_result_mutex_);
        if (!merge_thread_results_.empty()) {
1655
            std::chrono::milliseconds span(10);
1656 1657
            if (merge_thread_results_.back().wait_for(span) == std::future_status::ready) {
                merge_thread_results_.pop_back();
1658
            }
G
groot 已提交
1659 1660
        }
    }
X
Xu Peng 已提交
1661

1662
    // add new merge task
1663
    {
1664 1665
        std::lock_guard<std::mutex> lck(merge_result_mutex_);
        if (merge_thread_results_.empty()) {
1666 1667
            // collect merge files for all collections(if merge_collection_ids_ is empty) for two reasons:
            // 1. other collections may still has un-merged files
1668
            // 2. server may be closed unexpected, these un-merge files need to be merged when server restart
1669 1670 1671 1672 1673
            if (merge_collection_ids_.empty()) {
                std::vector<meta::CollectionSchema> collection_schema_array;
                meta_ptr_->AllCollections(collection_schema_array);
                for (auto& schema : collection_schema_array) {
                    merge_collection_ids_.insert(schema.collection_id_);
1674 1675 1676 1677
                }
            }

            // start merge file thread
1678
            merge_thread_results_.push_back(
1679 1680
                merge_thread_pool_.enqueue(&DBImpl::BackgroundMerge, this, merge_collection_ids_));
            merge_collection_ids_.clear();
1681
        }
G
groot 已提交
1682
    }
1683

1684
    // LOG_ENGINE_DEBUG_ << "End StartMergeTask";
X
Xu Peng 已提交
1685 1686
}

S
starlord 已提交
1687
Status
J
Jin Hai 已提交
1688
DBImpl::MergeFiles(const std::string& collection_id, const meta::SegmentsSchema& files) {
Z
Zhiru Zhu 已提交
1689
    // const std::lock_guard<std::mutex> lock(flush_merge_compact_mutex_);
1690

1691
    LOG_ENGINE_DEBUG_ << "Merge files for collection: " << collection_id;
S
starlord 已提交
1692

J
Jin Hai 已提交
1693
    // step 1: create collection file
1694 1695 1696 1697
    meta::SegmentSchema collection_file;
    collection_file.collection_id_ = collection_id;
    collection_file.file_type_ = meta::SegmentSchema::NEW_MERGE;
    Status status = meta_ptr_->CreateCollectionFile(collection_file);
X
Xu Peng 已提交
1698

1699
    if (!status.ok()) {
1700
        LOG_ENGINE_ERROR_ << "Failed to create collection: " << status.ToString();
1701 1702 1703
        return status;
    }

S
starlord 已提交
1704
    // step 2: merge files
1705
    /*
G
groot 已提交
1706
    ExecutionEnginePtr index =
1707 1708
        EngineFactory::Build(collection_file.dimension_, collection_file.location_,
    (EngineType)collection_file.engine_type_, (MetricType)collection_file.metric_type_, collection_file.nlist_);
1709
*/
J
Jin Hai 已提交
1710
    meta::SegmentsSchema updated;
1711 1712

    std::string new_segment_dir;
1713
    utils::GetParentPath(collection_file.location_, new_segment_dir);
1714
    auto segment_writer_ptr = std::make_shared<segment::SegmentWriter>(new_segment_dir);
1715

Y
Yu Kun 已提交
1716
    for (auto& file : files) {
Y
Yu Kun 已提交
1717
        server::CollectMergeFilesMetrics metrics;
1718 1719
        std::string segment_dir_to_merge;
        utils::GetParentPath(file.location_, segment_dir_to_merge);
1720
        segment_writer_ptr->Merge(segment_dir_to_merge, collection_file.file_id_);
1721
        auto file_schema = file;
J
Jin Hai 已提交
1722
        file_schema.file_type_ = meta::SegmentSchema::TO_DELETE;
1723
        updated.push_back(file_schema);
1724 1725
        auto size = segment_writer_ptr->Size();
        if (size >= file_schema.index_file_size_) {
S
starlord 已提交
1726
            break;
S
starlord 已提交
1727
        }
1728 1729
    }

S
starlord 已提交
1730
    // step 3: serialize to disk
S
starlord 已提交
1731
    try {
1732
        status = segment_writer_ptr->Serialize();
S
shengjh 已提交
1733 1734
        fiu_do_on("DBImpl.MergeFiles.Serialize_ThrowException", throw std::exception());
        fiu_do_on("DBImpl.MergeFiles.Serialize_ErrorStatus", status = Status(DB_ERROR, ""));
Y
Yu Kun 已提交
1735
    } catch (std::exception& ex) {
S
starlord 已提交
1736
        std::string msg = "Serialize merged index encounter exception: " + std::string(ex.what());
1737
        LOG_ENGINE_ERROR_ << msg;
G
groot 已提交
1738 1739
        status = Status(DB_ERROR, msg);
    }
Y
yu yunfeng 已提交
1740

G
groot 已提交
1741
    if (!status.ok()) {
1742
        LOG_ENGINE_ERROR_ << "Failed to persist merged segment: " << new_segment_dir << ". Error: " << status.message();
1743

G
groot 已提交
1744
        // if failed to serialize merge file to disk
1745
        // typical error: out of disk space, out of memory or permission denied
1746 1747
        collection_file.file_type_ = meta::SegmentSchema::TO_DELETE;
        status = meta_ptr_->UpdateCollectionFile(collection_file);
1748 1749
        LOG_ENGINE_DEBUG_ << "Failed to update file to index, mark file: " << collection_file.file_id_
                          << " to to_delete";
X
Xu Peng 已提交
1750

G
groot 已提交
1751
        return status;
S
starlord 已提交
1752 1753
    }

J
Jin Hai 已提交
1754
    // step 4: update collection files state
1755
    // if index type isn't IDMAP, set file type to TO_INDEX if file size exceed index_file_size
S
starlord 已提交
1756
    // else set file type to RAW, no need to build index
1757 1758 1759 1760
    if (!utils::IsRawIndexType(collection_file.engine_type_)) {
        collection_file.file_type_ = (segment_writer_ptr->Size() >= collection_file.index_file_size_)
                                         ? meta::SegmentSchema::TO_INDEX
                                         : meta::SegmentSchema::RAW;
1761
    } else {
1762
        collection_file.file_type_ = meta::SegmentSchema::RAW;
1763
    }
1764 1765 1766 1767
    collection_file.file_size_ = segment_writer_ptr->Size();
    collection_file.row_count_ = segment_writer_ptr->VectorCount();
    updated.push_back(collection_file);
    status = meta_ptr_->UpdateCollectionFiles(updated);
1768 1769
    LOG_ENGINE_DEBUG_ << "New merged segment " << collection_file.segment_id_ << " of size "
                      << segment_writer_ptr->Size() << " bytes";
1770

S
starlord 已提交
1771
    if (options_.insert_cache_immediately_) {
1772
        segment_writer_ptr->Cache();
S
starlord 已提交
1773
    }
X
Xu Peng 已提交
1774

1775 1776 1777
    return status;
}

1778 1779 1780 1781 1782 1783 1784 1785 1786 1787 1788 1789 1790 1791 1792 1793 1794 1795 1796 1797 1798 1799 1800 1801 1802 1803 1804 1805 1806 1807 1808 1809 1810 1811 1812 1813 1814 1815 1816 1817 1818 1819 1820 1821 1822 1823 1824 1825 1826 1827 1828 1829 1830 1831 1832 1833 1834 1835 1836 1837 1838 1839 1840 1841 1842 1843 1844 1845 1846 1847 1848 1849 1850 1851 1852 1853 1854 1855 1856 1857 1858 1859 1860 1861 1862 1863 1864 1865 1866 1867
Status
DBImpl::MergeHybridFiles(const std::string& collection_id, const milvus::engine::meta::SegmentsSchema& files) {
    // const std::lock_guard<std::mutex> lock(flush_merge_compact_mutex_);

    LOG_ENGINE_DEBUG_ << "Merge files for collection: " << collection_id;

    // step 1: create table file
    meta::SegmentSchema table_file;
    table_file.collection_id_ = collection_id;
    table_file.file_type_ = meta::SegmentSchema::NEW_MERGE;
    Status status = meta_ptr_->CreateHybridCollectionFile(table_file);

    if (!status.ok()) {
        LOG_ENGINE_ERROR_ << "Failed to create collection: " << status.ToString();
        return status;
    }

    // step 2: merge files
    /*
    ExecutionEnginePtr index =
        EngineFactory::Build(table_file.dimension_, table_file.location_, (EngineType)table_file.engine_type_,
                             (MetricType)table_file.metric_type_, table_file.nlist_);
*/
    meta::SegmentsSchema updated;

    std::string new_segment_dir;
    utils::GetParentPath(table_file.location_, new_segment_dir);
    auto segment_writer_ptr = std::make_shared<segment::SegmentWriter>(new_segment_dir);

    for (auto& file : files) {
        server::CollectMergeFilesMetrics metrics;
        std::string segment_dir_to_merge;
        utils::GetParentPath(file.location_, segment_dir_to_merge);
        segment_writer_ptr->Merge(segment_dir_to_merge, table_file.file_id_);
        auto file_schema = file;
        file_schema.file_type_ = meta::SegmentSchema::TO_DELETE;
        updated.push_back(file_schema);
        auto size = segment_writer_ptr->Size();
        if (size >= file_schema.index_file_size_) {
            break;
        }
    }

    // step 3: serialize to disk
    try {
        status = segment_writer_ptr->Serialize();
        fiu_do_on("DBImpl.MergeFiles.Serialize_ThrowException", throw std::exception());
        fiu_do_on("DBImpl.MergeFiles.Serialize_ErrorStatus", status = Status(DB_ERROR, ""));
    } catch (std::exception& ex) {
        std::string msg = "Serialize merged index encounter exception: " + std::string(ex.what());
        LOG_ENGINE_ERROR_ << msg;
        status = Status(DB_ERROR, msg);
    }

    if (!status.ok()) {
        LOG_ENGINE_ERROR_ << "Failed to persist merged segment: " << new_segment_dir << ". Error: " << status.message();

        // if failed to serialize merge file to disk
        // typical error: out of disk space, out of memory or permission denied
        table_file.file_type_ = meta::SegmentSchema::TO_DELETE;
        status = meta_ptr_->UpdateCollectionFile(table_file);
        LOG_ENGINE_DEBUG_ << "Failed to update file to index, mark file: " << table_file.file_id_ << " to to_delete";

        return status;
    }

    // step 4: update table files state
    // if index type isn't IDMAP, set file type to TO_INDEX if file size exceed index_file_size
    // else set file type to RAW, no need to build index
    if (!utils::IsRawIndexType(table_file.engine_type_)) {
        table_file.file_type_ = (segment_writer_ptr->Size() >= table_file.index_file_size_)
                                    ? meta::SegmentSchema::TO_INDEX
                                    : meta::SegmentSchema::RAW;
    } else {
        table_file.file_type_ = meta::SegmentSchema::RAW;
    }
    table_file.file_size_ = segment_writer_ptr->Size();
    table_file.row_count_ = segment_writer_ptr->VectorCount();
    updated.push_back(table_file);
    status = meta_ptr_->UpdateCollectionFiles(updated);
    LOG_ENGINE_DEBUG_ << "New merged segment " << table_file.segment_id_ << " of size " << segment_writer_ptr->Size()
                      << " bytes";

    if (options_.insert_cache_immediately_) {
        segment_writer_ptr->Cache();
    }

    return status;
}

S
starlord 已提交
1868
Status
J
Jin Hai 已提交
1869
DBImpl::BackgroundMergeFiles(const std::string& collection_id) {
Z
Zhiru Zhu 已提交
1870
    const std::lock_guard<std::mutex> lock(flush_merge_compact_mutex_);
1871

J
Jin Hai 已提交
1872 1873
    meta::SegmentsSchema raw_files;
    auto status = meta_ptr_->FilesToMerge(collection_id, raw_files);
X
Xu Peng 已提交
1874
    if (!status.ok()) {
1875
        LOG_ENGINE_ERROR_ << "Failed to get merge files for collection: " << collection_id;
X
Xu Peng 已提交
1876 1877
        return status;
    }
1878

1879
    if (raw_files.size() < options_.merge_trigger_number_) {
1880
        LOG_ENGINE_TRACE_ << "Files number not greater equal than merge trigger number, skip merge action";
1881 1882
        return Status::OK();
    }
1883

1884
    status = OngoingFileChecker::GetInstance().MarkOngoingFiles(raw_files);
J
Jin Hai 已提交
1885
    MergeFiles(collection_id, raw_files);
1886
    status = OngoingFileChecker::GetInstance().UnmarkOngoingFiles(raw_files);
G
groot 已提交
1887

1888
    if (!initialized_.load(std::memory_order_acquire)) {
1889
        LOG_ENGINE_DEBUG_ << "Server will shutdown, skip merge action for collection: " << collection_id;
1890
    }
X
Xu Peng 已提交
1891

G
groot 已提交
1892 1893
    return Status::OK();
}
1894

S
starlord 已提交
1895
void
1896
DBImpl::BackgroundMerge(std::set<std::string> collection_ids) {
1897
    // LOG_ENGINE_TRACE_ << " Background merge thread start";
S
starlord 已提交
1898

G
groot 已提交
1899
    Status status;
1900
    for (auto& collection_id : collection_ids) {
J
Jin Hai 已提交
1901
        status = BackgroundMergeFiles(collection_id);
G
groot 已提交
1902
        if (!status.ok()) {
1903
            LOG_ENGINE_ERROR_ << "Merge files for collection " << collection_id << " failed: " << status.ToString();
G
groot 已提交
1904
        }
S
starlord 已提交
1905

1906
        if (!initialized_.load(std::memory_order_acquire)) {
1907
            LOG_ENGINE_DEBUG_ << "Server will shutdown, skip merge action";
S
starlord 已提交
1908 1909
            break;
        }
G
groot 已提交
1910
    }
X
Xu Peng 已提交
1911

G
groot 已提交
1912
    meta_ptr_->Archive();
Z
update  
zhiru 已提交
1913

1914
    {
G
groot 已提交
1915
        uint64_t ttl = 10 * meta::SECOND;  // default: file will be hard-deleted few seconds after soft-deleted
1916
        if (options_.mode_ == DBOptions::MODE::CLUSTER_WRITABLE) {
1917
            ttl = meta::HOUR;
1918
        }
G
groot 已提交
1919

1920
        meta_ptr_->CleanUpFilesWithTTL(ttl);
Z
update  
zhiru 已提交
1921
    }
S
starlord 已提交
1922

1923
    // LOG_ENGINE_TRACE_ << " Background merge thread exit";
G
groot 已提交
1924
}
X
Xu Peng 已提交
1925

S
starlord 已提交
1926
void
G
groot 已提交
1927
DBImpl::StartBuildIndexTask() {
S
starlord 已提交
1928
    // build index has been finished?
1929 1930 1931 1932 1933 1934 1935
    {
        std::lock_guard<std::mutex> lck(index_result_mutex_);
        if (!index_thread_results_.empty()) {
            std::chrono::milliseconds span(10);
            if (index_thread_results_.back().wait_for(span) == std::future_status::ready) {
                index_thread_results_.pop_back();
            }
G
groot 已提交
1936 1937 1938
        }
    }

S
starlord 已提交
1939
    // add new build index task
1940 1941 1942
    {
        std::lock_guard<std::mutex> lck(index_result_mutex_);
        if (index_thread_results_.empty()) {
S
starlord 已提交
1943
            index_thread_results_.push_back(index_thread_pool_.enqueue(&DBImpl::BackgroundBuildIndex, this));
1944
        }
G
groot 已提交
1945
    }
X
Xu Peng 已提交
1946 1947
}

S
starlord 已提交
1948 1949
void
DBImpl::BackgroundBuildIndex() {
P
peng.xu 已提交
1950
    std::unique_lock<std::mutex> lock(build_index_mutex_);
J
Jin Hai 已提交
1951
    meta::SegmentsSchema to_index_files;
G
groot 已提交
1952
    meta_ptr_->FilesToIndex(to_index_files);
1953
    Status status = index_failed_checker_.IgnoreFailedIndexFiles(to_index_files);
1954

1955
    if (!to_index_files.empty()) {
1956
        LOG_ENGINE_DEBUG_ << "Background build index thread begin";
1957
        status = OngoingFileChecker::GetInstance().MarkOngoingFiles(to_index_files);
1958

1959
        // step 2: put build index task to scheduler
J
Jin Hai 已提交
1960
        std::vector<std::pair<scheduler::BuildIndexJobPtr, scheduler::SegmentSchemaPtr>> job2file_map;
1961
        for (auto& file : to_index_files) {
G
groot 已提交
1962
            scheduler::BuildIndexJobPtr job = std::make_shared<scheduler::BuildIndexJob>(meta_ptr_, options_);
J
Jin Hai 已提交
1963
            scheduler::SegmentSchemaPtr file_ptr = std::make_shared<meta::SegmentSchema>(file);
1964
            job->AddToIndexFiles(file_ptr);
G
groot 已提交
1965
            scheduler::JobMgrInst::GetInstance()->Put(job);
G
groot 已提交
1966
            job2file_map.push_back(std::make_pair(job, file_ptr));
1967
        }
G
groot 已提交
1968

G
groot 已提交
1969
        // step 3: wait build index finished and mark failed files
G
groot 已提交
1970 1971
        for (auto iter = job2file_map.begin(); iter != job2file_map.end(); ++iter) {
            scheduler::BuildIndexJobPtr job = iter->first;
J
Jin Hai 已提交
1972
            meta::SegmentSchema& file_schema = *(iter->second.get());
G
groot 已提交
1973 1974 1975
            job->WaitBuildIndexFinish();
            if (!job->GetStatus().ok()) {
                Status status = job->GetStatus();
1976
                LOG_ENGINE_ERROR_ << "Building index job " << job->id() << " failed: " << status.ToString();
G
groot 已提交
1977

1978
                index_failed_checker_.MarkFailedIndexFile(file_schema, status.message());
G
groot 已提交
1979
            } else {
1980
                LOG_ENGINE_DEBUG_ << "Building index job " << job->id() << " succeed.";
G
groot 已提交
1981 1982

                index_failed_checker_.MarkSucceedIndexFile(file_schema);
G
groot 已提交
1983
            }
1984
            status = OngoingFileChecker::GetInstance().UnmarkOngoingFile(file_schema);
1985
        }
G
groot 已提交
1986

1987
        LOG_ENGINE_DEBUG_ << "Background build index thread finished";
G
groot 已提交
1988
        index_req_swn_.Notify();  // notify CreateIndex check circle
Y
Yu Kun 已提交
1989
    }
X
Xu Peng 已提交
1990 1991
}

G
groot 已提交
1992
Status
J
Jin Hai 已提交
1993 1994
DBImpl::GetFilesToBuildIndex(const std::string& collection_id, const std::vector<int>& file_types,
                             meta::SegmentsSchema& files) {
G
groot 已提交
1995
    files.clear();
J
Jin Hai 已提交
1996
    auto status = meta_ptr_->FilesByType(collection_id, file_types, files);
G
groot 已提交
1997 1998 1999

    // only build index for files that row count greater than certain threshold
    for (auto it = files.begin(); it != files.end();) {
J
Jin Hai 已提交
2000
        if ((*it).file_type_ == static_cast<int>(meta::SegmentSchema::RAW) &&
G
groot 已提交
2001 2002 2003
            (*it).row_count_ < meta::BUILD_INDEX_THRESHOLD) {
            it = files.erase(it);
        } else {
2004
            ++it;
G
groot 已提交
2005 2006 2007 2008 2009 2010
        }
    }

    return Status::OK();
}

G
groot 已提交
2011
Status
J
Jin Hai 已提交
2012
DBImpl::GetFilesToSearch(const std::string& collection_id, meta::SegmentsSchema& files) {
2013
    LOG_ENGINE_DEBUG_ << "Collect files from collection: " << collection_id;
2014

J
Jin Hai 已提交
2015 2016
    meta::SegmentsSchema search_files;
    auto status = meta_ptr_->FilesToSearch(collection_id, search_files);
G
groot 已提交
2017 2018 2019 2020
    if (!status.ok()) {
        return status;
    }

2021 2022 2023
    for (auto& file : search_files) {
        files.push_back(file);
    }
G
groot 已提交
2024 2025 2026
    return Status::OK();
}

2027
Status
J
Jin Hai 已提交
2028 2029
DBImpl::GetPartitionByTag(const std::string& collection_id, const std::string& partition_tag,
                          std::string& partition_name) {
2030 2031 2032
    Status status;

    if (partition_tag.empty()) {
J
Jin Hai 已提交
2033
        partition_name = collection_id;
2034 2035 2036 2037 2038 2039 2040 2041

    } else {
        // trim side-blank of tag, only compare valid characters
        // for example: " ab cd " is treated as "ab cd"
        std::string valid_tag = partition_tag;
        server::StringHelpFunctions::TrimStringBlank(valid_tag);

        if (valid_tag == milvus::engine::DEFAULT_PARTITON_TAG) {
J
Jin Hai 已提交
2042
            partition_name = collection_id;
2043 2044 2045
            return status;
        }

J
Jin Hai 已提交
2046
        status = meta_ptr_->GetPartitionName(collection_id, partition_tag, partition_name);
2047
        if (!status.ok()) {
2048
            LOG_ENGINE_ERROR_ << status.message();
2049 2050 2051 2052 2053 2054
        }
    }

    return status;
}

G
groot 已提交
2055
Status
J
Jin Hai 已提交
2056
DBImpl::GetPartitionsByTags(const std::string& collection_id, const std::vector<std::string>& partition_tags,
G
groot 已提交
2057
                            std::set<std::string>& partition_name_array) {
J
Jin Hai 已提交
2058 2059
    std::vector<meta::CollectionSchema> partition_array;
    auto status = meta_ptr_->ShowPartitions(collection_id, partition_array);
G
groot 已提交
2060 2061

    for (auto& tag : partition_tags) {
2062 2063 2064 2065
        // trim side-blank of tag, only compare valid characters
        // for example: " ab cd " is treated as "ab cd"
        std::string valid_tag = tag;
        server::StringHelpFunctions::TrimStringBlank(valid_tag);
2066 2067

        if (valid_tag == milvus::engine::DEFAULT_PARTITON_TAG) {
J
Jin Hai 已提交
2068
            partition_name_array.insert(collection_id);
2069 2070 2071
            return status;
        }

G
groot 已提交
2072
        for (auto& schema : partition_array) {
2073
            if (server::StringHelpFunctions::IsRegexMatch(schema.partition_tag_, valid_tag)) {
J
Jin Hai 已提交
2074
                partition_name_array.insert(schema.collection_id_);
G
groot 已提交
2075 2076 2077 2078
            }
        }
    }

T
Tinkerrr 已提交
2079 2080 2081 2082
    if (partition_name_array.empty()) {
        return Status(PARTITION_NOT_FOUND, "Cannot find the specified partitions");
    }

G
groot 已提交
2083 2084 2085 2086
    return Status::OK();
}

Status
2087
DBImpl::DropCollectionRecursively(const std::string& collection_id) {
J
Jin Hai 已提交
2088
    // dates partly delete files of the collection but currently we don't support
2089
    LOG_ENGINE_DEBUG_ << "Prepare to delete collection " << collection_id;
G
groot 已提交
2090 2091

    Status status;
2092
    if (options_.wal_enable_) {
2093
        wal_mgr_->DropCollection(collection_id);
G
groot 已提交
2094 2095
    }

2096 2097 2098
    status = mem_mgr_->EraseMemVector(collection_id);   // not allow insert
    status = meta_ptr_->DropCollection(collection_id);  // soft delete collection
    index_failed_checker_.CleanFailedIndexFileOfCollection(collection_id);
2099

J
Jin Hai 已提交
2100
    // scheduler will determine when to delete collection files
2101
    auto nres = scheduler::ResMgrInst::GetInstance()->GetNumOfComputeResource();
J
Jin Hai 已提交
2102
    scheduler::DeleteJobPtr job = std::make_shared<scheduler::DeleteJob>(collection_id, meta_ptr_, nres);
2103 2104 2105
    scheduler::JobMgrInst::GetInstance()->Put(job);
    job->WaitAndDelete();

J
Jin Hai 已提交
2106 2107
    std::vector<meta::CollectionSchema> partition_array;
    status = meta_ptr_->ShowPartitions(collection_id, partition_array);
G
groot 已提交
2108
    for (auto& schema : partition_array) {
2109 2110
        status = DropCollectionRecursively(schema.collection_id_);
        fiu_do_on("DBImpl.DropCollectionRecursively.failed", status = Status(DB_ERROR, ""));
G
groot 已提交
2111 2112 2113 2114 2115 2116 2117 2118 2119
        if (!status.ok()) {
            return status;
        }
    }

    return Status::OK();
}

Status
2120
DBImpl::UpdateCollectionIndexRecursively(const std::string& collection_id, const CollectionIndex& index) {
J
Jin Hai 已提交
2121
    DropIndex(collection_id);
G
groot 已提交
2122

2123 2124
    auto status = meta_ptr_->UpdateCollectionIndex(collection_id, index);
    fiu_do_on("DBImpl.UpdateCollectionIndexRecursively.fail_update_collection_index",
S
shengjh 已提交
2125
              status = Status(DB_META_TRANSACTION_FAILED, ""));
G
groot 已提交
2126
    if (!status.ok()) {
2127
        LOG_ENGINE_ERROR_ << "Failed to update collection index info for collection: " << collection_id;
G
groot 已提交
2128 2129 2130
        return status;
    }

J
Jin Hai 已提交
2131 2132
    std::vector<meta::CollectionSchema> partition_array;
    status = meta_ptr_->ShowPartitions(collection_id, partition_array);
G
groot 已提交
2133
    for (auto& schema : partition_array) {
2134
        status = UpdateCollectionIndexRecursively(schema.collection_id_, index);
G
groot 已提交
2135 2136 2137 2138 2139 2140 2141 2142 2143
        if (!status.ok()) {
            return status;
        }
    }

    return Status::OK();
}

Status
2144
DBImpl::WaitCollectionIndexRecursively(const std::string& collection_id, const CollectionIndex& index) {
G
groot 已提交
2145 2146 2147
    // for IDMAP type, only wait all NEW file converted to RAW file
    // for other type, wait NEW/RAW/NEW_MERGE/NEW_INDEX/TO_INDEX files converted to INDEX files
    std::vector<int> file_types;
2148
    if (utils::IsRawIndexType(index.engine_type_)) {
G
groot 已提交
2149
        file_types = {
J
Jin Hai 已提交
2150 2151
            static_cast<int32_t>(meta::SegmentSchema::NEW),
            static_cast<int32_t>(meta::SegmentSchema::NEW_MERGE),
G
groot 已提交
2152 2153 2154
        };
    } else {
        file_types = {
J
Jin Hai 已提交
2155 2156 2157
            static_cast<int32_t>(meta::SegmentSchema::RAW),       static_cast<int32_t>(meta::SegmentSchema::NEW),
            static_cast<int32_t>(meta::SegmentSchema::NEW_MERGE), static_cast<int32_t>(meta::SegmentSchema::NEW_INDEX),
            static_cast<int32_t>(meta::SegmentSchema::TO_INDEX),
G
groot 已提交
2158 2159 2160 2161
        };
    }

    // get files to build index
2162 2163
    meta::SegmentsSchema collection_files;
    auto status = GetFilesToBuildIndex(collection_id, file_types, collection_files);
G
groot 已提交
2164 2165
    int times = 1;

2166
    while (!collection_files.empty()) {
2167
        LOG_ENGINE_DEBUG_ << "Non index files detected! Will build index " << times;
2168
        if (!utils::IsRawIndexType(index.engine_type_)) {
2169
            status = meta_ptr_->UpdateCollectionFilesToIndex(collection_id);
G
groot 已提交
2170 2171
        }

G
groot 已提交
2172
        index_req_swn_.Wait_For(std::chrono::seconds(WAIT_BUILD_INDEX_INTERVAL));
2173
        GetFilesToBuildIndex(collection_id, file_types, collection_files);
2174
        ++times;
G
groot 已提交
2175

2176
        index_failed_checker_.IgnoreFailedIndexFiles(collection_files);
G
groot 已提交
2177 2178 2179
    }

    // build index for partition
J
Jin Hai 已提交
2180 2181
    std::vector<meta::CollectionSchema> partition_array;
    status = meta_ptr_->ShowPartitions(collection_id, partition_array);
G
groot 已提交
2182
    for (auto& schema : partition_array) {
2183 2184
        status = WaitCollectionIndexRecursively(schema.collection_id_, index);
        fiu_do_on("DBImpl.WaitCollectionIndexRecursively.fail_build_collection_Index_for_partition",
S
shengjh 已提交
2185
                  status = Status(DB_ERROR, ""));
G
groot 已提交
2186 2187 2188 2189 2190
        if (!status.ok()) {
            return status;
        }
    }

G
groot 已提交
2191
    // failed to build index for some files, return error
2192
    std::string err_msg;
2193 2194
    index_failed_checker_.GetErrMsgForCollection(collection_id, err_msg);
    fiu_do_on("DBImpl.WaitCollectionIndexRecursively.not_empty_err_msg", err_msg.append("fiu"));
2195 2196
    if (!err_msg.empty()) {
        return Status(DB_ERROR, err_msg);
G
groot 已提交
2197 2198
    }

G
groot 已提交
2199 2200 2201 2202
    return Status::OK();
}

Status
2203
DBImpl::DropCollectionIndexRecursively(const std::string& collection_id) {
2204
    LOG_ENGINE_DEBUG_ << "Drop index for collection: " << collection_id;
2205 2206
    index_failed_checker_.CleanFailedIndexFileOfCollection(collection_id);
    auto status = meta_ptr_->DropCollectionIndex(collection_id);
G
groot 已提交
2207 2208 2209 2210 2211
    if (!status.ok()) {
        return status;
    }

    // drop partition index
J
Jin Hai 已提交
2212 2213
    std::vector<meta::CollectionSchema> partition_array;
    status = meta_ptr_->ShowPartitions(collection_id, partition_array);
G
groot 已提交
2214
    for (auto& schema : partition_array) {
2215 2216
        status = DropCollectionIndexRecursively(schema.collection_id_);
        fiu_do_on("DBImpl.DropCollectionIndexRecursively.fail_drop_collection_Index_for_partition",
S
shengjh 已提交
2217
                  status = Status(DB_ERROR, ""));
G
groot 已提交
2218 2219 2220 2221 2222 2223 2224 2225 2226
        if (!status.ok()) {
            return status;
        }
    }

    return Status::OK();
}

Status
2227
DBImpl::GetCollectionRowCountRecursively(const std::string& collection_id, uint64_t& row_count) {
G
groot 已提交
2228
    row_count = 0;
J
Jin Hai 已提交
2229
    auto status = meta_ptr_->Count(collection_id, row_count);
G
groot 已提交
2230 2231 2232 2233 2234
    if (!status.ok()) {
        return status;
    }

    // get partition row count
J
Jin Hai 已提交
2235 2236
    std::vector<meta::CollectionSchema> partition_array;
    status = meta_ptr_->ShowPartitions(collection_id, partition_array);
G
groot 已提交
2237
    for (auto& schema : partition_array) {
G
groot 已提交
2238
        uint64_t partition_row_count = 0;
2239 2240
        status = GetCollectionRowCountRecursively(schema.collection_id_, partition_row_count);
        fiu_do_on("DBImpl.GetCollectionRowCountRecursively.fail_get_collection_rowcount_for_partition",
S
shengjh 已提交
2241
                  status = Status(DB_ERROR, ""));
G
groot 已提交
2242 2243 2244 2245 2246 2247 2248 2249 2250 2251
        if (!status.ok()) {
            return status;
        }

        row_count += partition_row_count;
    }

    return Status::OK();
}

2252 2253 2254 2255
Status
DBImpl::ExecWalRecord(const wal::MXLogRecord& record) {
    fiu_return_on("DBImpl.ExexWalRecord.return", Status(););

2256 2257
    auto collections_flushed = [&](const std::set<std::string>& collection_ids) -> uint64_t {
        if (collection_ids.empty()) {
2258 2259 2260 2261 2262
            return 0;
        }

        uint64_t max_lsn = 0;
        if (options_.wal_enable_) {
2263
            for (auto& collection : collection_ids) {
2264
                uint64_t lsn = 0;
2265 2266
                meta_ptr_->GetCollectionFlushLSN(collection, lsn);
                wal_mgr_->CollectionFlushed(collection, lsn);
2267 2268 2269 2270 2271 2272 2273
                if (lsn > max_lsn) {
                    max_lsn = lsn;
                }
            }
        }

        std::lock_guard<std::mutex> lck(merge_result_mutex_);
2274 2275
        for (auto& collection : collection_ids) {
            merge_collection_ids_.insert(collection);
2276 2277 2278 2279 2280 2281 2282
        }
        return max_lsn;
    };

    Status status;

    switch (record.type) {
2283 2284 2285 2286
        case wal::MXLogType::Entity: {
            std::string target_collection_name;
            status = GetPartitionByTag(record.collection_id, record.partition_tag, target_collection_name);
            if (!status.ok()) {
2287
                LOG_WAL_ERROR_ << LogOut("[%s][%ld] ", "insert", 0) << "Get partition fail: " << status.message();
2288 2289 2290
                return status;
            }

2291
            std::set<std::string> flushed_collections;
2292 2293 2294
            status = mem_mgr_->InsertEntities(target_collection_name, record.length, record.ids,
                                              (record.data_size / record.length / sizeof(float)),
                                              (const float*)record.data, record.attr_nbytes, record.attr_data_size,
2295 2296
                                              record.attr_data, record.lsn, flushed_collections);
            collections_flushed(flushed_collections);
2297 2298 2299 2300

            milvus::server::CollectInsertMetrics metrics(record.length, status);
            break;
        }
2301
        case wal::MXLogType::InsertBinary: {
2302 2303
            std::string target_collection_name;
            status = GetPartitionByTag(record.collection_id, record.partition_tag, target_collection_name);
2304
            if (!status.ok()) {
2305
                LOG_WAL_ERROR_ << LogOut("[%s][%ld] ", "insert", 0) << "Get partition fail: " << status.message();
2306 2307 2308
                return status;
            }

2309 2310
            std::set<std::string> flushed_collections;
            status = mem_mgr_->InsertVectors(target_collection_name, record.length, record.ids,
2311
                                             (record.data_size / record.length / sizeof(uint8_t)),
2312
                                             (const u_int8_t*)record.data, record.lsn, flushed_collections);
2313
            // even though !status.ok, run
2314
            collections_flushed(flushed_collections);
2315 2316 2317 2318 2319 2320 2321

            // metrics
            milvus::server::CollectInsertMetrics metrics(record.length, status);
            break;
        }

        case wal::MXLogType::InsertVector: {
2322 2323
            std::string target_collection_name;
            status = GetPartitionByTag(record.collection_id, record.partition_tag, target_collection_name);
2324
            if (!status.ok()) {
2325
                LOG_WAL_ERROR_ << LogOut("[%s][%ld] ", "insert", 0) << "Get partition fail: " << status.message();
2326 2327 2328
                return status;
            }

2329 2330
            std::set<std::string> flushed_collections;
            status = mem_mgr_->InsertVectors(target_collection_name, record.length, record.ids,
2331
                                             (record.data_size / record.length / sizeof(float)),
2332
                                             (const float*)record.data, record.lsn, flushed_collections);
2333
            // even though !status.ok, run
2334
            collections_flushed(flushed_collections);
2335 2336 2337 2338 2339 2340 2341

            // metrics
            milvus::server::CollectInsertMetrics metrics(record.length, status);
            break;
        }

        case wal::MXLogType::Delete: {
J
Jin Hai 已提交
2342 2343
            std::vector<meta::CollectionSchema> partition_array;
            status = meta_ptr_->ShowPartitions(record.collection_id, partition_array);
2344 2345 2346 2347
            if (!status.ok()) {
                return status;
            }

2348
            std::vector<std::string> collection_ids{record.collection_id};
2349
            for (auto& partition : partition_array) {
2350 2351
                auto& partition_collection_id = partition.collection_id_;
                collection_ids.emplace_back(partition_collection_id);
2352 2353 2354
            }

            if (record.length == 1) {
2355
                for (auto& collection_id : collection_ids) {
J
Jin Hai 已提交
2356
                    status = mem_mgr_->DeleteVector(collection_id, *record.ids, record.lsn);
2357 2358 2359 2360 2361
                    if (!status.ok()) {
                        return status;
                    }
                }
            } else {
2362
                for (auto& collection_id : collection_ids) {
J
Jin Hai 已提交
2363
                    status = mem_mgr_->DeleteVectors(collection_id, record.length, record.ids, record.lsn);
2364 2365 2366 2367 2368 2369 2370 2371 2372
                    if (!status.ok()) {
                        return status;
                    }
                }
            }
            break;
        }

        case wal::MXLogType::Flush: {
J
Jin Hai 已提交
2373 2374 2375 2376
            if (!record.collection_id.empty()) {
                // flush one collection
                std::vector<meta::CollectionSchema> partition_array;
                status = meta_ptr_->ShowPartitions(record.collection_id, partition_array);
2377 2378 2379 2380
                if (!status.ok()) {
                    return status;
                }

2381
                std::vector<std::string> collection_ids{record.collection_id};
2382
                for (auto& partition : partition_array) {
2383 2384
                    auto& partition_collection_id = partition.collection_id_;
                    collection_ids.emplace_back(partition_collection_id);
2385 2386
                }

2387 2388
                std::set<std::string> flushed_collections;
                for (auto& collection_id : collection_ids) {
2389
                    const std::lock_guard<std::mutex> lock(flush_merge_compact_mutex_);
J
Jin Hai 已提交
2390
                    status = mem_mgr_->Flush(collection_id);
2391 2392 2393
                    if (!status.ok()) {
                        break;
                    }
2394
                    flushed_collections.insert(collection_id);
2395 2396
                }

2397
                collections_flushed(flushed_collections);
2398 2399

            } else {
2400 2401
                // flush all collections
                std::set<std::string> collection_ids;
2402 2403
                {
                    const std::lock_guard<std::mutex> lock(flush_merge_compact_mutex_);
2404
                    status = mem_mgr_->Flush(collection_ids);
2405 2406
                }

2407
                uint64_t lsn = collections_flushed(collection_ids);
2408 2409 2410 2411 2412 2413 2414 2415 2416 2417 2418 2419
                if (options_.wal_enable_) {
                    wal_mgr_->RemoveOldFiles(lsn);
                }
            }
            break;
        }
    }

    return status;
}

void
G
groot 已提交
2420 2421 2422 2423 2424 2425 2426 2427 2428 2429 2430
DBImpl::InternalFlush(const std::string& collection_id) {
    wal::MXLogRecord record;
    record.type = wal::MXLogType::Flush;
    record.collection_id = collection_id;
    ExecWalRecord(record);

    StartMergeTask();
}

void
DBImpl::BackgroundWalThread() {
2431
    SetThreadName("wal_thread");
2432 2433
    server::SystemInfo::GetInstance().Init();

2434
    std::chrono::system_clock::time_point next_auto_flush_time;
2435
    auto get_next_auto_flush_time = [&]() {
2436
        return std::chrono::system_clock::now() + std::chrono::seconds(options_.auto_flush_interval_);
2437
    };
2438 2439 2440
    if (options_.auto_flush_interval_ > 0) {
        next_auto_flush_time = get_next_auto_flush_time();
    }
2441 2442

    while (true) {
2443 2444
        if (options_.auto_flush_interval_ > 0) {
            if (std::chrono::system_clock::now() >= next_auto_flush_time) {
G
groot 已提交
2445
                InternalFlush();
2446 2447
                next_auto_flush_time = get_next_auto_flush_time();
            }
2448 2449
        }

G
groot 已提交
2450
        wal::MXLogRecord record;
2451 2452
        auto error_code = wal_mgr_->GetNextRecord(record);
        if (error_code != WAL_SUCCESS) {
2453
            LOG_ENGINE_ERROR_ << "WAL background GetNextRecord error";
2454 2455 2456 2457 2458 2459
            break;
        }

        if (record.type != wal::MXLogType::None) {
            ExecWalRecord(record);
            if (record.type == wal::MXLogType::Flush) {
G
groot 已提交
2460 2461
                // notify flush request to return
                flush_req_swn_.Notify();
2462 2463

                // if user flush all manually, update auto flush also
J
Jin Hai 已提交
2464
                if (record.collection_id.empty() && options_.auto_flush_interval_ > 0) {
2465 2466 2467 2468 2469 2470
                    next_auto_flush_time = get_next_auto_flush_time();
                }
            }

        } else {
            if (!initialized_.load(std::memory_order_acquire)) {
G
groot 已提交
2471 2472
                InternalFlush();
                flush_req_swn_.Notify();
2473 2474
                WaitMergeFileFinish();
                WaitBuildIndexFinish();
2475
                LOG_ENGINE_DEBUG_ << "WAL background thread exit";
2476 2477 2478
                break;
            }

2479
            if (options_.auto_flush_interval_ > 0) {
G
groot 已提交
2480
                swn_wal_.Wait_Until(next_auto_flush_time);
2481
            } else {
G
groot 已提交
2482
                swn_wal_.Wait();
2483
            }
2484 2485 2486 2487
        }
    }
}

G
groot 已提交
2488 2489
void
DBImpl::BackgroundFlushThread() {
2490
    SetThreadName("flush_thread");
G
groot 已提交
2491 2492 2493
    server::SystemInfo::GetInstance().Init();
    while (true) {
        if (!initialized_.load(std::memory_order_acquire)) {
2494
            LOG_ENGINE_DEBUG_ << "DB background flush thread exit";
G
groot 已提交
2495 2496 2497 2498 2499 2500 2501 2502 2503 2504 2505 2506 2507 2508 2509 2510 2511
            break;
        }

        InternalFlush();
        if (options_.auto_flush_interval_ > 0) {
            swn_flush_.Wait_For(std::chrono::seconds(options_.auto_flush_interval_));
        } else {
            swn_flush_.Wait();
        }
    }
}

void
DBImpl::BackgroundMetricThread() {
    server::SystemInfo::GetInstance().Init();
    while (true) {
        if (!initialized_.load(std::memory_order_acquire)) {
2512
            LOG_ENGINE_DEBUG_ << "DB background metric thread exit";
G
groot 已提交
2513 2514 2515 2516 2517 2518 2519 2520
            break;
        }

        swn_metric_.Wait_For(std::chrono::seconds(BACKGROUND_METRIC_INTERVAL));
        StartMetricTask();
    }
}

2521 2522 2523 2524 2525
void
DBImpl::OnCacheInsertDataChanged(bool value) {
    options_.insert_cache_immediately_ = value;
}

2526 2527 2528 2529 2530
void
DBImpl::OnUseBlasThresholdChanged(int64_t threshold) {
    faiss::distance_compute_blas_threshold = threshold;
}

S
starlord 已提交
2531 2532
}  // namespace engine
}  // namespace milvus