未验证 提交 5c833ca6 编写于 作者: C Cai Yudong 提交者: GitHub

snapshot gc (#2634)

* opt test_snapshot
Signed-off-by: Nyudong.cai <yudong.cai@zilliz.com>

* rename BaseHolders to ResourceHolder
Signed-off-by: Nyudong.cai <yudong.cai@zilliz.com>

* rename DBBaseResource to BaseResource
Signed-off-by: Nyudong.cai <yudong.cai@zilliz.com>

* remove ResourceHolder.inl
Signed-off-by: Nyudong.cai <yudong.cai@zilliz.com>

* fix clang-format
Signed-off-by: Nyudong.cai <yudong.cai@zilliz.com>

* add GarbageCollector.h
Signed-off-by: Nyudong.cai <yudong.cai@zilliz.com>

* combine ReferenceProxy.cpp into ReferenceProxy.h
Signed-off-by: Nyudong.cai <yudong.cai@zilliz.com>

* fix clang-format
Signed-off-by: Nyudong.cai <yudong.cai@zilliz.com>

* fix clang-format
Signed-off-by: Nyudong.cai <yudong.cai@zilliz.com>

* reorg Resources.h
Signed-off-by: Nyudong.cai <yudong.cai@zilliz.com>

* rename GarbageCollector to EventExecutor
Signed-off-by: Nyudong.cai <yudong.cai@zilliz.com>

* fix test_snapshot
Signed-off-by: Nyudong.cai <yudong.cai@zilliz.com>

* fix clang-format
Signed-off-by: Nyudong.cai <yudong.cai@zilliz.com>

* opt executor
Signed-off-by: Nyudong.cai <yudong.cai@zilliz.com>
上级 bcf79134
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
#include "BaseHolders.h"
#include "Operations.h"
#include <iostream>
#include <memory>
namespace milvus {
namespace engine {
namespace snapshot {
template <typename ResourceT, typename Derived>
void ResourceHolder<ResourceT, Derived>::Dump(const std::string& tag) {
std::unique_lock<std::mutex> lock(mutex_);
std::cout << typeid(*this).name() << " Dump Start [" << tag << "]:" << id_map_.size() << std::endl;
for (auto& kv : id_map_) {
/* std::cout << "\t" << kv.second->ToString() << std::endl; */
std::cout << "\t" << kv.first << " ref_count " << kv.second->ref_count() << std::endl;
}
std::cout << typeid(*this).name() << " Dump End [" << tag << "]" << std::endl;
}
template <typename ResourceT, typename Derived>
void ResourceHolder<ResourceT, Derived>::Reset() {
id_map_.clear();
}
template <typename ResourceT, typename Derived>
typename ResourceHolder<ResourceT, Derived>::ResourcePtr
ResourceHolder<ResourceT, Derived>::DoLoad(Store& store, ID_TYPE id) {
LoadOperationContext context;
context.id = id;
auto op = std::make_shared<LoadOperation<ResourceT>>(context);
(*op)(store);
typename ResourceT::Ptr c;
auto status = op->GetResource(c);
if (status.ok()) {
Add(c);
return c;
}
return nullptr;
}
template <typename ResourceT, typename Derived>
typename ResourceHolder<ResourceT, Derived>::ScopedT
ResourceHolder<ResourceT, Derived>::Load(Store& store, ID_TYPE id, bool scoped) {
{
std::unique_lock<std::mutex> lock(mutex_);
auto cit = id_map_.find(id);
if (cit != id_map_.end()) {
return ScopedT(cit->second, scoped);
}
}
auto ret = DoLoad(store, id);
if (!ret) return ScopedT();
return ScopedT(ret, scoped);
}
template <typename ResourceT, typename Derived>
typename ResourceHolder<ResourceT, Derived>::ScopedT
ResourceHolder<ResourceT, Derived>::GetResource(ID_TYPE id, bool scoped) {
// TODO: Temp to use Load here. Will be removed when resource is loaded just post Compound
// Operations.
return Load(Store::GetInstance(), id, scoped);
{
std::unique_lock<std::mutex> lock(mutex_);
auto cit = id_map_.find(id);
if (cit != id_map_.end()) {
return ScopedT(cit->second, scoped);
}
}
return ScopedT();
}
template <typename ResourceT, typename Derived>
void
ResourceHolder<ResourceT, Derived>::OnNoRefCallBack(typename ResourceHolder<ResourceT, Derived>::ResourcePtr resource) {
HardDelete(resource->GetID());
Release(resource->GetID());
}
template <typename ResourceT, typename Derived>
bool ResourceHolder<ResourceT, Derived>::ReleaseNoLock(ID_TYPE id) {
auto it = id_map_.find(id);
if (it == id_map_.end()) {
return false;
}
id_map_.erase(it);
return true;
}
template <typename ResourceT, typename Derived>
bool ResourceHolder<ResourceT, Derived>::Release(ID_TYPE id) {
std::unique_lock<std::mutex> lock(mutex_);
return ReleaseNoLock(id);
}
template <typename ResourceT, typename Derived>
bool
ResourceHolder<ResourceT, Derived>::HardDelete(ID_TYPE id) {
auto op = std::make_shared<HardDeleteOperation<ResourceT>>(id);
// TODO:
(*op)(Store::GetInstance());
return true;
}
template <typename ResourceT, typename Derived>
bool ResourceHolder<ResourceT, Derived>::AddNoLock(typename ResourceHolder<ResourceT, Derived>::ResourcePtr resource) {
if (!resource) return false;
if (id_map_.find(resource->GetID()) != id_map_.end()) {
return false;
}
id_map_[resource->GetID()] = resource;
resource->RegisterOnNoRefCB(std::bind(&Derived::OnNoRefCallBack, this, resource));
return true;
}
template <typename ResourceT, typename Derived>
bool ResourceHolder<ResourceT, Derived>::Add(typename ResourceHolder<ResourceT, Derived>::ResourcePtr resource) {
std::unique_lock<std::mutex> lock(mutex_);
return AddNoLock(resource);
}
} // namespace snapshot
} // namespace engine
} // namespace milvus
......@@ -10,7 +10,10 @@
// or implied. See the License for the specific language governing permissions and limitations under the License.
#pragma once
#include <memory>
#include <string>
#include "ReferenceProxy.h"
namespace milvus::engine::snapshot {
......@@ -25,4 +28,6 @@ class BaseResource : public ReferenceProxy {
~BaseResource() override = default;
};
using BaseResourcePtr = std::shared_ptr<BaseResource>;
} // namespace milvus::engine::snapshot
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
#pragma once
#include <memory>
#include <mutex>
#include <string>
#include <thread>
#include "ResourceTypes.h"
#include "utils/BlockingQueue.h"
namespace milvus {
namespace engine {
namespace snapshot {
enum class EventType {
EVENT_INVALID = 0,
EVENT_GC = 1,
};
struct EventContext {
ID_TYPE id;
std::string res_type;
};
struct Event {
EventType type;
EventContext context;
std::string
ToString() {
return context.res_type + "_" + std::to_string(context.id);
}
};
using EventPtr = std::shared_ptr<Event>;
using ThreadPtr = std::shared_ptr<std::thread>;
using EventQueue = BlockingQueue<EventPtr>;
class EventExecutor {
public:
EventExecutor() = default;
EventExecutor(const EventExecutor&) = delete;
~EventExecutor() {
Stop();
}
static EventExecutor&
GetInstance() {
static EventExecutor inst;
return inst;
}
Status
Submit(EventPtr& evt) {
if (evt == nullptr) {
return Status(SS_INVALID_ARGUMENT_ERROR, "Invalid Resource");
}
Enqueue(evt);
return Status::OK();
}
void
Start() {
if (thread_ptr_ == nullptr) {
thread_ptr_ = std::make_shared<std::thread>(&EventExecutor::ThreadMain, this);
}
}
void
Stop() {
if (thread_ptr_ != nullptr) {
Enqueue(nullptr);
thread_ptr_->join();
thread_ptr_ = nullptr;
std::cout << "EventExecutor Stopped" << std::endl;
}
}
private:
void
ThreadMain() {
while (true) {
EventPtr evt = queue_.Take();
if (evt == nullptr) {
break;
}
std::cout << std::this_thread::get_id() << " Dequeue Event " << evt->ToString() << std::endl;
switch (evt->type) {
case EventType::EVENT_GC:
break;
default:
break;
}
}
}
void
Enqueue(EventPtr evt) {
queue_.Put(evt);
if (evt != nullptr) {
std::cout << std::this_thread::get_id() << " Enqueue Event " << evt->ToString() << std::endl;
}
}
private:
ThreadPtr thread_ptr_ = nullptr;
EventQueue queue_;
};
} // namespace snapshot
} // namespace engine
} // namespace milvus
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
#include "db/snapshot/OperationExecutor.h"
#include <iostream>
namespace milvus::engine::snapshot {
OperationExecutor::OperationExecutor() = default;
OperationExecutor::~OperationExecutor() {
Stop();
}
OperationExecutor&
OperationExecutor::GetInstance() {
static OperationExecutor executor;
return executor;
}
Status
OperationExecutor::Submit(const OperationsPtr& operation, bool sync) {
if (!operation)
return Status(SS_INVALID_ARGUMENT_ERROR, "Invalid Operation");
/* Store::GetInstance().Apply(*operation); */
/* return true; */
Enqueue(operation);
if (sync)
return operation->WaitToFinish();
return Status::OK();
}
void
OperationExecutor::Start() {
thread_ = std::thread(&OperationExecutor::ThreadMain, this);
running_ = true;
/* std::cout << "OperationExecutor Started" << std::endl; */
}
void
OperationExecutor::Stop() {
if (!running_)
return;
Enqueue(nullptr);
thread_.join();
running_ = false;
std::cout << "OperationExecutor Stopped" << std::endl;
}
void
OperationExecutor::Enqueue(const OperationsPtr& operation) {
/* std::cout << std::this_thread::get_id() << " Enqueue Operation " << operation->GetID() << std::endl; */
queue_.Put(operation);
}
void
OperationExecutor::ThreadMain() {
while (true) {
OperationsPtr operation = queue_.Take();
if (!operation) {
std::cout << "Stopping operation executor thread " << std::this_thread::get_id() << std::endl;
break;
}
/* std::cout << std::this_thread::get_id() << " Dequeue Operation " << operation->GetID() << std::endl; */
Store::GetInstance().Apply(*operation);
}
}
} // namespace milvus::engine::snapshot
......@@ -22,40 +22,74 @@ namespace milvus::engine::snapshot {
using ThreadPtr = std::shared_ptr<std::thread>;
using OperationQueue = BlockingQueue<OperationsPtr>;
using OperationQueuePtr = std::shared_ptr<OperationQueue>;
class OperationExecutor {
public:
using Ptr = std::shared_ptr<OperationExecutor>;
OperationExecutor() = default;
OperationExecutor(const OperationExecutor&) = delete;
~OperationExecutor() {
Stop();
}
static OperationExecutor&
GetInstance();
GetInstance() {
static OperationExecutor executor;
return executor;
}
Status
Submit(const OperationsPtr& operation, bool sync = true);
Submit(const OperationsPtr& operation, bool sync = true) {
if (!operation) {
return Status(SS_INVALID_ARGUMENT_ERROR, "Invalid Operation");
}
/* Store::GetInstance().Apply(*operation); */
/* return true; */
Enqueue(operation);
if (sync) {
return operation->WaitToFinish();
}
return Status::OK();
}
void
Start();
Start() {
if (thread_ptr_ == nullptr) {
thread_ptr_ = std::make_shared<std::thread>(&OperationExecutor::ThreadMain, this);
}
}
void
Stop();
~OperationExecutor();
protected:
OperationExecutor();
Stop() {
if (thread_ptr_ != nullptr) {
Enqueue(nullptr);
thread_ptr_->join();
thread_ptr_ = nullptr;
std::cout << "OperationExecutor Stopped" << std::endl;
}
}
private:
void
ThreadMain();
ThreadMain() {
while (true) {
OperationsPtr operation = queue_.Take();
if (!operation) {
break;
}
/* std::cout << std::this_thread::get_id() << " Dequeue Operation " << operation->GetID() << std::endl; */
Store::GetInstance().Apply(*operation);
}
}
void
Enqueue(const OperationsPtr& operation);
Enqueue(const OperationsPtr& operation) {
/* std::cout << std::this_thread::get_id() << " Enqueue Operation " << operation->GetID() << std::endl; */
queue_.Put(operation);
}
protected:
bool running_ = false;
std::thread thread_;
private:
ThreadPtr thread_ptr_ = nullptr;
OperationQueue queue_;
};
......
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
#include "db/snapshot/ReferenceProxy.h"
namespace milvus::engine::snapshot {
void
ReferenceProxy::Ref() {
++ref_count_;
}
void
ReferenceProxy::UnRef() {
if (ref_count_ == 0)
return;
if (ref_count_.fetch_sub(1) == 1) {
for (auto& cb : on_no_ref_cbs_) {
cb();
}
}
}
void
ReferenceProxy::RegisterOnNoRefCB(const OnNoRefCBF& cb) {
on_no_ref_cbs_.emplace_back(cb);
}
} // namespace milvus::engine::snapshot
......@@ -23,20 +23,29 @@ using OnNoRefCBF = std::function<void(void)>;
class ReferenceProxy {
public:
ReferenceProxy() = default;
virtual ~ReferenceProxy() = default;
// TODO: Copy constructor is used in Mock Test. Should never be used. To be removed
ReferenceProxy(const ReferenceProxy& o) {
ref_count_ = 0;
}
void
RegisterOnNoRefCB(const OnNoRefCBF& cb);
virtual void
Ref();
Ref() {
++ref_count_;
}
virtual void
UnRef();
UnRef() {
if (ref_count_ == 0) {
return;
}
if (ref_count_.fetch_sub(1) == 1) {
for (auto& cb : on_no_ref_cbs_) {
cb();
}
}
}
[[nodiscard]] int64_t
ref_count() const {
......@@ -48,7 +57,10 @@ class ReferenceProxy {
ref_count_ = 0;
}
virtual ~ReferenceProxy() = default;
void
RegisterOnNoRefCB(const OnNoRefCBF& cb) {
on_no_ref_cbs_.emplace_back(cb);
}
protected:
std::atomic<int64_t> ref_count_ = {0};
......
......@@ -291,6 +291,8 @@ class NameField {
std::string name_;
};
///////////////////////////////////////////////////////////////////////////////
class Collection : public BaseResource,
public NameField,
public IdField,
......@@ -311,95 +313,6 @@ class Collection : public BaseResource,
using CollectionPtr = Collection::Ptr;
class SchemaCommit : public BaseResource,
public CollectionIdField,
public MappingsField,
public IdField,
public LsnField,
public StatusField,
public CreatedOnField,
public UpdatedOnField {
public:
using Ptr = std::shared_ptr<SchemaCommit>;
using MapT = std::map<ID_TYPE, Ptr>;
using ScopedMapT = std::map<ID_TYPE, ScopedResource<SchemaCommit>>;
using VecT = std::vector<Ptr>;
static constexpr const char* Name = "SchemaCommit";
explicit SchemaCommit(ID_TYPE collection_id, const MappingT& mappings = {}, ID_TYPE id = 0, LSN_TYPE lsn = 0,
State status = PENDING, TS_TYPE created_on = GetMicroSecTimeStamp(),
TS_TYPE UpdatedOnField = GetMicroSecTimeStamp());
};
using SchemaCommitPtr = SchemaCommit::Ptr;
class Field : public BaseResource,
public NameField,
public NumField,
public IdField,
public LsnField,
public StatusField,
public CreatedOnField,
public UpdatedOnField {
public:
using Ptr = std::shared_ptr<Field>;
using MapT = std::map<ID_TYPE, Ptr>;
using ScopedMapT = std::map<ID_TYPE, ScopedResource<Field>>;
using VecT = std::vector<Ptr>;
static constexpr const char* Name = "Field";
Field(const std::string& name, NUM_TYPE num, ID_TYPE id = 0, LSN_TYPE lsn = 0, State status = PENDING,
TS_TYPE created_on = GetMicroSecTimeStamp(), TS_TYPE UpdatedOnField = GetMicroSecTimeStamp());
};
using FieldPtr = Field::Ptr;
class FieldCommit : public BaseResource,
public CollectionIdField,
public FieldIdField,
public MappingsField,
public IdField,
public LsnField,
public StatusField,
public CreatedOnField,
public UpdatedOnField {
public:
using Ptr = std::shared_ptr<FieldCommit>;
using MapT = std::map<ID_TYPE, Ptr>;
using ScopedMapT = std::map<ID_TYPE, ScopedResource<FieldCommit>>;
using VecT = std::vector<Ptr>;
static constexpr const char* Name = "FieldCommit";
FieldCommit(ID_TYPE collection_id, ID_TYPE field_id, const MappingT& mappings = {}, ID_TYPE id = 0,
LSN_TYPE lsn = 0, State status = PENDING, TS_TYPE created_on = GetMicroSecTimeStamp(),
TS_TYPE UpdatedOnField = GetMicroSecTimeStamp());
};
using FieldCommitPtr = FieldCommit::Ptr;
class FieldElement : public BaseResource,
public CollectionIdField,
public FieldIdField,
public NameField,
public FtypeField,
public IdField,
public LsnField,
public StatusField,
public CreatedOnField,
public UpdatedOnField {
public:
using Ptr = std::shared_ptr<FieldElement>;
using MapT = std::map<ID_TYPE, Ptr>;
using ScopedMapT = std::map<ID_TYPE, ScopedResource<FieldElement>>;
using VecT = std::vector<Ptr>;
static constexpr const char* Name = "FieldElement";
FieldElement(ID_TYPE collection_id, ID_TYPE field_id, const std::string& name, FTYPE_TYPE ftype, ID_TYPE id = 0,
LSN_TYPE lsn = 0, State status = PENDING, TS_TYPE created_on = GetMicroSecTimeStamp(),
TS_TYPE UpdatedOnField = GetMicroSecTimeStamp());
};
using FieldElementPtr = FieldElement::Ptr;
class CollectionCommit : public BaseResource,
public CollectionIdField,
public SchemaIdField,
......@@ -422,6 +335,8 @@ class CollectionCommit : public BaseResource,
using CollectionCommitPtr = CollectionCommit::Ptr;
///////////////////////////////////////////////////////////////////////////////
class Partition : public BaseResource,
public NameField,
public CollectionIdField,
......@@ -469,6 +384,8 @@ class PartitionCommit : public BaseResource,
using PartitionCommitPtr = PartitionCommit::Ptr;
///////////////////////////////////////////////////////////////////////////////
class Segment : public BaseResource,
public PartitionIdField,
public NumField,
......@@ -520,6 +437,8 @@ class SegmentCommit : public BaseResource,
using SegmentCommitPtr = SegmentCommit::Ptr;
///////////////////////////////////////////////////////////////////////////////
class SegmentFile : public BaseResource,
public PartitionIdField,
public SegmentIdField,
......@@ -543,4 +462,99 @@ class SegmentFile : public BaseResource,
using SegmentFilePtr = SegmentFile::Ptr;
///////////////////////////////////////////////////////////////////////////////
class SchemaCommit : public BaseResource,
public CollectionIdField,
public MappingsField,
public IdField,
public LsnField,
public StatusField,
public CreatedOnField,
public UpdatedOnField {
public:
using Ptr = std::shared_ptr<SchemaCommit>;
using MapT = std::map<ID_TYPE, Ptr>;
using ScopedMapT = std::map<ID_TYPE, ScopedResource<SchemaCommit>>;
using VecT = std::vector<Ptr>;
static constexpr const char* Name = "SchemaCommit";
explicit SchemaCommit(ID_TYPE collection_id, const MappingT& mappings = {}, ID_TYPE id = 0, LSN_TYPE lsn = 0,
State status = PENDING, TS_TYPE created_on = GetMicroSecTimeStamp(),
TS_TYPE UpdatedOnField = GetMicroSecTimeStamp());
};
using SchemaCommitPtr = SchemaCommit::Ptr;
///////////////////////////////////////////////////////////////////////////////
class Field : public BaseResource,
public NameField,
public NumField,
public IdField,
public LsnField,
public StatusField,
public CreatedOnField,
public UpdatedOnField {
public:
using Ptr = std::shared_ptr<Field>;
using MapT = std::map<ID_TYPE, Ptr>;
using ScopedMapT = std::map<ID_TYPE, ScopedResource<Field>>;
using VecT = std::vector<Ptr>;
static constexpr const char* Name = "Field";
Field(const std::string& name, NUM_TYPE num, ID_TYPE id = 0, LSN_TYPE lsn = 0, State status = PENDING,
TS_TYPE created_on = GetMicroSecTimeStamp(), TS_TYPE UpdatedOnField = GetMicroSecTimeStamp());
};
using FieldPtr = Field::Ptr;
class FieldCommit : public BaseResource,
public CollectionIdField,
public FieldIdField,
public MappingsField,
public IdField,
public LsnField,
public StatusField,
public CreatedOnField,
public UpdatedOnField {
public:
using Ptr = std::shared_ptr<FieldCommit>;
using MapT = std::map<ID_TYPE, Ptr>;
using ScopedMapT = std::map<ID_TYPE, ScopedResource<FieldCommit>>;
using VecT = std::vector<Ptr>;
static constexpr const char* Name = "FieldCommit";
FieldCommit(ID_TYPE collection_id, ID_TYPE field_id, const MappingT& mappings = {}, ID_TYPE id = 0,
LSN_TYPE lsn = 0, State status = PENDING, TS_TYPE created_on = GetMicroSecTimeStamp(),
TS_TYPE UpdatedOnField = GetMicroSecTimeStamp());
};
using FieldCommitPtr = FieldCommit::Ptr;
///////////////////////////////////////////////////////////////////////////////
class FieldElement : public BaseResource,
public CollectionIdField,
public FieldIdField,
public NameField,
public FtypeField,
public IdField,
public LsnField,
public StatusField,
public CreatedOnField,
public UpdatedOnField {
public:
using Ptr = std::shared_ptr<FieldElement>;
using MapT = std::map<ID_TYPE, Ptr>;
using ScopedMapT = std::map<ID_TYPE, ScopedResource<FieldElement>>;
using VecT = std::vector<Ptr>;
static constexpr const char* Name = "FieldElement";
FieldElement(ID_TYPE collection_id, ID_TYPE field_id, const std::string& name, FTYPE_TYPE ftype, ID_TYPE id = 0,
LSN_TYPE lsn = 0, State status = PENDING, TS_TYPE created_on = GetMicroSecTimeStamp(),
TS_TYPE UpdatedOnField = GetMicroSecTimeStamp());
};
using FieldElementPtr = FieldElement::Ptr;
} // namespace milvus::engine::snapshot
......@@ -58,9 +58,10 @@ class Store {
DoCommit(ResourceT&&... resources) {
auto t = std::make_tuple(std::forward<ResourceT>(resources)...);
auto& t_size = std::tuple_size<decltype(t)>::value;
if (t_size == 0)
if (t_size == 0) {
return false;
StartTransanction();
}
StartTransaction();
std::apply([this](auto&&... resource) { ((std::cout << CommitResource(resource) << "\n"), ...); }, t);
FinishTransaction();
return true;
......@@ -83,8 +84,9 @@ class Store {
}
void
StartTransanction() {
StartTransaction() {
}
void
FinishTransaction() {
}
......
......@@ -19,15 +19,16 @@
#include <algorithm>
#include "db/utils.h"
#include "db/snapshot/CompoundOperations.h"
#include "db/snapshot/Context.h"
#include "db/snapshot/EventExecutor.h"
#include "db/snapshot/OperationExecutor.h"
#include "db/snapshot/ReferenceProxy.h"
#include "db/snapshot/ScopedResource.h"
#include "db/snapshot/WrappedTypes.h"
#include "db/snapshot/ResourceHolders.h"
#include "db/snapshot/OperationExecutor.h"
#include "db/snapshot/Store.h"
#include "db/snapshot/Context.h"
#include "db/snapshot/CompoundOperations.h"
#include "db/snapshot/ScopedResource.h"
#include "db/snapshot/Snapshots.h"
#include "db/snapshot/Store.h"
#include "db/snapshot/WrappedTypes.h"
using ID_TYPE = milvus::engine::snapshot::ID_TYPE;
using IDS_TYPE = milvus::engine::snapshot::IDS_TYPE;
......
......@@ -25,6 +25,7 @@
#include "cache/GpuCacheMgr.h"
#include "db/DBFactory.h"
#include "db/Options.h"
#include "db/snapshot/EventExecutor.h"
#include "db/snapshot/OperationExecutor.h"
#include "db/snapshot/Snapshots.h"
#include "db/snapshot/ResourceHolders.h"
......@@ -361,6 +362,7 @@ void
SnapshotTest::SetUp() {
BaseTest::SetUp();
milvus::engine::snapshot::OperationExecutor::GetInstance().Start();
milvus::engine::snapshot::EventExecutor::GetInstance().Start();
milvus::engine::snapshot::CollectionCommitsHolder::GetInstance().Reset();
milvus::engine::snapshot::CollectionsHolder::GetInstance().Reset();
milvus::engine::snapshot::SchemaCommitsHolder::GetInstance().Reset();
......@@ -382,6 +384,7 @@ void
SnapshotTest::TearDown() {
// TODO: Temp to delay some time. OperationExecutor should wait all resources be destructed before stop
std::this_thread::sleep_for(std::chrono::milliseconds(20));
milvus::engine::snapshot::EventExecutor::GetInstance().Stop();
milvus::engine::snapshot::OperationExecutor::GetInstance().Stop();
BaseTest::TearDown();
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册