未验证 提交 4a8a779d 编写于 作者: S Shuo 提交者: GitHub

feat(rocksdb): Adapt rate limiter to prevent bust writes and huge compaction (#543)

上级 764ee4f8
......@@ -311,6 +311,8 @@
rocksdb_multi_get_max_iteration_size = 31457280
rocksdb_max_iteration_count = 1000
rocksdb_iteration_threshold_time_ms = 30000
rocksdb_limiter_max_write_megabytes_per_sec = 500
rocksdb_limiter_enable_auto_tune = false
checkpoint_reserve_min_count = 2
checkpoint_reserve_time_seconds = 1800
......
......@@ -42,12 +42,16 @@ static bool chkpt_init_from_dir(const char *name, int64_t &decree)
std::string(name) == chkpt_get_dir_name(decree);
}
std::shared_ptr<rocksdb::RateLimiter> pegasus_server_impl::_s_rate_limiter;
int64_t pegasus_server_impl::_rocksdb_limiter_last_total_through;
std::shared_ptr<rocksdb::Cache> pegasus_server_impl::_s_block_cache;
::dsn::task_ptr pegasus_server_impl::_update_server_rdb_stat;
::dsn::perf_counter_wrapper pegasus_server_impl::_pfc_rdb_block_cache_mem_usage;
::dsn::perf_counter_wrapper pegasus_server_impl::_pfc_rdb_write_limiter_rate_bytes;
const std::string pegasus_server_impl::COMPRESSION_HEADER = "per_level:";
const std::string pegasus_server_impl::DATA_COLUMN_FAMILY_NAME = "default";
const std::string pegasus_server_impl::META_COLUMN_FAMILY_NAME = "pegasus_meta_cf";
const std::chrono::seconds pegasus_server_impl::kServerStatUpdateTimeSec = std::chrono::seconds(10);
void pegasus_server_impl::parse_checkpoints()
{
......@@ -1424,16 +1428,18 @@ void pegasus_server_impl::on_clear_scanner(const int64_t &args) { _context_cache
[this]() { this->update_replica_rocksdb_statistics(); },
_update_rdb_stat_interval);
// Block cache is a singleton on this server shared by all replicas, its metrics update task
// should be scheduled once an interval on the server view.
// These counters are singletons on this server shared by all replicas, their metrics update
// task should be scheduled once an interval on the server view.
static std::once_flag flag;
std::call_once(flag, [&]() {
// The timer task will always running even though there is no replicas
dassert(kServerStatUpdateTimeSec.count() != 0,
"kServerStatUpdateTimeSec shouldn't be zero");
_update_server_rdb_stat = ::dsn::tasking::enqueue_timer(
LPC_REPLICATION_LONG_COMMON,
nullptr, // TODO: the tracker is nullptr, we will fix it later
[this]() { update_server_rocksdb_statistics(); },
_update_rdb_stat_interval);
kServerStatUpdateTimeSec);
});
// initialize cu calculator and write service after server being initialized.
......@@ -2107,9 +2113,16 @@ void pegasus_server_impl::update_server_rocksdb_statistics()
if (_s_block_cache) {
uint64_t val = _s_block_cache->GetUsage();
_pfc_rdb_block_cache_mem_usage->set(val);
dinfo_f("_pfc_rdb_block_cache_mem_usage: {} bytes", val);
} else {
dinfo("_pfc_rdb_block_cache_mem_usage: 0 bytes because block cache is disabled");
}
// Update _pfc_rdb_write_limiter_rate_bytes
if (_s_rate_limiter) {
uint64_t current_total_through = _s_rate_limiter->GetTotalBytesThrough();
uint64_t through_bytes_per_sec =
(current_total_through - _rocksdb_limiter_last_total_through) /
kServerStatUpdateTimeSec.count();
_pfc_rdb_write_limiter_rate_bytes->set(through_bytes_per_sec);
_rocksdb_limiter_last_total_through = current_total_through;
}
}
......
......@@ -13,6 +13,7 @@
#include <dsn/dist/replication/replication.codes.h>
#include <rrdb/rrdb_types.h>
#include <gtest/gtest_prod.h>
#include <rocksdb/rate_limiter.h>
#include "key_ttl_compaction_filter.h"
#include "pegasus_scan_context.h"
......@@ -317,6 +318,7 @@ private:
::dsn::error_code flush_all_family_columns(bool wait);
private:
static const std::chrono::seconds kServerStatUpdateTimeSec;
static const std::string COMPRESSION_HEADER;
// Column family names.
static const std::string DATA_COLUMN_FAMILY_NAME;
......@@ -346,6 +348,8 @@ private:
rocksdb::ColumnFamilyHandle *_data_cf;
rocksdb::ColumnFamilyHandle *_meta_cf;
static std::shared_ptr<rocksdb::Cache> _s_block_cache;
static std::shared_ptr<rocksdb::RateLimiter> _s_rate_limiter;
static int64_t _rocksdb_limiter_last_total_through;
volatile bool _is_open;
uint32_t _pegasus_data_version;
std::atomic<int64_t> _last_durable_decree;
......@@ -392,6 +396,7 @@ private:
// rocksdb internal statistics
// server level
static ::dsn::perf_counter_wrapper _pfc_rdb_write_limiter_rate_bytes;
static ::dsn::perf_counter_wrapper _pfc_rdb_block_cache_mem_usage;
// replica level
::dsn::perf_counter_wrapper _pfc_rdb_sst_count;
......
......@@ -4,6 +4,7 @@
#include "pegasus_server_impl.h"
#include <dsn/utility/flags.h>
#include <rocksdb/filter_policy.h>
#include "capacity_unit_calculator.h"
......@@ -14,6 +15,18 @@
namespace pegasus {
namespace server {
DSN_DEFINE_int64(
"pegasus.server",
rocksdb_limiter_max_write_megabytes_per_sec,
500,
"max rate of rocksdb flush and compaction(MB/s), if less than or equal to 0 means close limit");
DSN_DEFINE_bool("pegasus.server",
rocksdb_limiter_enable_auto_tune,
false,
"whether to enable write rate auto tune when open rocksdb write limit");
pegasus_server_impl::pegasus_server_impl(dsn::replication::replica *r)
: pegasus_read_service(r),
_db(nullptr),
......@@ -251,6 +264,22 @@ pegasus_server_impl::pegasus_server_impl(dsn::replication::replica *r)
tbl_opts.block_cache = _s_block_cache;
}
// FLAGS_rocksdb_limiter_max_write_megabytes_per_sec <= 0 means close the rate limit.
// For more detail arguments see
// https://github.com/facebook/rocksdb/blob/v6.6.4/include/rocksdb/rate_limiter.h#L111-L137
if (FLAGS_rocksdb_limiter_max_write_megabytes_per_sec > 0) {
static std::once_flag flag;
std::call_once(flag, [&]() {
_s_rate_limiter = std::shared_ptr<rocksdb::RateLimiter>(rocksdb::NewGenericRateLimiter(
FLAGS_rocksdb_limiter_max_write_megabytes_per_sec << 20,
100 * 1000, // refill_period_us
10, // fairness
rocksdb::RateLimiter::Mode::kWritesOnly,
FLAGS_rocksdb_limiter_enable_auto_tune));
});
_db_opts.rate_limiter = _s_rate_limiter;
}
// Bloom filter configurations.
bool disable_bloom_filter = dsn_config_get_value_bool(
"pegasus.server", "rocksdb_disable_bloom_filter", false, "Whether to disable bloom filter");
......@@ -400,8 +429,8 @@ pegasus_server_impl::pegasus_server_impl(dsn::replication::replica *r)
COUNTER_TYPE_NUMBER,
"statistic the total count of rocksdb block cache");
// Block cache is a singleton on this server shared by all replicas, so we initialize
// `_pfc_rdb_block_cache_mem_usage` only once.
// These counters are singletons on this server shared by all replicas, so we initialize
// them only once.
static std::once_flag flag;
std::call_once(flag, [&]() {
_pfc_rdb_block_cache_mem_usage.init_global_counter(
......@@ -410,6 +439,13 @@ pegasus_server_impl::pegasus_server_impl(dsn::replication::replica *r)
"rdb.block_cache.memory_usage",
COUNTER_TYPE_NUMBER,
"statistic the memory usage of rocksdb block cache");
_pfc_rdb_write_limiter_rate_bytes.init_global_counter(
"replica",
"app.pegasus",
"rdb.write_limiter_rate_bytes",
COUNTER_TYPE_NUMBER,
"statistic the through bytes of rocksdb write rate limiter");
});
snprintf(name, 255, "rdb.index_and_filter_blocks.memory_usage@%s", str_gpid.c_str());
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册