提交 c37aa5a5 编写于 作者: G groot

code lint

上级 b0a680f5
......@@ -22,6 +22,7 @@ Please mark all change in change log and use the ticket from JIRA.
- \#458 - Index data is not compatible between 0.5 and 0.6
- \#465 - Server hang caused by searching with nsg index
- \#486 - gpu no usage during index building
- \#497 - CPU-version search performance decreased
- \#504 - The code coverage rate of core/src/scheduler/optimizer is too low
- \#509 - IVF_PQ index build trapped into dead loop caused by invalid params
- \#513 - Unittest DELETE_BY_RANGE sometimes failed
......@@ -31,7 +32,7 @@ Please mark all change in change log and use the ticket from JIRA.
- \#532 - assigin value to `table_name` from confest shell
- \#533 - NSG build failed with MetricType Inner Product
- \#543 - client raise exception in shards when search results is empty
- \#497 - CPU-version search performance decreased
- \#545 - Avoid dead circle of build index thread when error occurs
## Feature
- \#12 - Pure CPU version for Milvus
......
......@@ -41,6 +41,7 @@
#include <iostream>
#include <set>
#include <thread>
#include <utility>
namespace milvus {
namespace engine {
......@@ -51,6 +52,8 @@ constexpr uint64_t METRIC_ACTION_INTERVAL = 1;
constexpr uint64_t COMPACT_ACTION_INTERVAL = 1;
constexpr uint64_t INDEX_ACTION_INTERVAL = 1;
constexpr uint64_t INDEX_FAILED_RETRY_TIME = 1;
static const Status SHUTDOWN_ERROR = Status(DB_ERROR, "Milsvus server is shutdown!");
void
......@@ -361,6 +364,7 @@ DBImpl::CreateIndex(const std::string& table_id, const TableIndex& index) {
WaitMergeFileFinish();
// step 4: wait and build index
status = CleanFailedIndexFileOfTable(table_id);
status = BuildTableIndexRecursively(table_id, index);
return status;
......@@ -828,22 +832,35 @@ DBImpl::BackgroundBuildIndex() {
std::unique_lock<std::mutex> lock(build_index_mutex_);
meta::TableFilesSchema to_index_files;
meta_ptr_->FilesToIndex(to_index_files);
Status status;
Status status = IgnoreFailedIndexFiles(to_index_files);
if (!to_index_files.empty()) {
scheduler::BuildIndexJobPtr job = std::make_shared<scheduler::BuildIndexJob>(meta_ptr_, options_);
// step 2: put build index task to scheduler
std::map<scheduler::BuildIndexJobPtr, scheduler::TableFileSchemaPtr> job2file_map;
for (auto& file : to_index_files) {
scheduler::BuildIndexJobPtr job = std::make_shared<scheduler::BuildIndexJob>(meta_ptr_, options_);
scheduler::TableFileSchemaPtr file_ptr = std::make_shared<meta::TableFileSchema>(file);
job->AddToIndexFiles(file_ptr);
scheduler::JobMgrInst::GetInstance()->Put(job);
job2file_map.insert(std::make_pair(job, file_ptr));
}
scheduler::JobMgrInst::GetInstance()->Put(job);
job->WaitBuildIndexFinish();
if (!job->GetStatus().ok()) {
Status status = job->GetStatus();
ENGINE_LOG_ERROR << "Building index failed: " << status.ToString();
for (auto iter = job2file_map.begin(); iter != job2file_map.end(); ++iter) {
scheduler::BuildIndexJobPtr job = iter->first;
meta::TableFileSchema& file_schema = *(iter->second.get());
job->WaitBuildIndexFinish();
if (!job->GetStatus().ok()) {
Status status = job->GetStatus();
ENGINE_LOG_ERROR << "Building index job " << job->id() << " failed: " << status.ToString();
MarkFailedIndexFile(file_schema);
} else {
MarkSucceedIndexFile(file_schema);
ENGINE_LOG_DEBUG << "Building index job " << job->id() << " succeed.";
}
}
ENGINE_LOG_DEBUG << "Background build index thread finished";
}
// ENGINE_LOG_TRACE << "Background build index thread exit";
......@@ -911,6 +928,7 @@ DBImpl::DropTableRecursively(const std::string& table_id, const meta::DatesT& da
if (dates.empty()) {
status = mem_mgr_->EraseMemVector(table_id); // not allow insert
status = meta_ptr_->DropTable(table_id); // soft delete table
CleanFailedIndexFileOfTable(table_id);
// scheduler will determine when to delete table files
auto nres = scheduler::ResMgrInst::GetInstance()->GetNumOfComputeResource();
......@@ -989,6 +1007,8 @@ DBImpl::BuildTableIndexRecursively(const std::string& table_id, const TableIndex
std::this_thread::sleep_for(std::chrono::milliseconds(std::min(10 * 1000, times * 100)));
GetFilesToBuildIndex(table_id, file_types, table_files);
times++;
IgnoreFailedIndexFiles(table_files);
}
// build index for partition
......@@ -1001,12 +1021,23 @@ DBImpl::BuildTableIndexRecursively(const std::string& table_id, const TableIndex
}
}
// failed to build index for some files, return error
std::vector<std::string> failed_files;
GetFailedIndexFileOfTable(table_id, failed_files);
if (!failed_files.empty()) {
std::string msg = "Failed to build index for " + std::to_string(failed_files.size()) +
((failed_files.size() == 1) ? " file" : " files") +
", file size is too large or gpu memory is not enough";
return Status(DB_ERROR, msg);
}
return Status::OK();
}
Status
DBImpl::DropTableIndexRecursively(const std::string& table_id) {
ENGINE_LOG_DEBUG << "Drop index for table: " << table_id;
CleanFailedIndexFileOfTable(table_id);
auto status = meta_ptr_->DropTableIndex(table_id);
if (!status.ok()) {
return status;
......@@ -1049,5 +1080,86 @@ DBImpl::GetTableRowCountRecursively(const std::string& table_id, uint64_t& row_c
return Status::OK();
}
Status
DBImpl::CleanFailedIndexFileOfTable(const std::string& table_id) {
std::lock_guard<std::mutex> lck(index_failed_mutex_);
index_failed_files_.erase(table_id); // rebuild failed index files for this table
return Status::OK();
}
Status
DBImpl::GetFailedIndexFileOfTable(const std::string& table_id, std::vector<std::string>& failed_files) {
failed_files.clear();
std::lock_guard<std::mutex> lck(index_failed_mutex_);
auto iter = index_failed_files_.find(table_id);
if (iter != index_failed_files_.end()) {
FileID2FailedTimes& failed_map = iter->second;
for (auto it_file = failed_map.begin(); it_file != failed_map.end(); ++it_file) {
failed_files.push_back(it_file->first);
}
}
return Status::OK();
}
Status
DBImpl::MarkFailedIndexFile(const meta::TableFileSchema& file) {
std::lock_guard<std::mutex> lck(index_failed_mutex_);
auto iter = index_failed_files_.find(file.table_id_);
if (iter == index_failed_files_.end()) {
FileID2FailedTimes failed_files;
failed_files.insert(std::make_pair(file.file_id_, 1));
index_failed_files_.insert(std::make_pair(file.table_id_, failed_files));
} else {
auto it_failed_files = iter->second.find(file.file_id_);
if (it_failed_files != iter->second.end()) {
it_failed_files->second++;
} else {
iter->second.insert(std::make_pair(file.file_id_, 1));
}
}
return Status::OK();
}
Status
DBImpl::MarkSucceedIndexFile(const meta::TableFileSchema& file) {
std::lock_guard<std::mutex> lck(index_failed_mutex_);
auto iter = index_failed_files_.find(file.table_id_);
if (iter != index_failed_files_.end()) {
iter->second.erase(file.file_id_);
}
return Status::OK();
}
Status
DBImpl::IgnoreFailedIndexFiles(meta::TableFilesSchema& table_files) {
std::lock_guard<std::mutex> lck(index_failed_mutex_);
// there could be some failed files belong to different table.
// some files may has failed for several times, no need to build index for these files.
// thus we can avoid dead circle for build index operation
for (auto it_file = table_files.begin(); it_file != table_files.end();) {
auto it_failed_files = index_failed_files_.find((*it_file).table_id_);
if (it_failed_files != index_failed_files_.end()) {
auto it_failed_file = it_failed_files->second.find((*it_file).file_id_);
if (it_failed_file != it_failed_files->second.end()) {
if (it_failed_file->second >= INDEX_FAILED_RETRY_TIME) {
it_file = table_files.erase(it_file);
continue;
}
}
}
++it_file;
}
return Status::OK();
}
} // namespace engine
} // namespace milvus
......@@ -25,6 +25,7 @@
#include <atomic>
#include <condition_variable>
#include <list>
#include <map>
#include <memory>
#include <mutex>
#include <set>
......@@ -35,8 +36,6 @@
namespace milvus {
namespace engine {
class Env;
namespace meta {
class Meta;
}
......@@ -179,6 +178,21 @@ class DBImpl : public DB {
Status
GetTableRowCountRecursively(const std::string& table_id, uint64_t& row_count);
Status
CleanFailedIndexFileOfTable(const std::string& table_id);
Status
GetFailedIndexFileOfTable(const std::string& table_id, std::vector<std::string>& failed_files);
Status
MarkFailedIndexFile(const meta::TableFileSchema& file);
Status
MarkSucceedIndexFile(const meta::TableFileSchema& file);
Status
IgnoreFailedIndexFiles(meta::TableFilesSchema& table_files);
private:
const DBOptions options_;
......@@ -200,7 +214,11 @@ class DBImpl : public DB {
std::list<std::future<void>> index_thread_results_;
std::mutex build_index_mutex_;
}; // DBImpl
std::mutex index_failed_mutex_;
using FileID2FailedTimes = std::map<std::string, uint64_t>;
using Table2FailedFiles = std::map<std::string, FileID2FailedTimes>;
Table2FailedFiles index_failed_files_; // file id mapping to failed times
}; // DBImpl
} // namespace engine
} // namespace milvus
......@@ -154,7 +154,9 @@ GetTableFilePath(const DBMetaOptions& options, meta::TableFileSchema& table_file
}
std::string msg = "Table file doesn't exist: " + file_path;
ENGINE_LOG_ERROR << msg << " in path: " << options.path_ << " for table: " << table_file.table_id_;
if (table_file.file_size_ > 0) { // no need to pop error for empty file
ENGINE_LOG_ERROR << msg << " in path: " << options.path_ << " for table: " << table_file.table_id_;
}
return Status(DB_ERROR, msg);
}
......
......@@ -1610,10 +1610,35 @@ MySQLMetaImpl::FilesByType(const std::string& table_id, const std::vector<int>&
}
}
ENGINE_LOG_DEBUG << "Table " << table_id << " currently has raw files:" << raw_count
<< " new files:" << new_count << " new_merge files:" << new_merge_count
<< " new_index files:" << new_index_count << " to_index files:" << to_index_count
<< " index files:" << index_count << " backup files:" << backup_count;
std::string msg = "Get table files by type. ";
for (int file_type : file_types) {
switch (file_type) {
case (int)TableFileSchema::RAW:
msg = msg + "raw files:" + std::to_string(raw_count);
break;
case (int)TableFileSchema::NEW:
msg = msg + "new files:" + std::to_string(raw_count);
break;
case (int)TableFileSchema::NEW_MERGE:
msg = msg + "new_merge files:" + std::to_string(raw_count);
break;
case (int)TableFileSchema::NEW_INDEX:
msg = msg + "new_index files:" + std::to_string(raw_count);
break;
case (int)TableFileSchema::TO_INDEX:
msg = msg + "to_index files:" + std::to_string(raw_count);
break;
case (int)TableFileSchema::INDEX:
msg = msg + "index files:" + std::to_string(raw_count);
break;
case (int)TableFileSchema::BACKUP:
msg = msg + "backup files:" + std::to_string(raw_count);
break;
default:
break;
}
}
ENGINE_LOG_DEBUG << msg;
}
} catch (std::exception& e) {
return HandleException("GENERAL ERROR WHEN GET FILE BY TYPE", e.what());
......
......@@ -1157,10 +1157,34 @@ SqliteMetaImpl::FilesByType(const std::string& table_id,
table_files.emplace_back(file_schema);
}
ENGINE_LOG_DEBUG << "Table " << table_id << " currently has raw files:" << raw_count
<< " new files:" << new_count << " new_merge files:" << new_merge_count
<< " new_index files:" << new_index_count << " to_index files:" << to_index_count
<< " index files:" << index_count << " backup files:" << backup_count;
std::string msg = "Get table files by type. ";
for (int file_type : file_types) {
switch (file_type) {
case (int)TableFileSchema::RAW:
msg = msg + "raw files:" + std::to_string(raw_count);
break;
case (int)TableFileSchema::NEW:
msg = msg + "new files:" + std::to_string(raw_count);
break;
case (int)TableFileSchema::NEW_MERGE:
msg = msg + "new_merge files:" + std::to_string(raw_count);
break;
case (int)TableFileSchema::NEW_INDEX:
msg = msg + "new_index files:" + std::to_string(raw_count);
break;
case (int)TableFileSchema::TO_INDEX:
msg = msg + "to_index files:" + std::to_string(raw_count);
break;
case (int)TableFileSchema::INDEX:
msg = msg + "index files:" + std::to_string(raw_count);
break;
case (int)TableFileSchema::BACKUP:
msg = msg + "backup files:" + std::to_string(raw_count);
break;
default:break;
}
}
ENGINE_LOG_DEBUG << msg;
}
} catch (std::exception& e) {
return HandleException("Encounter exception when check non index files", e.what());
......
......@@ -229,7 +229,7 @@ CommonUtil::ConvertTime(tm time_struct, time_t& time_integer) {
void
CommonUtil::EraseFromCache(const std::string& item_key) {
if (item_key.empty()) {
SERVER_LOG_ERROR << "Empty key cannot be erased from cache";
// SERVER_LOG_ERROR << "Empty key cannot be erased from cache";
return;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册