未验证 提交 1963f3b9 编写于 作者: B BossZou 提交者: GitHub

Make cache_insert_data take effect in-service(# 1521) (#1540)

* make cache_insert_data take effect in-service (fix #1521)
Signed-off-by: NYhz <yinghao.zou@zilliz.com>

* complete omitted config setter
Signed-off-by: NYhz <yinghao.zou@zilliz.com>

* cache_insert_data take effect for MemTable
Signed-off-by: NYhz <yinghao.zou@zilliz.com>

* add config test
Signed-off-by: NYhz <yinghao.zou@zilliz.com>

* rename config handler
Signed-off-by: NYhz <yinghao.zou@zilliz.com>
上级 8cd1ec98
......@@ -37,8 +37,9 @@ Please mark all change in change log and use the issue from GitHub
- \#1510 Add set interfaces for WAL configurations
- \#1511 Fix big integer cannot pass to server correctly
- \#1518 Table count did not match after deleting vectors and compact
- \#1530 Set table file with correct engine type in meta
- \#1521 Make cache_insert_data take effect in-service
- \#1525 Add setter API for config preload_table
- \#1530 Set table file with correct engine type in meta
- \#1535 Degradation searching performance with metric_type: binary_idmap
## Feature
......
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
#include "config/handler/CacheConfigHandler.h"
#include "server/Config.h"
namespace milvus {
namespace server {
CacheConfigHandler::CacheConfigHandler() {
auto& config = Config::GetInstance();
config.GetCacheConfigCpuCacheCapacity(cpu_cache_capacity_);
config.GetCacheConfigInsertBufferSize(insert_buffer_size_);
config.GetCacheConfigCacheInsertData(cache_insert_data_);
}
CacheConfigHandler::~CacheConfigHandler() {
RemoveCpuCacheCapacityListener();
RemoveInsertBufferSizeListener();
RemoveCacheInsertDataListener();
}
void
CacheConfigHandler::OnCpuCacheCapacityChanged(int64_t value) {
cpu_cache_capacity_ = value;
}
void
CacheConfigHandler::OnInsertBufferSizeChanged(int64_t value) {
insert_buffer_size_ = value;
}
void
CacheConfigHandler::OnCacheInsertDataChanged(bool value) {
cache_insert_data_ = value;
}
void
CacheConfigHandler::AddCpuCacheCapacityListener() {
server::ConfigCallBackF lambda = [this](const std::string& value) -> Status {
server::Config& config = server::Config::GetInstance();
int64_t capacity;
auto status = config.GetCacheConfigCpuCacheCapacity(capacity);
if (status.ok()) {
OnCpuCacheCapacityChanged(capacity);
}
return status;
};
auto& config = server::Config::GetInstance();
config.RegisterCallBack(server::CONFIG_CACHE, server::CONFIG_CACHE_CPU_CACHE_CAPACITY, identity_, lambda);
}
void
CacheConfigHandler::AddInsertBufferSizeListener() {
server::ConfigCallBackF lambda = [this](const std::string& value) -> Status {
server::Config& config = server::Config::GetInstance();
int64_t size;
auto status = config.GetCacheConfigInsertBufferSize(size);
if (status.ok()) {
OnInsertBufferSizeChanged(size);
}
return status;
};
auto& config = server::Config::GetInstance();
config.RegisterCallBack(server::CONFIG_CACHE, server::CONFIG_CACHE_INSERT_BUFFER_SIZE, identity_, lambda);
}
void
CacheConfigHandler::AddCacheInsertDataListener() {
server::ConfigCallBackF lambda = [this](const std::string& value) -> Status {
auto& config = server::Config::GetInstance();
bool ok;
auto status = config.GetCacheConfigCacheInsertData(ok);
if (status.ok()) {
OnCacheInsertDataChanged(ok);
}
return status;
};
auto& config = server::Config::GetInstance();
config.RegisterCallBack(server::CONFIG_CACHE, server::CONFIG_CACHE_CACHE_INSERT_DATA, identity_, lambda);
}
void
CacheConfigHandler::RemoveCpuCacheCapacityListener() {
auto& config = server::Config::GetInstance();
config.CancelCallBack(server::CONFIG_CACHE, server::CONFIG_CACHE_CPU_CACHE_CAPACITY, identity_);
}
void
CacheConfigHandler::RemoveInsertBufferSizeListener() {
auto& config = server::Config::GetInstance();
config.CancelCallBack(server::CONFIG_CACHE, server::CONFIG_CACHE_INSERT_BUFFER_SIZE, identity_);
}
void
CacheConfigHandler::RemoveCacheInsertDataListener() {
auto& config = server::Config::GetInstance();
config.CancelCallBack(server::CONFIG_CACHE, server::CONFIG_CACHE_CACHE_INSERT_DATA, identity_);
}
} // namespace server
} // namespace milvus
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
#pragma once
#include <string>
#include "config/handler/ConfigHandler.h"
namespace milvus {
namespace server {
class CacheConfigHandler : virtual public ConfigHandler {
public:
CacheConfigHandler();
~CacheConfigHandler();
protected:
virtual void
OnCpuCacheCapacityChanged(int64_t value);
virtual void
OnInsertBufferSizeChanged(int64_t value);
virtual void
OnCacheInsertDataChanged(bool value);
protected:
void
AddCpuCacheCapacityListener();
void
AddInsertBufferSizeListener();
void
AddCacheInsertDataListener();
void
RemoveCpuCacheCapacityListener();
void
RemoveInsertBufferSizeListener();
void
RemoveCacheInsertDataListener();
private:
int64_t cpu_cache_capacity_ = 4 /*GiB*/;
int64_t insert_buffer_size_ = 1 /*GiB*/;
bool cache_insert_data_ = false;
};
} // namespace server
} // namespace milvus
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
#pragma once
#include <string>
#include "server/Config.h"
#include "utils/Log.h"
namespace milvus {
namespace server {
class ConfigHandler {
protected:
void
SetIdentity(const std::string& identity) {
auto& config = server::Config::GetInstance();
config.GenUniqueIdentityID(identity, identity_);
}
protected:
std::string identity_;
};
} // namespace server
} // namespace milvus
......@@ -9,7 +9,7 @@
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
#ifdef MILVUS_GPU_VERSION
#include "config/handler/GpuBuildResHandler.h"
#include "config/handler/GpuBuildConfigHandler.h"
#include <string>
#include <vector>
......@@ -17,24 +17,23 @@
namespace milvus {
namespace server {
GpuBuildResHandler::GpuBuildResHandler() {
GpuBuildConfigHandler::GpuBuildConfigHandler() {
server::Config& config = server::Config::GetInstance();
config.GetGpuResourceConfigBuildIndexResources(build_gpus_);
}
GpuBuildResHandler::~GpuBuildResHandler() {
server::Config& config = server::Config::GetInstance();
config.CancelCallBack(server::CONFIG_GPU_RESOURCE, server::CONFIG_GPU_RESOURCE_BUILD_INDEX_RESOURCES, identity_);
GpuBuildConfigHandler::~GpuBuildConfigHandler() {
RemoveGpuBuildResListener();
}
////////////////////////////////////////////////////////////////
void
GpuBuildResHandler::OnGpuBuildResChanged(const std::vector<int64_t>& gpus) {
GpuBuildConfigHandler::OnGpuBuildResChanged(const std::vector<int64_t>& gpus) {
build_gpus_ = gpus;
}
void
GpuBuildResHandler::AddGpuBuildResListener() {
GpuBuildConfigHandler::AddGpuBuildResListener() {
server::Config& config = server::Config::GetInstance();
server::ConfigCallBackF lambda = [this](const std::string& value) -> Status {
server::Config& config = server::Config::GetInstance();
......@@ -51,7 +50,7 @@ GpuBuildResHandler::AddGpuBuildResListener() {
}
void
GpuBuildResHandler::RemoveGpuBuildResListener() {
GpuBuildConfigHandler::RemoveGpuBuildResListener() {
auto& config = server::Config::GetInstance();
config.CancelCallBack(server::CONFIG_GPU_RESOURCE, server::CONFIG_GPU_RESOURCE_BUILD_INDEX_RESOURCES, identity_);
}
......
......@@ -13,16 +13,16 @@
#include <vector>
#include "config/handler/GpuResourcesHandler.h"
#include "config/handler/GpuConfigHandler.h"
namespace milvus {
namespace server {
class GpuBuildResHandler : virtual public GpuResourcesHandler {
class GpuBuildConfigHandler : virtual public GpuConfigHandler {
public:
GpuBuildResHandler();
GpuBuildConfigHandler();
~GpuBuildResHandler();
~GpuBuildConfigHandler();
public:
virtual void
......
......@@ -10,38 +10,32 @@
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
#ifdef MILVUS_GPU_VERSION
#include "config/handler/GpuResourcesHandler.h"
#include "config/handler/GpuConfigHandler.h"
namespace milvus {
namespace server {
GpuResourcesHandler::GpuResourcesHandler() {
GpuConfigHandler::GpuConfigHandler() {
server::Config& config = server::Config::GetInstance();
config.GetGpuResourceConfigEnable(gpu_enable_);
}
GpuResourcesHandler::~GpuResourcesHandler() {
GpuConfigHandler::~GpuConfigHandler() {
RemoveGpuEnableListener();
}
//////////////////////////////////////////////////////////////
void
GpuResourcesHandler::OnGpuEnableChanged(bool enable) {
GpuConfigHandler::OnGpuEnableChanged(bool enable) {
gpu_enable_ = enable;
}
void
GpuResourcesHandler::SetIdentity(const std::string& identity) {
server::Config& config = server::Config::GetInstance();
config.GenUniqueIdentityID(identity, identity_);
}
void
GpuResourcesHandler::AddGpuEnableListener() {
server::Config& config = server::Config::GetInstance();
GpuConfigHandler::AddGpuEnableListener() {
auto& config = server::Config::GetInstance();
server::ConfigCallBackF lambda = [this](const std::string& value) -> Status {
server::Config& config = server::Config::GetInstance();
auto& config = server::Config::GetInstance();
bool enable;
auto status = config.GetGpuResourceConfigEnable(enable);
if (status.ok()) {
......@@ -54,7 +48,7 @@ GpuResourcesHandler::AddGpuEnableListener() {
}
void
GpuResourcesHandler::RemoveGpuEnableListener() {
GpuConfigHandler::RemoveGpuEnableListener() {
server::Config& config = server::Config::GetInstance();
config.CancelCallBack(server::CONFIG_GPU_RESOURCE, server::CONFIG_GPU_RESOURCE_ENABLE, identity_);
}
......
......@@ -15,25 +15,23 @@
#include <limits>
#include <string>
#include "config/handler/ConfigHandler.h"
#include "server/Config.h"
namespace milvus {
namespace server {
class GpuResourcesHandler {
class GpuConfigHandler : virtual public ConfigHandler {
public:
GpuResourcesHandler();
GpuConfigHandler();
~GpuResourcesHandler();
~GpuConfigHandler();
protected:
virtual void
OnGpuEnableChanged(bool enable);
protected:
void
SetIdentity(const std::string& identity);
void
AddGpuEnableListener();
......@@ -42,7 +40,6 @@ class GpuResourcesHandler {
protected:
bool gpu_enable_ = true;
std::string identity_;
};
} // namespace server
......
......@@ -9,7 +9,7 @@
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
#ifdef MILVUS_GPU_VERSION
#include "config/handler/GpuSearchResHandler.h"
#include "config/handler/GpuSearchConfigHandler.h"
#include <string>
#include <vector>
......@@ -19,7 +19,7 @@
namespace milvus {
namespace server {
GpuSearchResHandler::GpuSearchResHandler() {
GpuSearchConfigHandler::GpuSearchConfigHandler() {
server::Config& config = server::Config::GetInstance();
Status s = config.GetEngineConfigGpuSearchThreshold(threshold_);
......@@ -30,24 +30,24 @@ GpuSearchResHandler::GpuSearchResHandler() {
config.GetGpuResourceConfigSearchResources(search_gpus_);
}
GpuSearchResHandler::~GpuSearchResHandler() {
GpuSearchConfigHandler::~GpuSearchConfigHandler() {
RemoveGpuSearchThresholdListener();
RemoveGpuSearchResListener();
}
////////////////////////////////////////////////////////////////////////
void
GpuSearchResHandler::OnGpuSearchThresholdChanged(int64_t threshold) {
GpuSearchConfigHandler::OnGpuSearchThresholdChanged(int64_t threshold) {
threshold_ = threshold;
}
void
GpuSearchResHandler::OnGpuSearchResChanged(const std::vector<int64_t>& gpus) {
GpuSearchConfigHandler::OnGpuSearchResChanged(const std::vector<int64_t>& gpus) {
search_gpus_ = gpus;
}
void
GpuSearchResHandler::AddGpuSearchThresholdListener() {
GpuSearchConfigHandler::AddGpuSearchThresholdListener() {
server::Config& config = server::Config::GetInstance();
server::ConfigCallBackF lambda_gpu_threshold = [this](const std::string& value) -> Status {
......@@ -65,7 +65,7 @@ GpuSearchResHandler::AddGpuSearchThresholdListener() {
}
void
GpuSearchResHandler::AddGpuSearchResListener() {
GpuSearchConfigHandler::AddGpuSearchResListener() {
server::Config& config = server::Config::GetInstance();
server::ConfigCallBackF lambda_gpu_search_res = [this](const std::string& value) -> Status {
......@@ -83,13 +83,13 @@ GpuSearchResHandler::AddGpuSearchResListener() {
}
void
GpuSearchResHandler::RemoveGpuSearchThresholdListener() {
GpuSearchConfigHandler::RemoveGpuSearchThresholdListener() {
server::Config& config = server::Config::GetInstance();
config.CancelCallBack(server::CONFIG_ENGINE, server::CONFIG_ENGINE_GPU_SEARCH_THRESHOLD, identity_);
}
void
GpuSearchResHandler::RemoveGpuSearchResListener() {
GpuSearchConfigHandler::RemoveGpuSearchResListener() {
auto& config = server::Config::GetInstance();
config.CancelCallBack(server::CONFIG_GPU_RESOURCE, server::CONFIG_GPU_RESOURCE_SEARCH_RESOURCES, identity_);
}
......
......@@ -14,16 +14,16 @@
#include <limits>
#include <vector>
#include "config/handler/GpuResourcesHandler.h"
#include "config/handler/GpuConfigHandler.h"
namespace milvus {
namespace server {
class GpuSearchResHandler : virtual public GpuResourcesHandler {
class GpuSearchConfigHandler : virtual public GpuConfigHandler {
public:
GpuSearchResHandler();
GpuSearchConfigHandler();
~GpuSearchResHandler();
~GpuSearchConfigHandler();
public:
virtual void
......
......@@ -75,10 +75,14 @@ DBImpl::DBImpl(const DBOptions& options)
wal_mgr_ = std::make_shared<wal::WalManager>(mxlog_config);
}
SetIdentity("DBImpl");
AddCacheInsertDataListener();
Start();
}
DBImpl::~DBImpl() {
RemoveCacheInsertDataListener();
Stop();
}
......@@ -2084,5 +2088,10 @@ DBImpl::BackgroundWalTask() {
}
}
void
DBImpl::OnCacheInsertDataChanged(bool value) {
options_.insert_cache_immediately_ = value;
}
} // namespace engine
} // namespace milvus
......@@ -22,7 +22,8 @@
#include <thread>
#include <vector>
#include "DB.h"
#include "config/handler/CacheConfigHandler.h"
#include "db/DB.h"
#include "db/IndexFailedChecker.h"
#include "db/OngoingFileChecker.h"
#include "db/Types.h"
......@@ -37,7 +38,7 @@ namespace meta {
class Meta;
}
class DBImpl : public DB {
class DBImpl : public DB, public server::CacheConfigHandler {
public:
explicit DBImpl(const DBOptions& options);
~DBImpl();
......@@ -146,6 +147,10 @@ class DBImpl : public DB {
Status
Size(uint64_t& result) override;
protected:
void
OnCacheInsertDataChanged(bool value) override;
private:
Status
QueryAsync(const std::shared_ptr<server::Context>& context, const std::string& table_id,
......@@ -226,7 +231,7 @@ class DBImpl : public DB {
BackgroundWalTask();
private:
const DBOptions options_;
DBOptions options_;
std::atomic<bool> initialized_;
......
......@@ -274,5 +274,10 @@ MemManagerImpl::GetMaxLSN(const MemList& tables) {
return max_lsn;
}
void
MemManagerImpl::OnInsertBufferSizeChanged(int64_t value) {
options_.insert_buffer_size_ = value * ONE_GB;
}
} // namespace engine
} // namespace milvus
......@@ -19,8 +19,9 @@
#include <string>
#include <vector>
#include "MemManager.h"
#include "MemTable.h"
#include "config/handler/CacheConfigHandler.h"
#include "db/insert/MemManager.h"
#include "db/insert/MemTable.h"
#include "db/meta/Meta.h"
#include "server/Config.h"
#include "utils/Status.h"
......@@ -28,33 +29,15 @@
namespace milvus {
namespace engine {
class MemManagerImpl : public MemManager {
class MemManagerImpl : public MemManager, public server::CacheConfigHandler {
public:
using Ptr = std::shared_ptr<MemManagerImpl>;
using MemIdMap = std::map<std::string, MemTablePtr>;
using MemList = std::vector<MemTablePtr>;
MemManagerImpl(const meta::MetaPtr& meta, const DBOptions& options) : meta_(meta), options_(options) {
server::Config& config = server::Config::GetInstance();
config.GenUniqueIdentityID("MemManagerImpl", identity_);
server::ConfigCallBackF lambda = [this](const std::string& value) -> Status {
server::Config& config = server::Config::GetInstance();
int64_t buffer_size;
auto status = config.GetCacheConfigInsertBufferSize(buffer_size);
if (status.ok()) {
options_.insert_buffer_size_ = buffer_size * ONE_GB;
}
return status;
};
config.RegisterCallBack(server::CONFIG_CACHE, server::CONFIG_CACHE_INSERT_BUFFER_SIZE, identity_, lambda);
}
~MemManagerImpl() {
server::Config& config = server::Config::GetInstance();
config.CancelCallBack(server::CONFIG_CACHE, server::CONFIG_CACHE_INSERT_BUFFER_SIZE, identity_);
SetIdentity("MemManagerImpl");
AddInsertBufferSizeListener();
}
Status
......@@ -92,6 +75,10 @@ class MemManagerImpl : public MemManager {
size_t
GetCurrentMem() override;
protected:
void
OnInsertBufferSizeChanged(int64_t value) override;
private:
MemTablePtr
GetMemByTable(const std::string& table_id);
......@@ -108,7 +95,6 @@ class MemManagerImpl : public MemManager {
uint64_t
GetMaxLSN(const MemList& tables);
std::string identity_;
MemIdMap mem_id_map_;
MemList immu_mem_list_;
meta::MetaPtr meta_;
......
......@@ -30,6 +30,8 @@ namespace engine {
MemTable::MemTable(const std::string& table_id, const meta::MetaPtr& meta, const DBOptions& options)
: table_id_(table_id), meta_(meta), options_(options) {
SetIdentity("MemTable");
AddCacheInsertDataListener();
}
Status
......@@ -381,5 +383,10 @@ MemTable::SetLSN(uint64_t lsn) {
lsn_ = lsn;
}
void
MemTable::OnCacheInsertDataChanged(bool value) {
options_.insert_cache_immediately_ = value;
}
} // namespace engine
} // namespace milvus
......@@ -18,14 +18,15 @@
#include <string>
#include <vector>
#include "MemTableFile.h"
#include "VectorSource.h"
#include "config/handler/CacheConfigHandler.h"
#include "db/insert/MemTableFile.h"
#include "db/insert/VectorSource.h"
#include "utils/Status.h"
namespace milvus {
namespace engine {
class MemTable {
class MemTable : public server::CacheConfigHandler {
public:
using MemTableFileList = std::vector<MemTableFilePtr>;
......@@ -64,6 +65,10 @@ class MemTable {
void
SetLSN(uint64_t lsn);
protected:
void
OnCacheInsertDataChanged(bool value) override;
private:
Status
ApplyDeletes();
......
......@@ -40,6 +40,13 @@ MemTableFile::MemTableFile(const std::string& table_id, const meta::MetaPtr& met
utils::GetParentPath(table_file_schema_.location_, directory);
segment_writer_ptr_ = std::make_shared<segment::SegmentWriter>(directory);
}
SetIdentity("MemTableFile");
AddCacheInsertDataListener();
}
MemTableFile::~MemTableFile() {
RemoveCacheInsertDataListener();
}
Status
......@@ -216,5 +223,10 @@ MemTableFile::GetSegmentId() const {
return table_file_schema_.segment_id_;
}
void
MemTableFile::OnCacheInsertDataChanged(bool value) {
options_.insert_cache_immediately_ = value;
}
} // namespace engine
} // namespace milvus
......@@ -17,18 +17,22 @@
#include <string>
#include <vector>
#include "VectorSource.h"
#include "config/handler/CacheConfigHandler.h"
#include "db/engine/ExecutionEngine.h"
#include "db/insert/VectorSource.h"
#include "db/meta/Meta.h"
#include "utils/Status.h"
namespace milvus {
namespace engine {
class MemTableFile {
class MemTableFile : public server::CacheConfigHandler {
public:
MemTableFile(const std::string& table_id, const meta::MetaPtr& meta, const DBOptions& options);
~MemTableFile();
public:
Status
Add(const VectorSourcePtr& source);
......@@ -53,6 +57,10 @@ class MemTableFile {
const std::string&
GetSegmentId() const;
protected:
void
OnCacheInsertDataChanged(bool value) override;
private:
Status
CreateTableFile();
......
......@@ -19,6 +19,12 @@ namespace scheduler {
BuildIndexJob::BuildIndexJob(engine::meta::MetaPtr meta_ptr, engine::DBOptions options)
: Job(JobType::BUILD), meta_ptr_(std::move(meta_ptr)), options_(std::move(options)) {
SetIdentity("BuildIndexJob");
AddCacheInsertDataListener();
}
BuildIndexJob::~BuildIndexJob() {
RemoveCacheInsertDataListener();
}
bool
......@@ -58,5 +64,10 @@ BuildIndexJob::Dump() const {
return ret;
}
void
BuildIndexJob::OnCacheInsertDataChanged(bool value) {
options_.insert_cache_immediately_ = value;
}
} // namespace scheduler
} // namespace milvus
......@@ -21,9 +21,10 @@
#include <unordered_map>
#include <vector>
#include "Job.h"
#include "config/handler/CacheConfigHandler.h"
#include "db/meta/Meta.h"
#include "scheduler/Definition.h"
#include "scheduler/job/Job.h"
namespace milvus {
namespace scheduler {
......@@ -33,10 +34,12 @@ using engine::meta::TableFileSchemaPtr;
using Id2ToIndexMap = std::unordered_map<size_t, TableFileSchemaPtr>;
using Id2ToTableFileMap = std::unordered_map<size_t, TableFileSchema>;
class BuildIndexJob : public Job {
class BuildIndexJob : public Job, public server::CacheConfigHandler {
public:
explicit BuildIndexJob(engine::meta::MetaPtr meta_ptr, engine::DBOptions options);
~BuildIndexJob();
public:
bool
AddToIndexFiles(const TableFileSchemaPtr& to_index_file);
......@@ -71,6 +74,10 @@ class BuildIndexJob : public Job {
return options_;
}
protected:
void
OnCacheInsertDataChanged(bool value) override;
private:
Id2ToIndexMap to_index_files_;
engine::meta::MetaPtr meta_ptr_;
......
......@@ -22,13 +22,13 @@
#include <unordered_map>
#include <vector>
#include "config/handler/GpuBuildResHandler.h"
#include "config/handler/GpuBuildConfigHandler.h"
#include "scheduler/optimizer/Pass.h"
namespace milvus {
namespace scheduler {
class BuildIndexPass : public Pass, public server::GpuBuildResHandler {
class BuildIndexPass : public Pass, public server::GpuBuildConfigHandler {
public:
BuildIndexPass() = default;
......
......@@ -23,13 +23,13 @@
#include <unordered_map>
#include <vector>
#include "config/handler/GpuSearchResHandler.h"
#include "config/handler/GpuSearchConfigHandler.h"
#include "scheduler/optimizer/Pass.h"
namespace milvus {
namespace scheduler {
class FaissFlatPass : public Pass, public server::GpuSearchResHandler {
class FaissFlatPass : public Pass, public server::GpuSearchConfigHandler {
public:
FaissFlatPass() = default;
......
......@@ -23,13 +23,13 @@
#include <unordered_map>
#include <vector>
#include "config/handler/GpuSearchResHandler.h"
#include "config/handler/GpuSearchConfigHandler.h"
#include "scheduler/optimizer/Pass.h"
namespace milvus {
namespace scheduler {
class FaissIVFFlatPass : public Pass, public server::GpuSearchResHandler {
class FaissIVFFlatPass : public Pass, public server::GpuSearchConfigHandler {
public:
FaissIVFFlatPass() = default;
......
......@@ -23,13 +23,13 @@
#include <unordered_map>
#include <vector>
#include "config/handler/GpuSearchResHandler.h"
#include "config/handler/GpuSearchConfigHandler.h"
#include "scheduler/optimizer/Pass.h"
namespace milvus {
namespace scheduler {
class FaissIVFPQPass : public Pass, public server::GpuSearchResHandler {
class FaissIVFPQPass : public Pass, public server::GpuSearchConfigHandler {
public:
FaissIVFPQPass() = default;
......
......@@ -23,13 +23,13 @@
#include <unordered_map>
#include <vector>
#include "config/handler/GpuSearchResHandler.h"
#include "config/handler/GpuSearchConfigHandler.h"
#include "scheduler/optimizer/Pass.h"
namespace milvus {
namespace scheduler {
class FaissIVFSQ8HPass : public Pass, public server::GpuSearchResHandler {
class FaissIVFSQ8HPass : public Pass, public server::GpuSearchConfigHandler {
public:
FaissIVFSQ8HPass() = default;
......
......@@ -23,13 +23,13 @@
#include <unordered_map>
#include <vector>
#include "config/handler/GpuSearchResHandler.h"
#include "config/handler/GpuSearchConfigHandler.h"
#include "scheduler/optimizer/Pass.h"
namespace milvus {
namespace scheduler {
class FaissIVFSQ8Pass : public Pass, public server::GpuSearchResHandler {
class FaissIVFSQ8Pass : public Pass, public server::GpuSearchConfigHandler {
public:
FaissIVFSQ8Pass() = default;
......
......@@ -262,6 +262,7 @@ Config::ResetDefaultConfig() {
CONFIG_CHECK(SetDBConfigPreloadTable(CONFIG_DB_PRELOAD_TABLE_DEFAULT));
CONFIG_CHECK(SetDBConfigArchiveDiskThreshold(CONFIG_DB_ARCHIVE_DISK_THRESHOLD_DEFAULT));
CONFIG_CHECK(SetDBConfigArchiveDaysThreshold(CONFIG_DB_ARCHIVE_DAYS_THRESHOLD_DEFAULT));
CONFIG_CHECK(SetDBConfigAutoFlushInterval(CONFIG_DB_AUTO_FLUSH_INTERVAL_DEFAULT));
/* storage config */
CONFIG_CHECK(SetStorageConfigPrimaryPath(CONFIG_STORAGE_PRIMARY_PATH_DEFAULT));
......@@ -352,6 +353,8 @@ Config::SetConfigCli(const std::string& parent_key, const std::string& child_key
status = SetDBConfigBackendUrl(value);
} else if (child_key == CONFIG_DB_PRELOAD_TABLE) {
status = SetDBConfigPreloadTable(value);
} else if (child_key == CONFIG_DB_AUTO_FLUSH_INTERVAL) {
status = SetDBConfigAutoFlushInterval(value);
} else {
status = Status(SERVER_UNEXPECTED_ERROR, invalid_node_str);
}
......@@ -428,7 +431,11 @@ Config::SetConfigCli(const std::string& parent_key, const std::string& child_key
}
#endif
} else if (parent_key == CONFIG_TRACING) {
return Status(SERVER_UNSUPPORTED_ERROR, "Not support set tracing_config currently");
if (child_key == CONFIG_TRACING_JSON_CONFIG_PATH) {
status = SetTracingConfigJsonConfigPath(value);
} else {
status = Status(SERVER_UNEXPECTED_ERROR, invalid_node_str);
}
} else if (parent_key == CONFIG_WAL) {
if (child_key == CONFIG_WAL_ENABLE) {
status = SetWalConfigEnable(value);
......@@ -504,7 +511,7 @@ Config::GenUniqueIdentityID(const std::string& identity, std::string& uid) {
// get current timestamp
auto time_now = std::chrono::system_clock::now();
auto duration_in_ms = std::chrono::duration_cast<std::chrono::microseconds>(time_now.time_since_epoch());
auto duration_in_ms = std::chrono::duration_cast<std::chrono::nanoseconds>(time_now.time_since_epoch());
elements.push_back(std::to_string(duration_in_ms.count()));
StringHelpFunctions::MergeStringWithDelimeter(elements, "-", uid);
......@@ -763,12 +770,17 @@ Config::CheckDBConfigBackendUrl(const std::string& value) {
Status
Config::CheckDBConfigPreloadTable(const std::string& value) {
fiu_return_on("check_config_preload_table_fail", Status(SERVER_INVALID_ARGUMENT, ""));
if (value.empty() || value == "*") {
return Status::OK();
}
std::vector<std::string> tables;
StringHelpFunctions::SplitStringByDelimeter(value, ",", tables);
std::unordered_set<std::string> table_set;
for (auto& table : tables) {
if (!ValidationUtil::ValidateTableName(table).ok()) {
return Status(SERVER_INVALID_ARGUMENT, "Invalid table name: " + table);
......@@ -778,6 +790,14 @@ Config::CheckDBConfigPreloadTable(const std::string& value) {
if (!(status.ok() && exist)) {
return Status(SERVER_TABLE_NOT_EXIST, "Table " + table + " not exist");
}
table_set.insert(table);
}
if (table_set.size() != tables.size()) {
std::string msg =
"Invalid preload tables. "
"Possible reason: db_config.preload_table contains duplicate table.";
return Status(SERVER_INVALID_ARGUMENT, msg);
}
return Status::OK();
......@@ -811,7 +831,10 @@ Config::CheckDBConfigArchiveDaysThreshold(const std::string& value) {
Status
Config::CheckDBConfigAutoFlushInterval(const std::string& value) {
if (!ValidationUtil::ValidateStringIsNumber(value).ok()) {
auto exist_error = !ValidationUtil::ValidateStringIsNumber(value).ok();
fiu_do_on("check_config_auto_flush_interval_fail", exist_error = true);
if (exist_error) {
std::string msg = "Invalid db configuration auto_flush_interval: " + value +
". Possible reason: db.auto_flush_interval is not a natural number.";
return Status(SERVER_INVALID_ARGUMENT, msg);
......@@ -1263,6 +1286,13 @@ Config::CheckGpuResourceConfigBuildIndexResources(const std::vector<std::string>
}
#endif
/* tracing config */
Status
Config::CheckTracingConfigJsonConfigPath(const std::string& value) {
std::string msg = "Invalid wal config: " + value +
". Possible reason: tracing_config.json_config_path is not supported to configure.";
return Status(SERVER_INVALID_ARGUMENT, msg);
}
/* wal config */
Status
......@@ -1840,6 +1870,12 @@ Config::SetDBConfigArchiveDaysThreshold(const std::string& value) {
return SetConfigValueInMem(CONFIG_DB, CONFIG_DB_ARCHIVE_DAYS_THRESHOLD, value);
}
Status
Config::SetDBConfigAutoFlushInterval(const std::string& value) {
CONFIG_CHECK(CheckDBConfigAutoFlushInterval(value));
return SetConfigValueInMem(CONFIG_DB, CONFIG_DB_AUTO_FLUSH_INTERVAL, value);
}
/* storage config */
Status
Config::SetStorageConfigPrimaryPath(const std::string& value) {
......@@ -1957,6 +1993,13 @@ Config::SetEngineConfigUseAVX512(const std::string& value) {
return SetConfigValueInMem(CONFIG_ENGINE, CONFIG_ENGINE_USE_AVX512, value);
}
/* tracing config */
Status
Config::SetTracingConfigJsonConfigPath(const std::string& value) {
CONFIG_CHECK(CheckTracingConfigJsonConfigPath(value));
return SetConfigValueInMem(CONFIG_TRACING, CONFIG_TRACING_JSON_CONFIG_PATH, value);
}
/* wal config */
Status
Config::SetWalConfigEnable(const std::string& value) {
......
......@@ -283,6 +283,10 @@ class Config {
CheckGpuResourceConfigBuildIndexResources(const std::vector<std::string>& value);
#endif
/* tracing config */
Status
CheckTracingConfigJsonConfigPath(const std::string& value);
/* wal config */
Status
CheckWalConfigEnable(const std::string& value);
......@@ -429,6 +433,8 @@ class Config {
SetDBConfigArchiveDiskThreshold(const std::string& value);
Status
SetDBConfigArchiveDaysThreshold(const std::string& value);
Status
SetDBConfigAutoFlushInterval(const std::string& value);
/* storage config */
Status
......@@ -474,6 +480,10 @@ class Config {
Status
SetEngineConfigUseAVX512(const std::string& value);
/* tracing config */
Status
SetTracingConfigJsonConfigPath(const std::string& value);
/* wal config */
Status
SetWalConfigEnable(const std::string& value);
......
......@@ -184,6 +184,11 @@ TEST_F(ConfigTest, SERVER_CONFIG_VALID_TEST) {
ASSERT_TRUE(config.GetDBConfigArchiveDaysThreshold(int64_val).ok());
ASSERT_TRUE(int64_val == db_archive_days_threshold);
int64_t db_auto_flush_interval = 1;
ASSERT_TRUE(config.SetDBConfigAutoFlushInterval(std::to_string(db_auto_flush_interval)).ok());
ASSERT_TRUE(config.GetDBConfigAutoFlushInterval(int64_val).ok());
ASSERT_TRUE(int64_val == db_auto_flush_interval);
/* storage config */
std::string storage_primary_path = "/home/zilliz";
ASSERT_TRUE(config.SetStorageConfigPrimaryPath(storage_primary_path).ok());
......@@ -573,6 +578,8 @@ TEST_F(ConfigTest, SERVER_CONFIG_INVALID_TEST) {
ASSERT_FALSE(config.SetDBConfigArchiveDaysThreshold("0x10").ok());
ASSERT_FALSE(config.SetDBConfigAutoFlushInterval("0.1").ok());
/* storage config */
ASSERT_FALSE(config.SetStorageConfigPrimaryPath("").ok());
......@@ -943,6 +950,11 @@ TEST_F(ConfigTest, SERVER_CONFIG_RESET_DEFAULT_CONFIG_FAIL_TEST) {
ASSERT_FALSE(s.ok());
fiu_disable("check_config_backend_url_fail");
fiu_enable("check_config_preload_table_fail", 1, NULL, 0);
s = config.ResetDefaultConfig();
ASSERT_FALSE(s.ok());
fiu_disable("check_config_preload_table_fail");
fiu_enable("check_config_archive_disk_threshold_fail", 1, NULL, 0);
s = config.ResetDefaultConfig();
ASSERT_FALSE(s.ok());
......@@ -953,6 +965,11 @@ TEST_F(ConfigTest, SERVER_CONFIG_RESET_DEFAULT_CONFIG_FAIL_TEST) {
ASSERT_FALSE(s.ok());
fiu_disable("check_config_archive_days_threshold_fail");
fiu_enable("check_config_auto_flush_interval_fail", 1, NULL, 0);
s = config.ResetDefaultConfig();
ASSERT_FALSE(s.ok());
fiu_disable("check_config_auto_flush_interval_fail");
fiu_enable("check_config_insert_buffer_size_fail", 1, NULL, 0);
s = config.ResetDefaultConfig();
ASSERT_FALSE(s.ok());
......
......@@ -1627,7 +1627,7 @@ TEST_F(WebControllerTest, CONFIG) {
OString table_name = "milvus_test_webcontroller_test_preload_table";
GenTable(table_name, 16, 10, "L2");
OString table_name_s = "milvus_test_webcontroller_test_preload_table";
OString table_name_s = "milvus_test_webcontroller_test_preload_table_s";
GenTable(table_name_s, 16, 10, "L2");
OString body_str = "{\"db_config\": {\"preload_table\": \"" + table_name + "\"}}";
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册