未验证 提交 525257a2 编写于 作者: C Cai Yudong 提交者: GitHub

scheduler support snapshot (#2944)

* using BlockingQueue in JobMgr
Signed-off-by: Nyudong.cai <yudong.cai@zilliz.com>

* update changelog
Signed-off-by: Nyudong.cai <yudong.cai@zilliz.com>

* update SSDBImpl::HybridQuery
Signed-off-by: Nyudong.cai <yudong.cai@zilliz.com>

* add QueryTest
Signed-off-by: Nyudong.cai <yudong.cai@zilliz.com>

* add SearchJob interface
Signed-off-by: Nyudong.cai <yudong.cai@zilliz.com>

* add SSSearchJob and SSSearchTask
Signed-off-by: Nyudong.cai <yudong.cai@zilliz.com>

* update AddSegmentVisitor
Signed-off-by: Nyudong.cai <yudong.cai@zilliz.com>

* add SSBuildIndexTask and SSBuildIndexJob
Signed-off-by: Nyudong.cai <yudong.cai@zilliz.com>

* add SSTestTask
Signed-off-by: Nyudong.cai <yudong.cai@zilliz.com>

* add test_ss_job and test_ss_task
Signed-off-by: Nyudong.cai <yudong.cai@zilliz.com>

* fix clang-format
Signed-off-by: Nyudong.cai <yudong.cai@zilliz.com>

* update TaskCreator
Signed-off-by: Nyudong.cai <yudong.cai@zilliz.com>
上级 7ba30c78
......@@ -12,9 +12,11 @@
#include "db/SSDBImpl.h"
#include "cache/CpuCacheMgr.h"
#include "db/IDGenerator.h"
#include "db/SnapshotVisitor.h"
#include "db/merge/MergeManagerFactory.h"
#include "db/merge/SSMergeTask.h"
#include "db/snapshot/CompoundOperations.h"
#include "db/snapshot/IterateHandler.h"
#include "db/snapshot/ResourceHelper.h"
#include "db/snapshot/ResourceTypes.h"
#include "db/snapshot/Snapshots.h"
......@@ -24,6 +26,7 @@
#include "metrics/SystemInfo.h"
#include "scheduler/Definition.h"
#include "scheduler/SchedInst.h"
#include "scheduler/job/SSSearchJob.h"
#include "segment/SSSegmentReader.h"
#include "segment/SSSegmentWriter.h"
#include "utils/Exception.h"
......@@ -627,24 +630,54 @@ Status
SSDBImpl::Query(const server::ContextPtr& context, const query::QueryPtr& query_ptr, engine::QueryResult& result) {
CHECK_INITIALIZED;
auto query_ctx = context->Child("Query");
milvus::server::ContextChild tracer(context, "Query");
TimeRecorder rc("SSDBImpl::Query");
// snapshot::ScopedSnapshotT ss;
// STATUS_CHECK(snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_name));
// auto handler = std::make_shared<HybridQueryHelperSegmentHandler>(nullptr, ss, partition_patterns);
// handler->Iterate();
// STATUS_CHECK(handler->GetStatus());
//
// LOG_ENGINE_DEBUG_ << LogOut("Engine query begin, segment count: %ld", handler->segments_.size());
// /* collect all valid segment */
// std::vector<SegmentVisitor::Ptr> segment_visitors;
// auto exec = [&] (const snapshot::Segment::Ptr& segment, snapshot::SegmentIterator* handler) -> Status {
// auto p_id = segment->GetPartitionId();
// auto p_ptr = ss->GetResource<snapshot::Partition>(p_id);
// auto& p_name = p_ptr->GetName();
//
// /* check partition match pattern */
// bool match = false;
// if (partition_patterns.empty()) {
// match = true;
// } else {
// for (auto &pattern : partition_patterns) {
// if (StringHelpFunctions::IsRegexMatch(p_name, pattern)) {
// match = true;
// break;
// }
// }
// }
//
// if (match) {
// auto visitor = SegmentVisitor::Build(ss, segment->GetID());
// if (!visitor) {
// return Status(milvus::SS_ERROR, "Cannot build segment visitor");
// }
// segment_visitors.push_back(visitor);
// }
// return Status::OK();
// };
//
// auto segment_iter = std::make_shared<snapshot::SegmentIterator>(ss, exec);
// segment_iter->Iterate();
// STATUS_CHECK(segment_iter->GetStatus());
//
// LOG_ENGINE_DEBUG_ << LogOut("Engine query begin, segment count: %ld", segment_visitors.size());
//
// VectorsData vectors;
// scheduler::SearchJobPtr job =
// std::make_shared<scheduler::SearchJob>(query_ctx, general_query, query_ptr, attr_type, vectors);
// for (auto& segment : handler->segments_) {
// // job->AddSegment(segment);
// scheduler::SSSearchJobPtr job =
// std::make_shared<scheduler::SSSearchJob>(tracer.Context(), general_query, query_ptr, attr_type, vectors);
// for (auto& sv : segment_visitors) {
// job->AddSegmentVisitor(sv);
// }
//
// // step 2: put search job to scheduler and wait result
......@@ -659,30 +692,28 @@ SSDBImpl::Query(const server::ContextPtr& context, const query::QueryPtr& query_
// result.row_num_ = job->vector_count();
// result.result_ids_ = job->GetResultIds();
// result.result_distances_ = job->GetResultDistances();
//
// // step 4: get entities by result ids
// STATUS_CHECK(GetEntityByID(collection_name, result.result_ids_, field_names, result.vectors_, result.attrs_));
//
// // step 5: filter entities by field names
// // std::vector<engine::AttrsData> filter_attrs;
// // for (auto attr : result.attrs_) {
// // AttrsData attrs_data;
// // attrs_data.attr_type_ = attr.attr_type_;
// // attrs_data.attr_count_ = attr.attr_count_;
// // attrs_data.id_array_ = attr.id_array_;
// // for (auto& name : field_names) {
// // if (attr.attr_data_.find(name) != attr.attr_data_.end()) {
// // attrs_data.attr_data_.insert(std::make_pair(name, attr.attr_data_.at(name)));
// // }
// // }
// // filter_attrs.emplace_back(attrs_data);
// // }
// //
// // result.attrs_ = filter_attrs;
// step 4: get entities by result ids
// STATUS_CHECK(GetEntityByID(collection_name, result.result_ids_, field_names, result.vectors_, result.attrs_));
// step 5: filter entities by field names
// std::vector<engine::AttrsData> filter_attrs;
// for (auto attr : result.attrs_) {
// AttrsData attrs_data;
// attrs_data.attr_type_ = attr.attr_type_;
// attrs_data.attr_count_ = attr.attr_count_;
// attrs_data.id_array_ = attr.id_array_;
// for (auto& name : field_names) {
// if (attr.attr_data_.find(name) != attr.attr_data_.end()) {
// attrs_data.attr_data_.insert(std::make_pair(name, attr.attr_data_.at(name)));
// }
// }
// filter_attrs.emplace_back(attrs_data);
// }
rc.ElapseFromBegin("Engine query totally cost");
query_ctx->GetTraceContext()->GetSpan()->Finish();
// tracer.Context()->GetTraceContext()->GetSpan()->Finish();
return Status::OK();
}
......
......@@ -18,7 +18,6 @@
#include "db/snapshot/Snapshot.h"
#include "knowhere/index/vector_index/helpers/IndexParameter.h"
#include "segment/SSSegmentReader.h"
#include "utils/StringHelpFunctions.h"
#include <unordered_map>
#include <utility>
......@@ -169,28 +168,6 @@ GetEntityByIdSegmentHandler::Handle(const snapshot::SegmentPtr& segment) {
}
///////////////////////////////////////////////////////////////////////////////
HybridQueryHelperSegmentHandler::HybridQueryHelperSegmentHandler(const server::ContextPtr& context,
engine::snapshot::ScopedSnapshotT ss,
const std::vector<std::string>& partition_patterns)
: BaseT(ss), context_(context), partition_patterns_(partition_patterns), segments_() {
}
Status
HybridQueryHelperSegmentHandler::Handle(const snapshot::SegmentPtr& segment) {
if (partition_patterns_.empty()) {
segments_.push_back(segment);
} else {
auto p_id = segment->GetPartitionId();
auto p_ptr = ss_->GetResource<snapshot::Partition>(p_id);
auto& p_name = p_ptr->GetName();
for (auto& pattern : partition_patterns_) {
if (StringHelpFunctions::IsRegexMatch(p_name, pattern)) {
segments_.push_back(segment);
break;
}
}
}
return Status::OK();
}
} // namespace engine
} // namespace milvus
......@@ -81,19 +81,6 @@ struct GetEntityByIdSegmentHandler : public snapshot::IterateHandler<snapshot::S
};
///////////////////////////////////////////////////////////////////////////////
struct HybridQueryHelperSegmentHandler : public snapshot::IterateHandler<snapshot::Segment> {
using ResourceT = snapshot::Segment;
using BaseT = snapshot::IterateHandler<ResourceT>;
HybridQueryHelperSegmentHandler(const server::ContextPtr& context, snapshot::ScopedSnapshotT ss,
const std::vector<std::string>& partition_patterns);
Status
Handle(const typename ResourceT::Ptr&) override;
const server::ContextPtr context_;
const std::vector<std::string> partition_patterns_;
std::vector<snapshot::SegmentPtr> segments_;
};
} // namespace engine
} // namespace milvus
......@@ -10,9 +10,14 @@
// or implied. See the License for the specific language governing permissions and limitations under the License.
#include "scheduler/TaskCreator.h"
#include "SchedInst.h"
#include "tasklabel/BroadcastLabel.h"
#include "tasklabel/SpecResLabel.h"
#include "scheduler/SchedInst.h"
#include "scheduler/task/BuildIndexTask.h"
#include "scheduler/task/DeleteTask.h"
#include "scheduler/task/SSBuildIndexTask.h"
#include "scheduler/task/SSSearchTask.h"
#include "scheduler/task/SearchTask.h"
#include "scheduler/tasklabel/BroadcastLabel.h"
#include "scheduler/tasklabel/SpecResLabel.h"
namespace milvus {
namespace scheduler {
......@@ -29,6 +34,12 @@ TaskCreator::Create(const JobPtr& job) {
case JobType::BUILD: {
return Create(std::static_pointer_cast<BuildIndexJob>(job));
}
case JobType::SS_SEARCH: {
return Create(std::static_pointer_cast<SSSearchJob>(job));
}
case JobType::SS_BUILD: {
return Create(std::static_pointer_cast<SSBuildIndexJob>(job));
}
default: {
// TODO(wxyu): error
return std::vector<TaskPtr>();
......@@ -70,5 +81,27 @@ TaskCreator::Create(const BuildIndexJobPtr& job) {
return tasks;
}
std::vector<TaskPtr>
TaskCreator::Create(const SSSearchJobPtr& job) {
std::vector<TaskPtr> tasks;
for (auto& sv : job->segment_visitor_map()) {
auto task = std::make_shared<XSSSearchTask>(job->GetContext(), sv.second, nullptr);
task->job_ = job;
tasks.emplace_back(task);
}
return tasks;
}
std::vector<TaskPtr>
TaskCreator::Create(const SSBuildIndexJobPtr& job) {
std::vector<TaskPtr> tasks;
for (auto& sv : job->segment_visitor_map()) {
auto task = std::make_shared<XSSBuildIndexTask>(sv.second, nullptr);
task->job_ = job;
tasks.emplace_back(task);
}
return tasks;
}
} // namespace scheduler
} // namespace milvus
......@@ -21,12 +21,11 @@
#include <unordered_map>
#include <vector>
#include "job/BuildIndexJob.h"
#include "job/DeleteJob.h"
#include "job/Job.h"
#include "job/SSBuildIndexJob.h"
#include "job/SSSearchJob.h"
#include "job/SearchJob.h"
#include "task/BuildIndexTask.h"
#include "task/DeleteTask.h"
#include "task/SearchTask.h"
#include "task/Task.h"
namespace milvus {
......@@ -46,6 +45,12 @@ class TaskCreator {
static std::vector<TaskPtr>
Create(const BuildIndexJobPtr& job);
static std::vector<TaskPtr>
Create(const SSSearchJobPtr& job);
static std::vector<TaskPtr>
Create(const SSBuildIndexJobPtr& job);
};
} // namespace scheduler
......
......@@ -21,21 +21,26 @@
#include <unordered_map>
#include <vector>
#include "db/SnapshotVisitor.h"
#include "db/snapshot/ResourceTypes.h"
#include "scheduler/interface/interfaces.h"
#include "server/context/Context.h"
namespace milvus {
namespace scheduler {
enum class JobType {
INVALID,
SEARCH,
DELETE,
BUILD,
INVALID = -1,
SEARCH = 0,
DELETE = 1,
BUILD = 2,
SS_SEARCH = 10,
SS_BUILD = 11,
};
using JobId = std::uint64_t;
using SegmentVisitorMap = std::unordered_map<engine::snapshot::ID_TYPE, engine::SegmentVisitorPtr>;
class Job : public interface::dumpable {
public:
......
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
#include "scheduler/job/SSBuildIndexJob.h"
#include <utility>
#include "utils/Log.h"
namespace milvus {
namespace scheduler {
SSBuildIndexJob::SSBuildIndexJob(engine::DBOptions options) : Job(JobType::SS_BUILD), options_(std::move(options)) {
SetIdentity("SSBuildIndexJob");
AddCacheInsertDataListener();
}
void
SSBuildIndexJob::AddSegmentVisitor(const engine::SegmentVisitorPtr& visitor) {
if (visitor != nullptr) {
segment_visitor_map_[visitor->GetSegment()->GetID()] = visitor;
}
}
void
SSBuildIndexJob::WaitBuildIndexFinish() {
std::unique_lock<std::mutex> lock(mutex_);
cv_.wait(lock, [this] { return segment_visitor_map_.empty(); });
LOG_SERVER_DEBUG_ << LogOut("[%s][%ld] BuildIndexJob %ld all done", "build index", 0, id());
}
void
SSBuildIndexJob::BuildIndexDone(const engine::snapshot::ID_TYPE seg_id) {
std::unique_lock<std::mutex> lock(mutex_);
segment_visitor_map_.erase(seg_id);
cv_.notify_all();
LOG_SERVER_DEBUG_ << LogOut("[%s][%ld] BuildIndexJob %ld finish segment: %ld", "build index", 0, id(), seg_id);
}
json
SSBuildIndexJob::Dump() const {
json ret{
{"number_of_to_index_file", segment_visitor_map_.size()},
};
auto base = Job::Dump();
ret.insert(base.begin(), base.end());
return ret;
}
void
SSBuildIndexJob::OnCacheInsertDataChanged(bool value) {
options_.insert_cache_immediately_ = value;
}
} // namespace scheduler
} // namespace milvus
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
#pragma once
#include <condition_variable>
#include <deque>
#include <list>
#include <memory>
#include <mutex>
#include <queue>
#include <string>
#include <thread>
#include <unordered_map>
#include <vector>
#include "config/handler/CacheConfigHandler.h"
//#include "db/meta/Meta.h"
#include "scheduler/Definition.h"
#include "scheduler/job/Job.h"
namespace milvus {
namespace scheduler {
// using engine::meta::SegmentSchemaPtr;
// using Id2ToIndexMap = std::unordered_map<size_t, SegmentSchemaPtr>;
// using Id2ToTableFileMap = std::unordered_map<size_t, SegmentSchema>;
class SSBuildIndexJob : public Job, public server::CacheConfigHandler {
public:
explicit SSBuildIndexJob(engine::DBOptions options);
~SSBuildIndexJob() = default;
public:
// bool
// AddToIndexFiles(const SegmentSchemaPtr& to_index_file);
void
AddSegmentVisitor(const engine::SegmentVisitorPtr& visitor);
void
WaitBuildIndexFinish();
void
BuildIndexDone(const engine::snapshot::ID_TYPE seg_id);
json
Dump() const override;
public:
Status&
GetStatus() {
return status_;
}
// Id2ToIndexMap&
// to_index_files() {
// return to_index_files_;
// }
// engine::meta::MetaPtr
// meta() const {
// return meta_ptr_;
// }
const SegmentVisitorMap&
segment_visitor_map() {
return segment_visitor_map_;
}
engine::DBOptions
options() const {
return options_;
}
protected:
void
OnCacheInsertDataChanged(bool value) override;
private:
// Id2ToIndexMap to_index_files_;
// engine::meta::MetaPtr meta_ptr_;
engine::DBOptions options_;
SegmentVisitorMap segment_visitor_map_;
Status status_;
std::mutex mutex_;
std::condition_variable cv_;
};
using SSBuildIndexJobPtr = std::shared_ptr<SSBuildIndexJob>;
} // namespace scheduler
} // namespace milvus
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
#include "scheduler/job/SSSearchJob.h"
#include "utils/Log.h"
namespace milvus {
namespace scheduler {
SSSearchJob::SSSearchJob(const server::ContextPtr& context, int64_t topk, const milvus::json& extra_params,
engine::VectorsData& vectors)
: Job(JobType::SS_SEARCH), context_(context), topk_(topk), extra_params_(extra_params), vectors_(vectors) {
}
SSSearchJob::SSSearchJob(const server::ContextPtr& context, milvus::query::GeneralQueryPtr general_query,
query::QueryPtr query_ptr,
std::unordered_map<std::string, engine::meta::hybrid::DataType>& attr_type,
engine::VectorsData& vectors)
: Job(JobType::SS_SEARCH),
context_(context),
general_query_(general_query),
query_ptr_(query_ptr),
attr_type_(attr_type),
vectors_(vectors) {
}
void
SSSearchJob::AddSegmentVisitor(const engine::SegmentVisitorPtr& visitor) {
if (visitor != nullptr) {
segment_visitor_map_[visitor->GetSegment()->GetID()] = visitor;
}
}
void
SSSearchJob::WaitResult() {
std::unique_lock<std::mutex> lock(mutex_);
cv_.wait(lock, [this] { return segment_visitor_map_.empty(); });
// LOG_SERVER_DEBUG_ << LogOut("[%s][%ld] SearchJob %ld: query_time %f, map_uids_time %f, reduce_time %f",
// "search", 0,
// id(), this->time_stat().query_time, this->time_stat().map_uids_time,
// this->time_stat().reduce_time);
LOG_SERVER_DEBUG_ << LogOut("[%s][%ld] SearchJob %ld all done", "search", 0, id());
}
void
SSSearchJob::SearchDone(const engine::snapshot::ID_TYPE seg_id) {
std::unique_lock<std::mutex> lock(mutex_);
segment_visitor_map_.erase(seg_id);
if (segment_visitor_map_.empty()) {
cv_.notify_all();
}
LOG_SERVER_DEBUG_ << LogOut("[%s][%ld] SearchJob %ld finish segment: %ld", "search", 0, id(), seg_id);
}
ResultIds&
SSSearchJob::GetResultIds() {
return result_ids_;
}
ResultDistances&
SSSearchJob::GetResultDistances() {
return result_distances_;
}
Status&
SSSearchJob::GetStatus() {
return status_;
}
json
SSSearchJob::Dump() const {
json ret{
{"topk", topk_},
{"nq", vectors_.vector_count_},
{"extra_params", extra_params_.dump()},
};
auto base = Job::Dump();
ret.insert(base.begin(), base.end());
return ret;
}
const std::shared_ptr<server::Context>&
SSSearchJob::GetContext() const {
return context_;
}
} // namespace scheduler
} // namespace milvus
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
#pragma once
#include <condition_variable>
#include <deque>
#include <list>
#include <memory>
#include <mutex>
#include <queue>
#include <string>
#include <thread>
#include <unordered_map>
#include <utility>
#include <vector>
#include "Job.h"
#include "db/SnapshotVisitor.h"
#include "db/Types.h"
#include "db/meta/MetaTypes.h"
#include "query/GeneralQuery.h"
#include "server/context/Context.h"
namespace milvus {
namespace scheduler {
using engine::meta::SegmentSchemaPtr;
using Id2IndexMap = std::unordered_map<size_t, SegmentSchemaPtr>;
using ResultIds = engine::ResultIds;
using ResultDistances = engine::ResultDistances;
// struct SearchTimeStat {
// double query_time = 0.0;
// double map_uids_time = 0.0;
// double reduce_time = 0.0;
//};
class SSSearchJob : public Job {
public:
SSSearchJob(const server::ContextPtr& context, int64_t topk, const milvus::json& extra_params,
engine::VectorsData& vectors);
SSSearchJob(const server::ContextPtr& context, query::GeneralQueryPtr general_query, query::QueryPtr query_ptr,
std::unordered_map<std::string, engine::meta::hybrid::DataType>& attr_type,
engine::VectorsData& vectorsData);
public:
void
AddSegmentVisitor(const engine::SegmentVisitorPtr& visitor);
void
WaitResult();
void
SearchDone(const engine::snapshot::ID_TYPE seg_id);
ResultIds&
GetResultIds();
ResultDistances&
GetResultDistances();
void
SetVectors(engine::VectorsData& vectors) {
vectors_ = vectors;
}
Status&
GetStatus();
json
Dump() const override;
public:
const server::ContextPtr&
GetContext() const;
int64_t
topk() {
return topk_;
}
int64_t
nq() const {
return vectors_.vector_count_;
}
const milvus::json&
extra_params() const {
return extra_params_;
}
const engine::VectorsData&
vectors() const {
return vectors_;
}
const SegmentVisitorMap&
segment_visitor_map() {
return segment_visitor_map_;
}
std::mutex&
mutex() {
return mutex_;
}
query::GeneralQueryPtr
general_query() {
return general_query_;
}
query::QueryPtr
query_ptr() {
return query_ptr_;
}
std::unordered_map<std::string, engine::meta::hybrid::DataType>&
attr_type() {
return attr_type_;
}
int64_t
vector_count() {
return vector_count_;
}
// SearchTimeStat&
// time_stat() {
// return time_stat_;
// }
private:
const server::ContextPtr context_;
int64_t topk_ = 0;
milvus::json extra_params_;
// TODO: smart pointer
engine::VectorsData& vectors_;
SegmentVisitorMap segment_visitor_map_;
// TODO: column-base better ?
ResultIds result_ids_;
ResultDistances result_distances_;
Status status_;
query::GeneralQueryPtr general_query_;
query::QueryPtr query_ptr_;
std::unordered_map<std::string, engine::meta::hybrid::DataType> attr_type_;
int64_t vector_count_;
std::mutex mutex_;
std::condition_variable cv_;
// SearchTimeStat time_stat_;
};
using SSSearchJobPtr = std::shared_ptr<SSSearchJob>;
} // namespace scheduler
} // namespace milvus
......@@ -23,9 +23,9 @@
#include <vector>
#include "Job.h"
#include "db/SnapshotVisitor.h"
#include "db/Types.h"
#include "db/meta/MetaTypes.h"
#include "query/GeneralQuery.h"
#include "server/context/Context.h"
......
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
#include "scheduler/task/SSBuildIndexTask.h"
#include <fiu-local.h>
#include <memory>
#include <string>
#include <thread>
#include <utility>
#include "db/Utils.h"
#include "db/engine/EngineFactory.h"
#include "metrics/Metrics.h"
#include "scheduler/job/SSBuildIndexJob.h"
#include "utils/CommonUtil.h"
#include "utils/Exception.h"
#include "utils/Log.h"
#include "utils/TimeRecorder.h"
namespace milvus {
namespace scheduler {
XSSBuildIndexTask::XSSBuildIndexTask(const engine::SegmentVisitorPtr& visitor, TaskLabelPtr label)
: Task(TaskType::BuildIndexTask, std::move(label)), visitor_(visitor) {
// if (file_) {
// EngineType engine_type;
// if (file->file_type_ == SegmentSchema::FILE_TYPE::RAW ||
// file->file_type_ == SegmentSchema::FILE_TYPE::TO_INDEX ||
// file->file_type_ == SegmentSchema::FILE_TYPE::BACKUP) {
// engine_type = engine::utils::IsBinaryMetricType(file->metric_type_) ? EngineType::FAISS_BIN_IDMAP
// : EngineType::FAISS_IDMAP;
// } else {
// engine_type = (EngineType)file->engine_type_;
// }
//
// auto json = milvus::json::parse(file_->index_params_);
// to_index_engine_ = EngineFactory::Build(file_->dimension_, file_->location_, engine_type,
// (MetricType)file_->metric_type_, json);
// }
}
void
XSSBuildIndexTask::Load(milvus::scheduler::LoadType type, uint8_t device_id) {
TimeRecorder rc("XSSBuildIndexTask::Load");
Status stat = Status::OK();
std::string error_msg;
std::string type_str;
if (auto job = job_.lock()) {
auto build_index_job = std::static_pointer_cast<scheduler::SSBuildIndexJob>(job);
auto options = build_index_job->options();
try {
if (type == LoadType::DISK2CPU) {
stat = to_index_engine_->Load(options.insert_cache_immediately_);
type_str = "DISK2CPU";
} else if (type == LoadType::CPU2GPU) {
stat = to_index_engine_->CopyToIndexFileToGpu(device_id);
type_str = "CPU2GPU:" + std::to_string(device_id);
} else {
error_msg = "Wrong load type";
stat = Status(SERVER_UNEXPECTED_ERROR, error_msg);
}
fiu_do_on("XSSBuildIndexTask.Load.throw_std_exception", throw std::exception());
} catch (std::exception& ex) {
// typical error: out of disk space or permission denied
error_msg = "Failed to load to_index file: " + std::string(ex.what());
LOG_ENGINE_ERROR_ << error_msg;
stat = Status(SERVER_UNEXPECTED_ERROR, error_msg);
}
fiu_do_on("XSSBuildIndexTask.Load.out_of_memory", stat = Status(SERVER_UNEXPECTED_ERROR, "out of memory"));
if (!stat.ok()) {
Status s;
if (stat.ToString().find("out of memory") != std::string::npos) {
error_msg = "out of memory: " + type_str;
s = Status(SERVER_UNEXPECTED_ERROR, error_msg);
} else {
error_msg = "Failed to load to_index file: " + type_str;
s = Status(SERVER_UNEXPECTED_ERROR, error_msg);
}
LOG_ENGINE_ERROR_ << s.message();
if (auto job = job_.lock()) {
auto build_index_job = std::static_pointer_cast<scheduler::SSBuildIndexJob>(job);
build_index_job->GetStatus() = s;
build_index_job->BuildIndexDone(visitor_->GetSegment()->GetID());
}
return;
}
// size_t file_size = to_index_engine_->Size();
//
// std::string info = "Build index task load file id:" + std::to_string(file_->id_) + " " + type_str +
// " file type:" + std::to_string(file_->file_type_) + " size:" +
// std::to_string(file_size) + " bytes from location: " + file_->location_ + " totally
// cost";
// rc.ElapseFromBegin(info);
//
// to_index_id_ = file_->id_;
// to_index_type_ = file_->file_type_;
}
}
void
XSSBuildIndexTask::Execute() {
auto seg_id = visitor_->GetSegment()->GetID();
TimeRecorderAuto rc("XSSBuildIndexTask::Execute " + std::to_string(seg_id));
if (auto job = job_.lock()) {
auto build_index_job = std::static_pointer_cast<scheduler::SSBuildIndexJob>(job);
if (to_index_engine_ == nullptr) {
build_index_job->BuildIndexDone(seg_id);
build_index_job->GetStatus() = Status(DB_ERROR, "source index is null");
return;
}
// std::string location = file_->location_;
// std::shared_ptr<engine::ExecutionEngine> index;
//
// // step 1: create collection file
// engine::meta::SegmentSchema table_file;
// table_file.collection_id_ = file_->collection_id_;
// table_file.segment_id_ = file_->file_id_;
// table_file.date_ = file_->date_;
// table_file.file_type_ = engine::meta::SegmentSchema::NEW_INDEX;
//
// engine::meta::MetaPtr meta_ptr = build_index_job->meta();
// Status status = meta_ptr->CreateCollectionFile(table_file);
//
// fiu_do_on("XSSBuildIndexTask.Execute.create_table_success", status = Status::OK());
// if (!status.ok()) {
// LOG_ENGINE_ERROR_ << "Failed to create collection file: " << status.ToString();
// build_index_job->BuildIndexDone(to_index_id_);
// build_index_job->GetStatus() = status;
// to_index_engine_ = nullptr;
// return;
// }
//
// auto failed_build_index = [&](std::string log_msg, std::string err_msg) {
// table_file.file_type_ = engine::meta::SegmentSchema::TO_DELETE;
// status = meta_ptr->UpdateCollectionFile(table_file);
// LOG_ENGINE_ERROR_ << log_msg;
//
// build_index_job->BuildIndexDone(to_index_id_);
// build_index_job->GetStatus() = Status(DB_ERROR, err_msg);
// to_index_engine_ = nullptr;
// };
//
// // step 2: build index
// try {
// LOG_ENGINE_DEBUG_ << "Begin build index for file:" + table_file.location_;
// index = to_index_engine_->BuildIndex(table_file.location_, (EngineType)table_file.engine_type_);
// fiu_do_on("XSSBuildIndexTask.Execute.build_index_fail", index = nullptr);
// if (index == nullptr) {
// std::string log_msg = "Failed to build index " + table_file.file_id_ + ", reason: source index
// is null"; failed_build_index(log_msg, "source index is null"); return;
// }
// } catch (std::exception& ex) {
// std::string msg = "Failed to build index " + table_file.file_id_ + ", reason: " +
// std::string(ex.what()); failed_build_index(msg, ex.what()); return;
// }
//
// // step 3: if collection has been deleted, dont save index file
// bool has_collection = false;
// meta_ptr->HasCollection(file_->collection_id_, has_collection);
// fiu_do_on("XSSBuildIndexTask.Execute.has_collection", has_collection = true);
//
// if (!has_collection) {
// std::string msg = "Failed to build index " + table_file.file_id_ + ", reason: collection has been
// deleted"; failed_build_index(msg, "Collection has been deleted"); return;
// }
//
// // step 4: save index file
// try {
// fiu_do_on("XSSBuildIndexTask.Execute.throw_std_exception", throw std::exception());
// status = index->Serialize();
// if (!status.ok()) {
// std::string msg =
// "Failed to persist index file: " + table_file.location_ + ", reason: " + status.message();
// failed_build_index(msg, status.message());
// return;
// }
// } catch (std::exception& ex) {
// // if failed to serialize index file to disk
// // typical error: out of disk space, out of memory or permition denied
// std::string msg =
// "Failed to persist index file:" + table_file.location_ + ", exception:" +
// std::string(ex.what());
// failed_build_index(msg, ex.what());
// return;
// }
//
// // step 5: update meta
// table_file.file_type_ = engine::meta::SegmentSchema::INDEX;
// table_file.file_size_ = CommonUtil::GetFileSize(table_file.location_);
// table_file.row_count_ = file_->row_count_; // index->Count();
//
// auto origin_file = *file_;
// origin_file.file_type_ = engine::meta::SegmentSchema::BACKUP;
//
// engine::meta::SegmentsSchema update_files = {table_file, origin_file};
//
// if (status.ok()) { // makesure index file is sucessfully serialized to disk
// status = meta_ptr->UpdateCollectionFiles(update_files);
// }
//
// fiu_do_on("XSSBuildIndexTask.Execute.update_table_file_fail", status = Status(SERVER_UNEXPECTED_ERROR,
// "")); if (status.ok()) {
// LOG_ENGINE_DEBUG_ << "New index file " << table_file.file_id_ << " of size " <<
// table_file.file_size_
// << " bytes"
// << " from file " << origin_file.file_id_;
// // XXX_Index_NM doesn't support it now.
// // if (build_index_job->options().insert_cache_immediately_) {
// // index->Cache();
// // }
// } else {
// // failed to update meta, mark the new file as to_delete, don't delete old file
// origin_file.file_type_ = engine::meta::SegmentSchema::TO_INDEX;
// status = meta_ptr->UpdateCollectionFile(origin_file);
// LOG_ENGINE_DEBUG_ << "Failed to update file to index, mark file: " << origin_file.file_id_
// << " to to_index";
//
// table_file.file_type_ = engine::meta::SegmentSchema::TO_DELETE;
// status = meta_ptr->UpdateCollectionFile(table_file);
// LOG_ENGINE_DEBUG_ << "Failed to up date file to index, mark file: " << table_file.file_id_
// << " to to_delete";
// }
//
// build_index_job->BuildIndexDone(to_index_id_);
}
to_index_engine_ = nullptr;
}
} // namespace scheduler
} // namespace milvus
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
#pragma once
#include "db/SnapshotVisitor.h"
#include "scheduler/Definition.h"
#include "scheduler/job/SSBuildIndexJob.h"
#include "scheduler/task/Task.h"
namespace milvus {
namespace scheduler {
class XSSBuildIndexTask : public Task {
public:
explicit XSSBuildIndexTask(const engine::SegmentVisitorPtr& visitor, TaskLabelPtr label);
void
Load(LoadType type, uint8_t device_id) override;
void
Execute() override;
public:
engine::SegmentVisitorPtr visitor_;
// SegmentSchemaPtr file_;
// SegmentSchema table_file_;
// size_t to_index_id_ = 0;
int to_index_type_ = 0;
ExecutionEnginePtr to_index_engine_ = nullptr;
};
} // namespace scheduler
} // namespace milvus
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
#include "scheduler/task/SearchTask.h"
#include <fiu-local.h>
#include <algorithm>
#include <memory>
#include <string>
#include <thread>
#include <unordered_map>
#include <utility>
#include "db/Utils.h"
#include "db/engine/EngineFactory.h"
#include "metrics/Metrics.h"
#include "scheduler/SchedInst.h"
#include "scheduler/job/SSSearchJob.h"
#include "scheduler/task/SSSearchTask.h"
#include "segment/SegmentReader.h"
#include "utils/CommonUtil.h"
#include "utils/Log.h"
#include "utils/TimeRecorder.h"
namespace milvus {
namespace scheduler {
// void
// CollectFileMetrics(int file_type, size_t file_size) {
// server::MetricsBase& inst = server::Metrics::GetInstance();
// switch (file_type) {
// case SegmentSchema::RAW:
// case SegmentSchema::TO_INDEX: {
// inst.RawFileSizeHistogramObserve(file_size);
// inst.RawFileSizeTotalIncrement(file_size);
// inst.RawFileSizeGaugeSet(file_size);
// break;
// }
// default: {
// inst.IndexFileSizeHistogramObserve(file_size);
// inst.IndexFileSizeTotalIncrement(file_size);
// inst.IndexFileSizeGaugeSet(file_size);
// break;
// }
// }
//}
XSSSearchTask::XSSSearchTask(const server::ContextPtr& context, const engine::SegmentVisitorPtr& visitor,
TaskLabelPtr label)
: Task(TaskType::SearchTask, std::move(label)), context_(context), visitor_(visitor) {
// if (file_) {
// // distance -- value 0 means two vectors equal, ascending reduce, L2/HAMMING/JACCARD/TONIMOTO ...
// // similarity -- infinity value means two vectors equal, descending reduce, IP
// if (file_->metric_type_ == static_cast<int>(MetricType::IP) &&
// file_->engine_type_ != static_cast<int>(EngineType::FAISS_PQ)) {
// ascending_reduce = false;
// }
//
// EngineType engine_type;
// if (file->file_type_ == SegmentSchema::FILE_TYPE::RAW ||
// file->file_type_ == SegmentSchema::FILE_TYPE::TO_INDEX ||
// file->file_type_ == SegmentSchema::FILE_TYPE::BACKUP) {
// engine_type = engine::utils::IsBinaryMetricType(file->metric_type_) ? EngineType::FAISS_BIN_IDMAP
// : EngineType::FAISS_IDMAP;
// } else {
// engine_type = (EngineType)file->engine_type_;
// }
//
// milvus::json json_params;
// if (!file_->index_params_.empty()) {
// json_params = milvus::json::parse(file_->index_params_);
// }
// index_engine_ = EngineFactory::Build(file_->dimension_, file_->location_, engine_type,
// (MetricType)file_->metric_type_, json_params);
// }
}
void
XSSSearchTask::Load(LoadType type, uint8_t device_id) {
// milvus::server::ContextFollower tracer(context_, "XSearchTask::Load " + std::to_string(file_->id_));
TimeRecorder rc(LogOut("[%s][%ld]", "search", 0));
Status stat = Status::OK();
std::string error_msg;
std::string type_str;
try {
fiu_do_on("XSearchTask.Load.throw_std_exception", throw std::exception());
if (type == LoadType::DISK2CPU) {
// stat = index_engine_->Load();
// stat = index_engine_->LoadAttr();
type_str = "DISK2CPU";
} else if (type == LoadType::CPU2GPU) {
// bool hybrid = false;
// if (index_engine_->IndexEngineType() == engine::EngineType::FAISS_IVFSQ8H) {
// hybrid = true;
// }
// stat = index_engine_->CopyToGpu(device_id, hybrid);
type_str = "CPU2GPU" + std::to_string(device_id);
} else if (type == LoadType::GPU2CPU) {
// stat = index_engine_->CopyToCpu();
type_str = "GPU2CPU";
} else {
error_msg = "Wrong load type";
stat = Status(SERVER_UNEXPECTED_ERROR, error_msg);
}
} catch (std::exception& ex) {
// typical error: out of disk space or permition denied
error_msg = "Failed to load index file: " + std::string(ex.what());
LOG_ENGINE_ERROR_ << LogOut("[%s][%ld] Encounter execption: %s", "search", 0, error_msg.c_str());
stat = Status(SERVER_UNEXPECTED_ERROR, error_msg);
}
fiu_do_on("XSearchTask.Load.out_of_memory", stat = Status(SERVER_UNEXPECTED_ERROR, "out of memory"));
if (!stat.ok()) {
Status s;
if (stat.ToString().find("out of memory") != std::string::npos) {
error_msg = "out of memory: " + type_str + " : " + stat.message();
s = Status(SERVER_OUT_OF_MEMORY, error_msg);
} else {
error_msg = "Failed to load index file: " + type_str + " : " + stat.message();
s = Status(SERVER_UNEXPECTED_ERROR, error_msg);
}
if (auto job = job_.lock()) {
auto search_job = std::static_pointer_cast<scheduler::SSSearchJob>(job);
search_job->SearchDone(visitor_->GetSegment()->GetID());
search_job->GetStatus() = s;
}
return;
}
// size_t file_size = index_engine_->Size();
// std::string info = "Search task load file id:" + std::to_string(file_->id_) + " " + type_str +
// " file type:" + std::to_string(file_->file_type_) + " size:" + std::to_string(file_size) +
// " bytes from location: " + file_->location_ + " totally cost";
// rc.ElapseFromBegin(info);
//
// CollectFileMetrics(file_->file_type_, file_size);
//
// // step 2: return search task for later execution
// index_id_ = file_->id_;
// index_type_ = file_->file_type_;
// search_contexts_.swap(search_contexts_);
}
void
XSSSearchTask::Execute() {
auto seg_id = visitor_->GetSegment()->GetID();
milvus::server::ContextFollower tracer(context_, "XSearchTask::Execute " + std::to_string(seg_id));
TimeRecorder rc(LogOut("[%s][%ld] DoSearch file id:%ld", "search", 0, seg_id));
server::CollectDurationMetrics metrics(index_type_);
std::vector<int64_t> output_ids;
std::vector<float> output_distance;
double span;
if (auto job = job_.lock()) {
auto search_job = std::static_pointer_cast<scheduler::SSSearchJob>(job);
if (index_engine_ == nullptr) {
search_job->SearchDone(seg_id);
return;
}
/* step 1: allocate memory */
query::GeneralQueryPtr general_query = search_job->general_query();
uint64_t nq = search_job->nq();
uint64_t topk = search_job->topk();
fiu_do_on("XSearchTask.Execute.throw_std_exception", throw std::exception());
// try {
// /* step 2: search */
// bool hybrid = false;
// if (index_engine_->IndexEngineType() == engine::EngineType::FAISS_IVFSQ8H &&
// ResMgrInst::GetInstance()->GetResource(path().Last())->type() == ResourceType::CPU) {
// hybrid = true;
// }
// Status s;
// if (general_query != nullptr) {
// std::unordered_map<std::string, DataType> types;
// auto attr_type = search_job->attr_type();
// auto type_it = attr_type.begin();
// for (; type_it != attr_type.end(); type_it++) {
// types.insert(std::make_pair(type_it->first, (DataType)(type_it->second)));
// }
//
// auto query_ptr = search_job->query_ptr();
//
// s = index_engine_->HybridSearch(search_job, types, output_distance, output_ids, hybrid);
// auto vector_query = query_ptr->vectors.begin()->second;
// topk = vector_query->topk;
// nq = vector_query->query_vector.float_data.size() / file_->dimension_;
// search_job->vector_count() = nq;
// } else {
// s = index_engine_->Search(output_ids, output_distance, search_job, hybrid);
// }
//
// fiu_do_on("XSearchTask.Execute.search_fail", s = Status(SERVER_UNEXPECTED_ERROR, ""));
// if (!s.ok()) {
// search_job->GetStatus() = s;
// search_job->SearchDone(index_id_);
// return;
// }
//
// span = rc.RecordSection("search done");
//
// /* step 3: pick up topk result */
// auto spec_k = file_->row_count_ < topk ? file_->row_count_ : topk;
// if (spec_k == 0) {
// LOG_ENGINE_WARNING_ << LogOut("[%s][%ld] Searching in an empty file. file location = %s",
// "search", 0,
// file_->location_.c_str());
// } else {
// std::unique_lock<std::mutex> lock(search_job->mutex());
// XSearchTask::MergeTopkToResultSet(output_ids, output_distance, spec_k, nq, topk,
// ascending_reduce,
// search_job->GetResultIds(),
// search_job->GetResultDistances());
// }
//
// span = rc.RecordSection("reduce topk done");
// search_job->time_stat().reduce_time += span / 1000;
// } catch (std::exception& ex) {
// LOG_ENGINE_ERROR_ << LogOut("[%s][%ld] SearchTask encounter exception: %s", "search", 0,
// ex.what()); search_job->GetStatus() = Status(SERVER_UNEXPECTED_ERROR, ex.what());
// }
/* step 4: notify to send result to client */
search_job->SearchDone(seg_id);
}
rc.ElapseFromBegin("totally cost");
// release index in resource
index_engine_ = nullptr;
}
void
XSSSearchTask::MergeTopkToResultSet(const scheduler::ResultIds& src_ids,
const scheduler::ResultDistances& src_distances, size_t src_k, size_t nq,
size_t topk, bool ascending, scheduler::ResultIds& tar_ids,
scheduler::ResultDistances& tar_distances) {
if (src_ids.empty()) {
LOG_ENGINE_DEBUG_ << LogOut("[%s][%d] Search result is empty.", "search", 0);
return;
}
size_t tar_k = tar_ids.size() / nq;
size_t buf_k = std::min(topk, src_k + tar_k);
scheduler::ResultIds buf_ids(nq * buf_k, -1);
scheduler::ResultDistances buf_distances(nq * buf_k, 0.0);
for (uint64_t i = 0; i < nq; i++) {
size_t buf_k_j = 0, src_k_j = 0, tar_k_j = 0;
size_t buf_idx, src_idx, tar_idx;
size_t buf_k_multi_i = buf_k * i;
size_t src_k_multi_i = topk * i;
size_t tar_k_multi_i = tar_k * i;
while (buf_k_j < buf_k && src_k_j < src_k && tar_k_j < tar_k) {
src_idx = src_k_multi_i + src_k_j;
tar_idx = tar_k_multi_i + tar_k_j;
buf_idx = buf_k_multi_i + buf_k_j;
if ((tar_ids[tar_idx] == -1) || // initialized value
(ascending && src_distances[src_idx] < tar_distances[tar_idx]) ||
(!ascending && src_distances[src_idx] > tar_distances[tar_idx])) {
buf_ids[buf_idx] = src_ids[src_idx];
buf_distances[buf_idx] = src_distances[src_idx];
src_k_j++;
} else {
buf_ids[buf_idx] = tar_ids[tar_idx];
buf_distances[buf_idx] = tar_distances[tar_idx];
tar_k_j++;
}
buf_k_j++;
}
if (buf_k_j < buf_k) {
if (src_k_j < src_k) {
while (buf_k_j < buf_k && src_k_j < src_k) {
buf_idx = buf_k_multi_i + buf_k_j;
src_idx = src_k_multi_i + src_k_j;
buf_ids[buf_idx] = src_ids[src_idx];
buf_distances[buf_idx] = src_distances[src_idx];
src_k_j++;
buf_k_j++;
}
} else {
while (buf_k_j < buf_k && tar_k_j < tar_k) {
buf_idx = buf_k_multi_i + buf_k_j;
tar_idx = tar_k_multi_i + tar_k_j;
buf_ids[buf_idx] = tar_ids[tar_idx];
buf_distances[buf_idx] = tar_distances[tar_idx];
tar_k_j++;
buf_k_j++;
}
}
}
}
tar_ids.swap(buf_ids);
tar_distances.swap(buf_distances);
}
// const std::string&
// XSSSearchTask::GetLocation() const {
// return file_->location_;
//}
// size_t
// XSSSearchTask::GetIndexId() const {
// return file_->id_;
//}
} // namespace scheduler
} // namespace milvus
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
#pragma once
#include <memory>
#include <string>
#include <vector>
#include "db/SnapshotVisitor.h"
#include "scheduler/Definition.h"
#include "scheduler/job/SSSearchJob.h"
#include "scheduler/task/Task.h"
namespace milvus {
namespace scheduler {
// TODO(wxyu): rewrite
class XSSSearchTask : public Task {
public:
explicit XSSSearchTask(const server::ContextPtr& context, const engine::SegmentVisitorPtr& visitor,
TaskLabelPtr label);
void
Load(LoadType type, uint8_t device_id) override;
void
Execute() override;
public:
static void
MergeTopkToResultSet(const scheduler::ResultIds& src_ids, const scheduler::ResultDistances& src_distances,
size_t src_k, size_t nq, size_t topk, bool ascending, scheduler::ResultIds& tar_ids,
scheduler::ResultDistances& tar_distances);
// const std::string&
// GetLocation() const;
// size_t
// GetIndexId() const;
public:
const server::ContextPtr context_;
engine::SegmentVisitorPtr visitor_;
// size_t index_id_ = 0;
int index_type_ = 0;
ExecutionEnginePtr index_engine_ = nullptr;
// distance -- value 0 means two vectors equal, ascending reduce, L2/HAMMING/JACCARD/TONIMOTO ...
// similarity -- infinity value means two vectors equal, descending reduce, IP
bool ascending_reduce = true;
};
} // namespace scheduler
} // namespace milvus
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
#include <utility>
#include "cache/GpuCacheMgr.h"
#include "scheduler/task/SSTestTask.h"
namespace milvus {
namespace scheduler {
SSTestTask::SSTestTask(const server::ContextPtr& context, const engine::SegmentVisitorPtr& visitor, TaskLabelPtr label)
: XSSSearchTask(context, visitor, std::move(label)) {
}
void
SSTestTask::Load(LoadType type, uint8_t device_id) {
load_count_++;
}
void
SSTestTask::Execute() {
{
std::lock_guard<std::mutex> lock(mutex_);
exec_count_++;
done_ = true;
}
cv_.notify_one();
}
void
SSTestTask::Wait() {
std::unique_lock<std::mutex> lock(mutex_);
cv_.wait(lock, [&] { return done_; });
}
} // namespace scheduler
} // namespace milvus
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
#pragma once
#include <memory>
#include "SSSearchTask.h"
namespace milvus {
namespace scheduler {
class SSTestTask : public XSSSearchTask {
public:
explicit SSTestTask(const server::ContextPtr& context, const engine::SegmentVisitorPtr& visitor,
TaskLabelPtr label);
public:
void
Load(LoadType type, uint8_t device_id) override;
void
Execute() override;
void
Wait();
public:
int64_t load_count_ = 0;
int64_t exec_count_ = 0;
bool done_ = false;
std::mutex mutex_;
std::condition_variable cv_;
};
} // namespace scheduler
} // namespace milvus
......@@ -24,17 +24,17 @@ namespace milvus {
namespace scheduler {
enum class LoadType {
DISK2CPU,
CPU2GPU,
GPU2CPU,
TEST,
DISK2CPU = 0,
CPU2GPU = 1,
GPU2CPU = 2,
TEST = 99,
};
enum class TaskType {
SearchTask,
DeleteTask,
BuildIndexTask,
TestTask,
SearchTask = 0,
DeleteTask = 1,
BuildIndexTask = 2,
TestTask = 99,
};
class Task;
......
......@@ -16,7 +16,10 @@ set(test_files
${CMAKE_CURRENT_SOURCE_DIR}/test_snapshot.cpp
${CMAKE_CURRENT_SOURCE_DIR}/test_segment.cpp
${CMAKE_CURRENT_SOURCE_DIR}/test_db.cpp
${CMAKE_CURRENT_SOURCE_DIR}/test_ss_meta.cpp)
${CMAKE_CURRENT_SOURCE_DIR}/test_ss_meta.cpp
${CMAKE_CURRENT_SOURCE_DIR}/test_ss_job.cpp
${CMAKE_CURRENT_SOURCE_DIR}/test_ss_task.cpp
)
add_executable(test_ssdb
${common_files}
......
......@@ -400,6 +400,58 @@ TEST_F(SSDBTest, VisitorTest) {
std::cout << ss->ToString() << std::endl;
}
TEST_F(SSDBTest, QueryTest) {
LSN_TYPE lsn = 0;
auto next_lsn = [&]() -> decltype(lsn) {
return ++lsn;
};
std::string c1 = "c1";
auto status = CreateCollection(db_, c1, next_lsn());
ASSERT_TRUE(status.ok());
std::stringstream p_name;
auto num = RandomInt(1, 3);
for (auto i = 0; i < num; ++i) {
p_name.str("");
p_name << "partition_" << i;
status = db_->CreatePartition(c1, p_name.str());
ASSERT_TRUE(status.ok());
}
ScopedSnapshotT ss;
status = Snapshots::GetInstance().GetSnapshot(ss, c1);
ASSERT_TRUE(status.ok());
SegmentFileContext sf_context;
SFContextBuilder(sf_context, ss);
auto new_total = 0;
auto &partitions = ss->GetResources<Partition>();
ID_TYPE partition_id;
for (auto &kv : partitions) {
num = RandomInt(1, 3);
auto row_cnt = 100;
for (auto i = 0; i < num; ++i) {
ASSERT_TRUE(CreateSegment(ss, kv.first, next_lsn(), sf_context, row_cnt).ok());
}
new_total += num;
partition_id = kv.first;
}
status = Snapshots::GetInstance().GetSnapshot(ss, c1);
ASSERT_TRUE(status.ok());
milvus::server::ContextPtr ctx1;
std::vector<std::string> partition_patterns;
milvus::query::GeneralQueryPtr general_query;
milvus::query::QueryPtr query_ptr;
std::vector<std::string> field_names;
std::unordered_map<std::string, milvus::engine::meta::hybrid::DataType> attr_type;
milvus::engine::QueryResult result;
//db_->Query(ctx1, c1, partition_patterns, general_query, query_ptr, field_names, attr_type, result);
}
TEST_F(SSDBTest, InsertTest) {
std::string collection_name = "MERGE_TEST";
auto status = CreateCollection2(db_, collection_name, 0);
......
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
#include <gtest/gtest.h>
#include "scheduler/job/SSBuildIndexJob.h"
#include "scheduler/job/SSSearchJob.h"
namespace milvus {
namespace scheduler {
class TestJob : public Job {
public:
TestJob() : Job(JobType::INVALID) {}
};
TEST(SSJobTest, TestJob) {
engine::DBOptions options;
auto build_index_ptr = std::make_shared<SSBuildIndexJob>(options);
build_index_ptr->Dump();
build_index_ptr->AddSegmentVisitor(nullptr);
TestJob test_job;
test_job.Dump();
engine::VectorsData vectors;
auto search_ptr = std::make_shared<SSSearchJob>(nullptr, 1, 1, vectors);
search_ptr->Dump();
search_ptr->AddSegmentVisitor(nullptr);
}
} // namespace scheduler
} // namespace milvus
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
#include <fiu-local.h>
#include <fiu-control.h>
#include <gtest/gtest.h>
#include <opentracing/mocktracer/tracer.h>
//#include "db/meta/SqliteMetaImpl.h"
#include "db/DBFactory.h"
#include "scheduler/SchedInst.h"
#include "scheduler/job/SSBuildIndexJob.h"
#include "scheduler/job/SSSearchJob.h"
#include "scheduler/resource/CpuResource.h"
#include "scheduler/tasklabel/BroadcastLabel.h"
#include "scheduler/task/SSBuildIndexTask.h"
#include "scheduler/task/SSSearchTask.h"
#include "scheduler/task/SSTestTask.h"
namespace milvus {
namespace scheduler {
TEST(SSTaskTest, INVALID_INDEX) {
auto dummy_context = std::make_shared<milvus::server::Context>("dummy_request_id");
opentracing::mocktracer::MockTracerOptions tracer_options;
auto mock_tracer =
std::shared_ptr<opentracing::Tracer>{new opentracing::mocktracer::MockTracer{std::move(tracer_options)}};
auto mock_span = mock_tracer->StartSpan("mock_span");
auto trace_context = std::make_shared<milvus::tracing::TraceContext>(mock_span);
dummy_context->SetTraceContext(trace_context);
auto search_task = std::make_shared<XSSSearchTask>(dummy_context, nullptr, nullptr);
search_task->Load(LoadType::TEST, 10);
auto build_task = std::make_shared<XSSBuildIndexTask>(nullptr, nullptr);
build_task->Load(LoadType::TEST, 10);
// build_task->Execute();
}
TEST(SSTaskTest, TEST_TASK) {
auto dummy_context = std::make_shared<milvus::server::Context>("dummy_request_id");
// auto file = std::make_shared<SegmentSchema>();
// file->index_params_ = "{ \"nlist\": 16384 }";
// file->dimension_ = 64;
auto label = std::make_shared<BroadcastLabel>();
SSTestTask task(dummy_context, nullptr, label);
task.Load(LoadType::CPU2GPU, 0);
auto th = std::thread([&]() {
task.Execute();
});
task.Wait();
if (th.joinable()) {
th.join();
}
// static const char* CONFIG_PATH = "/tmp/milvus_test";
// auto options = milvus::engine::DBFactory::BuildOption();
// options.meta_.path_ = CONFIG_PATH;
// options.meta_.backend_uri_ = "sqlite://:@:/";
// options.insert_cache_immediately_ = true;
//
// file->collection_id_ = "111";
// file->location_ = "/tmp/milvus_test/index_file1.txt";
// auto build_index_job = std::make_shared<milvus::scheduler::SSBuildIndexJob>(options);
// XSSBuildIndexTask build_index_task(nullptr, label);
// build_index_task.job_ = build_index_job;
//
// build_index_task.Load(LoadType::TEST, 0);
//
// fiu_init(0);
// fiu_enable("XBuildIndexTask.Load.throw_std_exception", 1, NULL, 0);
// build_index_task.Load(LoadType::TEST, 0);
// fiu_disable("XBuildIndexTask.Load.throw_std_exception");
//
// fiu_enable("XBuildIndexTask.Load.out_of_memory", 1, NULL, 0);
// build_index_task.Load(LoadType::TEST, 0);
// fiu_disable("XBuildIndexTask.Load.out_of_memory");
//
// build_index_task.Execute();
// // always enable 'create_table_success'
// fiu_enable("XBuildIndexTask.Execute.create_table_success", 1, NULL, 0);
//
// milvus::json json = {{"nlist", 16384}};
// build_index_task.to_index_engine_ =
// EngineFactory::Build(file->dimension_, file->location_, (EngineType)file->engine_type_,
// (MetricType)file->metric_type_, json);
//
// build_index_task.Execute();
//
// fiu_enable("XBuildIndexTask.Execute.build_index_fail", 1, NULL, 0);
// build_index_task.to_index_engine_ =
// EngineFactory::Build(file->dimension_, file->location_, (EngineType)file->engine_type_,
// (MetricType)file->metric_type_, json);
// build_index_task.Execute();
// fiu_disable("XBuildIndexTask.Execute.build_index_fail");
//
// // always enable 'has_collection'
// fiu_enable("XBuildIndexTask.Execute.has_collection", 1, NULL, 0);
// build_index_task.to_index_engine_ =
// EngineFactory::Build(file->dimension_, file->location_, (EngineType)file->engine_type_,
// (MetricType)file->metric_type_, json);
// build_index_task.Execute();
//
// fiu_enable("XBuildIndexTask.Execute.throw_std_exception", 1, NULL, 0);
// build_index_task.to_index_engine_ =
// EngineFactory::Build(file->dimension_, file->location_, (EngineType)file->engine_type_,
// (MetricType)file->metric_type_, json);
// build_index_task.Execute();
// fiu_disable("XBuildIndexTask.Execute.throw_std_exception");
//
// fiu_enable("XBuildIndexTask.Execute.update_table_file_fail", 1, NULL, 0);
// build_index_task.to_index_engine_ =
// EngineFactory::Build(file->dimension_, file->location_, (EngineType)file->engine_type_,
// (MetricType)file->metric_type_, json);
// build_index_task.Execute();
// fiu_disable("XBuildIndexTask.Execute.update_table_file_fail");
//
// fiu_disable("XBuildIndexTask.Execute.throw_std_exception");
// fiu_disable("XBuildIndexTask.Execute.has_collection");
// fiu_disable("XBuildIndexTask.Execute.create_table_success");
// build_index_task.Execute();
//
// // search task
// engine::VectorsData vector;
// auto search_job = std::make_shared<SearchJob>(dummy_context, 1, 1, vector);
// file->metric_type_ = static_cast<int>(MetricType::IP);
// file->engine_type_ = static_cast<int>(engine::EngineType::FAISS_IVFSQ8H);
// opentracing::mocktracer::MockTracerOptions tracer_options;
// auto mock_tracer =
// std::shared_ptr<opentracing::Tracer>{new opentracing::mocktracer::MockTracer{std::move(tracer_options)}};
// auto mock_span = mock_tracer->StartSpan("mock_span");
// auto trace_context = std::make_shared<milvus::tracing::TraceContext>(mock_span);
// dummy_context->SetTraceContext(trace_context);
// XSearchTask search_task(dummy_context, file, label);
// search_task.job_ = search_job;
// std::string cpu_resouce_name = "cpu_name1";
// std::vector<std::string> path = {cpu_resouce_name};
// search_task.task_path_ = Path(path, 0);
// ResMgrInst::GetInstance()->Add(std::make_shared<CpuResource>(cpu_resouce_name, 1, true));
//
// search_task.Load(LoadType::CPU2GPU, 0);
// search_task.Load(LoadType::GPU2CPU, 0);
//
// fiu_enable("XSearchTask.Load.throw_std_exception", 1, NULL, 0);
// search_task.Load(LoadType::GPU2CPU, 0);
// fiu_disable("XSearchTask.Load.throw_std_exception");
//
// fiu_enable("XSearchTask.Load.out_of_memory", 1, NULL, 0);
// search_task.Load(LoadType::GPU2CPU, 0);
// fiu_disable("XSearchTask.Load.out_of_memory");
//
// fiu_enable("XSearchTask.Execute.search_fail", 1, NULL, 0);
// search_task.Execute();
// fiu_disable("XSearchTask.Execute.search_fail");
//
// fiu_enable("XSearchTask.Execute.throw_std_exception", 1, NULL, 0);
// search_task.Execute();
// fiu_disable("XSearchTask.Execute.throw_std_exception");
//
// search_task.Execute();
//
// scheduler::ResultIds ids, tar_ids;
// scheduler::ResultDistances distances, tar_distances;
// XSearchTask::MergeTopkToResultSet(ids, distances, 1, 1, 1, true, tar_ids, tar_distances);
}
TEST(SSTaskTest, TEST_PATH) {
Path path;
auto empty_path = path.Current();
ASSERT_TRUE(empty_path.empty());
empty_path = path.Next();
ASSERT_TRUE(empty_path.empty());
empty_path = path.Last();
ASSERT_TRUE(empty_path.empty());
}
} // namespace scheduler
} // namespace milvus
......@@ -30,11 +30,11 @@
#include "db/snapshot/OperationExecutor.h"
#include "db/snapshot/Snapshots.h"
#include "db/snapshot/ResourceHolders.h"
#ifdef MILVUS_GPU_VERSION
#include "knowhere/index/vector_index/helpers/FaissGpuResourceMgr.h"
#endif
#include "scheduler/ResourceFactory.h"
#include "scheduler/SchedInst.h"
#include "utils/CommonUtil.h"
......@@ -138,26 +138,16 @@ BaseTest::InitLog() {
}
void
BaseTest::SetUp() {
InitLog();
}
void
BaseTest::TearDown() {
}
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////
void
SnapshotTest::SetUp() {
BaseTest::SetUp();
BaseTest::SnapshotStart(bool mock_store) {
/* auto uri = "mysql://root:12345678@127.0.0.1:3307/milvus"; */
auto uri = "mock://:@:/";
auto store = Store::Build(uri);
milvus::engine::snapshot::OperationExecutor::Init(store);
milvus::engine::snapshot::OperationExecutor::GetInstance().Start();
milvus::engine::snapshot::EventExecutor::Init(store);
milvus::engine::snapshot::EventExecutor::GetInstance().Start();
store->Mock();
milvus::engine::snapshot::CollectionCommitsHolder::GetInstance().Reset();
milvus::engine::snapshot::CollectionsHolder::GetInstance().Reset();
milvus::engine::snapshot::SchemaCommitsHolder::GetInstance().Reset();
......@@ -170,17 +160,43 @@ SnapshotTest::SetUp() {
milvus::engine::snapshot::SegmentCommitsHolder::GetInstance().Reset();
milvus::engine::snapshot::SegmentFilesHolder::GetInstance().Reset();
milvus::engine::snapshot::Snapshots::GetInstance().Reset();
if (mock_store) {
store->Mock();
} else {
store->DoReset();
}
milvus::engine::snapshot::Snapshots::GetInstance().Reset();
milvus::engine::snapshot::Snapshots::GetInstance().Init(store);
}
void
SnapshotTest::TearDown() {
BaseTest::SnapshotStop() {
// TODO: Temp to delay some time. OperationExecutor should wait all resources be destructed before stop
std::this_thread::sleep_for(std::chrono::milliseconds(20));
milvus::engine::snapshot::EventExecutor::GetInstance().Stop();
milvus::engine::snapshot::OperationExecutor::GetInstance().Stop();
}
void
BaseTest::SetUp() {
InitLog();
}
void
BaseTest::TearDown() {
}
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////
void
SnapshotTest::SetUp() {
BaseTest::SetUp();
BaseTest::SnapshotStart(true);
}
void
SnapshotTest::TearDown() {
BaseTest::SnapshotStop();
BaseTest::TearDown();
}
......@@ -197,39 +213,37 @@ SSDBTest::GetOptions() {
void
SSDBTest::SetUp() {
BaseTest::SetUp();
/* auto uri = "mysql://root:123456@127.0.0.1:3306/milvus"; */
auto uri = "mock://:@:/";
auto store = Store::Build(uri);
milvus::engine::snapshot::OperationExecutor::Init(store);
milvus::engine::snapshot::OperationExecutor::GetInstance().Start();
milvus::engine::snapshot::EventExecutor::Init(store);
milvus::engine::snapshot::EventExecutor::GetInstance().Start();
milvus::engine::snapshot::CollectionCommitsHolder::GetInstance().Reset();
milvus::engine::snapshot::CollectionsHolder::GetInstance().Reset();
milvus::engine::snapshot::SchemaCommitsHolder::GetInstance().Reset();
milvus::engine::snapshot::FieldCommitsHolder::GetInstance().Reset();
milvus::engine::snapshot::FieldsHolder::GetInstance().Reset();
milvus::engine::snapshot::FieldElementsHolder::GetInstance().Reset();
milvus::engine::snapshot::PartitionsHolder::GetInstance().Reset();
milvus::engine::snapshot::PartitionCommitsHolder::GetInstance().Reset();
milvus::engine::snapshot::SegmentsHolder::GetInstance().Reset();
milvus::engine::snapshot::SegmentCommitsHolder::GetInstance().Reset();
milvus::engine::snapshot::SegmentFilesHolder::GetInstance().Reset();
store->DoReset();
milvus::engine::snapshot::Snapshots::GetInstance().Reset();
milvus::engine::snapshot::Snapshots::GetInstance().Init(store);
BaseTest::SnapshotStart(false);
db_ = std::make_shared<milvus::engine::SSDBImpl>(GetOptions());
auto res_mgr = milvus::scheduler::ResMgrInst::GetInstance();
res_mgr->Clear();
res_mgr->Add(milvus::scheduler::ResourceFactory::Create("disk", "DISK", 0, false));
res_mgr->Add(milvus::scheduler::ResourceFactory::Create("cpu", "CPU", 0));
auto default_conn = milvus::scheduler::Connection("IO", 500.0);
auto PCIE = milvus::scheduler::Connection("IO", 11000.0);
res_mgr->Connect("disk", "cpu", default_conn);
#ifdef MILVUS_GPU_VERSION
res_mgr->Add(milvus::scheduler::ResourceFactory::Create("0", "GPU", 0));
res_mgr->Connect("cpu", "0", PCIE);
#endif
res_mgr->Start();
milvus::scheduler::SchedInst::GetInstance()->Start();
milvus::scheduler::JobMgrInst::GetInstance()->Start();
milvus::scheduler::CPUBuilderInst::GetInstance()->Start();
}
void
SSDBTest::TearDown() {
db_ = nullptr;
// TODO: Temp to delay some time. OperationExecutor should wait all resources be destructed before stop
std::this_thread::sleep_for(std::chrono::milliseconds(20));
milvus::engine::snapshot::EventExecutor::GetInstance().Stop();
milvus::engine::snapshot::OperationExecutor::GetInstance().Stop();
milvus::scheduler::JobMgrInst::GetInstance()->Stop();
milvus::scheduler::SchedInst::GetInstance()->Stop();
milvus::scheduler::CPUBuilderInst::GetInstance()->Stop();
milvus::scheduler::ResMgrInst::GetInstance()->Stop();
milvus::scheduler::ResMgrInst::GetInstance()->Clear();
BaseTest::SnapshotStop();
db_ = nullptr;
auto options = GetOptions();
boost::filesystem::remove_all(options.meta_.path_);
......@@ -240,28 +254,7 @@ SSDBTest::TearDown() {
void
SSSegmentTest::SetUp() {
BaseTest::SetUp();
auto uri = "mock://:@:/";
auto store = Store::Build(uri);
milvus::engine::snapshot::OperationExecutor::Init(store);
milvus::engine::snapshot::OperationExecutor::GetInstance().Start();
milvus::engine::snapshot::EventExecutor::Init(store);
milvus::engine::snapshot::EventExecutor::GetInstance().Start();
milvus::engine::snapshot::CollectionCommitsHolder::GetInstance().Reset();
milvus::engine::snapshot::CollectionsHolder::GetInstance().Reset();
milvus::engine::snapshot::SchemaCommitsHolder::GetInstance().Reset();
milvus::engine::snapshot::FieldCommitsHolder::GetInstance().Reset();
milvus::engine::snapshot::FieldsHolder::GetInstance().Reset();
milvus::engine::snapshot::FieldElementsHolder::GetInstance().Reset();
milvus::engine::snapshot::PartitionsHolder::GetInstance().Reset();
milvus::engine::snapshot::PartitionCommitsHolder::GetInstance().Reset();
milvus::engine::snapshot::SegmentsHolder::GetInstance().Reset();
milvus::engine::snapshot::SegmentCommitsHolder::GetInstance().Reset();
milvus::engine::snapshot::SegmentFilesHolder::GetInstance().Reset();
store->DoReset();
milvus::engine::snapshot::Snapshots::GetInstance().Reset();
milvus::engine::snapshot::Snapshots::GetInstance().Init(store);
BaseTest::SnapshotStart(false);
auto options = milvus::engine::DBOptions();
options.wal_enable_ = false;
......@@ -270,12 +263,8 @@ SSSegmentTest::SetUp() {
void
SSSegmentTest::TearDown() {
BaseTest::SnapshotStop();
db_ = nullptr;
// TODO: Temp to delay some time. OperationExecutor should wait all resources be destructed before stop
std::this_thread::sleep_for(std::chrono::milliseconds(20));
milvus::engine::snapshot::EventExecutor::GetInstance().Stop();
milvus::engine::snapshot::OperationExecutor::GetInstance().Stop();
BaseTest::TearDown();
}
......
......@@ -286,6 +286,10 @@ class BaseTest : public ::testing::Test {
protected:
void
InitLog();
void
SnapshotStart(bool mock_store);
void
SnapshotStop();
void
SetUp() override;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册