未验证 提交 d22ff25a 编写于 作者: G groot 提交者: GitHub

fix delete eneity bug (#3161)

Signed-off-by: Nyhmo <yihua.mo@zilliz.com>
上级 c73a58a1
......@@ -58,8 +58,8 @@ DeletedDocsFormat::Read(const storage::FSHandlerPtr& fs_ptr, const std::string&
size_t num_bytes;
fs_ptr->reader_ptr_->read(&num_bytes, sizeof(size_t));
auto deleted_docs_size = num_bytes / sizeof(segment::offset_t);
std::vector<segment::offset_t> deleted_docs_list;
auto deleted_docs_size = num_bytes / sizeof(engine::offset_t);
std::vector<engine::offset_t> deleted_docs_list;
deleted_docs_list.resize(deleted_docs_size);
fs_ptr->reader_ptr_->read(deleted_docs_list.data(), num_bytes);
......@@ -82,7 +82,7 @@ DeletedDocsFormat::Write(const storage::FSHandlerPtr& fs_ptr, const std::string&
// Write to the temp file, in order to avoid possible race condition with search (concurrent read and write)
size_t old_num_bytes;
std::vector<segment::offset_t> delete_ids;
std::vector<engine::offset_t> delete_ids;
if (exists) {
if (!fs_ptr->reader_ptr_->open(temp_path)) {
std::string err_msg = "Failed to read from file: " + temp_path; // + ", error: " + std::strerror(errno);
......@@ -98,7 +98,7 @@ DeletedDocsFormat::Write(const storage::FSHandlerPtr& fs_ptr, const std::string&
}
auto deleted_docs_list = deleted_docs->GetDeletedDocs();
size_t new_num_bytes = old_num_bytes + sizeof(segment::offset_t) * deleted_docs->GetCount();
size_t new_num_bytes = old_num_bytes + sizeof(engine::offset_t) * deleted_docs->GetCount();
if (!deleted_docs_list.empty()) {
delete_ids.insert(delete_ids.end(), deleted_docs_list.begin(), deleted_docs_list.end());
}
......@@ -129,7 +129,7 @@ DeletedDocsFormat::ReadSize(const storage::FSHandlerPtr& fs_ptr, const std::stri
size_t num_bytes;
fs_ptr->reader_ptr_->read(&num_bytes, sizeof(size_t));
size = num_bytes / sizeof(segment::offset_t);
size = num_bytes / sizeof(engine::offset_t);
fs_ptr->reader_ptr_->close();
}
......
......@@ -632,6 +632,16 @@ DBImpl::ListIDInSegment(const std::string& collection_name, int64_t segment_id,
STATUS_CHECK(segment_reader->LoadUids(entity_ids));
// remove delete id from the id list
segment::DeletedDocsPtr deleted_docs_ptr;
STATUS_CHECK(segment_reader->LoadDeletedDocs(deleted_docs_ptr));
if (deleted_docs_ptr) {
const std::vector<offset_t>& delete_ids = deleted_docs_ptr->GetDeletedDocs();
for (auto offset : delete_ids) {
entity_ids.erase(entity_ids.begin() + offset, entity_ids.begin() + offset + 1);
}
}
return Status::OK();
}
......
......@@ -29,6 +29,9 @@
namespace milvus {
namespace engine {
using id_t = int64_t;
using offset_t = int32_t;
using DateT = int;
using IDNumber = int64_t;
......@@ -74,7 +77,7 @@ class VaribleData : public cache::DataObj {
public:
int64_t
Size() {
return data_.size();
return data_.size() + offset_.size() * sizeof(int64_t);
}
public:
......
......@@ -226,8 +226,8 @@ ExecutionEngineImpl::CopyToGpu(uint64_t device_id) {
}
void
MapAndCopyResult(const knowhere::DatasetPtr& dataset, const std::vector<milvus::segment::doc_id_t>& uids, int64_t nq,
int64_t k, float* distances, int64_t* labels) {
MapAndCopyResult(const knowhere::DatasetPtr& dataset, const std::vector<id_t>& uids, int64_t nq, int64_t k,
float* distances, int64_t* labels) {
int64_t* res_ids = dataset->Get<int64_t*>(knowhere::meta::IDS);
float* res_dist = dataset->Get<float*>(knowhere::meta::DISTANCE);
......@@ -787,7 +787,7 @@ ExecutionEngineImpl::BuildKnowhereIndex(const std::string& field_name, const Col
}
LOG_ENGINE_DEBUG_ << "Index config: " << conf.dump();
std::vector<segment::doc_id_t> uids;
std::vector<id_t> uids;
faiss::ConcurrentBitsetPtr blacklist;
if (from_index) {
auto dataset =
......
......@@ -74,19 +74,19 @@ MemCollection::Add(int64_t partition_id, const milvus::engine::VectorSourcePtr&
}
Status
MemCollection::Delete(const std::vector<segment::doc_id_t>& doc_ids) {
MemCollection::Delete(std::vector<id_t>& ids) {
// Locate which collection file the doc id lands in
{
std::lock_guard<std::mutex> lock(mutex_);
for (auto& partition_segments : mem_segments_) {
MemSegmentList& segments = partition_segments.second;
for (auto& segment : segments) {
segment->Delete(doc_ids);
segment->Delete(ids);
}
}
}
// Add the id to delete list so it can be applied to other segments on disk during the next flush
for (auto& id : doc_ids) {
for (auto& id : ids) {
doc_ids_to_delete_.insert(id);
}
......@@ -179,7 +179,7 @@ MemCollection::ApplyDeletes() {
STATUS_CHECK(segment_reader->LoadBloomFilter(pre_bloom_filter));
// Step 1: Check delete_id in mem
std::vector<segment::doc_id_t> delete_ids;
std::vector<id_t> delete_ids;
for (auto& id : doc_ids_to_delete_) {
if (pre_bloom_filter->Check(id)) {
delete_ids.push_back(id);
......@@ -193,7 +193,7 @@ MemCollection::ApplyDeletes() {
// Step 2: Load previous delete_id and merge into 'delete_ids'
segment::DeletedDocsPtr prev_del_docs;
STATUS_CHECK(segment_reader->LoadDeletedDocs(prev_del_docs));
std::vector<segment::offset_t> pre_del_ids;
std::vector<engine::offset_t> pre_del_ids;
if (prev_del_docs) {
pre_del_ids = prev_del_docs->GetDeletedDocs();
if (!pre_del_ids.empty())
......@@ -206,7 +206,7 @@ MemCollection::ApplyDeletes() {
std::string collection_root_path = options_.meta_.path_ + COLLECTIONS_FOLDER;
std::sort(delete_ids.begin(), delete_ids.end());
std::set<segment::doc_id_t> ids_to_check(delete_ids.begin(), delete_ids.end());
std::set<id_t> ids_to_check(delete_ids.begin(), delete_ids.end());
// Step 3: Mark previous deleted docs file and bloom filter file stale
auto& field_visitors_map = seg_visitor->GetFieldVisitors();
......@@ -263,7 +263,7 @@ MemCollection::ApplyDeletes() {
segment::IdBloomFilterPtr bloom_filter;
STATUS_CHECK(segment_writer->CreateBloomFilter(bloom_filter_file_path, bloom_filter));
auto delete_docs = std::make_shared<segment::DeletedDocs>();
std::vector<segment::doc_id_t> uids;
std::vector<id_t> uids;
STATUS_CHECK(segment_reader->LoadUids(uids));
for (size_t i = 0; i < uids.size(); i++) {
if (std::binary_search(ids_to_check.begin(), ids_to_check.end(), uids[i])) {
......
......@@ -40,7 +40,7 @@ class MemCollection {
Add(int64_t partition_id, const VectorSourcePtr& source);
Status
Delete(const std::vector<segment::doc_id_t>& doc_ids);
Delete(std::vector<id_t>& ids);
Status
EraseMem(int64_t partition_id);
......@@ -73,7 +73,7 @@ class MemCollection {
std::mutex mutex_;
std::set<segment::doc_id_t> doc_ids_to_delete_;
std::set<id_t> doc_ids_to_delete_;
std::atomic<uint64_t> lsn_;
}; // SSMemCollection
......
......@@ -196,55 +196,25 @@ MemSegment::Add(const VectorSourcePtr& source) {
}
Status
MemSegment::Delete(segment::doc_id_t doc_id) {
MemSegment::Delete(std::vector<id_t>& ids) {
engine::SegmentPtr segment_ptr;
segment_writer_ptr_->GetSegment(segment_ptr);
// Check wither the doc_id is present, if yes, delete it's corresponding buffer
engine::BinaryDataPtr raw_data;
auto status = segment_ptr->GetFixedFieldData(engine::DEFAULT_UID_NAME, raw_data);
if (!status.ok()) {
return Status::OK();
}
int64_t* uids = reinterpret_cast<int64_t*>(raw_data->data_.data());
int64_t row_count = segment_ptr->GetRowCount();
for (int64_t i = 0; i < row_count; i++) {
if (doc_id == uids[i]) {
segment_ptr->DeleteEntity(i);
std::vector<id_t> uids;
segment_writer_ptr_->LoadUids(uids);
std::vector<offset_t> offsets;
for (auto id : ids) {
auto found = std::find(uids.begin(), uids.end(), id);
if (found == uids.end()) {
continue;
}
}
return Status::OK();
}
Status
MemSegment::Delete(const std::vector<segment::doc_id_t>& doc_ids) {
engine::SegmentPtr segment_ptr;
segment_writer_ptr_->GetSegment(segment_ptr);
// Check wither the doc_id is present, if yes, delete it's corresponding buffer
std::vector<segment::doc_id_t> temp;
temp.resize(doc_ids.size());
memcpy(temp.data(), doc_ids.data(), doc_ids.size() * sizeof(segment::doc_id_t));
std::sort(temp.begin(), temp.end());
engine::BinaryDataPtr raw_data;
auto status = segment_ptr->GetFixedFieldData(engine::DEFAULT_UID_NAME, raw_data);
if (!status.ok()) {
return Status::OK();
}
int64_t* uids = reinterpret_cast<int64_t*>(raw_data->data_.data());
int64_t row_count = segment_ptr->GetRowCount();
size_t deleted = 0;
for (int64_t i = 0; i < row_count; ++i) {
if (std::binary_search(temp.begin(), temp.end(), uids[i])) {
segment_ptr->DeleteEntity(i - deleted);
++deleted;
}
auto offset = std::distance(uids.begin(), found);
offsets.push_back(offset);
}
segment_ptr->DeleteEntity(offsets);
return Status::OK();
}
......
......@@ -36,10 +36,7 @@ class MemSegment {
Add(const VectorSourcePtr& source);
Status
Delete(segment::doc_id_t doc_id);
Status
Delete(const std::vector<segment::doc_id_t>& doc_ids);
Delete(std::vector<id_t>& ids);
int64_t
GetCurrentMem();
......
......@@ -20,15 +20,16 @@
namespace milvus {
namespace segment {
DeletedDocs::DeletedDocs(const std::vector<offset_t>& deleted_doc_offsets) : deleted_doc_offsets_(deleted_doc_offsets) {
DeletedDocs::DeletedDocs(const std::vector<engine::offset_t>& deleted_doc_offsets)
: deleted_doc_offsets_(deleted_doc_offsets) {
}
void
DeletedDocs::AddDeletedDoc(offset_t offset) {
DeletedDocs::AddDeletedDoc(engine::offset_t offset) {
deleted_doc_offsets_.emplace_back(offset);
}
const std::vector<offset_t>&
const std::vector<engine::offset_t>&
DeletedDocs::GetDeletedDocs() const {
return deleted_doc_offsets_;
}
......@@ -45,7 +46,7 @@ DeletedDocs::GetCount() const {
int64_t
DeletedDocs::Size() {
return deleted_doc_offsets_.size() * sizeof(offset_t);
return deleted_doc_offsets_.size() * sizeof(engine::offset_t);
}
} // namespace segment
......
......@@ -21,22 +21,21 @@
#include <vector>
#include "cache/DataObj.h"
#include "db/Types.h"
namespace milvus {
namespace segment {
using offset_t = int32_t;
class DeletedDocs : public cache::DataObj {
public:
explicit DeletedDocs(const std::vector<offset_t>& deleted_doc_offsets);
explicit DeletedDocs(const std::vector<engine::offset_t>& deleted_doc_offsets);
DeletedDocs() = default;
void
AddDeletedDoc(offset_t offset);
AddDeletedDoc(engine::offset_t offset);
const std::vector<offset_t>&
const std::vector<engine::offset_t>&
GetDeletedDocs() const;
// // TODO
......@@ -62,7 +61,7 @@ class DeletedDocs : public cache::DataObj {
operator=(DeletedDocs&&) = delete;
private:
std::vector<offset_t> deleted_doc_offsets_;
std::vector<engine::offset_t> deleted_doc_offsets_;
// faiss::ConcurrentBitsetPtr bitset_;
// const std::string name_ = "deleted_docs";
};
......
......@@ -41,14 +41,14 @@ IdBloomFilter::GetBloomFilter() {
}
bool
IdBloomFilter::Check(doc_id_t uid) {
IdBloomFilter::Check(id_t uid) {
std::string s = std::to_string(uid);
const std::lock_guard<std::mutex> lock(mutex_);
return scaling_bloom_check(bloom_filter_, s.c_str(), s.size());
}
Status
IdBloomFilter::Add(doc_id_t uid) {
IdBloomFilter::Add(id_t uid) {
std::string s = std::to_string(uid);
const std::lock_guard<std::mutex> lock(mutex_);
if (scaling_bloom_add(bloom_filter_, s.c_str(), s.size(), uid) == -1) {
......@@ -60,7 +60,7 @@ IdBloomFilter::Add(doc_id_t uid) {
}
Status
IdBloomFilter::Remove(doc_id_t uid) {
IdBloomFilter::Remove(id_t uid) {
std::string s = std::to_string(uid);
const std::lock_guard<std::mutex> lock(mutex_);
if (scaling_bloom_remove(bloom_filter_, s.c_str(), s.size(), uid) == -1) {
......
......@@ -27,8 +27,6 @@
namespace milvus {
namespace segment {
using doc_id_t = int64_t;
class IdBloomFilter : public cache::DataObj {
public:
explicit IdBloomFilter(scaling_bloom_t* bloom_filter);
......@@ -39,13 +37,13 @@ class IdBloomFilter : public cache::DataObj {
GetBloomFilter();
bool
Check(doc_id_t uid);
Check(id_t uid);
Status
Add(doc_id_t uid);
Add(id_t uid);
Status
Remove(doc_id_t uid);
Remove(id_t uid);
int64_t
Size() override;
......
......@@ -18,6 +18,8 @@
#include "segment/Segment.h"
#include "utils/Log.h"
#include <algorithm>
#include <functional>
#include <utility>
namespace milvus {
......@@ -140,24 +142,32 @@ Segment::AddChunk(const DataChunkPtr& chunk_ptr, int64_t from, int64_t to) {
}
Status
Segment::DeleteEntity(int64_t offset) {
if (offset > row_count_) {
return Status(DB_ERROR, "Invalid input");
}
Segment::DeleteEntity(std::vector<offset_t>& offsets) {
// sort offset in descendant
std::sort(offsets.begin(), offsets.end(), std::less<offset_t>());
// delete entity data
for (auto& pair : fixed_fields_) {
int64_t width = fixed_fields_width_[pair.first];
if (width != 0) {
auto step = offset * width;
BinaryDataPtr& data = pair.second;
if (data == nullptr) {
continue;
if (width == 0 || pair.second == nullptr) {
continue;
}
BinaryDataPtr& data = pair.second;
for (auto offset : offsets) {
if (offset >= 0 && offset < row_count_) {
auto step = offset * width;
data->data_.erase(data->data_.begin() + step, data->data_.begin() + step + width);
}
}
}
data->data_.erase(data->data_.begin() + step, data->data_.begin() + step + width);
// reset row count
for (auto offset : offsets) {
if (offset >= 0 && offset < row_count_) {
row_count_--;
}
}
row_count_--;
return Status::OK();
}
......
......@@ -43,7 +43,7 @@ class Segment {
AddChunk(const DataChunkPtr& chunk_ptr, int64_t from, int64_t to);
Status
DeleteEntity(int64_t offset);
DeleteEntity(std::vector<offset_t>& offsets);
Status
GetFieldType(const std::string& field_name, DataType& type);
......
......@@ -213,7 +213,7 @@ SegmentReader::LoadFieldsEntities(const std::vector<std::string>& fields_name, c
}
Status
SegmentReader::LoadUids(std::vector<int64_t>& uids) {
SegmentReader::LoadUids(std::vector<engine::id_t>& uids) {
engine::BinaryDataPtr raw;
auto status = LoadField(engine::DEFAULT_UID_NAME, raw);
if (!status.ok()) {
......@@ -221,14 +221,18 @@ SegmentReader::LoadUids(std::vector<int64_t>& uids) {
return status;
}
if (raw->data_.size() % sizeof(int64_t) != 0) {
if (raw == nullptr) {
return Status(DB_ERROR, "Failed to load id field");
}
if (raw->data_.size() % sizeof(engine::id_t) != 0) {
std::string err_msg = "Failed to load uids: illegal file size";
LOG_ENGINE_ERROR_ << err_msg;
return Status(DB_ERROR, err_msg);
}
uids.clear();
uids.resize(raw->data_.size() / sizeof(int64_t));
uids.resize(raw->data_.size() / sizeof(engine::id_t));
memcpy(uids.data(), raw->data_.data(), raw->data_.size());
return Status::OK();
......
......@@ -50,7 +50,7 @@ class SegmentReader {
engine::DataChunkPtr& data_chunk);
Status
LoadUids(std::vector<int64_t>& uids);
LoadUids(std::vector<engine::id_t>& uids);
Status
LoadVectorIndex(const std::string& field_name, knowhere::VecIndexPtr& index_ptr, bool flat = false);
......
......@@ -326,10 +326,8 @@ SegmentWriter::Merge(const SegmentReaderPtr& segment_reader) {
}
if (src_deleted_docs) {
const std::vector<offset_t>& delete_ids = src_deleted_docs->GetDeletedDocs();
for (auto offset : delete_ids) {
src_segment->DeleteEntity(offset);
}
std::vector<engine::offset_t> delete_ids = src_deleted_docs->GetDeletedDocs();
src_segment->DeleteEntity(delete_ids);
}
// merge filed raw data
......@@ -360,6 +358,32 @@ SegmentWriter::RowCount() {
return segment_ptr_->GetRowCount();
}
Status
SegmentWriter::LoadUids(std::vector<engine::id_t>& uids) {
engine::BinaryDataPtr raw;
auto status = segment_ptr_->GetFixedFieldData(engine::DEFAULT_UID_NAME, raw);
if (!status.ok()) {
LOG_ENGINE_ERROR_ << status.message();
return status;
}
if (raw == nullptr) {
return Status(DB_ERROR, "Invalid id field");
}
if (raw->data_.size() % sizeof(engine::id_t) != 0) {
std::string err_msg = "Failed to load uids: illegal file size";
LOG_ENGINE_ERROR_ << err_msg;
return Status(DB_ERROR, err_msg);
}
uids.clear();
uids.resize(raw->data_.size() / sizeof(engine::id_t));
memcpy(uids.data(), raw->data_.data(), raw->data_.size());
return Status::OK();
}
Status
SegmentWriter::SetVectorIndex(const std::string& field_name, const milvus::knowhere::VecIndexPtr& index) {
return segment_ptr_->SetVectorIndex(field_name, index);
......
......@@ -60,6 +60,9 @@ class SegmentWriter {
size_t
RowCount();
Status
LoadUids(std::vector<engine::id_t>& uids);
Status
SetVectorIndex(const std::string& field_name, const knowhere::VecIndexPtr& index);
......
......@@ -630,7 +630,7 @@ TEST_F(DBTest, CompactTest) {
auto status = CreateCollection2(db_, collection_name, 0);
ASSERT_TRUE(status.ok());
const uint64_t entity_count = 10000;
const uint64_t entity_count = 1000;
milvus::engine::DataChunkPtr data_chunk;
BuildEntities(entity_count, 0, data_chunk);
......@@ -640,29 +640,61 @@ TEST_F(DBTest, CompactTest) {
status = db_->Flush();
ASSERT_TRUE(status.ok());
milvus::engine::IDNumbers entity_ids;
milvus::engine::utils::GetIDFromChunk(data_chunk, entity_ids);
ASSERT_EQ(entity_ids.size(), entity_count);
milvus::engine::IDNumbers batch_entity_ids;
milvus::engine::utils::GetIDFromChunk(data_chunk, batch_entity_ids);
ASSERT_EQ(batch_entity_ids.size(), entity_count);
int64_t delete_count = 10;
entity_ids.resize(delete_count);
status = db_->DeleteEntityByID(collection_name, entity_ids);
auto validate_entity_data = [&]() -> void {
std::vector<std::string> field_names = {"field_0"};
std::vector<bool> valid_row;
milvus::engine::DataChunkPtr fetch_chunk;
status = db_->GetEntityByID(collection_name, batch_entity_ids, field_names, valid_row, fetch_chunk);
ASSERT_TRUE(status.ok());
ASSERT_EQ(valid_row.size(), batch_entity_ids.size());
auto& chunk = fetch_chunk->fixed_fields_["field_0"];
int32_t* p = (int32_t*)(chunk->data_.data());
int64_t index = 0;
for (uint64_t i = 0; i < valid_row.size(); ++i) {
if (!valid_row[i]) {
continue;
}
ASSERT_EQ(p[index++], i);
}
};
validate_entity_data();
int64_t delete_count = 100;
int64_t gap = entity_count / delete_count - 1;
std::vector<milvus::engine::id_t> delete_ids;
for (auto i = 1; i <= delete_count; ++i) {
delete_ids.push_back(batch_entity_ids[i * gap]);
}
status = db_->DeleteEntityByID(collection_name, delete_ids);
ASSERT_TRUE(status.ok());
status = db_->Flush();
ASSERT_TRUE(status.ok());
int64_t row_count = 0;
status = db_->CountEntities(collection_name, row_count);
ASSERT_TRUE(status.ok());
ASSERT_EQ(row_count, entity_count - delete_count);
auto validate_compact = [&](double threshold) -> void {
int64_t row_count = 0;
status = db_->CountEntities(collection_name, row_count);
ASSERT_TRUE(status.ok());
ASSERT_EQ(row_count, entity_count - delete_count);
status = db_->Compact(dummy_context_, collection_name);
ASSERT_TRUE(status.ok());
status = db_->Compact(dummy_context_, collection_name, threshold);
ASSERT_TRUE(status.ok());
status = db_->CountEntities(collection_name, row_count);
ASSERT_TRUE(status.ok());
ASSERT_EQ(row_count, entity_count - delete_count);
validate_entity_data();
status = db_->CountEntities(collection_name, row_count);
ASSERT_TRUE(status.ok());
ASSERT_EQ(row_count, entity_count - delete_count);
validate_entity_data();
};
validate_compact(0.001); // compact skip
validate_compact(0.5); // do compact
}
TEST_F(DBTest, IndexTest) {
......@@ -903,11 +935,11 @@ TEST_F(DBTest, FetchTest) {
std::cout << status.message() << std::endl;
ASSERT_TRUE(status.ok());
// if (tag == partition_name) {
// ASSERT_EQ(segment_entity_ids.size(), batch_entity_ids.size() - delete_entity_ids.size());
// } else {
if (tag == partition_name) {
ASSERT_EQ(segment_entity_ids.size(), batch_entity_ids.size() - delete_entity_ids.size());
} else {
ASSERT_EQ(segment_entity_ids.size(), batch_entity_ids.size());
// }
}
}
}
......
......@@ -94,7 +94,7 @@ TEST_F(SegmentTest, SegmentTest) {
break;
}
std::vector<milvus::segment::doc_id_t> raw_uids = {123};
std::vector<milvus::engine::id_t> raw_uids = {123};
std::vector<uint8_t> raw_vectors = {1, 2, 3, 4};
{
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册