未验证 提交 6c826c13 编写于 作者: G groot 提交者: GitHub

#1648 The cache cannot be used all when the type is binary (#1648)

* #1648 The cache cannot be used all when the type is binary
Signed-off-by: Ngroot <yihua.mo@zilliz.com>

* #1646 The cache cannot be used all when the type is binary
Signed-off-by: Ngroot <yihua.mo@zilliz.com>

* #1646 The cache cannot be used all when the type is binary
Signed-off-by: Ngroot <yihua.mo@zilliz.com>
上级 1b60e3bc
......@@ -8,6 +8,7 @@ Please mark all change in change log and use the issue from GitHub
- \#1301 Data in WAL may be accidentally inserted into a new table with the same name.
- \#1634 Fix search demo bug in HTTP doc
- \#1635 Vectors can be returned by searching after vectors deleted if `cache_insert_data` set true
- \#1648 The cache cannot be used all when the type is binary
## Feature
- \#1603 BinaryFlat add 2 Metric: Substructure and Superstructure
......
......@@ -381,22 +381,23 @@ DBImpl::PreloadTable(const std::string& table_id) {
return Status(DB_ERROR, "Invalid engine type");
}
size += engine->PhysicalSize();
fiu_do_on("DBImpl.PreloadTable.exceed_cache", size = available_size + 1);
if (size > available_size) {
ENGINE_LOG_DEBUG << "Pre-load cancelled since cache is almost full";
return Status(SERVER_CACHE_FULL, "Cache is full");
} else {
try {
fiu_do_on("DBImpl.PreloadTable.engine_throw_exception", throw std::exception());
std::string msg = "Pre-loaded file: " + file.file_id_ + " size: " + std::to_string(file.file_size_);
TimeRecorderAuto rc_1(msg);
engine->Load(true);
} catch (std::exception& ex) {
std::string msg = "Pre-load table encounter exception: " + std::string(ex.what());
ENGINE_LOG_ERROR << msg;
return Status(DB_ERROR, msg);
try {
fiu_do_on("DBImpl.PreloadTable.engine_throw_exception", throw std::exception());
std::string msg = "Pre-loaded file: " + file.file_id_ + " size: " + std::to_string(file.file_size_);
TimeRecorderAuto rc_1(msg);
engine->Load(true);
size += engine->Size();
if (size > available_size) {
ENGINE_LOG_DEBUG << "Pre-load cancelled since cache is almost full";
return Status(SERVER_CACHE_FULL, "Cache is full");
}
} catch (std::exception& ex) {
std::string msg = "Pre-load table encounter exception: " + std::string(ex.what());
ENGINE_LOG_ERROR << msg;
return Status(DB_ERROR, msg);
}
}
......
......@@ -60,14 +60,11 @@ class ExecutionEngine {
virtual size_t
Count() const = 0;
virtual size_t
Size() const = 0;
virtual size_t
Dimension() const = 0;
virtual size_t
PhysicalSize() const = 0;
Size() const = 0;
virtual Status
Serialize() = 0;
......
......@@ -330,15 +330,6 @@ ExecutionEngineImpl::Count() const {
return index_->Count();
}
size_t
ExecutionEngineImpl::Size() const {
if (IsBinaryIndexType(index_->GetType())) {
return (size_t)(Count() * Dimension() / 8);
} else {
return (size_t)(Count() * Dimension()) * sizeof(float);
}
}
size_t
ExecutionEngineImpl::Dimension() const {
if (index_ == nullptr) {
......@@ -349,8 +340,12 @@ ExecutionEngineImpl::Dimension() const {
}
size_t
ExecutionEngineImpl::PhysicalSize() const {
return server::CommonUtil::GetFileSize(location_);
ExecutionEngineImpl::Size() const {
if (index_ == nullptr) {
ENGINE_LOG_ERROR << "ExecutionEngineImpl: index is null, return size 0";
return 0;
}
return index_->Size();
}
Status
......@@ -359,7 +354,7 @@ ExecutionEngineImpl::Serialize() {
// here we reset index size by file size,
// since some index type(such as SQ8) data size become smaller after serialized
index_->set_size(PhysicalSize());
index_->set_size(server::CommonUtil::GetFileSize(location_));
ENGINE_LOG_DEBUG << "Finish serialize index file: " << location_ << " size: " << index_->Size();
if (index_->Size() == 0) {
......@@ -370,36 +365,6 @@ ExecutionEngineImpl::Serialize() {
return status;
}
/*
Status
ExecutionEngineImpl::Load(bool to_cache) {
index_ = std::static_pointer_cast<VecIndex>(cache::CpuCacheMgr::GetInstance()->GetIndex(location_));
bool already_in_cache = (index_ != nullptr);
if (!already_in_cache) {
try {
double physical_size = PhysicalSize();
server::CollectExecutionEngineMetrics metrics(physical_size);
index_ = read_index(location_);
if (index_ == nullptr) {
std::string msg = "Failed to load index from " + location_;
ENGINE_LOG_ERROR << msg;
return Status(DB_ERROR, msg);
} else {
ENGINE_LOG_DEBUG << "Disk io from: " << location_;
}
} catch (std::exception& e) {
ENGINE_LOG_ERROR << e.what();
return Status(DB_ERROR, e.what());
}
}
if (!already_in_cache && to_cache) {
Cache();
}
return Status::OK();
}
*/
Status
ExecutionEngineImpl::Load(bool to_cache) {
// TODO(zhiru): refactor
......@@ -460,10 +425,6 @@ ExecutionEngineImpl::Load(bool to_cache) {
status = std::static_pointer_cast<BFIndex>(index_)->AddWithoutIds(vectors->GetCount(),
float_vectors.data(), Config());
status = std::static_pointer_cast<BFIndex>(index_)->SetBlacklist(concurrent_bitset_ptr);
int64_t index_size = vectors->GetCount() * dim_ * sizeof(float);
int64_t bitset_size = vectors->GetCount() / 8;
index_->set_size(index_size + bitset_size);
} else if (index_type_ == EngineType::FAISS_BIN_IDMAP) {
ec = std::static_pointer_cast<BinBFIndex>(index_)->Build(conf);
if (ec != KNOWHERE_SUCCESS) {
......@@ -472,11 +433,12 @@ ExecutionEngineImpl::Load(bool to_cache) {
status = std::static_pointer_cast<BinBFIndex>(index_)->AddWithoutIds(vectors->GetCount(),
vectors_data.data(), Config());
status = std::static_pointer_cast<BinBFIndex>(index_)->SetBlacklist(concurrent_bitset_ptr);
int64_t index_size = vectors->GetCount() * dim_ * sizeof(uint8_t);
int64_t bitset_size = vectors->GetCount() / 8;
index_->set_size(index_size + bitset_size);
}
int64_t index_size = vectors->Size(); // vector data size + vector ids size
int64_t bitset_size = vectors->GetCount(); // delete list size
index_->set_size(index_size + bitset_size);
if (!status.ok()) {
return status;
}
......@@ -485,8 +447,8 @@ ExecutionEngineImpl::Load(bool to_cache) {
} else {
try {
double physical_size = PhysicalSize();
server::CollectExecutionEngineMetrics metrics(physical_size);
// size_t physical_size = PhysicalSize();
// server::CollectExecutionEngineMetrics metrics((double)physical_size);
index_ = read_index(location_);
if (index_ == nullptr) {
......@@ -518,6 +480,10 @@ ExecutionEngineImpl::Load(bool to_cache) {
index_->SetUids(uids);
ENGINE_LOG_DEBUG << "set uids " << index_->GetUids().size() << " for index " << location_;
int64_t index_size = index_->Size(); // vector data size + vector ids size
int64_t bitset_size = index_->Count(); // delete list size
index_->set_size(index_size + bitset_size);
ENGINE_LOG_DEBUG << "Finished loading index file from segment " << segment_dir;
}
} catch (std::exception& e) {
......@@ -619,10 +585,12 @@ Status
ExecutionEngineImpl::CopyToIndexFileToGpu(uint64_t device_id) {
#ifdef MILVUS_GPU_VERSION
// the ToIndexData is only a placeholder, cpu-copy-to-gpu action is performed in
gpu_num_ = device_id;
auto to_index_data = std::make_shared<ToIndexData>(PhysicalSize());
cache::DataObjPtr obj = std::static_pointer_cast<cache::DataObj>(to_index_data);
milvus::cache::GpuCacheMgr::GetInstance(device_id)->InsertItem(location_ + "_placeholder", obj);
if (index_) {
gpu_num_ = device_id;
auto to_index_data = std::make_shared<ToIndexData>(index_->Size());
cache::DataObjPtr obj = std::static_pointer_cast<cache::DataObj>(to_index_data);
milvus::cache::GpuCacheMgr::GetInstance(device_id)->InsertItem(location_ + "_placeholder", obj);
}
#endif
return Status::OK();
}
......@@ -765,7 +733,7 @@ ExecutionEngineImpl::BuildIndex(const std::string& location, EngineType engine_t
throw Exception(DB_ERROR, status.message());
}
ENGINE_LOG_DEBUG << "Finish build index file: " << location << " size: " << to_index->Size();
ENGINE_LOG_DEBUG << "Finish build index: " << location;
return std::make_shared<ExecutionEngineImpl>(to_index, location, engine_type, metric_type_, index_params_);
}
......
......@@ -41,14 +41,11 @@ class ExecutionEngineImpl : public ExecutionEngine {
size_t
Count() const override;
size_t
Size() const override;
size_t
Dimension() const override;
size_t
PhysicalSize() const override;
Size() const override;
Status
Serialize() override;
......
......@@ -22,6 +22,7 @@
#include "db/engine/EngineFactory.h"
#include "metrics/Metrics.h"
#include "scheduler/job/BuildIndexJob.h"
#include "utils/CommonUtil.h"
#include "utils/Exception.h"
#include "utils/Log.h"
#include "utils/TimeRecorder.h"
......@@ -95,7 +96,7 @@ XBuildIndexTask::Load(milvus::scheduler::LoadType type, uint8_t device_id) {
return;
}
size_t file_size = to_index_engine_->PhysicalSize();
size_t file_size = to_index_engine_->Size();
std::string info = "Build index task load file id:" + std::to_string(file_->id_) + " " + type_str +
" file type:" + std::to_string(file_->file_type_) + " size:" + std::to_string(file_size) +
......@@ -207,7 +208,7 @@ XBuildIndexTask::Execute() {
// step 6: update meta
table_file.file_type_ = engine::meta::TableFileSchema::INDEX;
table_file.file_size_ = index->PhysicalSize();
table_file.file_size_ = server::CommonUtil::GetFileSize(table_file.location_);
table_file.row_count_ = file_->row_count_; // index->Count();
auto origin_file = *file_;
......@@ -221,7 +222,7 @@ XBuildIndexTask::Execute() {
fiu_do_on("XBuildIndexTask.Execute.update_table_file_fail", status = Status(SERVER_UNEXPECTED_ERROR, ""));
if (status.ok()) {
ENGINE_LOG_DEBUG << "New index file " << table_file.file_id_ << " of size " << index->PhysicalSize()
ENGINE_LOG_DEBUG << "New index file " << table_file.file_id_ << " of size " << table_file.file_size_
<< " bytes"
<< " from file " << origin_file.file_id_;
if (build_index_job->options().insert_cache_immediately_) {
......
......@@ -25,6 +25,7 @@
#include "scheduler/SchedInst.h"
#include "scheduler/job/SearchJob.h"
#include "segment/SegmentReader.h"
#include "utils/CommonUtil.h"
#include "utils/Log.h"
#include "utils/TimeRecorder.h"
#include "utils/ValidationUtil.h"
......@@ -181,7 +182,7 @@ XSearchTask::Load(LoadType type, uint8_t device_id) {
return;
}
size_t file_size = index_engine_->PhysicalSize();
size_t file_size = index_engine_->Size();
std::string info = "Search task load file id:" + std::to_string(file_->id_) + " " + type_str +
" file type:" + std::to_string(file_->file_type_) + " size:" + std::to_string(file_size) +
......
......@@ -50,10 +50,7 @@ namespace engine {
int64_t
VecIndex::Size() {
if (size_ != 0) {
return size_;
}
return Count() * Dimension() * sizeof(float);
return size_;
}
void
......
......@@ -36,6 +36,8 @@ class LessItemCacheMgr : public milvus::cache::CacheMgr<milvus::cache::DataObjPt
class MockVecIndex : public milvus::engine::VecIndex {
public:
MockVecIndex(int64_t dim, int64_t total) : dimension_(dim), ntotal_(total) {
int64_t data_size = Dimension() * Count() * sizeof(float);
set_size(data_size);
}
virtual milvus::Status
......
......@@ -22,32 +22,12 @@
namespace {
const char* COLLECTION_NAME = milvus_sdk::Utils::GenCollectionName().c_str();
constexpr int64_t COLLECTION_DIMENSION = 512;
constexpr int64_t COLLECTION_INDEX_FILE_SIZE = 128;
constexpr milvus::MetricType COLLECTION_METRIC_TYPE = milvus::MetricType::TANIMOTO;
constexpr int64_t BATCH_ENTITY_COUNT = 100000;
constexpr int64_t NQ = 5;
constexpr int64_t TOP_K = 10;
constexpr int64_t NPROBE = 32;
constexpr int64_t SEARCH_TARGET = 5000; // change this value, result is different, ensure less than BATCH_ENTITY_COUNT
constexpr int64_t ADD_ENTITY_LOOP = 20;
constexpr milvus::IndexType INDEX_TYPE = milvus::IndexType::IVFFLAT;
milvus::CollectionParam
BuildCollectionParam() {
milvus::CollectionParam
collection_param = {COLLECTION_NAME, COLLECTION_DIMENSION, COLLECTION_INDEX_FILE_SIZE, COLLECTION_METRIC_TYPE};
return collection_param;
}
milvus::IndexParam
BuildIndexParam() {
JSON json_params = {{"nlist", 1024}};
milvus::IndexParam index_param = {COLLECTION_NAME, INDEX_TYPE, json_params.dump()};
return index_param;
}
constexpr int64_t ADD_ENTITY_LOOP = 10;
void
BuildBinaryVectors(int64_t from, int64_t to, std::vector<milvus::Entity>& entity_array,
......@@ -59,7 +39,7 @@ BuildBinaryVectors(int64_t from, int64_t to, std::vector<milvus::Entity>& entity
entity_array.clear();
entity_ids.clear();
int64_t dim_byte = dimension/8;
int64_t dim_byte = dimension / 8;
for (int64_t k = from; k < to; k++) {
milvus::Entity entity;
entity.binary_data.resize(dim_byte);
......@@ -72,29 +52,16 @@ BuildBinaryVectors(int64_t from, int64_t to, std::vector<milvus::Entity>& entity
}
}
} // namespace
void
ClientTest::Test(const std::string& address, const std::string& port) {
std::shared_ptr<milvus::Connection> conn = milvus::Connection::Create();
TestProcess(std::shared_ptr<milvus::Connection> connection,
const milvus::CollectionParam& collection_param,
const milvus::IndexParam& index_param) {
milvus::Status stat;
{ // connect server
milvus::ConnectParam param = {address, port};
stat = conn->Connect(param);
std::cout << "Connect function call status: " << stat.message() << std::endl;
}
{ // create collection
milvus::CollectionParam collection_param = BuildCollectionParam();
stat = conn->CreateCollection(collection_param);
stat = connection->CreateCollection(collection_param);
std::cout << "CreateCollection function call status: " << stat.message() << std::endl;
milvus_sdk::Utils::PrintCollectionParam(collection_param);
bool has_collection = conn->HasCollection(collection_param.collection_name);
if (has_collection) {
std::cout << "Collection is created" << std::endl;
}
}
std::vector<std::pair<int64_t, milvus::Entity>> search_entity_array;
......@@ -109,7 +76,7 @@ ClientTest::Test(const std::string& address, const std::string& port) {
begin_index + BATCH_ENTITY_COUNT,
entity_array,
entity_ids,
COLLECTION_DIMENSION);
collection_param.dimension);
}
if (search_entity_array.size() < NQ) {
......@@ -118,49 +85,122 @@ ClientTest::Test(const std::string& address, const std::string& port) {
std::string title = "Insert " + std::to_string(entity_array.size()) + " entities No." + std::to_string(i);
milvus_sdk::TimeRecorder rc(title);
stat = conn->Insert(COLLECTION_NAME, "", entity_array, entity_ids);
stat = connection->Insert(collection_param.collection_name, "", entity_array, entity_ids);
std::cout << "Insert function call status: " << stat.message() << std::endl;
std::cout << "Returned id array count: " << entity_ids.size() << std::endl;
}
}
{ // flush buffer
stat = conn->FlushCollection(COLLECTION_NAME);
stat = connection->FlushCollection(collection_param.collection_name);
std::cout << "FlushCollection function call status: " << stat.message() << std::endl;
}
{ // search vectors
std::vector<std::string> partition_tags;
milvus::TopKQueryResult topk_query_result;
milvus_sdk::Utils::DoSearch(conn, COLLECTION_NAME, partition_tags, TOP_K, NPROBE, search_entity_array,
milvus_sdk::Utils::DoSearch(connection,
collection_param.collection_name,
partition_tags,
TOP_K,
NPROBE,
search_entity_array,
topk_query_result);
}
{ // wait unit build index finish
milvus_sdk::TimeRecorder rc("Create index");
std::cout << "Wait until create all index done" << std::endl;
milvus::IndexParam index1 = BuildIndexParam();
milvus_sdk::Utils::PrintIndexParam(index1);
stat = conn->CreateIndex(index1);
milvus_sdk::Utils::PrintIndexParam(index_param);
stat = connection->CreateIndex(index_param);
std::cout << "CreateIndex function call status: " << stat.message() << std::endl;
milvus::IndexParam index2;
stat = conn->DescribeIndex(COLLECTION_NAME, index2);
std::cout << "DescribeIndex function call status: " << stat.message() << std::endl;
milvus_sdk::Utils::PrintIndexParam(index2);
}
{ // search vectors
std::vector<std::string> partition_tags;
milvus::TopKQueryResult topk_query_result;
milvus_sdk::Utils::DoSearch(conn, COLLECTION_NAME, partition_tags, TOP_K, NPROBE, search_entity_array,
milvus_sdk::Utils::DoSearch(connection,
collection_param.collection_name,
partition_tags,
TOP_K,
NPROBE,
search_entity_array,
topk_query_result);
}
{ // drop collection
stat = conn->DropCollection(COLLECTION_NAME);
stat = connection->DropCollection(collection_param.collection_name);
std::cout << "DropCollection function call status: " << stat.message() << std::endl;
}
}
} // namespace
void
ClientTest::Test(const std::string& address, const std::string& port) {
std::shared_ptr<milvus::Connection> connection = milvus::Connection::Create();
{ // connect server
milvus::ConnectParam param = {address, port};
auto stat = connection->Connect(param);
std::cout << "Connect function call status: " << stat.message() << std::endl;
if (!stat.ok()) {
return;
}
}
{
milvus::CollectionParam collection_param = {
"collection_1",
512,
256,
milvus::MetricType::TANIMOTO
};
JSON json_params = {{"nlist", 1024}};
milvus::IndexParam index_param = {
collection_param.collection_name,
milvus::IndexType::IVFFLAT,
json_params.dump()
};
TestProcess(connection, collection_param, index_param);
}
{
milvus::CollectionParam collection_param = {
"collection_2",
256,
512,
milvus::MetricType::SUBSTRUCTURE
};
JSON json_params = {{"nlist", 2048}};
milvus::IndexParam index_param = {
collection_param.collection_name,
milvus::IndexType::FLAT,
json_params.dump()
};
TestProcess(connection, collection_param, index_param);
}
{
milvus::CollectionParam collection_param = {
"collection_3",
128,
1024,
milvus::MetricType::SUPERSTRUCTURE
};
JSON json_params = {{"nlist", 4092}};
milvus::IndexParam index_param = {
collection_param.collection_name,
milvus::IndexType::FLAT,
json_params.dump()
};
TestProcess(connection, collection_param, index_param);
}
milvus::Connection::Destroy(conn);
milvus::Connection::Destroy(connection);
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册