提交 128e9bb7 编写于 作者: G groot 提交者: jinhai

performance issues (#2438)

* performance issues
Signed-off-by: Nyhmo <yihua.mo@zilliz.com>

* fix bugs
Signed-off-by: Nyhmo <yihua.mo@zilliz.com>

* preload collection issue
Signed-off-by: Nyhmo <yihua.mo@zilliz.com>

* uncomment boring log
Signed-off-by: Nyhmo <yihua.mo@zilliz.com>

* reduce unittest time
Signed-off-by: Nyhmo <yihua.mo@zilliz.com>

* reduce metric test time cost
Signed-off-by: Nyhmo <yihua.mo@zilliz.com>
上级 1251fbf2
......@@ -71,7 +71,8 @@ class DB {
GetCollectionRowCount(const std::string& collection_id, uint64_t& row_count) = 0;
virtual Status
PreloadCollection(const std::string& collection_id) = 0;
PreloadCollection(const std::shared_ptr<server::Context>& context, const std::string& collection_id,
bool force = false) = 0;
virtual Status
UpdateCollectionFlag(const std::string& collection_id, int64_t flag) = 0;
......@@ -108,10 +109,11 @@ class DB {
Flush() = 0;
virtual Status
Compact(const std::string& collection_id, double threshold = 0.0) = 0;
Compact(const std::shared_ptr<server::Context>& context, const std::string& collection_id,
double threshold = 0.0) = 0;
virtual Status
GetVectorsByID(const std::string& collection_id, const IDNumbers& id_array,
GetVectorsByID(const engine::meta::CollectionSchema& collection, const IDNumbers& id_array,
std::vector<engine::VectorsData>& vectors) = 0;
virtual Status
......
......@@ -115,6 +115,16 @@ DBImpl::Start() {
// LOG_ENGINE_TRACE_ << "DB service start";
initialized_.store(true, std::memory_order_release);
// server may be closed unexpected, these un-merge files need to be merged when server restart
// and soft-delete files need to be deleted when server restart
std::set<std::string> merge_collection_ids;
std::vector<meta::CollectionSchema> collection_schema_array;
meta_ptr_->AllCollections(collection_schema_array);
for (auto& schema : collection_schema_array) {
merge_collection_ids.insert(schema.collection_id_);
}
StartMergeTask(merge_collection_ids, true);
// wal
if (options_.wal_enable_) {
auto error_code = DB_ERROR;
......@@ -158,7 +168,9 @@ DBImpl::Start() {
}
// background metric thread
bg_metric_thread_ = std::thread(&DBImpl::BackgroundMetricThread, this);
if (options_.metric_enable_) {
bg_metric_thread_ = std::thread(&DBImpl::BackgroundMetricThread, this);
}
return Status::OK();
}
......@@ -196,8 +208,10 @@ DBImpl::Stop() {
}
// wait metric thread exit
swn_metric_.Notify();
bg_metric_thread_.join();
if (options_.metric_enable_) {
swn_metric_.Notify();
bg_metric_thread_.join();
}
// LOG_ENGINE_TRACE_ << "DB service stop";
return Status::OK();
......@@ -386,7 +400,8 @@ DBImpl::GetCollectionInfo(const std::string& collection_id, std::string& collect
}
Status
DBImpl::PreloadCollection(const std::string& collection_id) {
DBImpl::PreloadCollection(const std::shared_ptr<server::Context>& context, const std::string& collection_id,
bool force) {
if (!initialized_.load(std::memory_order_acquire)) {
return SHUTDOWN_ERROR;
}
......@@ -436,6 +451,12 @@ DBImpl::PreloadCollection(const std::string& collection_id) {
<< " files need to be pre-loaded";
TimeRecorderAuto rc("Pre-load collection:" + collection_id);
for (auto& file : files_array) {
// client break the connection, no need to continue
if (context && context->IsConnectionBroken()) {
LOG_ENGINE_DEBUG_ << "Client connection broken, stop load collection";
break;
}
EngineType engine_type;
if (file.file_type_ == meta::SegmentSchema::FILE_TYPE::RAW ||
file.file_type_ == meta::SegmentSchema::FILE_TYPE::TO_INDEX ||
......@@ -467,7 +488,7 @@ DBImpl::PreloadCollection(const std::string& collection_id) {
}
size += engine->Size();
if (size > available_size) {
if (!force && size > available_size) {
LOG_ENGINE_DEBUG_ << "Pre-load cancelled since cache is almost full";
return Status(SERVER_CACHE_FULL, "Cache is full");
}
......@@ -919,7 +940,6 @@ DBImpl::Flush(const std::string& collection_id) {
swn_wal_.Notify();
flush_req_swn_.Wait();
}
StartMergeTask();
} else {
LOG_ENGINE_DEBUG_ << "MemTable flush";
InternalFlush(collection_id);
......@@ -946,7 +966,6 @@ DBImpl::Flush() {
swn_wal_.Notify();
flush_req_swn_.Wait();
}
StartMergeTask();
} else {
LOG_ENGINE_DEBUG_ << "MemTable flush";
InternalFlush();
......@@ -958,7 +977,7 @@ DBImpl::Flush() {
}
Status
DBImpl::Compact(const std::string& collection_id, double threshold) {
DBImpl::Compact(const std::shared_ptr<server::Context>& context, const std::string& collection_id, double threshold) {
if (!initialized_.load(std::memory_order_acquire)) {
return SHUTDOWN_ERROR;
}
......@@ -982,7 +1001,9 @@ DBImpl::Compact(const std::string& collection_id, double threshold) {
LOG_ENGINE_DEBUG_ << "Before compacting, wait for build index thread to finish...";
// WaitBuildIndexFinish();
std::vector<meta::CollectionSchema> collection_array;
status = meta_ptr_->ShowPartitions(collection_id, collection_array);
collection_array.push_back(collection_schema);
const std::lock_guard<std::mutex> index_lock(build_index_mutex_);
const std::lock_guard<std::mutex> merge_lock(flush_merge_compact_mutex_);
......@@ -993,7 +1014,7 @@ DBImpl::Compact(const std::string& collection_id, double threshold) {
std::vector<int> file_types{meta::SegmentSchema::FILE_TYPE::RAW, meta::SegmentSchema::FILE_TYPE::TO_INDEX,
meta::SegmentSchema::FILE_TYPE::BACKUP};
meta::FilesHolder files_holder;
status = meta_ptr_->FilesByType(collection_id, file_types, files_holder);
status = meta_ptr_->FilesByTypeEx(collection_array, file_types, files_holder);
if (!status.ok()) {
std::string err_msg = "Failed to get files to compact: " + status.message();
LOG_ENGINE_ERROR_ << err_msg;
......@@ -1006,6 +1027,12 @@ DBImpl::Compact(const std::string& collection_id, double threshold) {
// attention: here is a copy, not reference, since files_holder.UnmarkFile will change the array internal
milvus::engine::meta::SegmentsSchema files_to_compact = files_holder.HoldFiles();
for (auto iter = files_to_compact.begin(); iter != files_to_compact.end();) {
// client break the connection, no need to continue
if (context && context->IsConnectionBroken()) {
LOG_ENGINE_DEBUG_ << "Client connection broken, stop compact operation";
break;
}
meta::SegmentSchema file = *iter;
iter = files_to_compact.erase(iter);
......@@ -1023,7 +1050,7 @@ DBImpl::Compact(const std::string& collection_id, double threshold) {
meta::SegmentsSchema files_to_update;
if (deleted_docs_size != 0) {
compact_status = CompactFile(collection_id, threshold, file, files_to_update);
compact_status = CompactFile(file, threshold, files_to_update);
if (!compact_status.ok()) {
LOG_ENGINE_ERROR_ << "Compact failed for segment " << file.segment_id_ << ": "
......@@ -1054,9 +1081,8 @@ DBImpl::Compact(const std::string& collection_id, double threshold) {
}
Status
DBImpl::CompactFile(const std::string& collection_id, double threshold, const meta::SegmentSchema& file,
meta::SegmentsSchema& files_to_update) {
LOG_ENGINE_DEBUG_ << "Compacting segment " << file.segment_id_ << " for collection: " << collection_id;
DBImpl::CompactFile(const meta::SegmentSchema& file, double threshold, meta::SegmentsSchema& files_to_update) {
LOG_ENGINE_DEBUG_ << "Compacting segment " << file.segment_id_ << " for collection: " << file.collection_id_;
std::string segment_dir_to_merge;
utils::GetParentPath(file.location_, segment_dir_to_merge);
......@@ -1068,7 +1094,7 @@ DBImpl::CompactFile(const std::string& collection_id, double threshold, const me
auto status = segment_reader_to_merge.LoadDeletedDocs(deleted_docs_ptr);
if (status.ok()) {
auto delete_items = deleted_docs_ptr->GetDeletedDocs();
double delete_rate = (double)delete_items.size() / (double)file.row_count_;
double delete_rate = (double)delete_items.size() / (double)(delete_items.size() + file.row_count_);
if (delete_rate < threshold) {
LOG_ENGINE_DEBUG_ << "Delete rate less than " << threshold << ", no need to compact for"
<< segment_dir_to_merge;
......@@ -1079,8 +1105,7 @@ DBImpl::CompactFile(const std::string& collection_id, double threshold, const me
// Create new collection file
meta::SegmentSchema compacted_file;
compacted_file.collection_id_ = collection_id;
// compacted_file.date_ = date;
compacted_file.collection_id_ = file.collection_id_;
compacted_file.file_type_ = meta::SegmentSchema::NEW_MERGE; // TODO: use NEW_MERGE for now
auto status = meta_ptr_->CreateCollectionFile(compacted_file);
......@@ -1090,7 +1115,6 @@ DBImpl::CompactFile(const std::string& collection_id, double threshold, const me
}
// Compact (merge) file to the newly created collection file
std::string new_segment_dir;
utils::GetParentPath(compacted_file.location_, new_segment_dir);
auto segment_writer_ptr = std::make_shared<segment::SegmentWriter>(new_segment_dir);
......@@ -1112,7 +1136,7 @@ DBImpl::CompactFile(const std::string& collection_id, double threshold, const me
return status;
}
// Update compacted file state, if origin file is backup or to_index, set compected file to to_index
// Update compacted file state, if origin file is backup or to_index, set compacted file to to_index
compacted_file.file_size_ = segment_writer_ptr->Size();
compacted_file.row_count_ = segment_writer_ptr->VectorCount();
if ((file.file_type_ == (int32_t)meta::SegmentSchema::BACKUP ||
......@@ -1157,53 +1181,41 @@ DBImpl::CompactFile(const std::string& collection_id, double threshold, const me
}
Status
DBImpl::GetVectorsByID(const std::string& collection_id, const IDNumbers& id_array,
DBImpl::GetVectorsByID(const engine::meta::CollectionSchema& collection, const IDNumbers& id_array,
std::vector<engine::VectorsData>& vectors) {
if (!initialized_.load(std::memory_order_acquire)) {
return SHUTDOWN_ERROR;
}
bool has_collection;
auto status = HasCollection(collection_id, has_collection);
if (!has_collection) {
LOG_ENGINE_ERROR_ << "Collection " << collection_id << " does not exist: ";
return Status(DB_NOT_FOUND, "Collection does not exist");
}
if (!status.ok()) {
return status;
}
meta::FilesHolder files_holder;
std::vector<int> file_types{meta::SegmentSchema::FILE_TYPE::RAW, meta::SegmentSchema::FILE_TYPE::TO_INDEX,
meta::SegmentSchema::FILE_TYPE::BACKUP};
status = meta_ptr_->FilesByType(collection_id, file_types, files_holder);
std::vector<meta::CollectionSchema> collection_array;
auto status = meta_ptr_->ShowPartitions(collection.collection_id_, collection_array);
collection_array.push_back(collection);
status = meta_ptr_->FilesByTypeEx(collection_array, file_types, files_holder);
if (!status.ok()) {
std::string err_msg = "Failed to get files for GetVectorsByID: " + status.message();
std::string err_msg = "Failed to get files for GetVectorByID: " + status.message();
LOG_ENGINE_ERROR_ << err_msg;
return status;
}
std::vector<meta::CollectionSchema> partition_array;
status = meta_ptr_->ShowPartitions(collection_id, partition_array);
for (auto& schema : partition_array) {
status = meta_ptr_->FilesByType(schema.collection_id_, file_types, files_holder);
if (!status.ok()) {
std::string err_msg = "Failed to get files for GetVectorByID: " + status.message();
LOG_ENGINE_ERROR_ << err_msg;
return status;
}
}
if (files_holder.HoldFiles().empty()) {
LOG_ENGINE_DEBUG_ << "No files to get vector by id from";
return Status(DB_NOT_FOUND, "Collection is empty");
}
cache::CpuCacheMgr::GetInstance()->PrintInfo();
status = GetVectorsByIdHelper(collection_id, id_array, vectors, files_holder);
status = GetVectorsByIdHelper(id_array, vectors, files_holder);
cache::CpuCacheMgr::GetInstance()->PrintInfo();
if (vectors.empty()) {
std::string msg = "Vectors not found in collection " + collection.collection_id_;
LOG_ENGINE_DEBUG_ << msg;
}
return status;
}
......@@ -1280,8 +1292,8 @@ DBImpl::GetVectorIDs(const std::string& collection_id, const std::string& segmen
}
Status
DBImpl::GetVectorsByIdHelper(const std::string& collection_id, const IDNumbers& id_array,
std::vector<engine::VectorsData>& vectors, meta::FilesHolder& files_holder) {
DBImpl::GetVectorsByIdHelper(const IDNumbers& id_array, std::vector<engine::VectorsData>& vectors,
meta::FilesHolder& files_holder) {
// attention: this is a copy, not a reference, since the files_holder.UnMarkFile will change the array internal
milvus::engine::meta::SegmentsSchema files = files_holder.HoldFiles();
LOG_ENGINE_DEBUG_ << "Getting vector by id in " << files.size() << " files, id count = " << id_array.size();
......@@ -1298,6 +1310,9 @@ DBImpl::GetVectorsByIdHelper(const std::string& collection_id, const IDNumbers&
IDNumbers temp_ids = id_array;
for (auto& file : files) {
if (temp_ids.empty()) {
break; // all vectors found, no need to continue
}
// Load bloom filter
std::string segment_dir;
engine::utils::GetParentPath(file.location_, segment_dir);
......@@ -1380,11 +1395,6 @@ DBImpl::GetVectorsByIdHelper(const std::string& collection_id, const IDNumbers&
vectors.emplace_back(data);
}
if (vectors.empty()) {
std::string msg = "Vectors not found in collection " + collection_id;
LOG_ENGINE_DEBUG_ << msg;
}
return Status::OK();
}
......@@ -1395,15 +1405,17 @@ DBImpl::CreateIndex(const std::shared_ptr<server::Context>& context, const std::
return SHUTDOWN_ERROR;
}
// serialize memory data
// std::set<std::string> sync_collection_ids;
// auto status = SyncMemData(sync_collection_ids);
// step 1: wait merge file thread finished to avoid duplicate data bug
auto status = Flush();
WaitMergeFileFinish(); // let merge file thread finish
std::set<std::string> merge_collection_ids;
StartMergeTask(merge_collection_ids, true); // start force-merge task
WaitMergeFileFinish(); // let force-merge file thread finish
{
std::unique_lock<std::mutex> lock(build_index_mutex_);
// step 1: check index difference
// step 2: check index difference
CollectionIndex old_index;
status = DescribeIndex(collection_id, old_index);
if (!status.ok()) {
......@@ -1411,7 +1423,7 @@ DBImpl::CreateIndex(const std::shared_ptr<server::Context>& context, const std::
return status;
}
// step 2: update index info
// step 3: update index info
CollectionIndex new_index = index;
new_index.metric_type_ = old_index.metric_type_; // dont change metric type, it was defined by CreateCollection
if (!utils::IsSameIndex(old_index, new_index)) {
......@@ -1422,11 +1434,6 @@ DBImpl::CreateIndex(const std::shared_ptr<server::Context>& context, const std::
}
}
// step 3: wait merge file thread finished to avoid duplicate data bug
WaitMergeFileFinish(); // let merge file thread finish
StartMergeTask(true); // start force-merge task
WaitMergeFileFinish(); // let force-merge file thread finish
// step 4: wait and build index
status = index_failed_checker_.CleanFailedIndexFileOfCollection(collection_id);
status = WaitCollectionIndexRecursively(context, collection_id, index);
......@@ -1451,7 +1458,8 @@ DBImpl::DropIndex(const std::string& collection_id) {
LOG_ENGINE_DEBUG_ << "Drop index for collection: " << collection_id;
auto status = DropCollectionIndexRecursively(collection_id);
StartMergeTask(); // merge small files after drop index
std::set<std::string> merge_collection_ids = {collection_id};
StartMergeTask(merge_collection_ids, true); // merge small files after drop index
return status;
}
......@@ -1493,7 +1501,7 @@ DBImpl::QueryByIDs(const std::shared_ptr<server::Context>& context, const std::s
// get target vectors data
std::vector<milvus::engine::VectorsData> vectors;
status = GetVectorsByID(collection_id, id_array, vectors);
status = GetVectorsByID(collection_schema, id_array, vectors);
if (!status.ok()) {
std::string msg = "Failed to get vector data for collection: " + collection_id;
LOG_ENGINE_ERROR_ << msg;
......@@ -1897,6 +1905,7 @@ DBImpl::HybridQueryAsync(const std::shared_ptr<server::Context>& context, const
void
DBImpl::BackgroundIndexThread() {
SetThreadName("index_thread");
server::SystemInfo::GetInstance().Init();
while (true) {
if (!initialized_.load(std::memory_order_acquire)) {
......@@ -1965,7 +1974,7 @@ DBImpl::StartMetricTask() {
}
void
DBImpl::StartMergeTask(bool force_merge_all) {
DBImpl::StartMergeTask(const std::set<std::string>& merge_collection_ids, bool force_merge_all) {
// LOG_ENGINE_DEBUG_ << "Begin StartMergeTask";
// merge task has been finished?
{
......@@ -1982,21 +1991,9 @@ DBImpl::StartMergeTask(bool force_merge_all) {
{
std::lock_guard<std::mutex> lck(merge_result_mutex_);
if (merge_thread_results_.empty()) {
// collect merge files for all collections(if merge_collection_ids_ is empty) for two reasons:
// 1. other collections may still has un-merged files
// 2. server may be closed unexpected, these un-merge files need to be merged when server restart
if (merge_collection_ids_.empty()) {
std::vector<meta::CollectionSchema> collection_schema_array;
meta_ptr_->AllCollections(collection_schema_array);
for (auto& schema : collection_schema_array) {
merge_collection_ids_.insert(schema.collection_id_);
}
}
// start merge file thread
merge_thread_results_.push_back(
merge_thread_pool_.enqueue(&DBImpl::BackgroundMerge, this, merge_collection_ids_, force_merge_all));
merge_collection_ids_.clear();
merge_thread_pool_.enqueue(&DBImpl::BackgroundMerge, this, merge_collection_ids, force_merge_all));
}
}
......@@ -2124,7 +2121,7 @@ DBImpl::BackgroundMerge(std::set<std::string> collection_ids, bool force_merge_a
}
}
meta_ptr_->Archive();
// meta_ptr_->Archive();
{
uint64_t timeout = (options_.file_cleanup_timeout_ >= 0) ? options_.file_cleanup_timeout_ : 10;
......@@ -2163,7 +2160,7 @@ DBImpl::BackgroundBuildIndex() {
meta::FilesHolder files_holder;
meta_ptr_->FilesToIndex(files_holder);
milvus::engine::meta::SegmentsSchema& to_index_files = files_holder.HoldFiles();
milvus::engine::meta::SegmentsSchema to_index_files = files_holder.HoldFiles();
Status status = index_failed_checker_.IgnoreFailedIndexFiles(to_index_files);
if (!to_index_files.empty()) {
......@@ -2383,7 +2380,7 @@ DBImpl::WaitCollectionIndexRecursively(const std::shared_ptr<server::Context>& c
index_req_swn_.Wait_For(std::chrono::seconds(1));
// client break the connection, no need to block, check every 1 second
if (context->IsConnectionBroken()) {
if (context && context->IsConnectionBroken()) {
LOG_ENGINE_DEBUG_ << "Client connection broken, build index in background";
break; // just break, not return, continue to update partitions files to to_index
}
......@@ -2490,10 +2487,11 @@ DBImpl::ExecWalRecord(const wal::MXLogRecord& record) {
wal_mgr_->CollectionFlushed(collection_id, lsn);
}
std::lock_guard<std::mutex> lck(merge_result_mutex_);
std::set<std::string> merge_collection_ids;
for (auto& collection : target_collection_names) {
merge_collection_ids_.insert(collection);
merge_collection_ids.insert(collection);
}
StartMergeTask(merge_collection_ids);
return max_lsn;
};
......@@ -2505,8 +2503,8 @@ DBImpl::ExecWalRecord(const wal::MXLogRecord& record) {
wal_mgr_->PartitionFlushed(collection_id, partition, lsn);
}
std::lock_guard<std::mutex> lck(merge_result_mutex_);
merge_collection_ids_.insert(target_collection_name);
std::set<std::string> merge_collection_ids = {target_collection_name};
StartMergeTask(merge_collection_ids);
};
Status status;
......@@ -2663,8 +2661,6 @@ DBImpl::InternalFlush(const std::string& collection_id) {
record.type = wal::MXLogType::Flush;
record.collection_id = collection_id;
ExecWalRecord(record);
StartMergeTask();
}
void
......@@ -2747,6 +2743,7 @@ DBImpl::BackgroundFlushThread() {
void
DBImpl::BackgroundMetricThread() {
SetThreadName("metric_thread");
server::SystemInfo::GetInstance().Init();
while (true) {
if (!initialized_.load(std::memory_order_acquire)) {
......
......@@ -78,7 +78,8 @@ class DBImpl : public DB, public server::CacheConfigHandler, public server::Engi
GetCollectionInfo(const std::string& collection_id, std::string& collection_info) override;
Status
PreloadCollection(const std::string& collection_id) override;
PreloadCollection(const std::shared_ptr<server::Context>& context, const std::string& collection_id,
bool force = false) override;
Status
UpdateCollectionFlag(const std::string& collection_id, int64_t flag) override;
......@@ -119,10 +120,11 @@ class DBImpl : public DB, public server::CacheConfigHandler, public server::Engi
Flush() override;
Status
Compact(const std::string& collection_id, double threshold = 0.0) override;
Compact(const std::shared_ptr<server::Context>& context, const std::string& collection_id,
double threshold = 0.0) override;
Status
GetVectorsByID(const std::string& collection_id, const IDNumbers& id_array,
GetVectorsByID(const engine::meta::CollectionSchema& collection, const IDNumbers& id_array,
std::vector<engine::VectorsData>& vectors) override;
Status
......@@ -200,8 +202,8 @@ class DBImpl : public DB, public server::CacheConfigHandler, public server::Engi
ResultIds& result_ids, ResultDistances& result_distances);
Status
GetVectorsByIdHelper(const std::string& collection_id, const IDNumbers& id_array,
std::vector<engine::VectorsData>& vectors, meta::FilesHolder& files_holder);
GetVectorsByIdHelper(const IDNumbers& id_array, std::vector<engine::VectorsData>& vectors,
meta::FilesHolder& files_holder);
void
InternalFlush(const std::string& collection_id = "");
......@@ -228,7 +230,7 @@ class DBImpl : public DB, public server::CacheConfigHandler, public server::Engi
StartMetricTask();
void
StartMergeTask(bool force_merge_all = false);
StartMergeTask(const std::set<std::string>& merge_collection_ids, bool force_merge_all = false);
void
BackgroundMerge(std::set<std::string> collection_ids, bool force_merge_all);
......@@ -243,13 +245,7 @@ class DBImpl : public DB, public server::CacheConfigHandler, public server::Engi
BackgroundBuildIndex();
Status
CompactFile(const std::string& collection_id, double threshold, const meta::SegmentSchema& file,
meta::SegmentsSchema& files_to_update);
/*
Status
SyncMemData(std::set<std::string>& sync_collection_ids);
*/
CompactFile(const meta::SegmentSchema& file, double threshold, meta::SegmentsSchema& files_to_update);
Status
GetFilesToBuildIndex(const std::string& collection_id, const std::vector<int>& file_types,
......@@ -355,7 +351,6 @@ class DBImpl : public DB, public server::CacheConfigHandler, public server::Engi
ThreadPool merge_thread_pool_;
std::mutex merge_result_mutex_;
std::list<std::future<void>> merge_thread_results_;
std::set<std::string> merge_collection_ids_;
ThreadPool index_thread_pool_;
std::mutex index_result_mutex_;
......
......@@ -75,6 +75,8 @@ struct DBOptions {
int64_t auto_flush_interval_ = 1;
int64_t file_cleanup_timeout_ = 10;
bool metric_enable_ = false;
// wal relative configurations
bool wal_enable_ = true;
bool recovery_error_ignore_ = true;
......
......@@ -138,6 +138,10 @@ class Meta {
virtual Status
FilesByType(const std::string& collection_id, const std::vector<int>& file_types, FilesHolder& files_holder) = 0;
virtual Status
FilesByTypeEx(const std::vector<meta::CollectionSchema>& collections, const std::vector<int>& file_types,
FilesHolder& files_holder) = 0;
virtual Status
FilesByID(const std::vector<size_t>& ids, FilesHolder& files_holder) = 0;
......
......@@ -27,6 +27,7 @@
#include <sstream>
#include <string>
#include <thread>
#include <unordered_map>
#include <utility>
#include "MetaConsts.h"
......@@ -1637,7 +1638,8 @@ MySQLMetaImpl::FilesToSearch(const std::string& collection_id, FilesHolder& file
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
mysqlpp::Query statement = connectionPtr->query();
statement << "SELECT id, table_id, segment_id, engine_type, file_id, file_type, file_size, row_count, date"
statement << "SELECT id, table_id, segment_id, file_id, file_type, file_size, row_count, date,"
<< " engine_type, created_on, updated_time"
<< " FROM " << META_TABLEFILES << " WHERE table_id = " << mysqlpp::quote << collection_id;
// End
......@@ -1665,16 +1667,19 @@ MySQLMetaImpl::FilesToSearch(const std::string& collection_id, FilesHolder& file
collection_file.id_ = resRow["id"]; // implicit conversion
resRow["table_id"].to_string(collection_file.collection_id_);
resRow["segment_id"].to_string(collection_file.segment_id_);
collection_file.index_file_size_ = collection_schema.index_file_size_;
collection_file.engine_type_ = resRow["engine_type"];
collection_file.index_params_ = collection_schema.index_params_;
collection_file.metric_type_ = collection_schema.metric_type_;
resRow["file_id"].to_string(collection_file.file_id_);
collection_file.file_type_ = resRow["file_type"];
collection_file.file_size_ = resRow["file_size"];
collection_file.row_count_ = resRow["row_count"];
collection_file.date_ = resRow["date"];
collection_file.engine_type_ = resRow["engine_type"];
collection_file.created_on_ = resRow["created_on"];
collection_file.updated_time_ = resRow["updated_time"];
collection_file.dimension_ = collection_schema.dimension_;
collection_file.index_file_size_ = collection_schema.index_file_size_;
collection_file.index_params_ = collection_schema.index_params_;
collection_file.metric_type_ = collection_schema.metric_type_;
auto status = utils::GetCollectionFilePath(options_, collection_file);
if (!status.ok()) {
......@@ -1711,18 +1716,15 @@ MySQLMetaImpl::FilesToSearchEx(const std::string& root_collection, const std::se
return status;
}
// distribute id array to batchs
const int64_t batch_size = 50;
// distribute id array to batches
const uint64_t batch_size = 50;
std::vector<std::vector<std::string>> id_groups;
std::vector<std::string> temp_group;
int64_t count = 1;
for (auto& id : partition_id_array) {
temp_group.push_back(id);
count++;
if (count >= batch_size) {
if (temp_group.size() >= batch_size) {
id_groups.emplace_back(temp_group);
temp_group.clear();
count = 0;
}
}
......@@ -1749,9 +1751,9 @@ MySQLMetaImpl::FilesToSearchEx(const std::string& root_collection, const std::se
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
mysqlpp::Query statement = connectionPtr->query();
statement
<< "SELECT id, table_id, segment_id, engine_type, file_id, file_type, file_size, row_count, date"
<< " FROM " << META_TABLEFILES << " WHERE table_id in (";
statement << "SELECT id, table_id, segment_id, file_id, file_type, file_size, row_count, date,"
<< " engine_type, created_on, updated_time"
<< " FROM " << META_TABLEFILES << " WHERE table_id in (";
for (size_t i = 0; i < group.size(); i++) {
statement << mysqlpp::quote << group[i];
if (i != group.size() - 1) {
......@@ -1776,16 +1778,19 @@ MySQLMetaImpl::FilesToSearchEx(const std::string& root_collection, const std::se
collection_file.id_ = resRow["id"]; // implicit conversion
resRow["table_id"].to_string(collection_file.collection_id_);
resRow["segment_id"].to_string(collection_file.segment_id_);
collection_file.index_file_size_ = collection_schema.index_file_size_;
collection_file.engine_type_ = resRow["engine_type"];
collection_file.index_params_ = collection_schema.index_params_;
collection_file.metric_type_ = collection_schema.metric_type_;
resRow["file_id"].to_string(collection_file.file_id_);
collection_file.file_type_ = resRow["file_type"];
collection_file.file_size_ = resRow["file_size"];
collection_file.row_count_ = resRow["row_count"];
collection_file.date_ = resRow["date"];
collection_file.engine_type_ = resRow["engine_type"];
collection_file.created_on_ = resRow["created_on"];
collection_file.updated_time_ = resRow["updated_time"];
collection_file.dimension_ = collection_schema.dimension_;
collection_file.index_file_size_ = collection_schema.index_file_size_;
collection_file.index_params_ = collection_schema.index_params_;
collection_file.metric_type_ = collection_schema.metric_type_;
auto status = utils::GetCollectionFilePath(options_, collection_file);
if (!status.ok()) {
......@@ -1837,8 +1842,8 @@ MySQLMetaImpl::FilesToMerge(const std::string& collection_id, FilesHolder& files
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
mysqlpp::Query statement = connectionPtr->query();
statement << "SELECT id, table_id, segment_id, file_id, file_type, file_size, row_count, date, "
"engine_type, created_on"
statement << "SELECT id, table_id, segment_id, file_id, file_type, file_size, row_count, date,"
" engine_type, created_on, updated_time"
<< " FROM " << META_TABLEFILES << " WHERE table_id = " << mysqlpp::quote << collection_id
<< " AND file_type = " << std::to_string(SegmentSchema::RAW) << " ORDER BY row_count DESC;";
......@@ -1861,14 +1866,17 @@ MySQLMetaImpl::FilesToMerge(const std::string& collection_id, FilesHolder& files
resRow["segment_id"].to_string(collection_file.segment_id_);
resRow["file_id"].to_string(collection_file.file_id_);
collection_file.file_type_ = resRow["file_type"];
collection_file.file_size_ = resRow["file_size"];
collection_file.row_count_ = resRow["row_count"];
collection_file.date_ = resRow["date"];
collection_file.index_file_size_ = collection_schema.index_file_size_;
collection_file.engine_type_ = resRow["engine_type"];
collection_file.index_params_ = collection_schema.index_params_;
collection_file.metric_type_ = collection_schema.metric_type_;
collection_file.created_on_ = resRow["created_on"];
collection_file.updated_time_ = resRow["updated_time"];
collection_file.dimension_ = collection_schema.dimension_;
collection_file.index_file_size_ = collection_schema.index_file_size_;
collection_file.index_params_ = collection_schema.index_params_;
collection_file.metric_type_ = collection_schema.metric_type_;
auto status = utils::GetCollectionFilePath(options_, collection_file);
if (!status.ok()) {
......@@ -1911,12 +1919,12 @@ MySQLMetaImpl::FilesToIndex(FilesHolder& files_holder) {
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
mysqlpp::Query statement = connectionPtr->query();
statement << "SELECT id, table_id, segment_id, engine_type, file_id, file_type, file_size, "
"row_count, date, created_on"
statement << "SELECT id, table_id, segment_id, file_id, file_type, file_size, row_count, date,"
<< " engine_type, created_on, updated_time"
<< " FROM " << META_TABLEFILES << " WHERE file_type = " << std::to_string(SegmentSchema::TO_INDEX)
<< ";";
LOG_ENGINE_DEBUG_ << "FilesToIndex: " << statement.str();
// LOG_ENGINE_DEBUG_ << "FilesToIndex: " << statement.str();
res = statement.store();
} // Scoped Connection
......@@ -1929,13 +1937,14 @@ MySQLMetaImpl::FilesToIndex(FilesHolder& files_holder) {
collection_file.id_ = resRow["id"]; // implicit conversion
resRow["table_id"].to_string(collection_file.collection_id_);
resRow["segment_id"].to_string(collection_file.segment_id_);
collection_file.engine_type_ = resRow["engine_type"];
resRow["file_id"].to_string(collection_file.file_id_);
collection_file.file_type_ = resRow["file_type"];
collection_file.file_size_ = resRow["file_size"];
collection_file.row_count_ = resRow["row_count"];
collection_file.date_ = resRow["date"];
collection_file.engine_type_ = resRow["engine_type"];
collection_file.created_on_ = resRow["created_on"];
collection_file.updated_time_ = resRow["updated_time"];
auto groupItr = groups.find(collection_file.collection_id_);
if (groupItr == groups.end()) {
......@@ -2003,10 +2012,10 @@ MySQLMetaImpl::FilesByType(const std::string& collection_id, const std::vector<i
mysqlpp::Query statement = connectionPtr->query();
// since collection_id is a unique column we just need to check whether it exists or not
statement
<< "SELECT id, segment_id, engine_type, file_id, file_type, file_size, row_count, date, created_on"
<< " FROM " << META_TABLEFILES << " WHERE table_id = " << mysqlpp::quote << collection_id
<< " AND file_type in (" << types << ");";
statement << "SELECT id, table_id, segment_id, file_id, file_type, file_size, row_count, date,"
<< " engine_type, created_on, updated_time"
<< " FROM " << META_TABLEFILES << " WHERE table_id = " << mysqlpp::quote << collection_id
<< " AND file_type in (" << types << ");";
LOG_ENGINE_DEBUG_ << "FilesByType: " << statement.str();
......@@ -2028,13 +2037,14 @@ MySQLMetaImpl::FilesByType(const std::string& collection_id, const std::vector<i
file_schema.id_ = resRow["id"];
file_schema.collection_id_ = collection_id;
resRow["segment_id"].to_string(file_schema.segment_id_);
file_schema.engine_type_ = resRow["engine_type"];
resRow["file_id"].to_string(file_schema.file_id_);
file_schema.file_type_ = resRow["file_type"];
file_schema.file_size_ = resRow["file_size"];
file_schema.row_count_ = resRow["row_count"];
file_schema.date_ = resRow["date"];
file_schema.engine_type_ = resRow["engine_type"];
file_schema.created_on_ = resRow["created_on"];
file_schema.updated_time_ = resRow["updated_time"];
file_schema.index_file_size_ = collection_schema.index_file_size_;
file_schema.index_params_ = collection_schema.index_params_;
......@@ -2113,6 +2123,167 @@ MySQLMetaImpl::FilesByType(const std::string& collection_id, const std::vector<i
return ret;
}
Status
MySQLMetaImpl::FilesByTypeEx(const std::vector<meta::CollectionSchema>& collections, const std::vector<int>& file_types,
FilesHolder& files_holder) {
try {
server::MetricCollector metric;
// distribute id array to batches
const uint64_t batch_size = 50;
std::vector<std::vector<std::string>> id_groups;
std::vector<std::string> temp_group;
std::unordered_map<std::string, meta::CollectionSchema> map_collections;
for (auto& collection : collections) {
map_collections.insert(std::make_pair(collection.collection_id_, collection));
temp_group.push_back(collection.collection_id_);
if (temp_group.size() >= batch_size) {
id_groups.emplace_back(temp_group);
temp_group.clear();
}
}
if (!temp_group.empty()) {
id_groups.emplace_back(temp_group);
}
// perform query batch by batch
Status ret;
int raw_count = 0, new_count = 0, new_merge_count = 0, new_index_count = 0;
int to_index_count = 0, index_count = 0, backup_count = 0;
for (auto group : id_groups) {
mysqlpp::StoreQueryResult res;
{
mysqlpp::ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);
bool is_null_connection = (connectionPtr == nullptr);
fiu_do_on("MySQLMetaImpl.FilesByType.null_connection", is_null_connection = true);
fiu_do_on("MySQLMetaImpl.FilesByType.throw_exception", throw std::exception(););
if (is_null_connection) {
return Status(DB_ERROR, "Failed to connect to meta server(mysql)");
}
std::string types;
for (auto type : file_types) {
if (!types.empty()) {
types += ",";
}
types += std::to_string(type);
}
// to ensure UpdateCollectionFiles to be a atomic operation
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
mysqlpp::Query statement = connectionPtr->query();
// since collection_id is a unique column we just need to check whether it exists or not
statement << "SELECT id, table_id, segment_id, file_id, file_type, file_size, row_count, date,"
<< " engine_type, created_on, updated_time"
<< " FROM " << META_TABLEFILES << " WHERE table_id in (";
for (size_t i = 0; i < group.size(); i++) {
statement << mysqlpp::quote << group[i];
if (i != group.size() - 1) {
statement << ",";
}
}
statement << ") AND file_type in (" << types << ");";
LOG_ENGINE_DEBUG_ << "FilesByType: " << statement.str();
res = statement.store();
} // Scoped Connection
for (auto& resRow : res) {
SegmentSchema file_schema;
file_schema.id_ = resRow["id"]; // implicit conversion
resRow["table_id"].to_string(file_schema.collection_id_);
resRow["segment_id"].to_string(file_schema.segment_id_);
resRow["file_id"].to_string(file_schema.file_id_);
file_schema.file_type_ = resRow["file_type"];
file_schema.file_size_ = resRow["file_size"];
file_schema.row_count_ = resRow["row_count"];
file_schema.date_ = resRow["date"];
file_schema.engine_type_ = resRow["engine_type"];
file_schema.created_on_ = resRow["created_on"];
file_schema.updated_time_ = resRow["updated_time"];
auto& collection_schema = map_collections[file_schema.collection_id_];
file_schema.dimension_ = collection_schema.dimension_;
file_schema.index_file_size_ = collection_schema.index_file_size_;
file_schema.index_params_ = collection_schema.index_params_;
file_schema.metric_type_ = collection_schema.metric_type_;
auto status = utils::GetCollectionFilePath(options_, file_schema);
if (!status.ok()) {
ret = status;
continue;
}
files_holder.MarkFile(file_schema);
int32_t file_type = resRow["file_type"];
switch (file_type) {
case (int)SegmentSchema::RAW:
++raw_count;
break;
case (int)SegmentSchema::NEW:
++new_count;
break;
case (int)SegmentSchema::NEW_MERGE:
++new_merge_count;
break;
case (int)SegmentSchema::NEW_INDEX:
++new_index_count;
break;
case (int)SegmentSchema::TO_INDEX:
++to_index_count;
break;
case (int)SegmentSchema::INDEX:
++index_count;
break;
case (int)SegmentSchema::BACKUP:
++backup_count;
break;
default:
break;
}
}
}
std::string msg = "Get collection files by type.";
for (int file_type : file_types) {
switch (file_type) {
case (int)SegmentSchema::RAW:
msg = msg + " raw files:" + std::to_string(raw_count);
break;
case (int)SegmentSchema::NEW:
msg = msg + " new files:" + std::to_string(new_count);
break;
case (int)SegmentSchema::NEW_MERGE:
msg = msg + " new_merge files:" + std::to_string(new_merge_count);
break;
case (int)SegmentSchema::NEW_INDEX:
msg = msg + " new_index files:" + std::to_string(new_index_count);
break;
case (int)SegmentSchema::TO_INDEX:
msg = msg + " to_index files:" + std::to_string(to_index_count);
break;
case (int)SegmentSchema::INDEX:
msg = msg + " index files:" + std::to_string(index_count);
break;
case (int)SegmentSchema::BACKUP:
msg = msg + " backup files:" + std::to_string(backup_count);
break;
default:
break;
}
}
LOG_ENGINE_DEBUG_ << msg;
return ret;
} catch (std::exception& e) {
return HandleException("Failed to get files by type", e.what());
}
}
Status
MySQLMetaImpl::FilesByID(const std::vector<size_t>& ids, FilesHolder& files_holder) {
if (ids.empty()) {
......@@ -2136,7 +2307,8 @@ MySQLMetaImpl::FilesByID(const std::vector<size_t>& ids, FilesHolder& files_hold
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
mysqlpp::Query statement = connectionPtr->query();
statement << "SELECT id, table_id, segment_id, engine_type, file_id, file_type, file_size, row_count, date"
statement << "SELECT id, table_id, segment_id, file_id, file_type, file_size, row_count, date,"
<< " engine_type, created_on, updated_time"
<< " FROM " << META_TABLEFILES;
std::stringstream idSS;
......@@ -2167,12 +2339,14 @@ MySQLMetaImpl::FilesByID(const std::vector<size_t>& ids, FilesHolder& files_hold
collection_file.id_ = resRow["id"]; // implicit conversion
resRow["table_id"].to_string(collection_file.collection_id_);
resRow["segment_id"].to_string(collection_file.segment_id_);
collection_file.engine_type_ = resRow["engine_type"];
resRow["file_id"].to_string(collection_file.file_id_);
collection_file.file_type_ = resRow["file_type"];
collection_file.file_size_ = resRow["file_size"];
collection_file.row_count_ = resRow["row_count"];
collection_file.date_ = resRow["date"];
collection_file.engine_type_ = resRow["engine_type"];
collection_file.created_on_ = resRow["created_on"];
collection_file.updated_time_ = resRow["updated_time"];
if (collections.find(collection_file.collection_id_) == collections.end()) {
CollectionSchema collection_schema;
......@@ -2390,7 +2564,7 @@ MySQLMetaImpl::CleanUpFilesWithTTL(uint64_t seconds /*, CleanUpFilter* filter*/)
<< ")"
<< " AND updated_time < " << std::to_string(now - seconds * US_PS) << ";";
LOG_ENGINE_DEBUG_ << "CleanUpFilesWithTTL: " << statement.str();
// LOG_ENGINE_DEBUG_ << "CleanUpFilesWithTTL: " << statement.str();
res = statement.store();
}
......@@ -2481,7 +2655,7 @@ MySQLMetaImpl::CleanUpFilesWithTTL(uint64_t seconds /*, CleanUpFilter* filter*/)
<< " FROM " << META_TABLES << " WHERE state = " << std::to_string(CollectionSchema::TO_DELETE)
<< ";";
LOG_ENGINE_DEBUG_ << "CleanUpFilesWithTTL: " << statement.str();
// LOG_ENGINE_DEBUG_ << "CleanUpFilesWithTTL: " << statement.str();
mysqlpp::StoreQueryResult res = statement.store();
......@@ -2539,7 +2713,7 @@ MySQLMetaImpl::CleanUpFilesWithTTL(uint64_t seconds /*, CleanUpFilter* filter*/)
<< " FROM " << META_TABLEFILES << " WHERE table_id = " << mysqlpp::quote << collection_id
<< ";";
LOG_ENGINE_DEBUG_ << "CleanUpFilesWithTTL: " << statement.str();
// LOG_ENGINE_DEBUG_ << "CleanUpFilesWithTTL: " << statement.str();
mysqlpp::StoreQueryResult res = statement.store();
......
......@@ -124,6 +124,10 @@ class MySQLMetaImpl : public Meta {
FilesByType(const std::string& collection_id, const std::vector<int>& file_types,
FilesHolder& files_holder) override;
Status
FilesByTypeEx(const std::vector<meta::CollectionSchema>& collections, const std::vector<int>& file_types,
FilesHolder& files_holder) override;
Status
FilesByID(const std::vector<size_t>& ids, FilesHolder& files_holder) override;
......
......@@ -23,6 +23,7 @@
#include <memory>
#include <set>
#include <sstream>
#include <unordered_map>
#include "MetaConsts.h"
#include "db/IDGenerator.h"
......@@ -1076,7 +1077,8 @@ SqliteMetaImpl::FilesToSearch(const std::string& collection_id, FilesHolder& fil
// perform query
auto select_columns = columns(&SegmentSchema::id_, &SegmentSchema::collection_id_, &SegmentSchema::segment_id_,
&SegmentSchema::file_id_, &SegmentSchema::file_type_, &SegmentSchema::file_size_,
&SegmentSchema::row_count_, &SegmentSchema::date_, &SegmentSchema::engine_type_);
&SegmentSchema::row_count_, &SegmentSchema::date_, &SegmentSchema::engine_type_,
&SegmentSchema::created_on_, &SegmentSchema::updated_time_);
auto match_collectionid = c(&SegmentSchema::collection_id_) == collection_id;
......@@ -1104,6 +1106,9 @@ SqliteMetaImpl::FilesToSearch(const std::string& collection_id, FilesHolder& fil
collection_file.row_count_ = std::get<6>(file);
collection_file.date_ = std::get<7>(file);
collection_file.engine_type_ = std::get<8>(file);
collection_file.created_on_ = std::get<9>(file);
collection_file.updated_time_ = std::get<10>(file);
collection_file.dimension_ = collection_schema.dimension_;
collection_file.index_file_size_ = collection_schema.index_file_size_;
collection_file.index_params_ = collection_schema.index_params_;
......@@ -1145,18 +1150,15 @@ SqliteMetaImpl::FilesToSearchEx(const std::string& root_collection,
return status;
}
// distribute id array to batchs
const int64_t batch_size = 50;
// distribute id array to batches
const uint64_t batch_size = 50;
std::vector<std::vector<std::string>> id_groups;
std::vector<std::string> temp_group;
int64_t count = 1;
for (auto& id : partition_id_array) {
temp_group.push_back(id);
count++;
if (count >= batch_size) {
if (temp_group.size() >= batch_size) {
id_groups.emplace_back(temp_group);
temp_group.clear();
count = 0;
}
}
......@@ -1171,7 +1173,8 @@ SqliteMetaImpl::FilesToSearchEx(const std::string& root_collection,
auto select_columns =
columns(&SegmentSchema::id_, &SegmentSchema::collection_id_, &SegmentSchema::segment_id_,
&SegmentSchema::file_id_, &SegmentSchema::file_type_, &SegmentSchema::file_size_,
&SegmentSchema::row_count_, &SegmentSchema::date_, &SegmentSchema::engine_type_);
&SegmentSchema::row_count_, &SegmentSchema::date_, &SegmentSchema::engine_type_,
&SegmentSchema::created_on_, &SegmentSchema::updated_time_);
auto match_collectionid = in(&SegmentSchema::collection_id_, group);
......@@ -1197,6 +1200,8 @@ SqliteMetaImpl::FilesToSearchEx(const std::string& root_collection,
collection_file.row_count_ = std::get<6>(file);
collection_file.date_ = std::get<7>(file);
collection_file.engine_type_ = std::get<8>(file);
collection_file.created_on_ = std::get<9>(file);
collection_file.updated_time_ = std::get<10>(file);
collection_file.dimension_ = collection_schema.dimension_;
collection_file.index_file_size_ = collection_schema.index_file_size_;
collection_file.index_params_ = collection_schema.index_params_;
......@@ -1239,9 +1244,11 @@ SqliteMetaImpl::FilesToMerge(const std::string& collection_id, FilesHolder& file
}
// get files to merge
auto select_columns = columns(&SegmentSchema::id_, &SegmentSchema::collection_id_, &SegmentSchema::segment_id_,
&SegmentSchema::file_id_, &SegmentSchema::file_type_, &SegmentSchema::file_size_,
&SegmentSchema::row_count_, &SegmentSchema::date_, &SegmentSchema::created_on_);
auto select_columns =
columns(&SegmentSchema::id_, &SegmentSchema::collection_id_, &SegmentSchema::segment_id_,
&SegmentSchema::file_id_, &SegmentSchema::file_type_, &SegmentSchema::file_size_,
&SegmentSchema::row_count_, &SegmentSchema::date_, &SegmentSchema::engine_type_,
&SegmentSchema::created_on_, &SegmentSchema::updated_time_);
decltype(ConnectorPtr->select(select_columns)) selected;
{
// multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
......@@ -1268,7 +1275,10 @@ SqliteMetaImpl::FilesToMerge(const std::string& collection_id, FilesHolder& file
collection_file.file_type_ = std::get<4>(file);
collection_file.row_count_ = std::get<6>(file);
collection_file.date_ = std::get<7>(file);
collection_file.created_on_ = std::get<8>(file);
collection_file.engine_type_ = std::get<8>(file);
collection_file.created_on_ = std::get<9>(file);
collection_file.updated_time_ = std::get<10>(file);
collection_file.dimension_ = collection_schema.dimension_;
collection_file.index_file_size_ = collection_schema.index_file_size_;
collection_file.index_params_ = collection_schema.index_params_;
......@@ -1302,10 +1312,11 @@ SqliteMetaImpl::FilesToIndex(FilesHolder& files_holder) {
server::MetricCollector metric;
auto select_columns = columns(&SegmentSchema::id_, &SegmentSchema::collection_id_, &SegmentSchema::segment_id_,
&SegmentSchema::file_id_, &SegmentSchema::file_type_, &SegmentSchema::file_size_,
&SegmentSchema::row_count_, &SegmentSchema::date_, &SegmentSchema::engine_type_,
&SegmentSchema::created_on_);
auto select_columns =
columns(&SegmentSchema::id_, &SegmentSchema::collection_id_, &SegmentSchema::segment_id_,
&SegmentSchema::file_id_, &SegmentSchema::file_type_, &SegmentSchema::file_size_,
&SegmentSchema::row_count_, &SegmentSchema::date_, &SegmentSchema::engine_type_,
&SegmentSchema::created_on_, &SegmentSchema::updated_time_);
decltype(ConnectorPtr->select(select_columns)) selected;
{
// multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
......@@ -1329,6 +1340,7 @@ SqliteMetaImpl::FilesToIndex(FilesHolder& files_holder) {
collection_file.date_ = std::get<7>(file);
collection_file.engine_type_ = std::get<8>(file);
collection_file.created_on_ = std::get<9>(file);
collection_file.updated_time_ = std::get<10>(file);
auto status = utils::GetCollectionFilePath(options_, collection_file);
if (!status.ok()) {
......@@ -1388,7 +1400,8 @@ SqliteMetaImpl::FilesByType(const std::string& collection_id,
auto select_columns =
columns(&SegmentSchema::id_, &SegmentSchema::segment_id_, &SegmentSchema::file_id_,
&SegmentSchema::file_type_, &SegmentSchema::file_size_, &SegmentSchema::row_count_,
&SegmentSchema::date_, &SegmentSchema::engine_type_, &SegmentSchema::created_on_);
&SegmentSchema::date_, &SegmentSchema::engine_type_, &SegmentSchema::created_on_,
&SegmentSchema::updated_time_);
decltype(ConnectorPtr->select(select_columns)) selected;
{
// multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
......@@ -1413,6 +1426,7 @@ SqliteMetaImpl::FilesByType(const std::string& collection_id,
file_schema.date_ = std::get<6>(file);
file_schema.engine_type_ = std::get<7>(file);
file_schema.created_on_ = std::get<8>(file);
file_schema.updated_time_ = std::get<9>(file);
file_schema.dimension_ = collection_schema.dimension_;
file_schema.index_file_size_ = collection_schema.index_file_size_;
......@@ -1476,6 +1490,146 @@ SqliteMetaImpl::FilesByType(const std::string& collection_id,
return ret;
}
Status
SqliteMetaImpl::FilesByTypeEx(const std::vector<meta::CollectionSchema>& collections,
const std::vector<int>& file_types,
FilesHolder& files_holder) {
if (file_types.empty()) {
return Status(DB_ERROR, "file types array is empty");
}
Status ret = Status::OK();
try {
fiu_do_on("SqliteMetaImpl.FilesByType.throw_exception", throw std::exception());
// distribute id array to batches
const uint64_t batch_size = 50;
std::vector<std::vector<std::string>> id_groups;
std::vector<std::string> temp_group;
std::unordered_map<std::string, meta::CollectionSchema> map_collections;
for (auto& collection : collections) {
map_collections.insert(std::make_pair(collection.collection_id_, collection));
temp_group.push_back(collection.collection_id_);
if (temp_group.size() >= batch_size) {
id_groups.emplace_back(temp_group);
temp_group.clear();
}
}
if (!temp_group.empty()) {
id_groups.emplace_back(temp_group);
}
// perform query batch by batch
Status ret;
int raw_count = 0, new_count = 0, new_merge_count = 0, new_index_count = 0;
int to_index_count = 0, index_count = 0, backup_count = 0;
for (auto group : id_groups) {
auto select_columns =
columns(&SegmentSchema::id_,
&SegmentSchema::collection_id_,
&SegmentSchema::segment_id_,
&SegmentSchema::file_id_,
&SegmentSchema::file_type_,
&SegmentSchema::file_size_,
&SegmentSchema::row_count_,
&SegmentSchema::date_,
&SegmentSchema::engine_type_,
&SegmentSchema::created_on_,
&SegmentSchema::updated_time_);
decltype(ConnectorPtr->select(select_columns)) selected;
auto match_collectionid = in(&SegmentSchema::collection_id_, group);
std::vector<int> file_types = {(int)SegmentSchema::RAW, (int)SegmentSchema::TO_INDEX,
(int)SegmentSchema::INDEX};
auto match_type = in(&SegmentSchema::file_type_, file_types);
{
// multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
auto filter = where(match_collectionid and match_type);
selected = ConnectorPtr->select(select_columns, filter);
}
for (auto& file : selected) {
SegmentSchema file_schema;
file_schema.id_ = std::get<0>(file);
file_schema.collection_id_ = std::get<1>(file);
file_schema.segment_id_ = std::get<2>(file);
file_schema.file_id_ = std::get<3>(file);
file_schema.file_type_ = std::get<4>(file);
file_schema.file_size_ = std::get<5>(file);
file_schema.row_count_ = std::get<6>(file);
file_schema.date_ = std::get<7>(file);
file_schema.engine_type_ = std::get<8>(file);
file_schema.created_on_ = std::get<9>(file);
file_schema.updated_time_ = std::get<10>(file);
auto& collection_schema = map_collections[file_schema.collection_id_];
file_schema.dimension_ = collection_schema.dimension_;
file_schema.index_file_size_ = collection_schema.index_file_size_;
file_schema.index_params_ = collection_schema.index_params_;
file_schema.metric_type_ = collection_schema.metric_type_;
switch (file_schema.file_type_) {
case (int)SegmentSchema::RAW:++raw_count;
break;
case (int)SegmentSchema::NEW:++new_count;
break;
case (int)SegmentSchema::NEW_MERGE:++new_merge_count;
break;
case (int)SegmentSchema::NEW_INDEX:++new_index_count;
break;
case (int)SegmentSchema::TO_INDEX:++to_index_count;
break;
case (int)SegmentSchema::INDEX:++index_count;
break;
case (int)SegmentSchema::BACKUP:++backup_count;
break;
default:break;
}
auto status = utils::GetCollectionFilePath(options_, file_schema);
if (!status.ok()) {
ret = status;
}
files_holder.MarkFile(file_schema);
}
}
std::string msg = "Get collection files by type.";
for (int file_type : file_types) {
switch (file_type) {
case (int)SegmentSchema::RAW:msg = msg + " raw files:" + std::to_string(raw_count);
break;
case (int)SegmentSchema::NEW:msg = msg + " new files:" + std::to_string(new_count);
break;
case (int)SegmentSchema::NEW_MERGE:
msg = msg + " new_merge files:" + std::to_string(new_merge_count);
break;
case (int)SegmentSchema::NEW_INDEX:
msg = msg + " new_index files:" + std::to_string(new_index_count);
break;
case (int)SegmentSchema::TO_INDEX:
msg = msg + " to_index files:" + std::to_string(to_index_count);
break;
case (int)SegmentSchema::INDEX:msg = msg + " index files:" + std::to_string(index_count);
break;
case (int)SegmentSchema::BACKUP:msg = msg + " backup files:" + std::to_string(backup_count);
break;
default:break;
}
}
LOG_ENGINE_DEBUG_ << msg;
} catch (std::exception& e) {
return HandleException("Encounter exception when check non index files", e.what());
}
return ret;
}
Status
SqliteMetaImpl::FilesByID(const std::vector<size_t>& ids, FilesHolder& files_holder) {
if (ids.empty()) {
......@@ -1486,9 +1640,17 @@ SqliteMetaImpl::FilesByID(const std::vector<size_t>& ids, FilesHolder& files_hol
server::MetricCollector metric;
fiu_do_on("SqliteMetaImpl.FilesByID.throw_exception", throw std::exception());
auto select_columns = columns(&SegmentSchema::id_, &SegmentSchema::collection_id_, &SegmentSchema::segment_id_,
&SegmentSchema::file_id_, &SegmentSchema::file_type_, &SegmentSchema::file_size_,
&SegmentSchema::row_count_, &SegmentSchema::date_, &SegmentSchema::engine_type_);
auto select_columns = columns(&SegmentSchema::id_,
&SegmentSchema::collection_id_,
&SegmentSchema::segment_id_,
&SegmentSchema::file_id_,
&SegmentSchema::file_type_,
&SegmentSchema::file_size_,
&SegmentSchema::row_count_,
&SegmentSchema::date_,
&SegmentSchema::engine_type_,
&SegmentSchema::created_on_,
&SegmentSchema::updated_time_);
std::vector<int> file_types = {(int)SegmentSchema::RAW, (int)SegmentSchema::TO_INDEX,
(int)SegmentSchema::INDEX};
......@@ -1518,6 +1680,8 @@ SqliteMetaImpl::FilesByID(const std::vector<size_t>& ids, FilesHolder& files_hol
collection_file.row_count_ = std::get<6>(file);
collection_file.date_ = std::get<7>(file);
collection_file.engine_type_ = std::get<8>(file);
collection_file.created_on_ = std::get<9>(file);
collection_file.updated_time_ = std::get<10>(file);
if (collections.find(collection_file.collection_id_) == collections.end()) {
CollectionSchema collection_schema;
......@@ -1943,6 +2107,9 @@ SqliteMetaImpl::SetGlobalLastLSN(uint64_t lsn) {
try {
server::MetricCollector metric;
// multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
auto selected = ConnectorPtr->select(columns(&EnvironmentSchema::global_lsn_));
if (selected.size() == 0) {
EnvironmentSchema env;
......
......@@ -126,6 +126,10 @@ class SqliteMetaImpl : public Meta {
FilesByType(const std::string& collection_id, const std::vector<int>& file_types,
FilesHolder& files_holder) override;
Status
FilesByTypeEx(const std::vector<meta::CollectionSchema>& collections, const std::vector<int>& file_types,
FilesHolder& files_holder) override;
Status
FilesByID(const std::vector<size_t>& ids, FilesHolder& files_holder) override;
......
......@@ -260,8 +260,10 @@ WalManager::GetNextRecord(MXLogRecord& record) {
}
}
LOG_WAL_INFO_ << "record type " << (int32_t)record.type << " collection " << record.collection_id << " lsn "
<< record.lsn;
if (record.type != MXLogType::None) {
LOG_WAL_INFO_ << "record type " << (int32_t)record.type << " collection " << record.collection_id << " lsn "
<< record.lsn;
}
return error_code;
}
......
......@@ -70,6 +70,13 @@ DBWrapper::StartService() {
return s;
}
// metric config
s = config.GetMetricConfigEnableMonitor(opt.metric_enable_);
if (!s.ok()) {
std::cerr << s.ToString() << std::endl;
return s;
}
// cache config
s = config.GetCacheConfigCacheInsertData(opt.insert_cache_immediately_);
if (!s.ok()) {
......@@ -255,7 +262,7 @@ DBWrapper::PreloadCollections(const std::string& preload_collections) {
db_->AllCollections(table_schema_array);
for (auto& schema : table_schema_array) {
auto status = db_->PreloadCollection(schema.collection_id_);
auto status = db_->PreloadCollection(nullptr, schema.collection_id_);
if (!status.ok()) {
return status;
}
......@@ -264,7 +271,7 @@ DBWrapper::PreloadCollections(const std::string& preload_collections) {
std::vector<std::string> collection_names;
StringHelpFunctions::SplitStringByDelimeter(preload_collections, ",", collection_names);
for (auto& name : collection_names) {
auto status = db_->PreloadCollection(name);
auto status = db_->PreloadCollection(nullptr, name);
if (!status.ok()) {
return status;
}
......
......@@ -70,7 +70,7 @@ CompactRequest::OnExecute() {
rc.RecordSection("check validation");
// step 2: check collection existence
status = DBWrapper::DB()->Compact(collection_name_, compact_threshold_);
status = DBWrapper::DB()->Compact(context_, collection_name_, compact_threshold_);
if (!status.ok()) {
return status;
}
......
......@@ -83,7 +83,7 @@ GetVectorsByIDRequest::OnExecute() {
}
// step 2: get vector data, now only support get one id
return DBWrapper::DB()->GetVectorsByID(collection_name_, ids_, vectors_);
return DBWrapper::DB()->GetVectorsByID(collection_schema, ids_, vectors_);
} catch (std::exception& ex) {
return Status(SERVER_UNEXPECTED_ERROR, ex.what());
}
......
......@@ -60,8 +60,9 @@ PreloadCollectionRequest::OnExecute() {
}
}
// step 2: check collection existence
status = DBWrapper::DB()->PreloadCollection(collection_name_);
// step 2: force load collection data into cache
// load each segment and insert into cache even cache capacity is not enough
status = DBWrapper::DB()->PreloadCollection(context_, collection_name_, true);
fiu_do_on("PreloadCollectionRequest.OnExecute.preload_collection_fail",
status = Status(milvus::SERVER_UNEXPECTED_ERROR, ""));
fiu_do_on("PreloadCollectionRequest.OnExecute.throw_std_exception", throw std::exception());
......
......@@ -32,7 +32,7 @@ namespace {
static const char* COLLECTION_NAME = "test_group";
static constexpr int64_t COLLECTION_DIM = 256;
static constexpr int64_t VECTOR_COUNT = 25000;
static constexpr int64_t VECTOR_COUNT = 5000;
static constexpr int64_t INSERT_LOOP = 100;
static constexpr int64_t SECONDS_EACH_HOUR = 3600;
static constexpr int64_t DAY_SECONDS = 24 * 60 * 60;
......@@ -180,7 +180,7 @@ TEST_F(DBTest, DB_TEST) {
milvus::engine::ResultIds result_ids;
milvus::engine::ResultDistances result_distances;
int k = 10;
std::this_thread::sleep_for(std::chrono::seconds(2));
std::this_thread::sleep_for(std::chrono::seconds(1));
INIT_TIMER;
std::stringstream ss;
......@@ -214,7 +214,7 @@ TEST_F(DBTest, DB_TEST) {
/* LOG(DEBUG) << ss.str(); */
}
ASSERT_TRUE(count >= prev_count);
std::this_thread::sleep_for(std::chrono::seconds(1));
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
});
......@@ -236,7 +236,7 @@ TEST_F(DBTest, DB_TEST) {
stat = db_->Flush();
ASSERT_TRUE(stat.ok());
std::this_thread::sleep_for(std::chrono::microseconds(1));
std::this_thread::sleep_for(std::chrono::milliseconds(20));
}
search.join();
......@@ -455,34 +455,34 @@ TEST_F(DBTest, PRELOAD_TEST) {
db_->CreateIndex(dummy_context_, COLLECTION_NAME, index); // wait until build index finish
int64_t prev_cache_usage = milvus::cache::CpuCacheMgr::GetInstance()->CacheUsage();
stat = db_->PreloadCollection(COLLECTION_NAME);
stat = db_->PreloadCollection(dummy_context_, COLLECTION_NAME);
ASSERT_TRUE(stat.ok());
int64_t cur_cache_usage = milvus::cache::CpuCacheMgr::GetInstance()->CacheUsage();
ASSERT_TRUE(prev_cache_usage < cur_cache_usage);
FIU_ENABLE_FIU("SqliteMetaImpl.FilesToSearch.throw_exception");
stat = db_->PreloadCollection(COLLECTION_NAME);
stat = db_->PreloadCollection(dummy_context_, COLLECTION_NAME);
ASSERT_FALSE(stat.ok());
fiu_disable("SqliteMetaImpl.FilesToSearch.throw_exception");
// create a partition
stat = db_->CreatePartition(COLLECTION_NAME, "part0", "0");
ASSERT_TRUE(stat.ok());
stat = db_->PreloadCollection(COLLECTION_NAME);
stat = db_->PreloadCollection(dummy_context_, COLLECTION_NAME);
ASSERT_TRUE(stat.ok());
FIU_ENABLE_FIU("DBImpl.PreloadCollection.null_engine");
stat = db_->PreloadCollection(COLLECTION_NAME);
stat = db_->PreloadCollection(dummy_context_, COLLECTION_NAME);
ASSERT_FALSE(stat.ok());
fiu_disable("DBImpl.PreloadCollection.null_engine");
FIU_ENABLE_FIU("DBImpl.PreloadCollection.exceed_cache");
stat = db_->PreloadCollection(COLLECTION_NAME);
stat = db_->PreloadCollection(dummy_context_, COLLECTION_NAME);
ASSERT_FALSE(stat.ok());
fiu_disable("DBImpl.PreloadCollection.exceed_cache");
FIU_ENABLE_FIU("DBImpl.PreloadCollection.engine_throw_exception");
stat = db_->PreloadCollection(COLLECTION_NAME);
stat = db_->PreloadCollection(dummy_context_, COLLECTION_NAME);
ASSERT_FALSE(stat.ok());
fiu_disable("DBImpl.PreloadCollection.engine_throw_exception");
}
......@@ -535,15 +535,15 @@ TEST_F(DBTest, SHUTDOWN_TEST) {
stat = db_->DeleteVectors(collection_info.collection_id_, ids_to_delete);
ASSERT_FALSE(stat.ok());
stat = db_->Compact(collection_info.collection_id_);
stat = db_->Compact(dummy_context_, collection_info.collection_id_);
ASSERT_FALSE(stat.ok());
std::vector<milvus::engine::VectorsData> vectors;
std::vector<int64_t> id_array = {0};
stat = db_->GetVectorsByID(collection_info.collection_id_, id_array, vectors);
stat = db_->GetVectorsByID(collection_info, id_array, vectors);
ASSERT_FALSE(stat.ok());
stat = db_->PreloadCollection(collection_info.collection_id_);
stat = db_->PreloadCollection(dummy_context_, collection_info.collection_id_);
ASSERT_FALSE(stat.ok());
uint64_t row_count = 0;
......@@ -612,7 +612,6 @@ TEST_F(DBTest, BACK_TIMER_THREAD_1) {
ASSERT_EQ(xb.id_array_.size(), nb);
}
std::this_thread::sleep_for(std::chrono::seconds(2));
db_->Stop();
fiu_disable("DBImpl.StartMetricTask.InvalidTotalCache");
fiu_disable("SqliteMetaImpl.FilesToMerge.throw_exception");
......@@ -620,7 +619,6 @@ TEST_F(DBTest, BACK_TIMER_THREAD_1) {
FIU_ENABLE_FIU("DBImpl.StartMetricTask.InvalidTotalCache");
db_->Start();
std::this_thread::sleep_for(std::chrono::seconds(2));
db_->Stop();
fiu_disable("DBImpl.StartMetricTask.InvalidTotalCache");
}
......@@ -644,7 +642,6 @@ TEST_F(DBTest, BACK_TIMER_THREAD_2) {
}
FIU_ENABLE_FIU("SqliteMetaImpl.CreateCollectionFile.throw_exception");
std::this_thread::sleep_for(std::chrono::seconds(2));
db_->Stop();
fiu_disable("SqliteMetaImpl.CreateCollectionFile.throw_exception");
}
......@@ -669,7 +666,6 @@ TEST_F(DBTest, BACK_TIMER_THREAD_3) {
FIU_ENABLE_FIU("DBImpl.MergeFiles.Serialize_ThrowException");
db_->Start();
std::this_thread::sleep_for(std::chrono::seconds(2));
db_->Stop();
fiu_disable("DBImpl.MergeFiles.Serialize_ThrowException");
}
......@@ -694,7 +690,6 @@ TEST_F(DBTest, BACK_TIMER_THREAD_4) {
FIU_ENABLE_FIU("DBImpl.MergeFiles.Serialize_ErrorStatus");
db_->Start();
std::this_thread::sleep_for(std::chrono::seconds(2));
db_->Stop();
fiu_disable("DBImpl.MergeFiles.Serialize_ErrorStatus");
}
......@@ -934,11 +929,9 @@ TEST_F(DBTest2, ARHIVE_DISK_CHECK) {
BuildVectors(nb, i, xb);
db_->InsertVectors(COLLECTION_NAME, "", xb);
std::this_thread::sleep_for(std::chrono::microseconds(1));
}
std::this_thread::sleep_for(std::chrono::seconds(1));
db_->Flush();
db_->Size(size);
LOG(DEBUG) << "size=" << size;
ASSERT_LE(size, 1 * milvus::engine::GB);
......@@ -981,8 +974,6 @@ TEST_F(DBTest2, DELETE_TEST) {
fiu_disable("DBImpl.DropCollectionRecursively.failed");
stat = db_->DropCollection(COLLECTION_NAME);
std::this_thread::sleep_for(std::chrono::seconds(2));
ASSERT_TRUE(stat.ok());
db_->HasCollection(COLLECTION_NAME, has_collection);
......@@ -1183,7 +1174,9 @@ TEST_F(DBTest2, FLUSH_NON_EXISTING_COLLECTION) {
TEST_F(DBTest2, GET_VECTOR_NON_EXISTING_COLLECTION) {
std::vector<milvus::engine::VectorsData> vectors;
std::vector<int64_t> id_array = {0};
auto status = db_->GetVectorsByID("non_existing", id_array, vectors);
milvus::engine::meta::CollectionSchema collection_info;
collection_info.collection_id_ = "non_existing";
auto status = db_->GetVectorsByID(collection_info, id_array, vectors);
ASSERT_FALSE(status.ok());
}
......@@ -1203,7 +1196,7 @@ TEST_F(DBTest2, GET_VECTOR_BY_ID_TEST) {
std::vector<milvus::engine::VectorsData> vectors;
std::vector<int64_t> empty_array;
stat = db_->GetVectorsByID(COLLECTION_NAME, empty_array, vectors);
stat = db_->GetVectorsByID(collection_info, empty_array, vectors);
ASSERT_FALSE(stat.ok());
stat = db_->InsertVectors(collection_info.collection_id_, partition_tag, qxb);
......@@ -1211,7 +1204,7 @@ TEST_F(DBTest2, GET_VECTOR_BY_ID_TEST) {
db_->Flush(collection_info.collection_id_);
stat = db_->GetVectorsByID(COLLECTION_NAME, qxb.id_array_, vectors);
stat = db_->GetVectorsByID(collection_info, qxb.id_array_, vectors);
ASSERT_TRUE(stat.ok());
ASSERT_EQ(vectors.size(), qxb.id_array_.size());
ASSERT_EQ(vectors[0].float_data_.size(), COLLECTION_DIM);
......@@ -1221,7 +1214,7 @@ TEST_F(DBTest2, GET_VECTOR_BY_ID_TEST) {
}
std::vector<int64_t> invalid_array = {-1, -1};
stat = db_->GetVectorsByID(COLLECTION_NAME, empty_array, vectors);
stat = db_->GetVectorsByID(collection_info, empty_array, vectors);
ASSERT_TRUE(stat.ok());
for (auto& vector : vectors) {
ASSERT_EQ(vector.vector_count_, 0);
......@@ -1344,7 +1337,7 @@ TEST_F(DBTest2, SEARCH_WITH_DIFFERENT_INDEX) {
stat = db_->CreateIndex(dummy_context_, collection_info.collection_id_, index);
ASSERT_TRUE(stat.ok());
stat = db_->PreloadCollection(collection_info.collection_id_);
stat = db_->PreloadCollection(dummy_context_, collection_info.collection_id_);
ASSERT_TRUE(stat.ok());
int topk = 10, nprobe = 10;
......@@ -1369,7 +1362,7 @@ result_distances);
stat = db_->CreateIndex(dummy_context_, collection_info.collection_id_, index);
ASSERT_TRUE(stat.ok());
stat = db_->PreloadCollection(collection_info.collection_id_);
stat = db_->PreloadCollection(dummy_context_, collection_info.collection_id_);
ASSERT_TRUE(stat.ok());
for (auto id : ids_to_search) {
......
......@@ -72,7 +72,7 @@ TEST_F(MySqlDBTest, DB_TEST) {
milvus::engine::ResultIds result_ids;
milvus::engine::ResultDistances result_distances;
int k = 10;
std::this_thread::sleep_for(std::chrono::seconds(2));
std::this_thread::sleep_for(std::chrono::seconds(1));
INIT_TIMER;
std::stringstream ss;
......@@ -106,7 +106,7 @@ TEST_F(MySqlDBTest, DB_TEST) {
/* LOG(DEBUG) << ss.str(); */
}
ASSERT_TRUE(count >= prev_count);
std::this_thread::sleep_for(std::chrono::seconds(1));
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
});
......@@ -128,7 +128,7 @@ TEST_F(MySqlDBTest, DB_TEST) {
stat = db_->Flush();
ASSERT_TRUE(stat.ok());
std::this_thread::sleep_for(std::chrono::microseconds(1));
std::this_thread::sleep_for(std::chrono::milliseconds(20));
}
search.join();
......@@ -183,7 +183,6 @@ TEST_F(MySqlDBTest, SEARCH_TEST) {
stat = db_->InsertVectors(COLLECTION_NAME, "", xb);
ASSERT_TRUE(stat.ok());
// sleep(2); // wait until build index finish
stat = db_->Flush();
ASSERT_TRUE(stat.ok());
......@@ -241,10 +240,8 @@ TEST_F(MySqlDBTest, ARHIVE_DISK_CHECK) {
milvus::engine::VectorsData xb;
BuildVectors(nb, i, xb);
db_->InsertVectors(COLLECTION_NAME, "", xb);
std::this_thread::sleep_for(std::chrono::microseconds(1));
}
// std::this_thread::sleep_for(std::chrono::seconds(1));
stat = db_->Flush();
ASSERT_TRUE(stat.ok());
......@@ -288,16 +285,12 @@ TEST_F(MySqlDBTest, DELETE_TEST) {
milvus::engine::VectorsData xb;
BuildVectors(nb, i, xb);
db_->InsertVectors(COLLECTION_NAME, "", xb);
std::this_thread::sleep_for(std::chrono::microseconds(1));
}
stat = db_->Flush();
ASSERT_TRUE(stat.ok());
stat = db_->DropCollection(COLLECTION_NAME);
//// std::cout << "5 sec start" << std::endl;
// std::this_thread::sleep_for(std::chrono::seconds(5));
//// std::cout << "5 sec finish" << std::endl;
ASSERT_TRUE(stat.ok());
//
db_->HasCollection(COLLECTION_NAME, has_collection);
......
......@@ -287,7 +287,7 @@ TEST_F(DeleteTest, delete_before_create_index) {
ASSERT_TRUE(stat.ok());
ASSERT_EQ(collection_info_get.dimension_, COLLECTION_DIM);
int64_t nb = 10000;
int64_t nb = 5000;
milvus::engine::VectorsData xb;
BuildVectors(nb, xb);
......@@ -369,7 +369,7 @@ TEST_F(DeleteTest, delete_with_index) {
ASSERT_TRUE(stat.ok());
ASSERT_EQ(collection_info_get.dimension_, COLLECTION_DIM);
int64_t nb = 10000;
int64_t nb = 5000;
milvus::engine::VectorsData xb;
BuildVectors(nb, xb);
......@@ -451,7 +451,7 @@ TEST_F(DeleteTest, delete_multiple_times_with_index) {
ASSERT_TRUE(stat.ok());
ASSERT_EQ(collection_info_get.dimension_, COLLECTION_DIM);
int64_t nb = 100000;
int64_t nb = 5000;
milvus::engine::VectorsData xb;
BuildVectors(nb, xb);
......@@ -749,7 +749,7 @@ TEST_F(CompactTest, compact_basic) {
ASSERT_TRUE(stat.ok());
ASSERT_EQ(row_count, nb - 2);
stat = db_->Compact(collection_info.collection_id_);
stat = db_->Compact(dummy_context_, collection_info.collection_id_);
ASSERT_TRUE(stat.ok());
const int topk = 1, nprobe = 1;
......@@ -834,7 +834,7 @@ TEST_F(CompactTest, compact_with_index) {
ASSERT_TRUE(stat.ok());
ASSERT_EQ(row_count, nb - ids_to_delete.size());
stat = db_->Compact(collection_info.collection_id_);
stat = db_->Compact(dummy_context_, collection_info.collection_id_);
ASSERT_TRUE(stat.ok());
stat = db_->GetCollectionRowCount(collection_info.collection_id_, row_count);
......@@ -864,6 +864,6 @@ TEST_F(CompactTest, compact_with_index) {
}
TEST_F(CompactTest, compact_non_existing_table) {
auto status = db_->Compact("non_existing_table");
auto status = db_->Compact(dummy_context_, "non_existing_table");
ASSERT_FALSE(status.ok());
}
......@@ -282,7 +282,7 @@ TEST_F(DBTest, COMPACT_TEST) {
stat = db_->Flush();
ASSERT_TRUE(stat.ok());
stat = db_->Compact(collection_info.collection_id_);
stat = db_->Compact(dummy_context_, collection_info.collection_id_);
ASSERT_TRUE(stat.ok());
const int topk = 1, nprobe = 1;
......
......@@ -318,20 +318,15 @@ TEST_F(MemManagerTest2, INSERT_TEST) {
ASSERT_TRUE(stat.ok());
ASSERT_EQ(collection_info_get.dimension_, COLLECTION_DIM);
auto start_time = METRICS_NOW_TIME;
int insert_loop = 20;
int insert_loop = 10;
for (int i = 0; i < insert_loop; ++i) {
int64_t nb = 40960;
int64_t nb = 4096;
milvus::engine::VectorsData xb;
BuildVectors(nb, xb);
milvus::engine::IDNumbers vector_ids;
stat = db_->InsertVectors(GetCollectionName(), "", xb);
ASSERT_TRUE(stat.ok());
}
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time, end_time);
LOG(DEBUG) << "total_time spent in INSERT_TEST (ms) : " << total_time;
}
TEST_F(MemManagerTest2, INSERT_BINARY_TEST) {
......
......@@ -86,7 +86,7 @@ TEST_F(SearchByIdTest, BASIC_TEST) {
ASSERT_TRUE(stat.ok());
ASSERT_EQ(collection_info_get.dimension_, COLLECTION_DIM);
int64_t nb = 100000;
int64_t nb = 10000;
milvus::engine::VectorsData xb;
BuildVectors(nb, xb);
......@@ -185,7 +185,7 @@ TEST_F(SearchByIdTest, WITH_INDEX_TEST) {
ASSERT_TRUE(stat.ok());
ASSERT_EQ(collection_info_get.dimension_, COLLECTION_DIM);
int64_t nb = 10000;
int64_t nb = 5000;
milvus::engine::VectorsData xb;
BuildVectors(nb, xb);
......@@ -246,7 +246,7 @@ TEST_F(SearchByIdTest, WITH_DELETE_TEST) {
ASSERT_TRUE(stat.ok());
ASSERT_EQ(collection_info_get.dimension_, COLLECTION_DIM);
int64_t nb = 100000;
int64_t nb = 10000;
milvus::engine::VectorsData xb;
BuildVectors(nb, xb);
......@@ -315,7 +315,7 @@ TEST_F(GetVectorByIdTest, BASIC_TEST) {
ASSERT_TRUE(stat.ok());
ASSERT_EQ(collection_info_get.dimension_, COLLECTION_DIM);
int64_t nb = 100000;
int64_t nb = 10000;
milvus::engine::VectorsData xb;
BuildVectors(nb, xb);
......@@ -349,7 +349,7 @@ TEST_F(GetVectorByIdTest, BASIC_TEST) {
milvus::engine::ResultDistances result_distances;
std::vector<milvus::engine::VectorsData> vectors;
stat = db_->GetVectorsByID(collection_info.collection_id_, ids_to_search, vectors);
stat = db_->GetVectorsByID(collection_info, ids_to_search, vectors);
ASSERT_TRUE(stat.ok());
stat = db_->Query(dummy_context_, collection_info.collection_id_, tags, topk, json_params, vectors[0], result_ids,
......@@ -369,7 +369,7 @@ TEST_F(GetVectorByIdTest, WITH_INDEX_TEST) {
ASSERT_TRUE(stat.ok());
ASSERT_EQ(collection_info_get.dimension_, COLLECTION_DIM);
int64_t nb = 10000;
int64_t nb = 5000;
milvus::engine::VectorsData xb;
BuildVectors(nb, xb);
......@@ -409,7 +409,7 @@ TEST_F(GetVectorByIdTest, WITH_INDEX_TEST) {
milvus::engine::ResultDistances result_distances;
std::vector<milvus::engine::VectorsData> vectors;
stat = db_->GetVectorsByID(collection_info.collection_id_, ids_to_search, vectors);
stat = db_->GetVectorsByID(collection_info, ids_to_search, vectors);
ASSERT_TRUE(stat.ok());
stat = db_->Query(dummy_context_, collection_info.collection_id_, tags, topk, json_params, vectors[0], result_ids,
......@@ -429,7 +429,7 @@ TEST_F(GetVectorByIdTest, WITH_DELETE_TEST) {
ASSERT_TRUE(stat.ok());
ASSERT_EQ(collection_info_get.dimension_, COLLECTION_DIM);
int64_t nb = 100000;
int64_t nb = 10000;
milvus::engine::VectorsData xb;
BuildVectors(nb, xb);
......@@ -469,7 +469,7 @@ TEST_F(GetVectorByIdTest, WITH_DELETE_TEST) {
milvus::engine::ResultDistances result_distances;
std::vector<milvus::engine::VectorsData> vectors;
stat = db_->GetVectorsByID(collection_info.collection_id_, ids_to_search, vectors);
stat = db_->GetVectorsByID(collection_info, ids_to_search, vectors);
ASSERT_TRUE(stat.ok());
for (auto& vector : vectors) {
ASSERT_EQ(vector.vector_count_, 0);
......@@ -541,7 +541,7 @@ TEST_F(SearchByIdTest, BINARY_TEST) {
milvus::engine::ResultDistances result_distances;
std::vector<milvus::engine::VectorsData> vectors;
stat = db_->GetVectorsByID(collection_info.collection_id_, ids_to_search, vectors);
stat = db_->GetVectorsByID(collection_info, ids_to_search, vectors);
ASSERT_TRUE(stat.ok());
ASSERT_EQ(vectors.size(), ids_to_search.size());
......
......@@ -89,7 +89,7 @@ TEST_F(MetricTest, METRIC_TEST) {
// std::vector<std::string> tags;
// milvus::engine::ResultIds result_ids;
// milvus::engine::ResultDistances result_distances;
std::this_thread::sleep_for(std::chrono::seconds(2));
std::this_thread::sleep_for(std::chrono::seconds(1));
INIT_TIMER;
std::stringstream ss;
......@@ -115,11 +115,11 @@ TEST_F(MetricTest, METRIC_TEST) {
// }
}
ASSERT_TRUE(count >= prev_count);
std::this_thread::sleep_for(std::chrono::seconds(1));
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
});
int loop = 10000;
int loop = 100;
for (auto i = 0; i < loop; ++i) {
if (i == 40) {
......@@ -131,7 +131,7 @@ TEST_F(MetricTest, METRIC_TEST) {
db_->InsertVectors(group_name, "", xb);
ASSERT_EQ(xb.id_array_.size(), nb);
}
std::this_thread::sleep_for(std::chrono::microseconds(2000));
std::this_thread::sleep_for(std::chrono::milliseconds(20));
}
search.join();
......
......@@ -813,7 +813,7 @@ TEST(UtilTest, ROLLOUTHANDLER_TEST) {
TEST(UtilTest, THREADPOOL_TEST) {
auto thread_pool_ptr = std::make_unique<milvus::ThreadPool>(3);
auto fun = [](int i) {
sleep(1);
std::this_thread::sleep_for(std::chrono::milliseconds(50));
};
for (int i = 0; i < 10; ++i) {
thread_pool_ptr->enqueue(fun, i);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册