未验证 提交 d0543d21 编写于 作者: G groot 提交者: GitHub

fix bug (#3078)

* fix bug
Signed-off-by: Nyhmo <yihua.mo@zilliz.com>
上级 717d0d7b
......@@ -27,7 +27,7 @@
namespace milvus {
namespace codec {
const char* BLOOM_FILTER_POSTFIX = ".bf";
const char* BLOOM_FILTER_POSTFIX = ".blf";
constexpr unsigned int BLOOM_FILTER_CAPACITY = 500000;
constexpr double BLOOM_FILTER_ERROR_RATE = 0.01;
......
......@@ -1234,17 +1234,10 @@ void
DBImpl::BackgroundMerge(std::set<std::string> collection_names, bool force_merge_all) {
// LOG_ENGINE_TRACE_ << " Background merge thread start";
Status status;
for (auto& collection_name : collection_names) {
const std::lock_guard<std::mutex> lock(flush_merge_compact_mutex_);
auto old_strategy = merge_mgr_ptr_->Strategy();
if (force_merge_all) {
merge_mgr_ptr_->UseStrategy(MergeStrategyType::ADAPTIVE);
}
status = merge_mgr_ptr_->MergeFiles(collection_name);
merge_mgr_ptr_->UseStrategy(old_strategy);
auto status = merge_mgr_ptr_->MergeFiles(collection_name);
if (!status.ok()) {
LOG_ENGINE_ERROR_ << "Failed to get merge files for collection: " << collection_name
<< " reason:" << status.message();
......@@ -1255,8 +1248,6 @@ DBImpl::BackgroundMerge(std::set<std::string> collection_names, bool force_merge
break;
}
}
// TODO: cleanup with ttl
}
void
......
......@@ -18,6 +18,7 @@
#include "db/snapshot/Snapshots.h"
#include "segment/Segment.h"
#include <algorithm>
#include <memory>
#include <unordered_map>
#include <utility>
......@@ -152,6 +153,7 @@ GetSnapshotInfo(const std::string& collection_name, milvus::json& json_info) {
STATUS_CHECK(snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_name));
size_t total_row_count = 0;
size_t total_data_size = 0;
std::unordered_map<snapshot::ID_TYPE, milvus::json> partitions;
auto partition_names = ss->GetPartitionNames();
......@@ -165,6 +167,8 @@ GetSnapshotInfo(const std::string& collection_name, milvus::json& json_info) {
auto partition_commit = ss->GetPartitionCommitByPartitionId(partition->GetID());
json_partition[JSON_ROW_COUNT] = partition_commit->GetRowCount();
total_row_count += partition_commit->GetRowCount();
json_partition[JSON_DATA_SIZE] = partition_commit->GetSize();
total_data_size += partition_commit->GetSize();
partitions.insert(std::make_pair(partition->GetID(), json_partition));
}
......@@ -172,6 +176,7 @@ GetSnapshotInfo(const std::string& collection_name, milvus::json& json_info) {
snapshot::IDS_TYPE segment_ids;
auto handler = std::make_shared<SegmentsToSearchCollector>(ss, segment_ids);
handler->Iterate();
std::sort(segment_ids.begin(), segment_ids.end());
std::unordered_map<snapshot::ID_TYPE, std::vector<milvus::json>> json_partition_segments;
for (auto id : segment_ids) {
......@@ -225,6 +230,7 @@ GetSnapshotInfo(const std::string& collection_name, milvus::json& json_info) {
}
json_info[JSON_ROW_COUNT] = total_row_count;
json_info[JSON_DATA_SIZE] = total_data_size;
json_info[JSON_PARTITIONS] = json_partitions;
return Status::OK();
......
......@@ -40,16 +40,24 @@
#include "knowhere/index/vector_index/helpers/IndexParameter.h"
#ifdef MILVUS_GPU_VERSION
#include "knowhere/index/vector_index/gpu/GPUIndex.h"
#include "knowhere/index/vector_index/gpu/IndexIVFSQHybrid.h"
#include "knowhere/index/vector_index/gpu/Quantizer.h"
#include "knowhere/index/vector_index/helpers/Cloner.h"
#endif
namespace milvus {
namespace engine {
namespace {
template <typename T>
knowhere::IndexPtr
CreateSortedIndex(std::vector<uint8_t>& raw_data) {
auto count = raw_data.size() / sizeof(T);
auto index_ptr =
std::make_shared<knowhere::StructuredIndexSort<T>>(count, reinterpret_cast<const T*>(raw_data.data()));
return std::static_pointer_cast<knowhere::Index>(index_ptr);
}
} // namespace
ExecutionEngineImpl::ExecutionEngineImpl(const std::string& dir_root, const SegmentVisitorPtr& segment_visitor)
: gpu_enable_(config.gpu.enable()) {
......@@ -93,31 +101,20 @@ ExecutionEngineImpl::CreateStructuredIndex(const DataType field_type, std::vecto
knowhere::IndexPtr& index_ptr) {
switch (field_type) {
case engine::DataType::INT32: {
auto size = raw_data.size() / sizeof(int32_t);
std::vector<int32_t> int32_data(size, 0);
memcpy(int32_data.data(), raw_data.data(), size);
auto int32_index_ptr = std::make_shared<knowhere::StructuredIndexSort<int32_t>>(
raw_data.size(), reinterpret_cast<const int32_t*>(raw_data.data()));
index_ptr = std::static_pointer_cast<knowhere::Index>(int32_index_ptr);
index_ptr = CreateSortedIndex<int32_t>(raw_data);
break;
}
case engine::DataType::UID:
case engine::DataType::INT64: {
auto int64_index_ptr = std::make_shared<knowhere::StructuredIndexSort<int64_t>>(
raw_data.size(), reinterpret_cast<const int64_t*>(raw_data.data()));
index_ptr = std::static_pointer_cast<knowhere::Index>(int64_index_ptr);
index_ptr = CreateSortedIndex<int64_t>(raw_data);
break;
}
case engine::DataType::FLOAT: {
auto float_index_ptr = std::make_shared<knowhere::StructuredIndexSort<float>>(
raw_data.size(), reinterpret_cast<const float*>(raw_data.data()));
index_ptr = std::static_pointer_cast<knowhere::Index>(float_index_ptr);
index_ptr = CreateSortedIndex<float>(raw_data);
break;
}
case engine::DataType::DOUBLE: {
auto double_index_ptr = std::make_shared<knowhere::StructuredIndexSort<double>>(
raw_data.size(), reinterpret_cast<const double*>(raw_data.data()));
index_ptr = std::static_pointer_cast<knowhere::Index>(double_index_ptr);
index_ptr = CreateSortedIndex<double>(raw_data);
break;
}
default: { return Status(DB_ERROR, "Field is not structured type"); }
......
......@@ -45,14 +45,8 @@ enum class MergeStrategyType {
class MergeManager {
public:
virtual MergeStrategyType
Strategy() const = 0;
virtual Status
UseStrategy(MergeStrategyType type) = 0;
virtual Status
MergeFiles(const std::string& collection_id) = 0;
MergeFiles(const std::string& collection_id, MergeStrategyType type = MergeStrategyType::SIMPLE) = 0;
}; // MergeManager
using MergeManagerPtr = std::shared_ptr<MergeManager>;
......
......@@ -19,7 +19,7 @@ namespace engine {
MergeManagerPtr
MergeManagerFactory::SSBuild(const DBOptions& options) {
return std::make_shared<MergeManagerImpl>(options, MergeStrategyType::SIMPLE);
return std::make_shared<MergeManagerImpl>(options);
}
} // namespace engine
......
......@@ -21,16 +21,14 @@
namespace milvus {
namespace engine {
MergeManagerImpl::MergeManagerImpl(const DBOptions& options, MergeStrategyType type)
: options_(options), strategy_type_(type) {
UseStrategy(type);
MergeManagerImpl::MergeManagerImpl(const DBOptions& options) : options_(options) {
}
Status
MergeManagerImpl::UseStrategy(MergeStrategyType type) {
MergeManagerImpl::CreateStrategy(MergeStrategyType type, MergeStrategyPtr& strategy) {
switch (type) {
case MergeStrategyType::SIMPLE: {
strategy_ = std::make_shared<MergeSimpleStrategy>();
strategy = std::make_shared<MergeSimpleStrategy>();
break;
}
case MergeStrategyType::LAYERED:
......@@ -38,20 +36,19 @@ MergeManagerImpl::UseStrategy(MergeStrategyType type) {
default: {
std::string msg = "Unsupported merge strategy type: " + std::to_string((int32_t)type);
LOG_ENGINE_ERROR_ << msg;
throw Exception(DB_ERROR, msg);
return Status(DB_ERROR, msg);
}
}
strategy_type_ = type;
return Status::OK();
}
Status
MergeManagerImpl::MergeFiles(const std::string& collection_name) {
if (strategy_ == nullptr) {
std::string msg = "No merge strategy specified";
LOG_ENGINE_ERROR_ << msg;
return Status(DB_ERROR, msg);
MergeManagerImpl::MergeFiles(const std::string& collection_name, MergeStrategyType type) {
MergeStrategyPtr strategy;
auto status = CreateStrategy(type, strategy);
if (!status.ok()) {
return status;
}
while (true) {
......@@ -79,7 +76,7 @@ MergeManagerImpl::MergeFiles(const std::string& collection_name) {
}
SegmentGroups segment_groups;
auto status = strategy_->RegroupSegments(latest_ss, part2seg, segment_groups);
auto status = strategy->RegroupSegments(latest_ss, part2seg, segment_groups);
if (!status.ok()) {
LOG_ENGINE_ERROR_ << "Failed to regroup segments for: " << collection_name
<< ", continue to merge all files into one";
......
......@@ -29,24 +29,17 @@ namespace engine {
class MergeManagerImpl : public MergeManager {
public:
MergeManagerImpl(const DBOptions& options, MergeStrategyType type);
MergeStrategyType
Strategy() const override {
return strategy_type_;
}
explicit MergeManagerImpl(const DBOptions& options);
Status
UseStrategy(MergeStrategyType type) override;
MergeFiles(const std::string& collection_name, MergeStrategyType type) override;
private:
Status
MergeFiles(const std::string& collection_name) override;
CreateStrategy(MergeStrategyType type, MergeStrategyPtr& strategy);
private:
DBOptions options_;
MergeStrategyType strategy_type_ = MergeStrategyType::SIMPLE;
MergeStrategyPtr strategy_;
}; // MergeManagerImpl
} // namespace engine
......
......@@ -31,15 +31,5 @@ class Index : public milvus::cache::DataObj {
using IndexPtr = std::shared_ptr<Index>;
// todo: remove from knowhere
class ToIndexData : public milvus::cache::DataObj {
public:
explicit ToIndexData(int64_t size) : size_(size) {
}
private:
int64_t size_ = 0;
};
} // namespace knowhere
} // namespace milvus
......@@ -30,6 +30,7 @@
#include "storage/disk/DiskIOReader.h"
#include "storage/disk/DiskIOWriter.h"
#include "storage/disk/DiskOperation.h"
#include "utils/CommonUtil.h"
#include "utils/Log.h"
#include "utils/SignalHandler.h"
#include "utils/TimeRecorder.h"
......@@ -127,6 +128,8 @@ SegmentWriter::WriteField(const std::string& file_path, const engine::FIXED_FIEL
Status
SegmentWriter::WriteFields() {
TimeRecorder recorder("SegmentWriter::WriteFields");
auto& field_visitors_map = segment_visitor_->GetFieldVisitors();
for (auto& iter : field_visitors_map) {
const engine::snapshot::FieldPtr& field = iter.second->GetField();
......@@ -135,9 +138,19 @@ SegmentWriter::WriteFields() {
segment_ptr_->GetFixedFieldData(name, raw_data);
auto element_visitor = iter.second->GetElementVisitor(engine::FieldElementType::FET_RAW);
std::string file_path =
engine::snapshot::GetResPath<engine::snapshot::SegmentFile>(dir_collections_, element_visitor->GetFile());
STATUS_CHECK(WriteField(file_path, raw_data));
if (element_visitor && element_visitor->GetFile()) {
auto segment_file = element_visitor->GetFile();
std::string file_path =
engine::snapshot::GetResPath<engine::snapshot::SegmentFile>(dir_collections_, segment_file);
STATUS_CHECK(WriteField(file_path, raw_data));
auto file_size = milvus::CommonUtil::GetFileSize(file_path);
segment_file->SetSize(file_size);
recorder.RecordSection("Serialize field raw file");
} else {
return Status(DB_ERROR, "Raw element missed in snapshot");
}
}
return Status::OK();
......@@ -157,23 +170,31 @@ SegmentWriter::WriteBloomFilter() {
auto& field_visitors_map = segment_visitor_->GetFieldVisitors();
auto uid_field_visitor = segment_visitor_->GetFieldVisitor(engine::DEFAULT_UID_NAME);
auto uid_blf_visitor = uid_field_visitor->GetElementVisitor(engine::FieldElementType::FET_BLOOM_FILTER);
std::string uid_blf_path =
engine::snapshot::GetResPath<engine::snapshot::SegmentFile>(dir_collections_, uid_blf_visitor->GetFile());
if (uid_blf_visitor && uid_blf_visitor->GetFile()) {
auto segment_file = uid_blf_visitor->GetFile();
std::string file_path =
engine::snapshot::GetResPath<engine::snapshot::SegmentFile>(dir_collections_, segment_file);
auto& ss_codec = codec::Codec::instance();
segment::IdBloomFilterPtr bloom_filter_ptr;
ss_codec.GetIdBloomFilterFormat()->Create(fs_ptr_, file_path, bloom_filter_ptr);
int64_t* uids = (int64_t*)(uid_data.data());
int64_t row_count = segment_ptr_->GetRowCount();
for (int64_t i = 0; i < row_count; i++) {
bloom_filter_ptr->Add(uids[i]);
}
segment_ptr_->SetBloomFilter(bloom_filter_ptr);
auto& ss_codec = codec::Codec::instance();
segment::IdBloomFilterPtr bloom_filter_ptr;
ss_codec.GetIdBloomFilterFormat()->Create(fs_ptr_, uid_blf_path, bloom_filter_ptr);
recorder.RecordSection("Initialize bloom filter");
int64_t* uids = (int64_t*)(uid_data.data());
int64_t row_count = segment_ptr_->GetRowCount();
for (int64_t i = 0; i < row_count; i++) {
bloom_filter_ptr->Add(uids[i]);
}
segment_ptr_->SetBloomFilter(bloom_filter_ptr);
STATUS_CHECK(WriteBloomFilter(file_path, segment_ptr_->GetBloomFilter()));
recorder.RecordSection("Initialize bloom filter");
return WriteBloomFilter(uid_blf_path, segment_ptr_->GetBloomFilter());
auto file_size = milvus::CommonUtil::GetFileSize(file_path + codec::IdBloomFilterFormat::FilePostfix());
segment_file->SetSize(file_size);
} else {
return Status(DB_ERROR, "Bloom filter element missed in snapshot");
}
} catch (std::exception& e) {
std::string err_msg = "Failed to write vectors: " + std::string(e.what());
LOG_ENGINE_ERROR_ << err_msg;
......@@ -181,6 +202,8 @@ SegmentWriter::WriteBloomFilter() {
engine::utils::SendExitSignal();
return Status(SERVER_WRITE_ERROR, err_msg);
}
return Status::OK();
}
Status
......@@ -190,12 +213,10 @@ SegmentWriter::WriteBloomFilter(const std::string& file_path, const IdBloomFilte
}
try {
TimeRecorder recorder("SegmentWriter::WriteBloomFilter");
TimeRecorderAuto recorder("SegmentWriter::WriteBloomFilter: " + file_path);
auto& ss_codec = codec::Codec::instance();
ss_codec.GetIdBloomFilterFormat()->Write(fs_ptr_, file_path, id_bloom_filter_ptr);
recorder.RecordSection("Write bloom filter file");
} catch (std::exception& e) {
std::string err_msg = "Failed to write bloom filter: " + std::string(e.what());
LOG_ENGINE_ERROR_ << err_msg;
......@@ -203,6 +224,7 @@ SegmentWriter::WriteBloomFilter(const std::string& file_path, const IdBloomFilte
engine::utils::SendExitSignal();
return Status(SERVER_WRITE_ERROR, err_msg);
}
return Status::OK();
}
......@@ -211,10 +233,20 @@ SegmentWriter::WriteDeletedDocs() {
auto& field_visitors_map = segment_visitor_->GetFieldVisitors();
auto uid_field_visitor = segment_visitor_->GetFieldVisitor(engine::DEFAULT_UID_NAME);
auto del_doc_visitor = uid_field_visitor->GetElementVisitor(engine::FieldElementType::FET_DELETED_DOCS);
std::string file_path =
engine::snapshot::GetResPath<engine::snapshot::SegmentFile>(dir_collections_, del_doc_visitor->GetFile());
if (del_doc_visitor && del_doc_visitor->GetFile()) {
auto segment_file = del_doc_visitor->GetFile();
std::string file_path =
engine::snapshot::GetResPath<engine::snapshot::SegmentFile>(dir_collections_, segment_file);
STATUS_CHECK(WriteDeletedDocs(file_path, segment_ptr_->GetDeletedDocs()));
return WriteDeletedDocs(file_path, segment_ptr_->GetDeletedDocs());
auto file_size = milvus::CommonUtil::GetFileSize(file_path + codec::DeletedDocsFormat::FilePostfix());
segment_file->SetSize(file_size);
} else {
return Status(DB_ERROR, "Deleted-doc element missed in snapshot");
}
return Status::OK();
}
Status
......@@ -224,7 +256,7 @@ SegmentWriter::WriteDeletedDocs(const std::string& file_path, const DeletedDocsP
}
try {
TimeRecorderAuto recorder("SegmentWriter::WriteDeletedDocs");
TimeRecorderAuto recorder("SegmentWriter::WriteDeletedDocs: " + file_path);
auto& ss_codec = codec::Codec::instance();
ss_codec.GetDeletedDocsFormat()->Write(fs_ptr_, file_path, deleted_docs);
......@@ -260,7 +292,7 @@ SegmentWriter::Merge(const SegmentReaderPtr& segment_reader) {
LOG_ENGINE_DEBUG_ << "Merging from " << segment_reader->GetSegmentPath() << " to " << GetSegmentPath();
TimeRecorder recorder("SegmentWriter::Merge");
TimeRecorderAuto recorder("SegmentWriter::Merge");
// merge deleted docs (Note: this step must before merge raw data)
segment::DeletedDocsPtr src_deleted_docs;
......@@ -305,11 +337,6 @@ SegmentWriter::Merge(const SegmentReaderPtr& segment_reader) {
return Status::OK();
}
size_t
SegmentWriter::Size() {
return 0;
}
size_t
SegmentWriter::RowCount() {
return segment_ptr_->GetRowCount();
......@@ -334,24 +361,36 @@ SegmentWriter::WriteVectorIndex(const std::string& field_name) {
if (field == nullptr) {
return Status(DB_ERROR, "Invalid filed name: " + field_name);
}
auto element_visitor = field->GetElementVisitor(engine::FieldElementType::FET_INDEX);
if (element_visitor == nullptr) {
return Status(DB_ERROR, "Invalid filed name: " + field_name);
}
auto& ss_codec = codec::Codec::instance();
fs_ptr_->operation_ptr_->CreateDirectory();
std::string file_path =
engine::snapshot::GetResPath<engine::snapshot::SegmentFile>(dir_collections_, element_visitor->GetFile());
ss_codec.GetVectorIndexFormat()->WriteIndex(fs_ptr_, file_path, index);
element_visitor = field->GetElementVisitor(engine::FieldElementType::FET_COMPRESS_SQ8);
if (element_visitor != nullptr) {
file_path = engine::snapshot::GetResPath<engine::snapshot::SegmentFile>(dir_collections_,
element_visitor->GetFile());
ss_codec.GetVectorIndexFormat()->WriteCompress(fs_ptr_, file_path, index);
// serialize index file
{
auto element_visitor = field->GetElementVisitor(engine::FieldElementType::FET_INDEX);
if (element_visitor && element_visitor->GetFile()) {
auto segment_file = element_visitor->GetFile();
std::string file_path =
engine::snapshot::GetResPath<engine::snapshot::SegmentFile>(dir_collections_, segment_file);
ss_codec.GetVectorIndexFormat()->WriteIndex(fs_ptr_, file_path, index);
auto file_size = milvus::CommonUtil::GetFileSize(file_path + codec::VectorIndexFormat::FilePostfix());
segment_file->SetSize(file_size);
}
}
// serialize compress file
{
auto element_visitor = field->GetElementVisitor(engine::FieldElementType::FET_COMPRESS_SQ8);
if (element_visitor && element_visitor->GetFile()) {
auto segment_file = element_visitor->GetFile();
std::string file_path =
engine::snapshot::GetResPath<engine::snapshot::SegmentFile>(dir_collections_, segment_file);
ss_codec.GetVectorIndexFormat()->WriteCompress(fs_ptr_, file_path, index);
auto file_size =
milvus::CommonUtil::GetFileSize(file_path + codec::VectorCompressFormat::FilePostfix());
segment_file->SetSize(file_size);
}
}
} catch (std::exception& e) {
std::string err_msg = "Failed to write vector index: " + std::string(e.what());
......@@ -383,20 +422,22 @@ SegmentWriter::WriteStructuredIndex(const std::string& field_name) {
return Status(DB_ERROR, "Invalid filed name: " + field_name);
}
auto element_visitor = field->GetElementVisitor(engine::FieldElementType::FET_INDEX);
if (element_visitor == nullptr) {
return Status(DB_ERROR, "Invalid filed name: " + field_name);
}
auto& ss_codec = codec::Codec::instance();
fs_ptr_->operation_ptr_->CreateDirectory();
engine::FIELD_TYPE field_type;
segment_ptr_->GetFieldType(field_name, field_type);
std::string file_path =
engine::snapshot::GetResPath<engine::snapshot::SegmentFile>(dir_collections_, element_visitor->GetFile());
ss_codec.GetStructuredIndexFormat()->Write(fs_ptr_, file_path, field_type, index);
// serialize index file
auto element_visitor = field->GetElementVisitor(engine::FieldElementType::FET_INDEX);
if (element_visitor && element_visitor->GetFile()) {
engine::FIELD_TYPE field_type;
segment_ptr_->GetFieldType(field_name, field_type);
auto segment_file = element_visitor->GetFile();
std::string file_path =
engine::snapshot::GetResPath<engine::snapshot::SegmentFile>(dir_collections_, segment_file);
ss_codec.GetStructuredIndexFormat()->Write(fs_ptr_, file_path, field_type, index);
auto file_size = milvus::CommonUtil::GetFileSize(file_path + codec::StructuredIndexFormat::FilePostfix());
segment_file->SetSize(file_size);
}
} catch (std::exception& e) {
std::string err_msg = "Failed to write vector index: " + std::string(e.what());
LOG_ENGINE_ERROR_ << err_msg;
......
......@@ -54,9 +54,6 @@ class SegmentWriter {
Status
Merge(const SegmentReaderPtr& segment_reader);
size_t
Size();
size_t
RowCount();
......
......@@ -456,8 +456,9 @@ TEST_F(DBTest, MergeTest) {
std::set<std::string> segment_file_paths;
auto sf_executor = [&] (const SegmentFilePtr& segment_file, SegmentFileIterator* handler) -> Status {
std::string res_path = milvus::engine::snapshot::GetResPath<SegmentFile>(root_path, segment_file);
if (boost::filesystem::is_regular_file(res_path) ||
boost::filesystem::is_regular_file(res_path + ".bf")) {
if (boost::filesystem::is_regular_file(res_path)
|| boost::filesystem::is_regular_file(res_path + milvus::codec::IdBloomFilterFormat::FilePostfix())
|| boost::filesystem::is_regular_file(res_path + milvus::codec::DeletedDocsFormat::FilePostfix())) {
segment_file_paths.insert(res_path);
std::cout << res_path << std::endl;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册