未验证 提交 dd2535e1 编写于 作者: G groot 提交者: GitHub

fix hnswsq index bug (#3467)

* remove unused config
Signed-off-by: Ngroot <yihua.mo@zilliz.com>

* add comments
Signed-off-by: Ngroot <yihua.mo@zilliz.com>

* fix hnswsq index bug
Signed-off-by: Ngroot <yihua.mo@zilliz.com>

* refine code
Signed-off-by: Ngroot <yihua.mo@zilliz.com>
上级 b4caa81a
......@@ -76,26 +76,10 @@ storage:
# | files in advance before implementing data changes. WAL | | |
# | ensures the atomicity and durability for Milvus operations.| | |
#----------------------+------------------------------------------------------------+------------+-----------------+
# recovery_error_ignore| Whether to ignore logs with errors that happens during WAL | Boolean | false |
# | recovery. If true, when Milvus restarts for recovery and | | |
# | there are errors in WAL log files, log files with errors | | |
# | are ignored. If false, Milvus does not restart when there | | |
# | are errors in WAL log files. | | |
#----------------------+------------------------------------------------------------+------------+-----------------+
# buffer_size | Sum total of the read buffer and the write buffer in Bytes.| String | 256MB |
# | buffer_size must be in range [64MB, 4096MB]. | | |
# | If the value you specified is out of range, Milvus | | |
# | automatically uses the boundary value closest to the | | |
# | specified value. It is recommended you set buffer_size to | | |
# | a value greater than the inserted data size of a single | | |
# | insert operation for better performance. | | |
#----------------------+------------------------------------------------------------+------------+-----------------+
# path | Location of WAL log files. | String | |
#----------------------+------------------------------------------------------------+------------+-----------------+
wal:
enable: true
recovery_error_ignore: false
buffer_size: 256MB
path: @MILVUS_DB_PATH@/wal
#----------------------+------------------------------------------------------------+------------+-----------------+
......
......@@ -160,8 +160,8 @@ VectorIndexFormat::ConstructIndex(const std::string& index_name, knowhere::Binar
}
if (compress_data != nullptr) {
LOG_ENGINE_DEBUG_ << "load index with " << SQ8_DATA << " " << compress_data->size;
index_data.Append(SQ8_DATA, compress_data);
LOG_ENGINE_DEBUG_ << "load index with " << QUANTIZATION_DATA << " " << compress_data->size;
index_data.Append(QUANTIZATION_DATA, compress_data);
length += compress_data->size;
}
......@@ -217,7 +217,7 @@ VectorIndexFormat::WriteCompress(const storage::FSHandlerPtr& fs_ptr, const std:
auto binaryset = index->Serialize(knowhere::Config());
auto sq8_data = binaryset.Erase(SQ8_DATA);
auto sq8_data = binaryset.Erase(QUANTIZATION_DATA);
if (sq8_data != nullptr) {
auto& ss_codec = codec::Codec::instance();
ss_codec.GetVectorCompressFormat()->Write(fs_ptr, file_path, sq8_data);
......
......@@ -21,16 +21,16 @@ constexpr int64_t MB = 1LL << 20;
constexpr int64_t GB = 1LL << 30;
constexpr int64_t TB = 1LL << 40;
constexpr int64_t MAX_MEM_SEGMENT_SIZE = 128 * MB;
constexpr int64_t MAX_MEM_SEGMENT_SIZE = 128 * MB; // max data size of one segment in insert buffer
constexpr int64_t MAX_NAME_LENGTH = 255;
constexpr int64_t MAX_DIMENSION = 32768;
constexpr int32_t MAX_SEGMENT_ROW_COUNT = 4 * 1024 * 1024;
constexpr int64_t DEFAULT_SEGMENT_ROW_COUNT = 100000; // default row count per segment when creating collection
constexpr int64_t MAX_INSERT_DATA_SIZE = 256 * MB;
constexpr int64_t MAX_WAL_FILE_SIZE = 256 * MB;
constexpr int64_t MAX_NAME_LENGTH = 255; // max string length for collection/partition/field name
constexpr int64_t MAX_DIMENSION = 32768; // max dimension of vector field
constexpr int32_t MAX_SEGMENT_ROW_COUNT = 4 * 1024 * 1024; // max row count of one segment
constexpr int64_t DEFAULT_SEGMENT_ROW_COUNT = 512 * 1024; // default row count per segment when creating collection
constexpr int64_t MAX_INSERT_DATA_SIZE = 256 * MB; // max data size in one insert action
constexpr int64_t MAX_WAL_FILE_SIZE = 256 * MB; // max file size of wal file
constexpr int64_t BUILD_INEDX_RETRY_TIMES = 3;
constexpr int64_t BUILD_INEDX_RETRY_TIMES = 3; // retry times if build index failed
} // namespace engine
} // namespace milvus
......@@ -61,10 +61,10 @@ SetSnapshotIndex(const std::string& collection_name, const std::string& field_na
json[engine::PARAM_INDEX_EXTRA_PARAMS] = index_info.extra_params_;
index_element->SetParams(json);
if (index_info.index_name_ == knowhere::IndexEnum::INDEX_RHNSWSQ) {
if (utils::RequireCompressFile(index_info.index_type_)) {
auto compress_element =
std::make_shared<snapshot::FieldElement>(ss->GetCollectionId(), field->GetID(), ELEMENT_INDEX_COMPRESS,
milvus::engine::FieldElementType::FET_COMPRESS_SQ8);
milvus::engine::FieldElementType::FET_COMPRESS);
ss_context.new_field_elements.push_back(compress_element);
}
}
......@@ -135,7 +135,7 @@ DeleteSnapshotIndex(const std::string& collection_name, const std::string& field
std::vector<snapshot::FieldElementPtr> elements = ss->GetFieldElementsByField(name);
for (auto& element : elements) {
if (element->GetFEtype() == engine::FieldElementType::FET_INDEX ||
element->GetFEtype() == engine::FieldElementType::FET_COMPRESS_SQ8) {
element->GetFEtype() == engine::FieldElementType::FET_COMPRESS) {
snapshot::OperationContext context;
context.stale_field_elements.push_back(element);
auto op = std::make_shared<snapshot::DropAllIndexOperation>(context, ss);
......
......@@ -85,7 +85,7 @@ enum class FieldElementType {
FET_BLOOM_FILTER = 2,
FET_DELETED_DOCS = 3,
FET_INDEX = 4,
FET_COMPRESS_SQ8 = 5,
FET_COMPRESS = 5,
};
///////////////////////////////////////////////////////////////////////////////////////////////////
......
......@@ -25,8 +25,11 @@
#include "db/Types.h"
#ifdef MILVUS_GPU_VERSION
#include "cache/GpuCacheMgr.h"
#endif
#include "config/ServerConfig.h"
//#include "storage/s3/S3ClientWrapper.h"
#include "knowhere/index/vector_index/helpers/IndexParameter.h"
......@@ -169,6 +172,17 @@ GetSizeOfChunk(const engine::DataChunkPtr& chunk) {
return total_size;
}
bool
RequireRawFile(const std::string& index_type) {
return index_type == knowhere::IndexEnum::INDEX_FAISS_IVFFLAT || index_type == knowhere::IndexEnum::INDEX_NSG ||
index_type == knowhere::IndexEnum::INDEX_HNSW;
}
bool
RequireCompressFile(const std::string& index_type) {
return index_type == knowhere::IndexEnum::INDEX_RHNSWSQ;
}
} // namespace utils
} // namespace engine
} // namespace milvus
......@@ -58,6 +58,12 @@ GetIDFromChunk(const engine::DataChunkPtr& chunk, engine::IDNumbers& ids);
int64_t
GetSizeOfChunk(const engine::DataChunkPtr& chunk);
bool
RequireRawFile(const std::string& index_type);
bool
RequireCompressFile(const std::string& index_type);
} // namespace utils
} // namespace engine
} // namespace milvus
......@@ -727,9 +727,8 @@ ExecutionEngineImpl::CreateSnapshotIndexFile(AddSegmentFileOperation& operation,
}
// create snapshot compress file
std::string index_name = index_element->GetName();
if (index_name == knowhere::IndexEnum::INDEX_RHNSWSQ) {
auto compress_visitor = field_visitor->GetElementVisitor(engine::FieldElementType::FET_COMPRESS_SQ8);
if (utils::RequireCompressFile(index_info.index_type_)) {
auto compress_visitor = field_visitor->GetElementVisitor(engine::FieldElementType::FET_COMPRESS);
if (compress_visitor == nullptr) {
return Status(DB_ERROR,
"Could not build index: compress element not exist"); // something wrong in CreateIndex
......
......@@ -27,7 +27,7 @@ namespace knowhere {
#define INDEX_DATA "INDEX_DATA"
#define RAW_DATA "RAW_DATA"
#define SQ8_DATA "SQ8_DATA"
#define QUANTIZATION_DATA "QUANTIZATION_DATA"
class VecIndex : public Index {
public:
......
......@@ -25,11 +25,11 @@
#include "codecs/Codec.h"
#include "db/SnapshotUtils.h"
#include "db/Types.h"
#include "db/Utils.h"
#include "db/snapshot/ResourceHelper.h"
#include "knowhere/index/vector_index/VecIndex.h"
#include "knowhere/index/vector_index/VecIndexFactory.h"
#include "knowhere/index/vector_index/adapter/VectorAdapter.h"
#include "knowhere/index/vector_index/helpers/IndexParameter.h"
#include "storage/disk/DiskIOReader.h"
#include "storage/disk/DiskIOWriter.h"
#include "storage/disk/DiskOperation.h"
......@@ -342,8 +342,7 @@ SegmentReader::LoadVectorIndex(const std::string& field_name, knowhere::VecIndex
// for some kinds index(IVF), read raw file
auto index_type = index_visitor->GetElement()->GetTypeName();
if (index_type == knowhere::IndexEnum::INDEX_FAISS_IVFFLAT || index_type == knowhere::IndexEnum::INDEX_NSG ||
index_type == knowhere::IndexEnum::INDEX_HNSW) {
if (engine::utils::RequireRawFile(index_type)) {
engine::BinaryDataPtr fixed_data;
auto status = segment_ptr_->GetFixedFieldData(field_name, fixed_data);
if (status.ok()) {
......@@ -355,9 +354,9 @@ SegmentReader::LoadVectorIndex(const std::string& field_name, knowhere::VecIndex
}
}
// for some kinds index(SQ8), read compress file
if (index_type == knowhere::IndexEnum::INDEX_RHNSWSQ) {
if (auto visitor = field_visitor->GetElementVisitor(engine::FieldElementType::FET_COMPRESS_SQ8)) {
// for some kinds index(RHNSWSQ), read compress file
if (engine::utils::RequireCompressFile(index_type)) {
if (auto visitor = field_visitor->GetElementVisitor(engine::FieldElementType::FET_COMPRESS)) {
auto file_path =
engine::snapshot::GetResPath<engine::snapshot::SegmentFile>(dir_collections_, visitor->GetFile());
ss_codec.GetVectorIndexFormat()->ReadCompress(fs_ptr_, file_path, compress_data);
......
......@@ -401,7 +401,7 @@ SegmentWriter::WriteVectorIndex(const std::string& field_name) {
// serialize compress file
{
auto element_visitor = field->GetElementVisitor(engine::FieldElementType::FET_COMPRESS_SQ8);
auto element_visitor = field->GetElementVisitor(engine::FieldElementType::FET_COMPRESS);
if (element_visitor && element_visitor->GetFile()) {
auto segment_file = element_visitor->GetFile();
std::string file_path =
......
......@@ -64,7 +64,7 @@ DescribeIndexReq::OnExecute() {
}
}
json_params_[engine::PARAM_INDEX_TYPE] = index.index_name_;
json_params_[engine::PARAM_INDEX_TYPE] = index.index_type_;
json_params_[engine::PARAM_INDEX_METRIC_TYPE] = index.metric_name_;
json_params_[engine::PARAM_INDEX_EXTRA_PARAMS] = index.extra_params_;
} catch (std::exception& ex) {
......
......@@ -32,7 +32,7 @@ constexpr int64_t NQ = 5;
constexpr int64_t TOP_K = 10;
constexpr int64_t NPROBE = 32;
constexpr int64_t SEARCH_TARGET = BATCH_ENTITY_COUNT / 2; // change this value, result is different
constexpr int64_t ADD_ENTITY_LOOP = 1;
constexpr int64_t ADD_ENTITY_LOOP = 10;
constexpr milvus::IndexType INDEX_TYPE = milvus::IndexType::IVFFLAT;
constexpr int32_t NLIST = 16384;
const char* PARTITION_TAG = "part";
......
......@@ -13,7 +13,7 @@ from utils import *
nb = 1
dim = 128
collection_id = "create_collection"
default_segment_row_count = 100000
default_segment_row_count = 512 * 1024
drop_collection_interval_time = 3
segment_row_count = 5000
default_fields = gen_default_fields()
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册