未验证 提交 6e3991d1 编写于 作者: S shengjun.li 提交者: GitHub

#1426 Support to configure whether to enabled autoflush (#1468)

Signed-off-by: Nshengjun.li <shengjun.li@zilliz.com>
上级 23e0816b
......@@ -40,6 +40,7 @@ Please mark all change in change log and use the issue from GitHub
- \#813 Add push mode for prometheus monitor
- \#815 Support MinIO storage
- \#823 Support binary vector tanimoto/jaccard/hamming metric
- \#830 - Support WAL(write-ahead logging)
- \#853 Support HNSW
- \#861 Support DeleteById / SearchByID / GetVectorById / Flush
- \#910 Change Milvus c++ standard to c++17
......@@ -68,8 +69,10 @@ Please mark all change in change log and use the issue from GitHub
- \#1234 Do S3 server validation check when Milvus startup
- \#1263 Allow system conf modifiable and some take effect directly
- \#1320 Remove debug logging from faiss
- \#1426 - Support to configure whether to enabled autoflush and the autoflush interval
- \#1444 Improve delete
## Task
- \#1327 Exclude third-party code from codebeat
- \#1331 Exclude third-party code from codacy
......
......@@ -46,13 +46,14 @@ server_config:
# | loaded when Milvus server starts up. | | |
# | '*' means preload all existing tables. | | |
#----------------------+------------------------------------------------------------+------------+-----------------+
# auto_flush_interval | The interval, in milliseconds, at which Milvus | Integer | 1000 (ms) |
# | automatically flushes data to disk. | | |
# auto_flush_interval | The interval, in seconds, at which Milvus automatically | Integer | 1 (s) |
# | flushes data to disk. | | |
# | 0 means disable the regular flush. | | |
#----------------------+------------------------------------------------------------+------------+-----------------+
db_config:
backend_url: sqlite://:@:/
preload_table:
auto_flush_interval: 1000
auto_flush_interval: 1
#----------------------+------------------------------------------------------------+------------+-----------------+
# Storage Config | Description | Type | Default |
......
......@@ -46,13 +46,14 @@ server_config:
# | loaded when Milvus server starts up. | | |
# | '*' means preload all existing tables. | | |
#----------------------+------------------------------------------------------------+------------+-----------------+
# auto_flush_interval | The interval, in milliseconds, at which Milvus | Integer | 1000 (ms) |
# | automatically flushes data to disk. | | |
# auto_flush_interval | The interval, in seconds, at which Milvus automatically | Integer | 1 (s) |
# | flushes data to disk. | | |
# | 0 means disable the regular flush. | | |
#----------------------+------------------------------------------------------------+------------+-----------------+
db_config:
backend_url: sqlite://:@:/
preload_table:
auto_flush_interval: 1000
auto_flush_interval: 1
#----------------------+------------------------------------------------------------+------------+-----------------+
# Storage Config | Description | Type | Default |
......
......@@ -46,13 +46,14 @@ server_config:
# | loaded when Milvus server starts up. | | |
# | '*' means preload all existing tables. | | |
#----------------------+------------------------------------------------------------+------------+-----------------+
# auto_flush_interval | The interval, in milliseconds, at which Milvus | Integer | 1000 (ms) |
# | automatically flushes data to disk. | | |
# auto_flush_interval | The interval, in seconds, at which Milvus automatically | Integer | 1 (s) |
# | flushes data to disk. | | |
# | 0 means disable the regular flush. | | |
#----------------------+------------------------------------------------------------+------------+-----------------+
db_config:
backend_url: sqlite://:@:/
preload_table:
auto_flush_interval: 1000
auto_flush_interval: 1
#----------------------+------------------------------------------------------------+------------+-----------------+
# Storage Config | Description | Type | Default |
......
......@@ -146,7 +146,7 @@ DBImpl::Stop() {
if (options_.mode_ != DBOptions::MODE::CLUSTER_READONLY) {
if (options_.wal_enable_) {
// wait flush merge/buildindex finish
wal_task_swn_.Notify();
bg_task_swn_.Notify();
bg_wal_thread_.join();
} else {
......@@ -156,6 +156,7 @@ DBImpl::Stop() {
ExecWalRecord(record);
// wait merge/buildindex finish
bg_task_swn_.Notify();
bg_timer_thread_.join();
}
......@@ -500,7 +501,7 @@ DBImpl::InsertVectors(const std::string& table_id, const std::string& partition_
} else if (!vectors.binary_data_.empty()) {
wal_mgr_->Insert(table_id, partition_tag, vectors.id_array_, vectors.binary_data_);
}
wal_task_swn_.Notify();
bg_task_swn_.Notify();
} else {
wal::MXLogRecord record;
......@@ -543,7 +544,7 @@ DBImpl::DeleteVectors(const std::string& table_id, IDNumbers vector_ids) {
Status status;
if (options_.wal_enable_) {
wal_mgr_->DeleteById(table_id, vector_ids);
wal_task_swn_.Notify();
bg_task_swn_.Notify();
} else {
wal::MXLogRecord record;
......@@ -583,7 +584,7 @@ DBImpl::Flush(const std::string& table_id) {
auto lsn = wal_mgr_->Flush(table_id);
ENGINE_LOG_DEBUG << "wal_mgr_->Flush";
if (lsn != 0) {
wal_task_swn_.Notify();
bg_task_swn_.Notify();
flush_task_swn_.Wait();
ENGINE_LOG_DEBUG << "flush_task_swn_.Wait()";
}
......@@ -614,7 +615,7 @@ DBImpl::Flush() {
ENGINE_LOG_DEBUG << "WAL flush";
auto lsn = wal_mgr_->Flush();
if (lsn != 0) {
wal_task_swn_.Notify();
bg_task_swn_.Notify();
flush_task_swn_.Wait();
}
} else {
......@@ -1204,7 +1205,11 @@ DBImpl::BackgroundTimerTask() {
break;
}
std::this_thread::sleep_for(std::chrono::milliseconds(options_.auto_flush_interval_));
if (options_.auto_flush_interval_ > 0) {
bg_task_swn_.Wait_For(std::chrono::seconds(options_.auto_flush_interval_));
} else {
bg_task_swn_.Wait();
}
StartMetricTask();
StartMergeTask();
......@@ -1945,10 +1950,13 @@ void
DBImpl::BackgroundWalTask() {
server::SystemInfo::GetInstance().Init();
std::chrono::system_clock::time_point next_auto_flush_time;
auto get_next_auto_flush_time = [&]() {
return std::chrono::system_clock::now() + std::chrono::milliseconds(options_.auto_flush_interval_);
return std::chrono::system_clock::now() + std::chrono::seconds(options_.auto_flush_interval_);
};
auto next_auto_flush_time = get_next_auto_flush_time();
if (options_.auto_flush_interval_ > 0) {
next_auto_flush_time = get_next_auto_flush_time();
}
wal::MXLogRecord record;
......@@ -1963,9 +1971,11 @@ DBImpl::BackgroundWalTask() {
};
while (true) {
if (std::chrono::system_clock::now() >= next_auto_flush_time) {
auto_flush();
next_auto_flush_time = get_next_auto_flush_time();
if (options_.auto_flush_interval_ > 0) {
if (std::chrono::system_clock::now() >= next_auto_flush_time) {
auto_flush();
next_auto_flush_time = get_next_auto_flush_time();
}
}
auto error_code = wal_mgr_->GetNextRecord(record);
......@@ -1981,7 +1991,7 @@ DBImpl::BackgroundWalTask() {
flush_task_swn_.Notify();
// if user flush all manually, update auto flush also
if (record.table_id.empty()) {
if (record.table_id.empty() && options_.auto_flush_interval_ > 0) {
next_auto_flush_time = get_next_auto_flush_time();
}
}
......@@ -1995,7 +2005,11 @@ DBImpl::BackgroundWalTask() {
break;
}
wal_task_swn_.Wait_Until(next_auto_flush_time);
if (options_.auto_flush_interval_ > 0) {
bg_task_swn_.Wait_Until(next_auto_flush_time);
} else {
bg_task_swn_.Wait();
}
}
}
}
......
......@@ -260,6 +260,15 @@ class DBImpl : public DB {
notified_ = false;
}
void
Wait_For(const std::chrono::system_clock::duration& tm_dur) {
std::unique_lock<std::mutex> lck(mutex_);
if (!notified_) {
cv_.wait_for(lck, tm_dur);
}
notified_ = false;
}
void
Notify() {
std::unique_lock<std::mutex> lck(mutex_);
......@@ -269,7 +278,7 @@ class DBImpl : public DB {
}
};
SimpleWaitNotify wal_task_swn_;
SimpleWaitNotify bg_task_swn_;
SimpleWaitNotify flush_task_swn_;
ThreadPool merge_thread_pool_;
......
......@@ -71,7 +71,7 @@ struct DBOptions {
size_t insert_buffer_size_ = 4 * ONE_GB;
bool insert_cache_immediately_ = false;
int64_t auto_flush_interval_ = 1000;
int64_t auto_flush_interval_ = 1;
// wal relative configurations
bool wal_enable_ = true;
......
......@@ -37,14 +37,7 @@ ParserLsn(uint64_t lsn, uint32_t& file_no, uint32_t& offset) {
}
MXLogBuffer::MXLogBuffer(const std::string& mxlog_path, const uint32_t buffer_size)
: mxlog_buffer_size_(buffer_size), mxlog_writer_(mxlog_path) {
if (mxlog_buffer_size_ < (uint32_t)WAL_BUFFER_MIN_SIZE) {
WAL_LOG_INFO << "config wal buffer size is too small " << mxlog_buffer_size_;
mxlog_buffer_size_ = (uint32_t)WAL_BUFFER_MIN_SIZE;
} else if (mxlog_buffer_size_ > (uint32_t)WAL_BUFFER_MAX_SIZE) {
WAL_LOG_INFO << "config wal buffer size is too larger " << mxlog_buffer_size_;
mxlog_buffer_size_ = (uint32_t)WAL_BUFFER_MAX_SIZE;
}
: mxlog_buffer_size_(buffer_size * UNIT_MB), mxlog_writer_(mxlog_path) {
}
MXLogBuffer::~MXLogBuffer() {
......
......@@ -25,8 +25,7 @@ namespace wal {
using TableSchemaPtr = std::shared_ptr<milvus::engine::meta::TableSchema>;
using TableMetaPtr = std::shared_ptr<std::unordered_map<std::string, TableSchemaPtr> >;
#define WAL_BUFFER_MAX_SIZE ((uint32_t)2 * 1024 * 1024 * 1024)
#define WAL_BUFFER_MIN_SIZE ((uint32_t)32 * 1024 * 1024)
#define UNIT_MB (1024 * 1024)
#define LSN_OFFSET_MASK 0x00000000ffffffff
enum class MXLogType { InsertBinary, InsertVector, Delete, Update, Flush, None };
......
......@@ -16,6 +16,7 @@
#include <algorithm>
#include <memory>
#include "server/Config.h"
#include "utils/CommonUtil.h"
#include "utils/Exception.h"
#include "utils/Log.h"
......@@ -25,8 +26,11 @@ namespace engine {
namespace wal {
WalManager::WalManager(const MXLogConfiguration& config) {
__glibcxx_assert(config.buffer_size <= milvus::server::CONFIG_WAL_BUFFER_SIZE_MAX / 2);
__glibcxx_assert(config.buffer_size >= milvus::server::CONFIG_WAL_BUFFER_SIZE_MIN / 2);
mxlog_config_.recovery_error_ignore = config.recovery_error_ignore;
mxlog_config_.buffer_size = config.buffer_size * 1024 * 1024;
mxlog_config_.buffer_size = config.buffer_size;
mxlog_config_.mxlog_path = config.mxlog_path;
// check the path end with '/'
......
......@@ -748,7 +748,7 @@ Status
Config::CheckDBConfigAutoFlushInterval(const std::string& value) {
if (!ValidationUtil::ValidateStringIsNumber(value).ok()) {
std::string msg = "Invalid db configuration auto_flush_interval: " + value +
". Possible reason: db.auto_flush_interval is not a positive integer.";
". Possible reason: db.auto_flush_interval is not a natural number.";
return Status(SERVER_INVALID_ARGUMENT, msg);
}
......@@ -1673,6 +1673,11 @@ Config::GetWalConfigBufferSize(int64_t& buffer_size) {
std::string str = GetConfigStr(CONFIG_WAL, CONFIG_WAL_BUFFER_SIZE, CONFIG_WAL_BUFFER_SIZE_DEFAULT);
CONFIG_CHECK(CheckWalConfigBufferSize(str));
buffer_size = std::stoll(str);
if (buffer_size > CONFIG_WAL_BUFFER_SIZE_MAX) {
buffer_size = CONFIG_WAL_BUFFER_SIZE_MAX;
} else if (buffer_size < CONFIG_WAL_BUFFER_SIZE_MIN) {
buffer_size = CONFIG_WAL_BUFFER_SIZE_MIN;
}
return Status::OK();
}
......
......@@ -60,7 +60,7 @@ static const char* CONFIG_DB_ARCHIVE_DAYS_THRESHOLD_DEFAULT = "0";
static const char* CONFIG_DB_PRELOAD_TABLE = "preload_table";
static const char* CONFIG_DB_PRELOAD_TABLE_DEFAULT = "";
static const char* CONFIG_DB_AUTO_FLUSH_INTERVAL = "auto_flush_interval";
static const char* CONFIG_DB_AUTO_FLUSH_INTERVAL_DEFAULT = "1000";
static const char* CONFIG_DB_AUTO_FLUSH_INTERVAL_DEFAULT = "1";
/* storage config */
static const char* CONFIG_STORAGE = "storage_config";
......@@ -142,6 +142,8 @@ static const char* CONFIG_WAL_RECOVERY_ERROR_IGNORE = "recovery_error_ignore";
static const char* CONFIG_WAL_RECOVERY_ERROR_IGNORE_DEFAULT = "true";
static const char* CONFIG_WAL_BUFFER_SIZE = "buffer_size";
static const char* CONFIG_WAL_BUFFER_SIZE_DEFAULT = "256";
static const int64_t CONFIG_WAL_BUFFER_SIZE_MAX = 4096;
static const int64_t CONFIG_WAL_BUFFER_SIZE_MIN = 64;
static const char* CONFIG_WAL_WAL_PATH = "wal_path";
static const char* CONFIG_WAL_WAL_PATH_DEFAULT = "/tmp/milvus/wal";
......
......@@ -182,7 +182,7 @@ TEST(WalTest, BUFFER_INIT_TEST) {
FILE* fi = nullptr;
char buff[128];
milvus::engine::wal::MXLogBuffer buffer(WAL_GTEST_PATH, 0);
milvus::engine::wal::MXLogBuffer buffer(WAL_GTEST_PATH, 32);
// start_lsn == end_lsn, start_lsn == 0
ASSERT_TRUE(buffer.Init(0, 0));
......@@ -260,7 +260,7 @@ TEST(WalTest, BUFFER_INIT_TEST) {
TEST(WalTest, BUFFER_TEST) {
MakeEmptyTestPath();
milvus::engine::wal::MXLogBuffer buffer(WAL_GTEST_PATH, WAL_BUFFER_MAX_SIZE + 1);
milvus::engine::wal::MXLogBuffer buffer(WAL_GTEST_PATH, 2048);
uint32_t file_no = 4;
uint32_t buf_off = 100;
......
......@@ -151,6 +151,8 @@ BaseTest::GetOptions() {
auto options = milvus::engine::DBFactory::BuildOption();
options.meta_.path_ = CONFIG_PATH;
options.meta_.backend_uri_ = "sqlite://:@:/";
// BaseTest not to enable WAL
options.wal_enable_ = false;
return options;
}
......@@ -206,6 +208,8 @@ DBTest2::GetOptions() {
options.meta_.path_ = "/tmp/milvus_test";
options.meta_.archive_conf_ = milvus::engine::ArchiveConf("delete", "disk:1");
options.meta_.backend_uri_ = "sqlite://:@:/";
// DBTest2 not to enable WAL
options.wal_enable_ = false;
return options;
}
......@@ -229,7 +233,7 @@ milvus::engine::DBOptions
DBTestWALRecovery::GetOptions() {
auto options = DBTestWAL::GetOptions();
//disable auto flush
options.auto_flush_interval_ = 10000;
options.auto_flush_interval_ = 0;
return options;
}
......@@ -240,7 +244,8 @@ DBTestWALRecovery_Error::GetOptions() {
options.meta_.path_ = CONFIG_PATH;
options.meta_.backend_uri_ = "sqlite://:@:/";
options.auto_flush_interval_ = 10000;
//disable auto flush
options.auto_flush_interval_ = 0;
options.wal_enable_ = true;
options.recovery_error_ignore_ = false;
options.buffer_size_ = 128;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册