未验证 提交 2fe4cc10 编写于 作者: X XuPeng-SH 提交者: GitHub

Snapshot code optimization 1 (#2440)

* (db/snapshot): add merge opration test
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): add UpdatedOnField
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): add Utils.h
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): refactor Snapshot
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): refactor snapshot 2
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): refactor snapshot 3
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): refactor snapshot 4
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): refactor snapshot 5
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>

* (db/snapshot): refactor snapshot 6
Signed-off-by: Npeng.xu <peng.xu@zilliz.com>
上级 2264aab0
......@@ -18,52 +18,61 @@ namespace milvus {
namespace engine {
namespace snapshot {
Collection::Collection(const std::string& name, ID_TYPE id, State status, TS_TYPE created_on)
: NameField(name), IdField(id), StatusField(status), CreatedOnField(created_on) {
Collection::Collection(const std::string& name, ID_TYPE id, State status, TS_TYPE created_on, TS_TYPE updated_on)
: NameField(name), IdField(id), StatusField(status), CreatedOnField(created_on), UpdatedOnField(updated_on) {
}
SchemaCommit::SchemaCommit(ID_TYPE collection_id, const MappingT& mappings, ID_TYPE id, State status,
TS_TYPE created_on)
TS_TYPE created_on, TS_TYPE updated_on)
: CollectionIdField(collection_id),
MappingsField(mappings),
IdField(id),
StatusField(status),
CreatedOnField(created_on) {
CreatedOnField(created_on),
UpdatedOnField(updated_on) {
}
FieldCommit::FieldCommit(ID_TYPE collection_id, ID_TYPE field_id, const MappingT& mappings, ID_TYPE id, State status,
TS_TYPE created_on)
TS_TYPE created_on, TS_TYPE updated_on)
: CollectionIdField(collection_id),
FieldIdField(field_id),
MappingsField(mappings),
IdField(id),
StatusField(status),
CreatedOnField(created_on) {
CreatedOnField(created_on),
UpdatedOnField(updated_on) {
}
Field::Field(const std::string& name, NUM_TYPE num, ID_TYPE id, State status, TS_TYPE created_on)
: NameField(name), NumField(num), IdField(id), StatusField(status), CreatedOnField(created_on) {
Field::Field(const std::string& name, NUM_TYPE num, ID_TYPE id, State status, TS_TYPE created_on, TS_TYPE updated_on)
: NameField(name),
NumField(num),
IdField(id),
StatusField(status),
CreatedOnField(created_on),
UpdatedOnField(updated_on_) {
}
FieldElement::FieldElement(ID_TYPE collection_id, ID_TYPE field_id, const std::string& name, FTYPE_TYPE ftype,
ID_TYPE id, State status, TS_TYPE created_on)
ID_TYPE id, State status, TS_TYPE created_on, TS_TYPE updated_on)
: CollectionIdField(collection_id),
FieldIdField(field_id),
NameField(name),
FtypeField(ftype),
IdField(id),
StatusField(status),
CreatedOnField(created_on) {
CreatedOnField(created_on),
UpdatedOnField(updated_on) {
}
CollectionCommit::CollectionCommit(ID_TYPE collection_id, ID_TYPE schema_id, const MappingT& mappings, ID_TYPE id,
State status, TS_TYPE created_on)
State status, TS_TYPE created_on, TS_TYPE updated_on)
: CollectionIdField(collection_id),
SchemaIdField(schema_id),
MappingsField(mappings),
IdField(id),
StatusField(status),
CreatedOnField(created_on) {
CreatedOnField(created_on),
UpdatedOnField(updated_on) {
}
/* std::string CollectionCommit::ToString() const { */
......@@ -80,18 +89,25 @@ CollectionCommit::CollectionCommit(ID_TYPE collection_id, ID_TYPE schema_id, con
/* return ss.str(); */
/* } */
Partition::Partition(const std::string& name, ID_TYPE collection_id, ID_TYPE id, State status, TS_TYPE created_on)
: NameField(name), CollectionIdField(collection_id), IdField(id), StatusField(status), CreatedOnField(created_on) {
Partition::Partition(const std::string& name, ID_TYPE collection_id, ID_TYPE id, State status, TS_TYPE created_on,
TS_TYPE updated_on)
: NameField(name),
CollectionIdField(collection_id),
IdField(id),
StatusField(status),
CreatedOnField(created_on),
UpdatedOnField(updated_on) {
}
PartitionCommit::PartitionCommit(ID_TYPE collection_id, ID_TYPE partition_id, const MappingT& mappings, ID_TYPE id,
State status, TS_TYPE created_on)
State status, TS_TYPE created_on, TS_TYPE updated_on)
: CollectionIdField(collection_id),
PartitionIdField(partition_id),
MappingsField(mappings),
IdField(id),
StatusField(status),
CreatedOnField(created_on) {
CreatedOnField(created_on),
UpdatedOnField(updated_on) {
}
std::string
......@@ -107,8 +123,13 @@ PartitionCommit::ToString() const {
return ss.str();
}
Segment::Segment(ID_TYPE partition_id, ID_TYPE num, ID_TYPE id, State status, TS_TYPE created_on)
: PartitionIdField(partition_id), NumField(num), IdField(id), StatusField(status), CreatedOnField(created_on) {
Segment::Segment(ID_TYPE partition_id, ID_TYPE num, ID_TYPE id, State status, TS_TYPE created_on, TS_TYPE updated_on)
: PartitionIdField(partition_id),
NumField(num),
IdField(id),
StatusField(status),
CreatedOnField(created_on),
UpdatedOnField(updated_on) {
}
std::string
......@@ -123,14 +144,15 @@ Segment::ToString() const {
}
SegmentCommit::SegmentCommit(ID_TYPE schema_id, ID_TYPE partition_id, ID_TYPE segment_id, const MappingT& mappings,
ID_TYPE id, State status, TS_TYPE created_on)
ID_TYPE id, State status, TS_TYPE created_on, TS_TYPE updated_on)
: SchemaIdField(schema_id),
PartitionIdField(partition_id),
SegmentIdField(segment_id),
MappingsField(mappings),
IdField(id),
StatusField(status),
CreatedOnField(created_on) {
CreatedOnField(created_on),
UpdatedOnField(updated_on) {
}
std::string
......@@ -145,13 +167,14 @@ SegmentCommit::ToString() const {
}
SegmentFile::SegmentFile(ID_TYPE partition_id, ID_TYPE segment_id, ID_TYPE field_element_id, ID_TYPE id, State status,
TS_TYPE created_on)
TS_TYPE created_on, TS_TYPE updated_on)
: PartitionIdField(partition_id),
SegmentIdField(segment_id),
FieldElementIdField(field_element_id),
IdField(id),
StatusField(status),
CreatedOnField(created_on) {
CreatedOnField(created_on),
UpdatedOnField(updated_on) {
}
} // namespace snapshot
......
......@@ -99,6 +99,19 @@ class CreatedOnField {
TS_TYPE created_on_;
};
class UpdatedOnField {
public:
explicit UpdatedOnField(TS_TYPE updated_on = GetMicroSecTimeStamp()) : updated_on_(updated_on) {
}
TS_TYPE
GetUpdatedTime() const {
return updated_on_;
}
protected:
TS_TYPE updated_on_;
};
class IdField {
public:
explicit IdField(ID_TYPE id) : id_(id) {
......@@ -249,15 +262,17 @@ class Collection : public DBBaseResource<>,
public NameField,
public IdField,
public StatusField,
public CreatedOnField {
public CreatedOnField,
public UpdatedOnField {
public:
using Ptr = std::shared_ptr<Collection>;
using MapT = std::map<ID_TYPE, Ptr>;
using ScopedMapT = std::map<ID_TYPE, ScopedResource<Collection>>;
using VecT = std::vector<Ptr>;
static constexpr const char* Name = "Collection";
Collection(const std::string& name, ID_TYPE id = 0, State status = PENDING,
TS_TYPE created_on = GetMicroSecTimeStamp());
TS_TYPE created_on = GetMicroSecTimeStamp(), TS_TYPE UpdatedOnField = GetMicroSecTimeStamp());
};
using CollectionPtr = Collection::Ptr;
......@@ -267,15 +282,17 @@ class SchemaCommit : public DBBaseResource<>,
public MappingsField,
public IdField,
public StatusField,
public CreatedOnField {
public CreatedOnField,
public UpdatedOnField {
public:
using Ptr = std::shared_ptr<SchemaCommit>;
using MapT = std::map<ID_TYPE, Ptr>;
using ScopedMapT = std::map<ID_TYPE, ScopedResource<SchemaCommit>>;
using VecT = std::vector<Ptr>;
static constexpr const char* Name = "SchemaCommit";
SchemaCommit(ID_TYPE collection_id, const MappingT& mappings = {}, ID_TYPE id = 0, State status = PENDING,
TS_TYPE created_on = GetMicroSecTimeStamp());
TS_TYPE created_on = GetMicroSecTimeStamp(), TS_TYPE UpdatedOnField = GetMicroSecTimeStamp());
};
using SchemaCommitPtr = SchemaCommit::Ptr;
......@@ -285,15 +302,17 @@ class Field : public DBBaseResource<>,
public NumField,
public IdField,
public StatusField,
public CreatedOnField {
public CreatedOnField,
public UpdatedOnField {
public:
using Ptr = std::shared_ptr<Field>;
using MapT = std::map<ID_TYPE, Ptr>;
using ScopedMapT = std::map<ID_TYPE, ScopedResource<Field>>;
using VecT = std::vector<Ptr>;
static constexpr const char* Name = "Field";
Field(const std::string& name, NUM_TYPE num, ID_TYPE id = 0, State status = PENDING,
TS_TYPE created_on = GetMicroSecTimeStamp());
TS_TYPE created_on = GetMicroSecTimeStamp(), TS_TYPE UpdatedOnField = GetMicroSecTimeStamp());
};
using FieldPtr = Field::Ptr;
......@@ -304,15 +323,18 @@ class FieldCommit : public DBBaseResource<>,
public MappingsField,
public IdField,
public StatusField,
public CreatedOnField {
public CreatedOnField,
public UpdatedOnField {
public:
using Ptr = std::shared_ptr<FieldCommit>;
using MapT = std::map<ID_TYPE, Ptr>;
using ScopedMapT = std::map<ID_TYPE, ScopedResource<FieldCommit>>;
using VecT = std::vector<Ptr>;
static constexpr const char* Name = "FieldCommit";
FieldCommit(ID_TYPE collection_id, ID_TYPE field_id, const MappingT& mappings = {}, ID_TYPE id = 0,
State status = PENDING, TS_TYPE created_on = GetMicroSecTimeStamp());
State status = PENDING, TS_TYPE created_on = GetMicroSecTimeStamp(),
TS_TYPE UpdatedOnField = GetMicroSecTimeStamp());
};
using FieldCommitPtr = FieldCommit::Ptr;
......@@ -324,14 +346,17 @@ class FieldElement : public DBBaseResource<>,
public FtypeField,
public IdField,
public StatusField,
public CreatedOnField {
public CreatedOnField,
public UpdatedOnField {
public:
using Ptr = std::shared_ptr<FieldElement>;
using MapT = std::map<ID_TYPE, Ptr>;
using ScopedMapT = std::map<ID_TYPE, ScopedResource<FieldElement>>;
using VecT = std::vector<Ptr>;
static constexpr const char* Name = "FieldElement";
FieldElement(ID_TYPE collection_id, ID_TYPE field_id, const std::string& name, FTYPE_TYPE ftype, ID_TYPE id = 0,
State status = PENDING, TS_TYPE created_on = GetMicroSecTimeStamp());
State status = PENDING, TS_TYPE created_on = GetMicroSecTimeStamp(),
TS_TYPE UpdatedOnField = GetMicroSecTimeStamp());
};
using FieldElementPtr = FieldElement::Ptr;
......@@ -342,14 +367,17 @@ class CollectionCommit : public DBBaseResource<>,
public MappingsField,
public IdField,
public StatusField,
public CreatedOnField {
public CreatedOnField,
public UpdatedOnField {
public:
static constexpr const char* Name = "CollectionCommit";
using Ptr = std::shared_ptr<CollectionCommit>;
using MapT = std::map<ID_TYPE, Ptr>;
using ScopedMapT = std::map<ID_TYPE, ScopedResource<CollectionCommit>>;
using VecT = std::vector<Ptr>;
CollectionCommit(ID_TYPE collection_id, ID_TYPE schema_id, const MappingT& mappings = {}, ID_TYPE id = 0,
State status = PENDING, TS_TYPE created_on = GetMicroSecTimeStamp());
State status = PENDING, TS_TYPE created_on = GetMicroSecTimeStamp(),
TS_TYPE UpdatedOnField = GetMicroSecTimeStamp());
};
using CollectionCommitPtr = CollectionCommit::Ptr;
......@@ -359,15 +387,17 @@ class Partition : public DBBaseResource<>,
public CollectionIdField,
public IdField,
public StatusField,
public CreatedOnField {
public CreatedOnField,
public UpdatedOnField {
public:
using Ptr = std::shared_ptr<Partition>;
using MapT = std::map<ID_TYPE, Ptr>;
using ScopedMapT = std::map<ID_TYPE, ScopedResource<Partition>>;
using VecT = std::vector<Ptr>;
static constexpr const char* Name = "Partition";
Partition(const std::string& name, ID_TYPE collection_id, ID_TYPE id = 0, State status = PENDING,
TS_TYPE created_on = GetMicroSecTimeStamp());
TS_TYPE created_on = GetMicroSecTimeStamp(), TS_TYPE UpdatedOnField = GetMicroSecTimeStamp());
};
using PartitionPtr = Partition::Ptr;
......@@ -378,15 +408,18 @@ class PartitionCommit : public DBBaseResource<>,
public MappingsField,
public IdField,
public StatusField,
public CreatedOnField {
public CreatedOnField,
public UpdatedOnField {
public:
using Ptr = std::shared_ptr<PartitionCommit>;
using MapT = std::map<ID_TYPE, Ptr>;
using ScopedMapT = std::map<ID_TYPE, ScopedResource<PartitionCommit>>;
using VecT = std::vector<Ptr>;
static constexpr const char* Name = "PartitionCommit";
PartitionCommit(ID_TYPE collection_id, ID_TYPE partition_id, const MappingT& mappings = {}, ID_TYPE id = 0,
State status = PENDING, TS_TYPE created_on = GetMicroSecTimeStamp());
State status = PENDING, TS_TYPE created_on = GetMicroSecTimeStamp(),
TS_TYPE UpdatedOnField = GetMicroSecTimeStamp());
std::string
ToString() const override;
......@@ -399,15 +432,17 @@ class Segment : public DBBaseResource<>,
public NumField,
public IdField,
public StatusField,
public CreatedOnField {
public CreatedOnField,
public UpdatedOnField {
public:
using Ptr = std::shared_ptr<Segment>;
using MapT = std::map<ID_TYPE, Ptr>;
using ScopedMapT = std::map<ID_TYPE, ScopedResource<Segment>>;
using VecT = std::vector<Ptr>;
static constexpr const char* Name = "Segment";
Segment(ID_TYPE partition_id, ID_TYPE num = 0, ID_TYPE id = 0, State status = PENDING,
TS_TYPE created_on = GetMicroSecTimeStamp());
TS_TYPE created_on = GetMicroSecTimeStamp(), TS_TYPE UpdatedOnField = GetMicroSecTimeStamp());
std::string
ToString() const override;
......@@ -422,15 +457,18 @@ class SegmentCommit : public DBBaseResource<>,
public MappingsField,
public IdField,
public StatusField,
public CreatedOnField {
public CreatedOnField,
public UpdatedOnField {
public:
using Ptr = std::shared_ptr<SegmentCommit>;
using MapT = std::map<ID_TYPE, Ptr>;
using ScopedMapT = std::map<ID_TYPE, ScopedResource<SegmentCommit>>;
using VecT = std::vector<Ptr>;
static constexpr const char* Name = "SegmentCommit";
SegmentCommit(ID_TYPE schema_id, ID_TYPE partition_id, ID_TYPE segment_id, const MappingT& mappings = {},
ID_TYPE id = 0, State status = PENDING, TS_TYPE created_on = GetMicroSecTimeStamp());
ID_TYPE id = 0, State status = PENDING, TS_TYPE created_on = GetMicroSecTimeStamp(),
TS_TYPE UpdatedOnField = GetMicroSecTimeStamp());
std::string
ToString() const override;
......@@ -444,15 +482,18 @@ class SegmentFile : public DBBaseResource<>,
public FieldElementIdField,
public IdField,
public StatusField,
public CreatedOnField {
public CreatedOnField,
public UpdatedOnField {
public:
using Ptr = std::shared_ptr<SegmentFile>;
using MapT = std::map<ID_TYPE, Ptr>;
using ScopedMapT = std::map<ID_TYPE, ScopedResource<SegmentFile>>;
using VecT = std::vector<Ptr>;
static constexpr const char* Name = "SegmentFile";
SegmentFile(ID_TYPE partition_id, ID_TYPE segment_id, ID_TYPE field_element_id, ID_TYPE id = 0,
State status = PENDING, TS_TYPE created_on = GetMicroSecTimeStamp());
State status = PENDING, TS_TYPE created_on = GetMicroSecTimeStamp(),
TS_TYPE UpdatedOnField = GetMicroSecTimeStamp());
};
using SegmentFilePtr = SegmentFile::Ptr;
......
......@@ -17,116 +17,30 @@ namespace milvus {
namespace engine {
namespace snapshot {
void
Snapshot::DumpSegments(const std::string& tag) {
std::cout << typeid(*this).name() << " DumpSegments Start [" << tag << "]:" << segments_.size() << std::endl;
for (auto& kv : segments_) {
/* std::cout << "\t" << kv.first << " RefCnt " << kv.second->RefCnt() << std::endl; */
std::cout << "\t" << kv.second->ToString() << std::endl;
}
std::cout << typeid(*this).name() << " DumpSegments End [" << tag << "]" << std::endl;
}
void
Snapshot::DumpPartitionCommits(const std::string& tag) {
std::cout << typeid(*this).name() << " DumpPartitionCommits Start [";
std::cout << tag << "]:" << partition_commits_.size() << std::endl;
for (auto& kv : partition_commits_) {
std::cout << "\t" << kv.second->ToString() << std::endl;
}
std::cout << typeid(*this).name() << " DumpPartitionCommits End [" << tag << "]" << std::endl;
}
void
Snapshot::DumpSegmentCommits(const std::string& tag) {
std::cout << typeid(*this).name() << " DumpSegmentCommits Start [";
std::cout << tag << "]:" << segment_commits_.size() << std::endl;
for (auto& kv : segment_commits_) {
std::cout << "\t" << kv.second->ToString() << std::endl;
}
std::cout << typeid(*this).name() << " DumpSegmentCommits End [" << tag << "]" << std::endl;
}
void
Snapshot::RefAll() {
collection_commit_->Ref();
for (auto& schema : schema_commits_) {
schema.second->Ref();
}
for (auto& element : field_elements_) {
element.second->Ref();
}
for (auto& field : fields_) {
field.second->Ref();
}
for (auto& field_commit : field_commits_) {
field_commit.second->Ref();
}
collection_->Ref();
for (auto& partition : partitions_) {
partition.second->Ref();
}
for (auto& partition_commit : partition_commits_) {
partition_commit.second->Ref();
}
for (auto& segment : segments_) {
segment.second->Ref();
}
for (auto& segment_commit : segment_commits_) {
segment_commit.second->Ref();
}
for (auto& segment_file : segment_files_) {
segment_file.second->Ref();
}
std::apply([this](auto&... resource) { ((DoRef(resource)), ...); }, resources_);
}
void
Snapshot::UnRefAll() {
/* std::cout << this << " UnRefAll " << collection_commit_->GetID() << " RefCnt=" << RefCnt() << std::endl; */
collection_commit_->UnRef();
for (auto& schema : schema_commits_) {
schema.second->UnRef();
}
for (auto& element : field_elements_) {
element.second->UnRef();
}
for (auto& field : fields_) {
field.second->UnRef();
}
for (auto& field_commit : field_commits_) {
field_commit.second->UnRef();
}
collection_->UnRef();
for (auto& partition : partitions_) {
partition.second->UnRef();
}
for (auto& partition_commit : partition_commits_) {
partition_commit.second->UnRef();
}
for (auto& segment : segments_) {
segment.second->UnRef();
}
for (auto& segment_commit : segment_commits_) {
segment_commit.second->UnRef();
}
for (auto& segment_file : segment_files_) {
segment_file.second->UnRef();
}
std::apply([this](auto&... resource) { ((DoUnRef(resource)), ...); }, resources_);
}
Snapshot::Snapshot(ID_TYPE id) {
collection_commit_ = CollectionCommitsHolder::GetInstance().GetResource(id, false);
assert(collection_commit_);
auto collection_commit = CollectionCommitsHolder::GetInstance().GetResource(id, false);
AddResource<CollectionCommit>(collection_commit);
auto& schema_holder = SchemaCommitsHolder::GetInstance();
auto current_schema = schema_holder.GetResource(collection_commit_->GetSchemaId(), false);
schema_commits_[current_schema->GetID()] = current_schema;
auto current_schema = schema_holder.GetResource(collection_commit->GetSchemaId(), false);
AddResource<SchemaCommit>(current_schema);
current_schema_id_ = current_schema->GetID();
auto& field_commits_holder = FieldCommitsHolder::GetInstance();
auto& fields_holder = FieldsHolder::GetInstance();
auto& field_elements_holder = FieldElementsHolder::GetInstance();
collection_ = CollectionsHolder::GetInstance().GetResource(collection_commit_->GetCollectionId(), false);
auto& mappings = collection_commit_->GetMappings();
auto collection = CollectionsHolder::GetInstance().GetResource(collection_commit->GetCollectionId(), false);
AddResource<Collection>(collection);
auto& mappings = collection_commit->GetMappings();
auto& partition_commits_holder = PartitionCommitsHolder::GetInstance();
auto& partitions_holder = PartitionsHolder::GetInstance();
auto& segments_holder = SegmentsHolder::GetInstance();
......@@ -136,28 +50,28 @@ Snapshot::Snapshot(ID_TYPE id) {
for (auto& id : mappings) {
auto partition_commit = partition_commits_holder.GetResource(id, false);
auto partition = partitions_holder.GetResource(partition_commit->GetPartitionId(), false);
partition_commits_[partition_commit->GetID()] = partition_commit;
AddResource<PartitionCommit>(partition_commit);
p_pc_map_[partition_commit->GetPartitionId()] = partition_commit->GetID();
partitions_[partition_commit->GetPartitionId()] = partition;
AddResource<Partition>(partition);
p_max_seg_num_[partition->GetID()] = 0;
auto& s_c_mappings = partition_commit->GetMappings();
for (auto& s_c_id : s_c_mappings) {
auto segment_commit = segment_commits_holder.GetResource(s_c_id, false);
auto segment = segments_holder.GetResource(segment_commit->GetSegmentId(), false);
auto schema = schema_holder.GetResource(segment_commit->GetSchemaId(), false);
schema_commits_[schema->GetID()] = schema;
segment_commits_[segment_commit->GetID()] = segment_commit;
AddResource<SchemaCommit>(schema);
AddResource<SegmentCommit>(segment_commit);
if (segment->GetNum() > p_max_seg_num_[segment->GetPartitionId()]) {
p_max_seg_num_[segment->GetPartitionId()] = segment->GetNum();
}
segments_[segment->GetID()] = segment;
AddResource<Segment>(segment);
seg_segc_map_[segment->GetID()] = segment_commit->GetID();
auto& s_f_mappings = segment_commit->GetMappings();
for (auto& s_f_id : s_f_mappings) {
auto segment_file = segment_files_holder.GetResource(s_f_id, false);
auto field_element = field_elements_holder.GetResource(segment_file->GetFieldElementId(), false);
field_elements_[field_element->GetID()] = field_element;
segment_files_[s_f_id] = segment_file;
AddResource<FieldElement>(field_element);
AddResource<SegmentFile>(segment_file);
auto entry = element_segfiles_map_.find(segment_file->GetFieldElementId());
if (entry == element_segfiles_map_.end()) {
element_segfiles_map_[segment_file->GetFieldElementId()] = {
......@@ -169,21 +83,21 @@ Snapshot::Snapshot(ID_TYPE id) {
}
}
for (auto& kv : schema_commits_) {
for (auto& kv : GetResources<SchemaCommit>()) {
if (kv.first > latest_schema_commit_id_)
latest_schema_commit_id_ = kv.first;
auto& schema_commit = kv.second;
auto& s_c_m = current_schema->GetMappings();
for (auto field_commit_id : s_c_m) {
auto field_commit = field_commits_holder.GetResource(field_commit_id, false);
field_commits_[field_commit_id] = field_commit;
AddResource<FieldCommit>(field_commit);
auto field = fields_holder.GetResource(field_commit->GetFieldId(), false);
fields_[field->GetID()] = field;
AddResource<Field>(field);
field_names_map_[field->GetName()] = field->GetID();
auto& f_c_m = field_commit->GetMappings();
for (auto field_element_id : f_c_m) {
auto field_element = field_elements_holder.GetResource(field_element_id, false);
field_elements_[field_element_id] = field_element;
AddResource<FieldElement>(field_element);
auto entry = field_element_names_map_.find(field->GetName());
if (entry == field_element_names_map_.end()) {
field_element_names_map_[field->GetName()] = {{field_element->GetName(), field_element->GetID()}};
......
......@@ -23,43 +23,45 @@
#include <shared_mutex>
#include <string>
#include <thread>
#include <tuple>
#include <utility>
#include <vector>
#include "db/snapshot/Utils.h"
#include "db/snapshot/WrappedTypes.h"
namespace milvus {
namespace engine {
namespace snapshot {
using ScopedResourcesT =
std::tuple<CollectionCommit::ScopedMapT, Collection::ScopedMapT, SchemaCommit::ScopedMapT, FieldCommit::ScopedMapT,
Field::ScopedMapT, FieldElement::ScopedMapT, PartitionCommit::ScopedMapT, Partition::ScopedMapT,
SegmentCommit::ScopedMapT, Segment::ScopedMapT, SegmentFile::ScopedMapT>;
class Snapshot : public ReferenceProxy {
public:
using Ptr = std::shared_ptr<Snapshot>;
explicit Snapshot(ID_TYPE id);
ID_TYPE
GetID() const {
return collection_commit_->GetID();
GetID() {
return GetCollectionCommit()->GetID();
}
ID_TYPE
GetCollectionId() const {
return collection_->GetID();
auto it = GetResources<Collection>().begin();
return it->first;
}
const std::string&
GetName() const {
return collection_->GetName();
return GetResources<Collection>().begin()->second->GetName();
}
CollectionCommitPtr
GetCollectionCommit() {
return collection_commit_.Get();
}
std::vector<std::string>
GetPartitionNames() const {
std::vector<std::string> names;
for (auto& kv : partitions_) {
std::cout << "Partition: " << kv.second->GetName() << std::endl;
names.push_back(kv.second->GetName());
}
return names;
return GetResources<CollectionCommit>().begin()->second.Get();
}
ID_TYPE
......@@ -67,26 +69,13 @@ class Snapshot : public ReferenceProxy {
return latest_schema_commit_id_;
}
PartitionPtr
GetPartition(ID_TYPE partition_id) {
auto it = partitions_.find(partition_id);
if (it == partitions_.end()) {
return nullptr;
}
return it->second.Get();
}
// PXU TODO: add const. Need to change Scopedxxxx::Get
SegmentCommitPtr
GetSegmentCommit(ID_TYPE segment_id) {
auto it = seg_segc_map_.find(segment_id);
if (it == seg_segc_map_.end())
return nullptr;
auto itsc = segment_commits_.find(it->second);
if (itsc == segment_commits_.end()) {
return nullptr;
}
return itsc->second.Get();
return GetResource<SegmentCommit>(it->second);
}
PartitionCommitPtr
......@@ -94,20 +83,7 @@ class Snapshot : public ReferenceProxy {
auto it = p_pc_map_.find(partition_id);
if (it == p_pc_map_.end())
return nullptr;
auto itpc = partition_commits_.find(it->second);
if (itpc == partition_commits_.end()) {
return nullptr;
}
return itpc->second.Get();
}
IDS_TYPE
GetPartitionIds() const {
IDS_TYPE ids;
for (auto& kv : partitions_) {
ids.push_back(kv.first);
}
return std::move(ids);
return GetResource<PartitionCommit>(it->second);
}
std::vector<std::string>
......@@ -164,34 +140,6 @@ class Snapshot : public ReferenceProxy {
return itfe->second;
}
std::vector<std::string>
GetFieldElementNames() const {
std::vector<std::string> names;
for (auto& kv : field_elements_) {
names.emplace_back(kv.second->GetName());
}
return std::move(names);
}
IDS_TYPE
GetSegmentIds() const {
IDS_TYPE ids;
for (auto& kv : segments_) {
ids.push_back(kv.first);
}
return std::move(ids);
}
IDS_TYPE
GetSegmentFileIds() const {
IDS_TYPE ids;
for (auto& kv : segment_files_) {
ids.push_back(kv.first);
}
return std::move(ids);
}
NUM_TYPE
GetMaxSegmentNumByPartition(ID_TYPE partition_id) {
auto it = p_max_seg_num_.find(partition_id);
......@@ -205,27 +153,70 @@ class Snapshot : public ReferenceProxy {
void
UnRefAll();
template <typename ResourceT>
void
DumpSegments(const std::string& tag = "");
DumpResource(const std::string& tag = "") {
auto& resources = GetResources<ResourceT>();
std::cout << typeid(*this).name() << " Dump" << ResourceT::Name << " Start [" << tag << "]:" << resources.size()
<< std::endl;
for (auto& kv : resources) {
std::cout << "\t" << kv.second->ToString() << std::endl;
}
std::cout << typeid(*this).name() << " Dump" << ResourceT::Name << " End [" << tag << "]:" << resources.size()
<< std::endl;
}
template <typename T>
void
DumpSegmentCommits(const std::string& tag = "");
DoUnRef(T& resource_map) {
for (auto& kv : resource_map) {
kv.second->UnRef();
}
}
template <typename T>
void
DumpPartitionCommits(const std::string& tag = "");
DoRef(T& resource_map) {
for (auto& kv : resource_map) {
kv.second->Ref();
}
}
template <typename ResourceT>
typename ResourceT::ScopedMapT&
GetResources() {
return std::get<Index<typename ResourceT::ScopedMapT, ScopedResourcesT>::value>(resources_);
}
template <typename ResourceT>
const typename ResourceT::ScopedMapT&
GetResources() const {
return std::get<Index<typename ResourceT::ScopedMapT, ScopedResourcesT>::value>(resources_);
}
template <typename ResourceT>
typename ResourceT::Ptr
GetResource(ID_TYPE id) {
auto& resources = GetResources<ResourceT>();
auto it = resources.find(id);
if (it == resources.end()) {
return nullptr;
}
return it->second.Get();
}
template <typename ResourceT>
void
AddResource(ScopedResource<ResourceT>& resource) {
auto& resources = GetResources<ResourceT>();
resources[resource->GetID()] = resource;
}
private:
// PXU TODO: Re-org below data structures to reduce memory usage
CollectionScopedT collection_;
ScopedResourcesT resources_;
ID_TYPE current_schema_id_;
SchemaCommitsT schema_commits_;
FieldsT fields_;
FieldCommitsT field_commits_;
FieldElementsT field_elements_;
CollectionCommitScopedT collection_commit_;
PartitionsT partitions_;
PartitionCommitsT partition_commits_;
SegmentsT segments_;
SegmentCommitsT segment_commits_;
SegmentFilesT segment_files_;
std::map<std::string, ID_TYPE> field_names_map_;
std::map<std::string, std::map<std::string, ID_TYPE>> field_element_names_map_;
std::map<ID_TYPE, std::map<ID_TYPE, ID_TYPE>> element_segfiles_map_;
......
......@@ -13,6 +13,7 @@
#include "db/snapshot/ResourceTypes.h"
#include "db/snapshot/Resources.h"
#include "db/snapshot/Utils.h"
#include <stdlib.h>
#include <time.h>
......@@ -35,19 +36,6 @@ namespace milvus {
namespace engine {
namespace snapshot {
template <class T, class Tuple>
struct Index;
template <class T, class... Types>
struct Index<T, std::tuple<T, Types...>> {
static const std::size_t value = 0;
};
template <class T, class U, class... Types>
struct Index<T, std::tuple<U, Types...>> {
static const std::size_t value = 1 + Index<T, std::tuple<Types...>>::value;
};
class Store {
public:
using MockIDST =
......
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
#pragma once
#include <tuple>
namespace milvus {
namespace engine {
namespace snapshot {
template <class T, class Tuple>
struct Index;
template <class T, class... Types>
struct Index<T, std::tuple<T, Types...>> {
static const std::size_t value = 0;
};
template <class T, class U, class... Types>
struct Index<T, std::tuple<U, Types...>> {
static const std::size_t value = 1 + Index<T, std::tuple<Types...>>::value;
};
} // namespace snapshot
} // namespace engine
} // namespace milvus
......@@ -15,6 +15,7 @@
#include <random>
#include <string>
#include <set>
#include "db/utils.h"
#include "db/snapshot/ReferenceProxy.h"
......@@ -149,6 +150,9 @@ TEST_F(SnapshotTest, OperationTest) {
ASSERT_TRUE(collection_commit);
}
milvus::engine::snapshot::OperationContext merge_ctx;
std::set<milvus::engine::snapshot::ID_TYPE> stale_segment_commit_ids;
// Check build operation correctness
{
milvus::engine::snapshot::OperationContext context;
......@@ -167,6 +171,11 @@ TEST_F(SnapshotTest, OperationTest) {
milvus::engine::snapshot::MappingT expected_mappings = prev_segment_commit_mappings;
expected_mappings.insert(seg_file->GetID());
ASSERT_EQ(expected_mappings, segment_commit_mappings);
auto seg = ss->GetResource<milvus::engine::snapshot::Segment>(seg_file->GetSegmentId());
ASSERT_TRUE(seg);
merge_ctx.stale_segments.push_back(seg);
stale_segment_commit_ids.insert(segment_commit->GetID());
}
// Check stale snapshot has been deleted from store
......@@ -177,9 +186,10 @@ TEST_F(SnapshotTest, OperationTest) {
}
ss_id = ss->GetID();
milvus::engine::snapshot::ID_TYPE partition_id;
{
milvus::engine::snapshot::OperationContext context;
context.prev_partition = ss->GetPartition(1);
context.prev_partition = ss->GetResource<milvus::engine::snapshot::Partition>(1);
auto op = std::make_shared<milvus::engine::snapshot::NewSegmentOperation>(context, ss);
auto new_seg = op->CommitNewSegment();
auto seg_file = op->CommitNewSegmentFile(sf_context);
......@@ -193,6 +203,34 @@ TEST_F(SnapshotTest, OperationTest) {
milvus::engine::snapshot::MappingT expected_segment_mappings;
expected_segment_mappings.insert(seg_file->GetID());
ASSERT_EQ(expected_segment_mappings, segment_commit_mappings);
merge_ctx.stale_segments.push_back(new_seg);
partition_id = segment_commit->GetPartitionId();
stale_segment_commit_ids.insert(segment_commit->GetID());
auto partition = ss->GetResource<milvus::engine::snapshot::Partition>(partition_id);
merge_ctx.prev_partition = partition;
}
ss_id = ss->GetID();
{
auto prev_partition_commit = ss->GetPartitionCommitByPartitionId(partition_id);
auto op = std::make_shared<milvus::engine::snapshot::MergeOperation>(merge_ctx, ss);
auto new_seg = op->CommitNewSegment();
sf_context.segment_id = new_seg->GetID();
auto seg_file = op->CommitNewSegmentFile(sf_context);
op->Push();
ss = op->GetSnapshot();
ASSERT_TRUE(ss->GetID() > ss_id);
auto segment_commit = ss->GetSegmentCommit(new_seg->GetID());
auto new_partition_commit = ss->GetPartitionCommitByPartitionId(partition_id);
auto new_mappings = new_partition_commit->GetMappings();
auto prev_mappings = prev_partition_commit->GetMappings();
auto expected_mappings = prev_mappings;
for (auto id : stale_segment_commit_ids) {
expected_mappings.erase(id);
}
expected_mappings.insert(segment_commit->GetID());
ASSERT_EQ(expected_mappings, new_mappings);
}
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册