未验证 提交 5b1fdcf7 编写于 作者: G groot 提交者: GitHub

clean code (#3052)

Signed-off-by: Nyhmo <yihua.mo@zilliz.com>
上级 4b52c573
......@@ -33,10 +33,6 @@ set(grpc_service_files
${MILVUS_ENGINE_SRC}/grpc/gen-status/status.pb.cc
)
# Now no use of context
aux_source_directory(${MILVUS_ENGINE_SRC}/context context_files)
aux_source_directory(${MILVUS_ENGINE_SRC}/scheduler scheduler_main_files)
aux_source_directory(${MILVUS_ENGINE_SRC}/scheduler/action scheduler_action_files)
aux_source_directory(${MILVUS_ENGINE_SRC}/scheduler/event scheduler_event_files)
......@@ -102,7 +98,6 @@ add_subdirectory(metrics)
add_subdirectory(config)
add_subdirectory(tracing)
add_subdirectory(query)
add_subdirectory(search)
add_subdirectory(db) # target milvus_engine
add_subdirectory(log)
......@@ -122,7 +117,6 @@ set(server_libs
log
oatpp
query
search
utils
)
......
......@@ -81,7 +81,7 @@ StructuredIndexFormat::CreateStructuredIndex(const milvus::engine::meta::hybrid:
void
StructuredIndexFormat::Read(const milvus::storage::FSHandlerPtr& fs_ptr, const std::string& file_path,
knowhere::IndexPtr& index) {
milvus::TimeRecorder recorder("SSStructuredIndexFormat::Read");
milvus::TimeRecorder recorder("StructuredIndexFormat::Read");
knowhere::BinarySet load_data_list;
std::string full_file_path = file_path + STRUCTURED_INDEX_POSTFIX;
......@@ -133,7 +133,7 @@ StructuredIndexFormat::Read(const milvus::storage::FSHandlerPtr& fs_ptr, const s
double span = recorder.RecordSection("End");
double rate = length * 1000000.0 / span / 1024 / 1024;
LOG_ENGINE_DEBUG_ << "SSStructuredIndexFormat::read(" << full_file_path << ") rate " << rate << "MB/s";
LOG_ENGINE_DEBUG_ << "StructuredIndexFormat::read(" << full_file_path << ") rate " << rate << "MB/s";
auto attr_type = static_cast<engine::meta::hybrid::DataType>(data_type);
index = CreateStructuredIndex(attr_type);
......@@ -143,7 +143,7 @@ StructuredIndexFormat::Read(const milvus::storage::FSHandlerPtr& fs_ptr, const s
void
StructuredIndexFormat::Write(const milvus::storage::FSHandlerPtr& fs_ptr, const std::string& file_path,
engine::meta::hybrid::DataType data_type, const knowhere::IndexPtr& index) {
milvus::TimeRecorder recorder("SSStructuredIndexFormat::Write");
milvus::TimeRecorder recorder("StructuredIndexFormat::Write");
std::string full_file_path = file_path + STRUCTURED_INDEX_POSTFIX;
auto binaryset = index->Serialize(knowhere::Config());
......@@ -170,7 +170,7 @@ StructuredIndexFormat::Write(const milvus::storage::FSHandlerPtr& fs_ptr, const
double span = recorder.RecordSection("End");
double rate = fs_ptr->writer_ptr_->length() * 1000000.0 / span / 1024 / 1024;
LOG_ENGINE_DEBUG_ << "SSStructuredIndexFormat::write(" << full_file_path << ") rate " << rate << "MB/s";
LOG_ENGINE_DEBUG_ << "StructuredIndexFormat::write(" << full_file_path << ") rate " << rate << "MB/s";
}
} // namespace codec
......
......@@ -38,7 +38,7 @@ VectorCompressFormat::FilePostfix() {
void
VectorCompressFormat::Read(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path,
knowhere::BinaryPtr& compress) {
milvus::TimeRecorder recorder("SSVectorCompressFormat::Read");
milvus::TimeRecorder recorder("VectorCompressFormat::Read");
const std::string full_file_path = file_path + VECTOR_COMPRESS_POSTFIX;
if (!fs_ptr->reader_ptr_->open(full_file_path)) {
......@@ -61,13 +61,13 @@ VectorCompressFormat::Read(const storage::FSHandlerPtr& fs_ptr, const std::strin
double span = recorder.RecordSection("End");
double rate = length * 1000000.0 / span / 1024 / 1024;
LOG_ENGINE_DEBUG_ << "SSVectorCompressFormat::Read(" << full_file_path << ") rate " << rate << "MB/s";
LOG_ENGINE_DEBUG_ << "VectorCompressFormat::Read(" << full_file_path << ") rate " << rate << "MB/s";
}
void
VectorCompressFormat::Write(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path,
const knowhere::BinaryPtr& compress) {
milvus::TimeRecorder recorder("SSVectorCompressFormat::Write");
milvus::TimeRecorder recorder("VectorCompressFormat::Write");
const std::string full_file_path = file_path + VECTOR_COMPRESS_POSTFIX;
if (!fs_ptr->writer_ptr_->open(full_file_path)) {
......@@ -80,7 +80,7 @@ VectorCompressFormat::Write(const storage::FSHandlerPtr& fs_ptr, const std::stri
double span = recorder.RecordSection("End");
double rate = compress->size * 1000000.0 / span / 1024 / 1024;
LOG_ENGINE_DEBUG_ << "SSVectorCompressFormat::Write(" << full_file_path << ") rate " << rate << "MB/s";
LOG_ENGINE_DEBUG_ << "SVectorCompressFormat::Write(" << full_file_path << ") rate " << rate << "MB/s";
}
} // namespace codec
......
......@@ -41,7 +41,7 @@ VectorIndexFormat::FilePostfix() {
void
VectorIndexFormat::ReadRaw(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path,
knowhere::BinaryPtr& data) {
milvus::TimeRecorder recorder("SSVectorIndexFormat::ReadRaw");
milvus::TimeRecorder recorder("VectorIndexFormat::ReadRaw");
if (!fs_ptr->reader_ptr_->open(file_path.c_str())) {
std::string err_msg = "Failed to open raw file: " + file_path + ", error: " + std::strerror(errno);
......@@ -63,13 +63,13 @@ VectorIndexFormat::ReadRaw(const storage::FSHandlerPtr& fs_ptr, const std::strin
double span = recorder.RecordSection("End");
double rate = num_bytes * 1000000.0 / span / 1024 / 1024;
LOG_ENGINE_DEBUG_ << "SSVectorIndexFormat::ReadIndex(" << file_path << ") rate " << rate << "MB/s";
LOG_ENGINE_DEBUG_ << "VectorIndexFormat::ReadIndex(" << file_path << ") rate " << rate << "MB/s";
}
void
VectorIndexFormat::ReadIndex(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path,
knowhere::BinarySet& data) {
milvus::TimeRecorder recorder("SSVectorIndexFormat::ReadIndex");
milvus::TimeRecorder recorder("VectorIndexFormat::ReadIndex");
std::string full_file_path = file_path + VECTOR_INDEX_POSTFIX;
if (!fs_ptr->reader_ptr_->open(full_file_path)) {
......@@ -117,7 +117,7 @@ VectorIndexFormat::ReadIndex(const storage::FSHandlerPtr& fs_ptr, const std::str
double span = recorder.RecordSection("End");
double rate = length * 1000000.0 / span / 1024 / 1024;
LOG_ENGINE_DEBUG_ << "SSVectorIndexFormat::ReadIndex(" << full_file_path << ") rate " << rate << "MB/s";
LOG_ENGINE_DEBUG_ << "VectorIndexFormat::ReadIndex(" << full_file_path << ") rate " << rate << "MB/s";
}
void
......@@ -196,13 +196,13 @@ VectorIndexFormat::WriteIndex(const storage::FSHandlerPtr& fs_ptr, const std::st
double span = recorder.RecordSection("End");
double rate = fs_ptr->writer_ptr_->length() * 1000000.0 / span / 1024 / 1024;
LOG_ENGINE_DEBUG_ << "SSVectorIndexFormat::WriteIndex(" << full_file_path << ") rate " << rate << "MB/s";
LOG_ENGINE_DEBUG_ << "VectorIndexFormat::WriteIndex(" << full_file_path << ") rate " << rate << "MB/s";
}
void
VectorIndexFormat::WriteCompress(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path,
const knowhere::VecIndexPtr& index) {
milvus::TimeRecorder recorder("SSVectorIndexFormat::WriteCompress");
milvus::TimeRecorder recorder("VectorIndexFormat::WriteCompress");
auto binaryset = index->Serialize(knowhere::Config());
......
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
#pragma once
#include <iostream>
#include <memory>
#include <string>
#include <vector>
#include "query/BinaryQuery.h"
#include "search/Task.h"
namespace milvus {
namespace search {
class Task;
using TaskPtr = std::shared_ptr<Task>;
} // namespace search
namespace context {
struct SearchContext {
query::GeneralQueryPtr general_query_;
std::vector<::milvus::search::TaskPtr> tasks_;
};
using SearchContextPtr = std::shared_ptr<SearchContext>;
} // namespace context
} // namespace milvus
......@@ -566,7 +566,7 @@ Status
DBImpl::Query(const server::ContextPtr& context, const query::QueryPtr& query_ptr, engine::QueryResultPtr& result) {
CHECK_INITIALIZED;
TimeRecorder rc("SSDBImpl::Query");
TimeRecorder rc("DBImpl::Query");
scheduler::SearchJobPtr job = std::make_shared<scheduler::SearchJob>(nullptr, options_, query_ptr);
......@@ -908,7 +908,6 @@ DBImpl::TimingMetricThread() {
swn_metric_.Wait_For(std::chrono::seconds(BACKGROUND_METRIC_INTERVAL));
StartMetricTask();
meta::FilesHolder::PrintInfo();
}
}
......
......@@ -12,7 +12,6 @@
#pragma once
#include "db/Types.h"
#include "db/meta/FilesHolder.h"
#include "db/snapshot/IterateHandler.h"
#include "db/snapshot/Snapshot.h"
#include "segment/Segment.h"
......
......@@ -11,7 +11,6 @@
#pragma once
#include "db/meta/FilesHolder.h"
#include "db/snapshot/Snapshot.h"
#include <map>
......
......@@ -38,19 +38,6 @@ namespace milvus {
namespace engine {
namespace utils {
namespace {
const char* TABLES_FOLDER = "/tables/";
static std::string
ConstructParentFolder(const std::string& db_path, const meta::SegmentSchema& table_file) {
std::string table_path = db_path + TABLES_FOLDER + table_file.collection_id_;
std::string partition_path = table_path + "/" + table_file.segment_id_;
return partition_path;
}
} // namespace
int64_t
GetMicroSecTimeStamp() {
auto now = std::chrono::system_clock::now();
......@@ -59,124 +46,12 @@ GetMicroSecTimeStamp() {
return micros;
}
Status
CreateCollectionPath(const DBMetaOptions& options, const std::string& collection_id) {
std::string db_path = options.path_;
std::string table_path = db_path + TABLES_FOLDER + collection_id;
auto status = CommonUtil::CreateDirectory(table_path);
if (!status.ok()) {
LOG_ENGINE_ERROR_ << status.message();
return status;
}
return Status::OK();
}
Status
DeleteCollectionPath(const DBMetaOptions& options, const std::string& collection_id, bool force) {
std::string table_path = options.path_ + TABLES_FOLDER + collection_id;
if (force) {
boost::filesystem::remove_all(table_path);
LOG_ENGINE_DEBUG_ << "Remove collection folder: " << table_path;
} else if (boost::filesystem::exists(table_path) && boost::filesystem::is_empty(table_path)) {
boost::filesystem::remove_all(table_path);
LOG_ENGINE_DEBUG_ << "Remove collection folder: " << table_path;
}
// bool s3_enable = false;
// server::Config& config = server::Config::GetInstance();
// config.GetStorageConfigS3Enable(s3_enable);
// if (s3_enable) {
// std::string table_path = options.path_ + TABLES_FOLDER + collection_id;
// auto& storage_inst = milvus::storage::S3ClientWrapper::GetInstance();
// Status stat = storage_inst.DeleteObjects(table_path);
// if (!stat.ok()) {
// return stat;
// }
// }
return Status::OK();
}
Status
CreateCollectionFilePath(const DBMetaOptions& options, meta::SegmentSchema& table_file) {
std::string parent_path = ConstructParentFolder(options.path_, table_file);
auto status = CommonUtil::CreateDirectory(parent_path);
fiu_do_on("CreateCollectionFilePath.fail_create", status = Status(DB_INVALID_PATH, ""));
if (!status.ok()) {
LOG_ENGINE_ERROR_ << status.message();
return status;
}
table_file.location_ = parent_path + "/" + table_file.file_id_;
return Status::OK();
}
Status
GetCollectionFilePath(const DBMetaOptions& options, meta::SegmentSchema& table_file) {
std::string parent_path = ConstructParentFolder(options.path_, table_file);
std::string file_path = parent_path + "/" + table_file.file_id_;
// bool s3_enable = false;
// server::Config& config = server::Config::GetInstance();
// config.GetStorageConfigS3Enable(s3_enable);
// fiu_do_on("GetCollectionFilePath.enable_s3", s3_enable = true);
// if (s3_enable) {
// /* need not check file existence */
// table_file.location_ = file_path;
// return Status::OK();
// }
if (boost::filesystem::exists(parent_path)) {
table_file.location_ = file_path;
return Status::OK();
}
std::string msg = "Collection file doesn't exist: " + file_path;
if (table_file.file_size_ > 0) { // no need to pop error for empty file
LOG_ENGINE_ERROR_ << msg << " in path: " << options.path_ << " for collection: " << table_file.collection_id_;
}
return Status(DB_ERROR, msg);
}
Status
DeleteCollectionFilePath(const DBMetaOptions& options, meta::SegmentSchema& table_file) {
utils::GetCollectionFilePath(options, table_file);
boost::filesystem::remove(table_file.location_);
return Status::OK();
}
Status
DeleteSegment(const DBMetaOptions& options, meta::SegmentSchema& table_file) {
utils::GetCollectionFilePath(options, table_file);
std::string segment_dir;
GetParentPath(table_file.location_, segment_dir);
boost::filesystem::remove_all(segment_dir);
return Status::OK();
}
Status
GetParentPath(const std::string& path, std::string& parent_path) {
boost::filesystem::path p(path);
parent_path = p.parent_path().string();
return Status::OK();
}
bool
IsSameIndex(const CollectionIndex& index1, const CollectionIndex& index2) {
return index1.index_name_ == index2.index_name_ && index1.extra_params_ == index2.extra_params_ &&
index1.metric_name_ == index2.metric_name_;
}
bool
IsRawIndexType(int32_t type) {
return (type == (int32_t)EngineType::FAISS_IDMAP) || (type == (int32_t)EngineType::FAISS_BIN_IDMAP);
}
bool
IsBinaryMetricType(int32_t metric_type) {
return (metric_type == (int32_t)engine::MetricType::HAMMING) ||
......@@ -247,31 +122,6 @@ ParseMetaUri(const std::string& uri, MetaUriInfo& info) {
return Status::OK();
}
std::string
GetIndexName(int32_t index_type) {
static std::map<int32_t, std::string> index_type_name = {
{(int32_t)engine::EngineType::FAISS_IDMAP, "FLAT"},
{(int32_t)engine::EngineType::FAISS_IVFFLAT, "IVF_FLAT"},
{(int32_t)engine::EngineType::FAISS_IVFSQ8, "IVF_SQ8"},
{(int32_t)engine::EngineType::FAISS_IVFSQ8H, "IVF_SQ8_HYBRID"},
{(int32_t)engine::EngineType::FAISS_PQ, "IVF_PQ"},
#ifdef MILVUS_SUPPORT_SPTAG
{(int32_t)engine::EngineType::SPTAG_KDT, "SPTAG_KDT_RNT"},
{(int32_t)engine::EngineType::SPTAG_BKT, "SPTAG_BKT_RNT"},
#endif
{(int32_t)engine::EngineType::FAISS_BIN_IDMAP, "BIN_FLAT"},
{(int32_t)engine::EngineType::FAISS_BIN_IVFFLAT, "BIN_IVF_FLAT"},
{(int32_t)engine::EngineType::HNSW, "HNSW"},
{(int32_t)engine::EngineType::NSG_MIX, "NSG"},
{(int32_t)engine::EngineType::ANNOY, "ANNOY"}};
if (index_type_name.find(index_type) == index_type_name.end()) {
return "Unknow";
}
return index_type_name[index_type];
}
void
SendExitSignal() {
LOG_SERVER_INFO_ << "Send SIGUSR2 signal to exit";
......@@ -279,50 +129,6 @@ SendExitSignal() {
kill(pid, SIGUSR2);
}
void
ExitOnWriteError(Status& status) {
if (status.code() == SERVER_WRITE_ERROR) {
utils::SendExitSignal();
}
}
void
EraseFromCache(const std::string& item_key) {
if (item_key.empty()) {
LOG_SERVER_ERROR_ << "Empty key cannot be erased from cache";
return;
}
cache::CpuCacheMgr::GetInstance()->EraseItem(item_key);
#ifdef MILVUS_GPU_VERSION
std::vector<int64_t> gpus = ParseGPUDevices(config.gpu.search_devices());
for (auto& gpu : gpus) {
cache::GpuCacheMgr::GetInstance(gpu)->EraseItem(item_key);
}
#endif
}
std::string
IndexTypeToStr(const int32_t type) {
auto pair = s_index_type2name.find(type);
if (pair == s_index_type2name.end()) {
return "";
}
return pair->second;
}
int32_t
StrToIndexType(const std::string& str) {
auto pair = s_index_name2type.find(str);
if (pair == s_index_name2type.end()) {
return 0;
}
return pair->second;
}
} // namespace utils
} // namespace engine
} // namespace milvus
......@@ -31,29 +31,9 @@ namespace utils {
int64_t
GetMicroSecTimeStamp();
Status
CreateCollectionPath(const DBMetaOptions& options, const std::string& collection_id);
Status
DeleteCollectionPath(const DBMetaOptions& options, const std::string& collection_id, bool force = true);
Status
CreateCollectionFilePath(const DBMetaOptions& options, meta::SegmentSchema& table_file);
Status
GetCollectionFilePath(const DBMetaOptions& options, meta::SegmentSchema& table_file);
Status
DeleteCollectionFilePath(const DBMetaOptions& options, meta::SegmentSchema& table_file);
Status
DeleteSegment(const DBMetaOptions& options, meta::SegmentSchema& table_file);
Status
GetParentPath(const std::string& path, std::string& parent_path);
bool
IsSameIndex(const CollectionIndex& index1, const CollectionIndex& index2);
bool
IsRawIndexType(int32_t type);
bool
IsBinaryMetricType(int32_t metric_type);
......@@ -76,23 +56,9 @@ struct MetaUriInfo {
Status
ParseMetaUri(const std::string& uri, MetaUriInfo& info);
std::string
GetIndexName(int32_t index_type);
void
SendExitSignal();
void
ExitOnWriteError(Status& status);
void
EraseFromCache(const std::string& item_key);
std::string
IndexTypeToStr(const int32_t type);
int32_t
StrToIndexType(const std::string& str);
} // namespace utils
} // namespace engine
} // namespace milvus
......@@ -15,6 +15,8 @@
#include "utils/Log.h"
#include <memory>
#include <set>
#include <vector>
namespace milvus {
namespace engine {
......@@ -30,5 +32,32 @@ EngineFactory::Build(const std::string& dir_root, const std::string& collection_
return execution_engine_ptr;
}
void
EngineFactory::GroupFieldsForIndex(const std::string& collection_name, TargetFieldGroups& field_groups) {
snapshot::ScopedSnapshotT ss;
auto status = snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_name);
if (!status.ok()) {
LOG_ENGINE_ERROR_ << collection_name << " doesn't exist: " << status.message();
return;
}
std::set<std::string> structured_fields;
std::vector<std::string> field_names = ss->GetFieldNames();
for (auto& field_name : field_names) {
auto field = ss->GetField(field_name);
auto ftype = field->GetFtype();
if (ftype == meta::hybrid::DataType::VECTOR_FLOAT || ftype == meta::hybrid::DataType::VECTOR_BINARY) {
std::set<std::string> index_field = {field_name};
field_groups.emplace_back(index_field);
} else {
structured_fields.insert(field_name);
}
}
if (!structured_fields.empty()) {
field_groups.emplace_back(structured_fields);
}
}
} // namespace engine
} // namespace milvus
......@@ -24,6 +24,12 @@ class EngineFactory {
public:
static ExecutionEnginePtr
Build(const std::string& dir_root, const std::string& collection_name, int64_t segment_id);
// this method distribute fields to multiple groups:
// put structured fields into one group
// each vector field as a group
static void
GroupFieldsForIndex(const std::string& collection_name, TargetFieldGroups& field_groups);
};
} // namespace engine
......
......@@ -13,6 +13,7 @@
#include <map>
#include <memory>
#include <set>
#include <string>
#include <unordered_map>
#include <vector>
......@@ -25,10 +26,13 @@
namespace milvus {
namespace engine {
using TargetFields = std::set<std::string>;
using TargetFieldGroups = std::vector<TargetFields>;
struct ExecutionEngineContext {
query::QueryPtr query_ptr_;
QueryResultPtr query_result_;
std::vector<std::string> target_fields_; // for build index task, which field should be build
TargetFields target_fields_; // for build index task, which field should be build
};
class ExecutionEngine {
......
......@@ -54,7 +54,7 @@ namespace engine {
namespace {
Status
GetRequiredIndexFields(const query::QueryPtr& query_ptr, std::vector<std::string>& field_names) {
GetRequiredIndexFields(const query::QueryPtr& query_ptr, TargetFields& field_names) {
return Status::OK();
}
......@@ -119,38 +119,18 @@ ExecutionEngineImpl::Load(ExecutionEngineContext& context) {
if (context.query_ptr_ != nullptr) {
return LoadForSearch(context.query_ptr_);
} else {
return LoadForIndex();
return Load(context.target_fields_);
}
}
Status
ExecutionEngineImpl::LoadForSearch(const query::QueryPtr& query_ptr) {
SegmentPtr segment_ptr;
segment_reader_->GetSegment(segment_ptr);
std::vector<std::string> field_names;
TargetFields field_names;
GetRequiredIndexFields(query_ptr, field_names);
return Load(field_names);
}
Status
ExecutionEngineImpl::LoadForIndex() {
std::vector<std::string> field_names;
auto field_visitors = segment_visitor_->GetFieldVisitors();
for (auto& pair : field_visitors) {
auto& field_visitor = pair.second;
auto element_visitor = field_visitor->GetElementVisitor(engine::FieldElementType::FET_INDEX);
if (element_visitor != nullptr && element_visitor->GetFile() == nullptr) {
field_names.push_back(field_visitor->GetField()->GetName());
break;
}
}
return Load(field_names);
}
Status
ExecutionEngineImpl::CreateStructuredIndex(const milvus::engine::meta::hybrid::DataType field_type,
std::vector<uint8_t>& raw_data, knowhere::IndexPtr& index_ptr) {
......@@ -185,8 +165,9 @@ ExecutionEngineImpl::CreateStructuredIndex(const milvus::engine::meta::hybrid::D
}
Status
ExecutionEngineImpl::Load(const std::vector<std::string>& field_names) {
TimeRecorderAuto rc("SSExecutionEngineImpl::Load");
ExecutionEngineImpl::Load(const TargetFields& field_names) {
TimeRecorderAuto rc("ExecutionEngineImpl::Load");
SegmentPtr segment_ptr;
segment_reader_->GetSegment(segment_ptr);
......@@ -216,6 +197,8 @@ ExecutionEngineImpl::Load(const std::vector<std::string>& field_names) {
std::vector<uint8_t> raw;
segment_reader_->LoadField(name, raw);
}
target_fields_.insert(name);
}
return Status::OK();
......@@ -224,7 +207,7 @@ ExecutionEngineImpl::Load(const std::vector<std::string>& field_names) {
Status
ExecutionEngineImpl::CopyToGpu(uint64_t device_id) {
#ifdef MILVUS_GPU_VERSION
TimeRecorderAuto rc("SSExecutionEngineImpl::CopyToGpu");
TimeRecorderAuto rc("ExecutionEngineImpl::CopyToGpu");
SegmentPtr segment_ptr;
segment_reader_->GetSegment(segment_ptr);
......@@ -232,8 +215,10 @@ ExecutionEngineImpl::CopyToGpu(uint64_t device_id) {
engine::VECTOR_INDEX_MAP new_map;
engine::VECTOR_INDEX_MAP& indice = segment_ptr->GetVectorIndice();
for (auto& pair : indice) {
auto gpu_index = knowhere::cloner::CopyCpuToGpu(pair.second, device_id, knowhere::Config());
new_map.insert(std::make_pair(pair.first, gpu_index));
if (pair.second != nullptr) {
auto gpu_index = knowhere::cloner::CopyCpuToGpu(pair.second, device_id, knowhere::Config());
new_map.insert(std::make_pair(pair.first, gpu_index));
}
}
indice.swap(new_map);
......@@ -515,7 +500,7 @@ ExecutionEngineImpl::ProcessRangeQuery(const std::unordered_map<std::string, met
Status
ExecutionEngineImpl::BuildIndex() {
TimeRecorderAuto rc("SSExecutionEngineImpl::BuildIndex");
TimeRecorderAuto rc("ExecutionEngineImpl::BuildIndex");
SegmentPtr segment_ptr;
segment_reader_->GetSegment(segment_ptr);
......@@ -524,9 +509,8 @@ ExecutionEngineImpl::BuildIndex() {
auto collection = snapshot->GetCollection();
auto& segment = segment_visitor_->GetSegment();
auto field_visitors = segment_visitor_->GetFieldVisitors();
for (auto& pair : field_visitors) {
auto& field_visitor = pair.second;
for (auto& field_name : target_fields_) {
auto field_visitor = segment_visitor_->GetFieldVisitor(field_name);
auto element_visitor = field_visitor->GetElementVisitor(engine::FieldElementType::FET_INDEX);
if (element_visitor == nullptr) {
continue; // no index specified
......
......@@ -51,10 +51,7 @@ class ExecutionEngineImpl : public ExecutionEngine {
LoadForSearch(const query::QueryPtr& query_ptr);
Status
LoadForIndex();
Status
Load(const std::vector<std::string>& field_names);
Load(const TargetFields& field_names);
Status
ExecBinaryQuery(const query::GeneralQueryPtr& general_query, faiss::ConcurrentBitsetPtr& bitset,
......@@ -82,10 +79,13 @@ class ExecutionEngineImpl : public ExecutionEngine {
SegmentVisitorPtr segment_visitor_;
segment::SegmentReaderPtr segment_reader_;
TargetFields target_fields_;
knowhere::VecIndexPtr vec_index_ptr_ = nullptr;
std::unordered_map<std::string, knowhere::IndexPtr> attr_index_;
int64_t entity_count_;
int64_t gpu_num_ = 0;
bool gpu_enable_ = false;
};
......
......@@ -98,7 +98,7 @@ MemCollection::GetTableFileCount() {
Status
MemCollection::Serialize(uint64_t wal_lsn) {
TimeRecorder recorder("SSMemCollection::Serialize collection " + collection_id_);
TimeRecorder recorder("MemCollection::Serialize collection " + collection_id_);
if (!doc_ids_to_delete_.empty()) {
auto status = ApplyDeletes();
......@@ -170,7 +170,7 @@ MemCollection::ApplyDeletes() {
// LOG_ENGINE_DEBUG_ << "Applying " << doc_ids_to_delete_.size() << " deletes in collection: " << collection_id_;
//
// TimeRecorder recorder("SSMemCollection::ApplyDeletes for collection " + collection_id_);
// TimeRecorder recorder("MemCollection::ApplyDeletes for collection " + collection_id_);
//
// std::vector<int> file_types{meta::SegmentSchema::FILE_TYPE::RAW, meta::SegmentSchema::FILE_TYPE::TO_INDEX,
// meta::SegmentSchema::FILE_TYPE::BACKUP};
......
......@@ -12,7 +12,7 @@
#pragma once
#include "MemManager.h"
#include "db/meta/Meta.h"
#include "db/Options.h"
#include <memory>
......
......@@ -42,7 +42,7 @@ MemSegment::CreateSegment() {
snapshot::ScopedSnapshotT ss;
auto status = snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_id_);
if (!status.ok()) {
std::string err_msg = "SSMemSegment::CreateSegment failed: " + status.ToString();
std::string err_msg = "MemSegment::CreateSegment failed: " + status.ToString();
LOG_ENGINE_ERROR_ << err_msg;
return status;
}
......@@ -53,7 +53,7 @@ MemSegment::CreateSegment() {
operation_ = std::make_shared<snapshot::NewSegmentOperation>(context, ss);
status = operation_->CommitNewSegment(segment_);
if (!status.ok()) {
std::string err_msg = "SSMemSegment::CreateSegment failed: " + status.ToString();
std::string err_msg = "MemSegment::CreateSegment failed: " + status.ToString();
LOG_ENGINE_ERROR_ << err_msg;
return status;
}
......@@ -71,7 +71,7 @@ MemSegment::CreateSegment() {
snapshot::SegmentFilePtr seg_file;
status = operation_->CommitNewSegmentFile(sf_context, seg_file);
if (!status.ok()) {
std::string err_msg = "SSMemSegment::CreateSegment failed: " + status.ToString();
std::string err_msg = "MemSegment::CreateSegment failed: " + status.ToString();
LOG_ENGINE_ERROR_ << err_msg;
return status;
}
......@@ -89,7 +89,7 @@ MemSegment::CreateSegment() {
snapshot::SegmentFilePtr delete_doc_file, bloom_filter_file;
status = operation_->CommitNewSegmentFile(sf_context, delete_doc_file);
if (!status.ok()) {
std::string err_msg = "SSMemSegment::CreateSegment failed: " + status.ToString();
std::string err_msg = "MemSegment::CreateSegment failed: " + status.ToString();
LOG_ENGINE_ERROR_ << err_msg;
return status;
}
......@@ -97,7 +97,7 @@ MemSegment::CreateSegment() {
sf_context.field_element_name = engine::DEFAULT_BLOOM_FILTER_NAME;
status = operation_->CommitNewSegmentFile(sf_context, bloom_filter_file);
if (!status.ok()) {
std::string err_msg = "SSMemSegment::CreateSegment failed: " + status.ToString();
std::string err_msg = "MemSegment::CreateSegment failed: " + status.ToString();
LOG_ENGINE_ERROR_ << err_msg;
return status;
}
......@@ -117,7 +117,7 @@ MemSegment::GetSingleEntitySize(int64_t& single_size) {
snapshot::ScopedSnapshotT ss;
auto status = snapshot::Snapshots::GetInstance().GetSnapshot(ss, collection_id_);
if (!status.ok()) {
std::string err_msg = "SSMemSegment::SingleEntitySize failed: " + status.ToString();
std::string err_msg = "MemSegment::SingleEntitySize failed: " + status.ToString();
LOG_ENGINE_ERROR_ << err_msg;
return status;
}
......
......@@ -18,7 +18,6 @@
#include <vector>
#include "db/Types.h"
#include "db/meta/FilesHolder.h"
#include "utils/Status.h"
namespace milvus {
......
......@@ -69,7 +69,7 @@ MergeTask::Execute() {
snapshot::SegmentFilePtr seg_file;
status = op->CommitNewSegmentFile(sf_context, seg_file);
if (!status.ok()) {
std::string err_msg = "SSMergeTask create segment failed: " + status.ToString();
std::string err_msg = "MergeTask create segment failed: " + status.ToString();
LOG_ENGINE_ERROR_ << err_msg;
return status;
}
......@@ -87,7 +87,7 @@ MergeTask::Execute() {
snapshot::SegmentFilePtr delete_doc_file, bloom_filter_file;
status = op->CommitNewSegmentFile(sf_context, delete_doc_file);
if (!status.ok()) {
std::string err_msg = "SSMergeTask create bloom filter segment file failed: " + status.ToString();
std::string err_msg = "MergeTask create bloom filter segment file failed: " + status.ToString();
LOG_ENGINE_ERROR_ << err_msg;
return status;
}
......@@ -95,7 +95,7 @@ MergeTask::Execute() {
sf_context.field_element_name = engine::DEFAULT_BLOOM_FILTER_NAME;
status = op->CommitNewSegmentFile(sf_context, bloom_filter_file);
if (!status.ok()) {
std::string err_msg = "SSMergeTask create deleted-doc segment file failed: " + status.ToString();
std::string err_msg = "MergeTask create deleted-doc segment file failed: " + status.ToString();
LOG_ENGINE_ERROR_ << err_msg;
return status;
}
......@@ -116,7 +116,7 @@ MergeTask::Execute() {
std::make_shared<segment::SegmentReader>(options_.meta_.path_, read_visitor);
status = segment_writer->Merge(segment_reader);
if (!status.ok()) {
std::string err_msg = "SSMergeTask merge failed: " + status.ToString();
std::string err_msg = "MergeTask merge failed: " + status.ToString();
LOG_ENGINE_ERROR_ << err_msg;
return status;
}
......
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
#include "db/meta/FilesHolder.h"
#include "utils/Log.h"
#include <utility>
namespace milvus {
namespace engine {
namespace meta {
//////////////////////////////////////////////////////////////////////////////////////////////////////////
FilesHolder::OngoingFileChecker&
FilesHolder::OngoingFileChecker::GetInstance() {
static OngoingFileChecker instance;
return instance;
}
Status
FilesHolder::OngoingFileChecker::MarkOngoingFile(const meta::SegmentSchema& table_file) {
std::lock_guard<std::mutex> lck(mutex_);
return MarkOngoingFileNoLock(table_file);
}
Status
FilesHolder::OngoingFileChecker::MarkOngoingFiles(const meta::SegmentsSchema& table_files) {
std::lock_guard<std::mutex> lck(mutex_);
for (auto& table_file : table_files) {
MarkOngoingFileNoLock(table_file);
}
return Status::OK();
}
Status
FilesHolder::OngoingFileChecker::UnmarkOngoingFile(const meta::SegmentSchema& table_file) {
std::lock_guard<std::mutex> lck(mutex_);
return UnmarkOngoingFileNoLock(table_file);
}
Status
FilesHolder::OngoingFileChecker::UnmarkOngoingFiles(const meta::SegmentsSchema& table_files) {
std::lock_guard<std::mutex> lck(mutex_);
for (auto& table_file : table_files) {
UnmarkOngoingFileNoLock(table_file);
}
return Status::OK();
}
bool
FilesHolder::OngoingFileChecker::CanBeDeleted(const meta::SegmentSchema& schema) {
std::lock_guard<std::mutex> lck(mutex_);
auto iter = ongoing_files_.find(schema.collection_id_);
if (iter == ongoing_files_.end()) {
return true;
} else {
auto it_file = iter->second.find(schema.id_);
if (it_file == iter->second.end()) {
return true;
} else {
return (it_file->second > 0) ? false : true;
}
}
}
void
FilesHolder::OngoingFileChecker::PrintInfo() {
std::lock_guard<std::mutex> lck(mutex_);
if (!ongoing_files_.empty()) {
LOG_ENGINE_DEBUG_ << "File reference information:";
for (meta::Table2FileRef::iterator iter = ongoing_files_.begin(); iter != ongoing_files_.end(); ++iter) {
LOG_ENGINE_DEBUG_ << "\t" << iter->first << ": " << iter->second.size() << " files in use";
}
}
}
Status
FilesHolder::OngoingFileChecker::MarkOngoingFileNoLock(const meta::SegmentSchema& table_file) {
if (table_file.collection_id_.empty() || table_file.file_id_.empty()) {
return Status(DB_ERROR, "Invalid collection files");
}
auto iter = ongoing_files_.find(table_file.collection_id_);
if (iter == ongoing_files_.end()) {
File2RefCount files_refcount;
files_refcount.insert(std::make_pair(table_file.id_, 1));
ongoing_files_.insert(std::make_pair(table_file.collection_id_, files_refcount));
} else {
auto it_file = iter->second.find(table_file.id_);
if (it_file == iter->second.end()) {
iter->second[table_file.id_] = 1;
} else {
it_file->second++;
}
}
LOG_ENGINE_DEBUG_ << "Mark ongoing file:" << table_file.file_id_
<< " refcount:" << ongoing_files_[table_file.collection_id_][table_file.id_];
return Status::OK();
}
Status
FilesHolder::OngoingFileChecker::UnmarkOngoingFileNoLock(const meta::SegmentSchema& table_file) {
if (table_file.collection_id_.empty() || table_file.file_id_.empty()) {
return Status(DB_ERROR, "Invalid collection files");
}
auto iter = ongoing_files_.find(table_file.collection_id_);
if (iter != ongoing_files_.end()) {
auto it_file = iter->second.find(table_file.id_);
if (it_file != iter->second.end()) {
it_file->second--;
LOG_ENGINE_DEBUG_ << "Unmark ongoing file:" << table_file.file_id_ << " refcount:" << it_file->second;
if (it_file->second <= 0) {
iter->second.erase(table_file.id_);
if (iter->second.empty()) {
ongoing_files_.erase(table_file.collection_id_);
}
}
}
}
return Status::OK();
}
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////
FilesHolder::FilesHolder() {
}
FilesHolder::~FilesHolder() {
ReleaseFiles();
}
Status
FilesHolder::MarkFile(const meta::SegmentSchema& file) {
std::lock_guard<std::mutex> lck(mutex_);
return MarkFileInternal(file);
}
Status
FilesHolder::MarkFiles(const meta::SegmentsSchema& files) {
std::lock_guard<std::mutex> lck(mutex_);
for (auto& file : files) {
MarkFileInternal(file);
}
return Status::OK();
}
Status
FilesHolder::UnmarkFile(const meta::SegmentSchema& file) {
std::lock_guard<std::mutex> lck(mutex_);
return UnmarkFileInternal(file);
}
Status
FilesHolder::UnmarkFiles(const meta::SegmentsSchema& files) {
std::lock_guard<std::mutex> lck(mutex_);
for (auto& file : files) {
UnmarkFileInternal(file);
}
return Status::OK();
}
void
FilesHolder::ReleaseFiles() {
std::lock_guard<std::mutex> lck(mutex_);
OngoingFileChecker::GetInstance().UnmarkOngoingFiles(hold_files_);
hold_files_.clear();
unique_ids_.clear();
}
bool
FilesHolder::CanBeDeleted(const meta::SegmentSchema& file) {
return OngoingFileChecker::GetInstance().CanBeDeleted(file);
}
void
FilesHolder::PrintInfo() {
return OngoingFileChecker::GetInstance().PrintInfo();
}
Status
FilesHolder::MarkFileInternal(const meta::SegmentSchema& file) {
if (unique_ids_.find(file.id_) != unique_ids_.end()) {
return Status::OK(); // already marked
}
auto status = OngoingFileChecker::GetInstance().MarkOngoingFile(file);
if (status.ok()) {
unique_ids_.insert(file.id_);
hold_files_.push_back(file);
}
return status;
}
Status
FilesHolder::UnmarkFileInternal(const meta::SegmentSchema& file) {
if (unique_ids_.find(file.id_) == unique_ids_.end()) {
return Status::OK(); // no such file
}
auto status = OngoingFileChecker::GetInstance().UnmarkOngoingFile(file);
if (status.ok()) {
for (auto iter = hold_files_.begin(); iter != hold_files_.end(); ++iter) {
if (file.id_ == (*iter).id_) {
hold_files_.erase(iter);
break;
}
}
unique_ids_.erase(file.id_);
}
return status;
}
} // namespace meta
} // namespace engine
} // namespace milvus
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
#pragma once
#include "db/meta/Meta.h"
#include "utils/Status.h"
#include <map>
#include <mutex>
#include <set>
#include <string>
namespace milvus {
namespace engine {
namespace meta {
class FilesHolder {
public:
FilesHolder();
virtual ~FilesHolder();
Status
MarkFile(const meta::SegmentSchema& file);
Status
MarkFiles(const meta::SegmentsSchema& files);
Status
UnmarkFile(const meta::SegmentSchema& file);
Status
UnmarkFiles(const meta::SegmentsSchema& files);
const milvus::engine::meta::SegmentsSchema&
HoldFiles() const {
return hold_files_;
}
milvus::engine::meta::SegmentsSchema&
HoldFiles() {
return hold_files_;
}
void
ReleaseFiles();
static bool
CanBeDeleted(const meta::SegmentSchema& file);
static void
PrintInfo();
private:
class OngoingFileChecker {
public:
static OngoingFileChecker&
GetInstance();
Status
MarkOngoingFile(const meta::SegmentSchema& file);
Status
MarkOngoingFiles(const meta::SegmentsSchema& files);
Status
UnmarkOngoingFile(const meta::SegmentSchema& file);
Status
UnmarkOngoingFiles(const meta::SegmentsSchema& files);
bool
CanBeDeleted(const meta::SegmentSchema& file);
void
PrintInfo();
private:
Status
MarkOngoingFileNoLock(const meta::SegmentSchema& file);
Status
UnmarkOngoingFileNoLock(const meta::SegmentSchema& file);
private:
std::mutex mutex_;
meta::Table2FileRef ongoing_files_; // collection id mapping to (file id mapping to ongoing ref-count)
};
private:
Status
MarkFileInternal(const meta::SegmentSchema& file);
Status
UnmarkFileInternal(const meta::SegmentSchema& file);
private:
std::mutex mutex_;
milvus::engine::meta::SegmentsSchema hold_files_;
std::set<uint64_t> unique_ids_;
};
} // namespace meta
} // namespace engine
} // namespace milvus
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
#include "db/meta/Meta.h"
namespace milvus {
namespace engine {
namespace meta {
const char* META_ENVIRONMENT = "Environment";
const char* META_TABLES = "Tables";
const char* META_TABLEFILES = "TableFiles";
const char* META_COLLECTIONS = "Collections";
const char* META_FIELDS = "Fields";
const char* META_COLLECTIONFILES = "CollectionFiles";
} // namespace meta
} // namespace engine
} // namespace milvus
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
#pragma once
#include <cstddef>
#include <memory>
#include <set>
#include <string>
#include <vector>
#include "MetaTypes.h"
#include "db/Options.h"
#include "db/Types.h"
#include "db/meta/FilesHolder.h"
#include "utils/Status.h"
namespace milvus {
namespace engine {
namespace meta {
extern const char* META_ENVIRONMENT;
extern const char* META_TABLES;
extern const char* META_TABLEFILES;
extern const char* META_COLLECTIONS;
extern const char* META_FIELDS;
extern const char* META_COLLECTIONFILES;
class FilesHolder;
class Meta {
/*
public:
class CleanUpFilter {
public:
virtual bool
IsIgnored(const SegmentSchema& schema) = 0;
};
*/
public:
virtual ~Meta() = default;
virtual Status
CreateCollection(CollectionSchema& table_schema) = 0;
virtual Status
DescribeCollection(CollectionSchema& table_schema) = 0;
virtual Status
HasCollection(const std::string& collection_id, bool& has_or_not, bool is_root = false) = 0;
virtual Status
AllCollections(std::vector<CollectionSchema>& table_schema_array, bool is_root = false) = 0;
virtual Status
UpdateCollectionFlag(const std::string& collection_id, int64_t flag) = 0;
virtual Status
UpdateCollectionFlushLSN(const std::string& collection_id, uint64_t flush_lsn) = 0;
virtual Status
GetCollectionFlushLSN(const std::string& collection_id, uint64_t& flush_lsn) = 0;
virtual Status
DropCollections(const std::vector<std::string>& collection_id_array) = 0;
virtual Status
DeleteCollectionFiles(const std::vector<std::string>& collection_id_array) = 0;
virtual Status
CreateCollectionFile(SegmentSchema& file_schema) = 0;
virtual Status
GetCollectionFiles(const std::string& collection_id, const std::vector<size_t>& ids, FilesHolder& files_holder) = 0;
virtual Status
GetCollectionFilesBySegmentId(const std::string& segment_id, FilesHolder& files_holder) = 0;
virtual Status
UpdateCollectionFile(SegmentSchema& file_schema) = 0;
virtual Status
UpdateCollectionFiles(SegmentsSchema& files) = 0;
virtual Status
UpdateCollectionFilesRowCount(SegmentsSchema& files) = 0;
virtual Status
UpdateCollectionIndex(const std::string& collection_id, const CollectionIndex& index) = 0;
virtual Status
UpdateCollectionFilesToIndex(const std::string& collection_id) = 0;
virtual Status
DescribeCollectionIndex(const std::string& collection_id, CollectionIndex& index) = 0;
virtual Status
DropCollectionIndex(const std::string& collection_id) = 0;
virtual Status
CreatePartition(const std::string& collection_name, const std::string& partition_name, const std::string& tag,
uint64_t lsn) = 0;
virtual Status
HasPartition(const std::string& collection_id, const std::string& tag, bool& has_or_not) = 0;
virtual Status
DropPartition(const std::string& partition_name) = 0;
virtual Status
ShowPartitions(const std::string& collection_name, std::vector<meta::CollectionSchema>& partition_schema_array) = 0;
virtual Status
GetPartitionName(const std::string& collection_name, const std::string& tag, std::string& partition_name) = 0;
virtual Status
FilesToSearch(const std::string& collection_id, FilesHolder& files_holder) = 0;
virtual Status
FilesToSearchEx(const std::string& root_collection, const std::set<std::string>& partition_id_array,
FilesHolder& files_holder) = 0;
virtual Status
FilesToMerge(const std::string& collection_id, FilesHolder& files_holder) = 0;
virtual Status
FilesToIndex(FilesHolder& files_holder) = 0;
virtual Status
FilesByType(const std::string& collection_id, const std::vector<int>& file_types, FilesHolder& files_holder) = 0;
virtual Status
FilesByTypeEx(const std::vector<meta::CollectionSchema>& collections, const std::vector<int>& file_types,
FilesHolder& files_holder) = 0;
virtual Status
FilesByID(const std::vector<size_t>& ids, FilesHolder& files_holder) = 0;
virtual Status
Size(uint64_t& result) = 0;
virtual Status
Archive() = 0;
virtual Status
CleanUpShadowFiles() = 0;
virtual Status
CleanUpFilesWithTTL(uint64_t seconds /*, CleanUpFilter* filter = nullptr*/) = 0;
virtual Status
DropAll() = 0;
virtual Status
Count(const std::string& collection_id, uint64_t& result) = 0;
virtual Status
SetGlobalLastLSN(uint64_t lsn) = 0;
virtual Status
GetGlobalLastLSN(uint64_t& lsn) = 0;
virtual Status
CreateHybridCollection(CollectionSchema& collection_schema, hybrid::FieldsSchema& fields_schema) = 0;
virtual Status
DescribeHybridCollection(CollectionSchema& collection_schema, hybrid::FieldsSchema& fields_schema) = 0;
}; // MetaData
using MetaPtr = std::shared_ptr<Meta>;
} // namespace meta
} // namespace engine
} // namespace milvus
......@@ -114,80 +114,7 @@ enum class StructuredIndexType {
namespace meta {
constexpr int32_t DEFAULT_ENGINE_TYPE = (int)EngineType::FAISS_IDMAP;
constexpr int32_t DEFAULT_METRIC_TYPE = (int)MetricType::L2;
constexpr int32_t DEFAULT_INDEX_FILE_SIZE = 1024;
constexpr char CURRENT_VERSION[] = MILVUS_VERSION;
constexpr int64_t FLAG_MASK_NO_USERID = 0x1;
constexpr int64_t FLAG_MASK_HAS_USERID = 0x1 << 1;
using DateT = int;
const DateT EmptyDate = -1;
struct EnvironmentSchema {
uint64_t global_lsn_ = 0;
}; // EnvironmentSchema
struct CollectionSchema {
typedef enum {
NORMAL,
TO_DELETE,
} TABLE_STATE;
size_t id_ = 0;
std::string collection_id_;
int32_t state_ = (int)NORMAL;
int64_t dimension_ = 0;
int64_t created_on_ = 0;
int64_t flag_ = 0;
int64_t index_file_size_ = DEFAULT_INDEX_FILE_SIZE;
int32_t engine_type_ = DEFAULT_ENGINE_TYPE;
std::string index_params_ = "{}";
int32_t metric_type_ = DEFAULT_METRIC_TYPE;
std::string owner_collection_;
std::string partition_tag_;
std::string version_ = CURRENT_VERSION;
uint64_t flush_lsn_ = 0;
}; // CollectionSchema
struct SegmentSchema {
typedef enum {
NEW,
RAW,
TO_INDEX,
INDEX,
TO_DELETE,
NEW_MERGE,
NEW_INDEX,
BACKUP,
} FILE_TYPE;
size_t id_ = 0;
std::string collection_id_;
std::string segment_id_;
std::string file_id_;
int32_t file_type_ = NEW;
size_t file_size_ = 0;
size_t row_count_ = 0;
DateT date_ = EmptyDate;
int64_t dimension_ = 0;
// TODO(zhiru)
std::string location_;
int64_t updated_time_ = 0;
int64_t created_on_ = 0;
int64_t index_file_size_ = DEFAULT_INDEX_FILE_SIZE; // not persist to meta
int32_t engine_type_ = DEFAULT_ENGINE_TYPE;
std::string index_params_; // not persist to meta
int32_t metric_type_ = DEFAULT_METRIC_TYPE; // not persist to meta
uint64_t flush_lsn_ = 0;
}; // SegmentSchema
using SegmentSchemaPtr = std::shared_ptr<meta::SegmentSchema>;
using SegmentsSchema = std::vector<SegmentSchema>;
using File2RefCount = std::map<uint64_t, int64_t>;
using Table2FileRef = std::map<std::string, File2RefCount>;
namespace hybrid {
......@@ -210,73 +137,6 @@ enum DataType {
VECTOR_FLOAT = 101,
};
struct VectorFieldSchema {
std::string vector_id_;
int64_t dimension;
int64_t index_file_size_ = DEFAULT_INDEX_FILE_SIZE;
int32_t engine_type_ = DEFAULT_ENGINE_TYPE;
std::string index_params_ = "{}";
int32_t metric_type_ = DEFAULT_METRIC_TYPE;
};
struct VectorFieldsSchema {
std::vector<VectorFieldSchema> vector_fields_;
};
using VectorFieldSchemaPtr = std::shared_ptr<VectorFieldSchema>;
struct FieldSchema {
// TODO(yukun): need field_id?
std::string collection_id_;
std::string field_name_;
int32_t field_type_ = (int)INT8;
std::string index_name_;
std::string index_param_ = "{}";
std::string field_params_ = "{}";
};
struct FieldsSchema {
std::vector<FieldSchema> fields_schema_;
};
using FieldSchemaPtr = std::shared_ptr<FieldSchema>;
struct VectorFileSchema {
std::string field_name_;
int64_t index_file_size_ = DEFAULT_INDEX_FILE_SIZE; // not persist to meta
int32_t engine_type_ = DEFAULT_ENGINE_TYPE;
std::string index_params_ = "{}"; // not persist to meta
int32_t metric_type_ = DEFAULT_METRIC_TYPE; // not persist to meta
};
using VectorFileSchemaPtr = std::shared_ptr<VectorFileSchema>;
struct CollectionFileSchema {
typedef enum {
NEW,
RAW,
TO_INDEX,
INDEX,
TO_DELETE,
NEW_MERGE,
NEW_INDEX,
BACKUP,
} FILE_TYPE;
size_t id_ = 0;
std::string collection_id_;
std::string segment_id_;
std::string file_id_;
int32_t file_type_ = NEW;
size_t file_size_ = 0;
size_t row_count_ = 0;
DateT date_ = EmptyDate;
std::string location_;
int64_t updated_time_ = 0;
int64_t created_on_ = 0;
uint64_t flush_lsn_ = 0;
};
using CollectionFileSchemaPtr = std::shared_ptr<CollectionFileSchema>;
} // namespace hybrid
} // namespace meta
......
此差异已折叠。
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
#pragma once
#include <mysql++/mysql++.h>
#include <memory>
#include <mutex>
#include <set>
#include <string>
#include <vector>
#include "Meta.h"
#include "MySQLConnectionPool.h"
#include "db/Options.h"
namespace milvus {
namespace engine {
namespace meta {
class MySQLMetaImpl : public Meta {
public:
MySQLMetaImpl(const DBMetaOptions& options, const int& mode);
~MySQLMetaImpl();
Status
CreateCollection(CollectionSchema& collection_schema) override;
Status
DescribeCollection(CollectionSchema& collection_schema) override;
Status
HasCollection(const std::string& collection_id, bool& has_or_not, bool is_root = false) override;
Status
AllCollections(std::vector<CollectionSchema>& collection_schema_array, bool is_root = false) override;
Status
DropCollections(const std::vector<std::string>& collection_id_array) override;
Status
DeleteCollectionFiles(const std::vector<std::string>& collection_id_array) override;
Status
CreateCollectionFile(SegmentSchema& file_schema) override;
Status
GetCollectionFiles(const std::string& collection_id, const std::vector<size_t>& ids,
FilesHolder& files_holder) override;
Status
GetCollectionFilesBySegmentId(const std::string& segment_id, FilesHolder& files_holder) override;
Status
UpdateCollectionIndex(const std::string& collection_id, const CollectionIndex& index) override;
Status
UpdateCollectionFlag(const std::string& collection_id, int64_t flag) override;
Status
UpdateCollectionFlushLSN(const std::string& collection_id, uint64_t flush_lsn) override;
Status
GetCollectionFlushLSN(const std::string& collection_id, uint64_t& flush_lsn) override;
Status
UpdateCollectionFile(SegmentSchema& file_schema) override;
Status
UpdateCollectionFilesToIndex(const std::string& collection_id) override;
Status
UpdateCollectionFiles(SegmentsSchema& files) override;
Status
UpdateCollectionFilesRowCount(SegmentsSchema& files) override;
Status
DescribeCollectionIndex(const std::string& collection_id, CollectionIndex& index) override;
Status
DropCollectionIndex(const std::string& collection_id) override;
Status
CreatePartition(const std::string& collection_id, const std::string& partition_name, const std::string& tag,
uint64_t lsn) override;
Status
HasPartition(const std::string& collection_id, const std::string& tag, bool& has_or_not) override;
Status
DropPartition(const std::string& partition_name) override;
Status
ShowPartitions(const std::string& collection_id,
std::vector<meta::CollectionSchema>& partition_schema_array) override;
Status
GetPartitionName(const std::string& collection_id, const std::string& tag, std::string& partition_name) override;
Status
FilesToSearch(const std::string& collection_id, FilesHolder& files_holder) override;
Status
FilesToSearchEx(const std::string& root_collection, const std::set<std::string>& partition_id_array,
FilesHolder& files_holder) override;
Status
FilesToMerge(const std::string& collection_id, FilesHolder& files_holder) override;
Status
FilesToIndex(FilesHolder& files_holder) override;
Status
FilesByType(const std::string& collection_id, const std::vector<int>& file_types,
FilesHolder& files_holder) override;
Status
FilesByTypeEx(const std::vector<meta::CollectionSchema>& collections, const std::vector<int>& file_types,
FilesHolder& files_holder) override;
Status
FilesByID(const std::vector<size_t>& ids, FilesHolder& files_holder) override;
Status
Archive() override;
Status
Size(uint64_t& result) override;
Status
CleanUpShadowFiles() override;
Status
CleanUpFilesWithTTL(uint64_t seconds /*, CleanUpFilter* filter = nullptr*/) override;
Status
DropAll() override;
Status
Count(const std::string& collection_id, uint64_t& result) override;
Status
SetGlobalLastLSN(uint64_t lsn) override;
Status
GetGlobalLastLSN(uint64_t& lsn) override;
Status
CreateHybridCollection(CollectionSchema& collection_schema, hybrid::FieldsSchema& fields_schema) override;
Status
DescribeHybridCollection(CollectionSchema& collection_schema, hybrid::FieldsSchema& fields_schema) override;
private:
Status
NextFileId(std::string& file_id);
Status
NextCollectionId(std::string& collection_id);
Status
DiscardFiles(int64_t to_discard_size);
void
ValidateMetaSchema();
Status
Initialize();
private:
const DBMetaOptions options_;
const int mode_;
std::shared_ptr<MySQLConnectionPool> mysql_connection_pool_;
bool safe_grab_ = false; // Safely graps a connection from mysql pool
std::mutex meta_mutex_;
std::mutex genid_mutex_;
// std::mutex connectionMutex_;
}; // DBMetaImpl
} // namespace meta
} // namespace engine
} // namespace milvus
......@@ -24,9 +24,6 @@ namespace milvus {
namespace engine {
namespace wal {
using TableSchemaPtr = std::shared_ptr<milvus::engine::meta::CollectionSchema>;
using TableMetaPtr = std::shared_ptr<std::unordered_map<std::string, TableSchemaPtr>>;
#define UNIT_MB (1024 * 1024)
#define UNIT_B 1
#define LSN_OFFSET_MASK 0x00000000ffffffff
......
......@@ -51,96 +51,6 @@ WalManager::WalManager(const MXLogConfiguration& config) {
WalManager::~WalManager() {
}
ErrorCode
WalManager::Init(const meta::MetaPtr& meta) {
uint64_t applied_lsn = 0;
p_meta_handler_ = std::make_shared<MXLogMetaHandler>(mxlog_config_.mxlog_path);
if (p_meta_handler_ != nullptr) {
p_meta_handler_->GetMXLogInternalMeta(applied_lsn);
}
uint64_t recovery_start = 0;
if (meta != nullptr) {
meta->GetGlobalLastLSN(recovery_start);
std::vector<meta::CollectionSchema> collention_schema_array;
auto status = meta->AllCollections(collention_schema_array);
if (!status.ok()) {
return WAL_META_ERROR;
}
if (!collention_schema_array.empty()) {
u_int64_t min_flushed_lsn = ~(u_int64_t)0;
u_int64_t max_flushed_lsn = 0;
auto update_limit_lsn = [&](u_int64_t lsn) {
if (min_flushed_lsn > lsn) {
min_flushed_lsn = lsn;
}
if (max_flushed_lsn < lsn) {
max_flushed_lsn = lsn;
}
};
for (auto& col_schema : collention_schema_array) {
auto& collection = collections_[col_schema.collection_id_];
auto& default_part = collection[""];
default_part.flush_lsn = col_schema.flush_lsn_;
update_limit_lsn(default_part.flush_lsn);
std::vector<meta::CollectionSchema> partition_schema_array;
status = meta->ShowPartitions(col_schema.collection_id_, partition_schema_array);
if (!status.ok()) {
return WAL_META_ERROR;
}
for (auto& par_schema : partition_schema_array) {
auto& partition = collection[par_schema.partition_tag_];
partition.flush_lsn = par_schema.flush_lsn_;
update_limit_lsn(partition.flush_lsn);
}
}
if (applied_lsn < max_flushed_lsn) {
// a new WAL folder?
applied_lsn = max_flushed_lsn;
}
if (recovery_start < min_flushed_lsn) {
// not flush all yet
recovery_start = min_flushed_lsn;
}
for (auto& col : collections_) {
for (auto& part : col.second) {
part.second.wal_lsn = applied_lsn;
}
}
}
}
// all tables are droped and a new wal path?
if (applied_lsn < recovery_start) {
applied_lsn = recovery_start;
}
ErrorCode error_code = WAL_ERROR;
p_buffer_ = std::make_shared<MXLogBuffer>(mxlog_config_.mxlog_path, mxlog_config_.buffer_size);
if (p_buffer_ != nullptr) {
if (p_buffer_->Init(recovery_start, applied_lsn)) {
error_code = WAL_SUCCESS;
} else if (mxlog_config_.recovery_error_ignore) {
p_buffer_->Reset(applied_lsn);
error_code = WAL_SUCCESS;
} else {
error_code = WAL_FILE_ERROR;
}
}
// buffer size may changed
mxlog_config_.buffer_size = p_buffer_->GetBufferSize();
last_applied_lsn_ = applied_lsn;
return error_code;
}
ErrorCode
WalManager::Init() {
uint64_t applied_lsn = 0;
......
......@@ -33,14 +33,6 @@ class WalManager {
explicit WalManager(const MXLogConfiguration& config);
~WalManager();
/*
* init
* @param meta
* @retval error_code
*/
ErrorCode
Init(const meta::MetaPtr& meta);
ErrorCode
Init();
......
......@@ -15,7 +15,6 @@
#include <string>
#include <unordered_map>
#include "db/meta/Meta.h"
#include "db/meta/MetaFactory.h"
#include "db/meta/MetaTypes.h"
#include "db/wal/WalDefinations.h"
......
此差异已折叠。
......@@ -28,9 +28,6 @@
namespace milvus {
namespace scheduler {
using SegmentSchemaPtr = engine::meta::SegmentSchemaPtr;
using SegmentSchema = engine::meta::SegmentSchema;
using ExecutionEnginePtr = engine::ExecutionEnginePtr;
using EngineFactory = engine::EngineFactory;
using EngineType = engine::EngineType;
......
......@@ -26,11 +26,16 @@ BuildIndexJob::BuildIndexJob(engine::DBOptions options, const std::string& colle
JobTasks
BuildIndexJob::CreateTasks() {
engine::TargetFieldGroups target_groups;
BuildIndexTask::GroupFieldsForIndex(collection_name_, target_groups);
std::vector<TaskPtr> tasks;
for (auto& id : segment_ids_) {
auto task = std::make_shared<BuildIndexTask>(options_, collection_name_, id, nullptr);
task->job_ = this;
tasks.emplace_back(task);
for (auto& group : target_groups) {
auto task = std::make_shared<BuildIndexTask>(options_, collection_name_, id, group, nullptr);
task->job_ = this;
tasks.emplace_back(task);
}
}
return tasks;
}
......
......@@ -25,7 +25,6 @@
#include "Job.h"
#include "db/SnapshotVisitor.h"
#include "db/Types.h"
//#include "db/meta/MetaTypes.h"
#include "server/context/Context.h"
......
#-------------------------------------------------------------------------------
# Copyright (C) 2019-2020 Zilliz. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
# or implied. See the License for the specific language governing permissions and limitations under the License.
#-------------------------------------------------------------------------------
aux_source_directory(${MILVUS_ENGINE_SRC}/search search_files)
add_library(search STATIC ${search_files})
target_include_directories(search PUBLIC ${MILVUS_ENGINE_SRC}/search)
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册