提交 bce021f9 编写于 作者: S Smilencer 提交者: neverchanje

feat(collector): add statistics for partition hotspot (#444)

上级 d4d6ef48
// Copyright (c) 2017, Xiaomi, Inc. All rights reserved.
// This source code is licensed under the Apache License Version 2.0, which
// can be found in the LICENSE file in the root directory of this source tree.
#pragma once
#include "shell/commands.h"
namespace pegasus {
namespace server {
struct hotspot_partition_data
{
hotspot_partition_data(const row_data &row)
: total_qps(row.get_total_qps()),
total_cu(row.get_total_cu()),
partition_name(row.row_name){};
hotspot_partition_data() {}
double total_qps;
double total_cu;
std::string partition_name;
};
} // namespace server
} // namespace pegasus
......@@ -91,6 +91,9 @@ info_collector::~info_collector()
for (auto kv : _app_stat_counters) {
delete kv.second;
}
for (auto store : _hotspot_calculator_store) {
delete store.second;
}
}
void info_collector::start()
......@@ -125,100 +128,46 @@ void info_collector::stop() { _tracker.cancel_outstanding_tasks(); }
void info_collector::on_app_stat()
{
ddebug("start to stat apps");
std::vector<row_data> rows;
if (!get_app_stat(&_shell_context, "", rows)) {
std::map<std::string, std::vector<row_data>> all_rows;
if (!get_app_partition_stat(&_shell_context, all_rows)) {
derror("call get_app_stat() failed");
return;
}
std::vector<double> read_qps;
std::vector<double> write_qps;
rows.resize(rows.size() + 1);
read_qps.resize(rows.size());
write_qps.resize(rows.size());
row_data &all = rows.back();
all.row_name = "_all_";
for (int i = 0; i < rows.size() - 1; ++i) {
row_data &row = rows[i];
all.get_qps += row.get_qps;
all.multi_get_qps += row.multi_get_qps;
all.put_qps += row.put_qps;
all.multi_put_qps += row.multi_put_qps;
all.remove_qps += row.remove_qps;
all.multi_remove_qps += row.multi_remove_qps;
all.incr_qps += row.incr_qps;
all.check_and_set_qps += row.check_and_set_qps;
all.check_and_mutate_qps += row.check_and_mutate_qps;
all.scan_qps += row.scan_qps;
all.recent_read_cu += row.recent_read_cu;
all.recent_write_cu += row.recent_write_cu;
all.recent_expire_count += row.recent_expire_count;
all.recent_filter_count += row.recent_filter_count;
all.recent_abnormal_count += row.recent_abnormal_count;
all.recent_write_throttling_delay_count += row.recent_write_throttling_delay_count;
all.recent_write_throttling_reject_count += row.recent_write_throttling_reject_count;
all.storage_mb += row.storage_mb;
all.storage_count += row.storage_count;
all.rdb_block_cache_hit_count += row.rdb_block_cache_hit_count;
all.rdb_block_cache_total_count += row.rdb_block_cache_total_count;
all.rdb_index_and_filter_blocks_mem_usage += row.rdb_index_and_filter_blocks_mem_usage;
all.rdb_memtable_mem_usage += row.rdb_memtable_mem_usage;
all.rdb_estimate_num_keys += row.rdb_estimate_num_keys;
read_qps[i] = row.get_qps + row.multi_get_qps + row.scan_qps;
write_qps[i] = row.put_qps + row.multi_put_qps + row.remove_qps + row.multi_remove_qps +
row.incr_qps + row.check_and_set_qps + row.check_and_mutate_qps;
}
read_qps[read_qps.size() - 1] = all.get_qps + all.multi_get_qps + all.scan_qps;
write_qps[read_qps.size() - 1] = all.put_qps + all.multi_put_qps + all.remove_qps +
all.multi_remove_qps + all.incr_qps + all.check_and_set_qps +
all.check_and_mutate_qps;
for (int i = 0; i < rows.size(); ++i) {
row_data &row = rows[i];
AppStatCounters *counters = get_app_counters(row.row_name);
counters->get_qps->set(row.get_qps);
counters->multi_get_qps->set(row.multi_get_qps);
counters->put_qps->set(row.put_qps);
counters->multi_put_qps->set(row.multi_put_qps);
counters->remove_qps->set(row.remove_qps);
counters->multi_remove_qps->set(row.multi_remove_qps);
counters->incr_qps->set(row.incr_qps);
counters->check_and_set_qps->set(row.check_and_set_qps);
counters->check_and_mutate_qps->set(row.check_and_mutate_qps);
counters->scan_qps->set(row.scan_qps);
counters->recent_read_cu->set(row.recent_read_cu);
counters->recent_write_cu->set(row.recent_write_cu);
counters->recent_expire_count->set(row.recent_expire_count);
counters->recent_filter_count->set(row.recent_filter_count);
counters->recent_abnormal_count->set(row.recent_abnormal_count);
counters->recent_write_throttling_delay_count->set(row.recent_write_throttling_delay_count);
counters->recent_write_throttling_reject_count->set(
row.recent_write_throttling_reject_count);
counters->storage_mb->set(row.storage_mb);
counters->storage_count->set(row.storage_count);
counters->rdb_block_cache_hit_rate->set(
std::abs(row.rdb_block_cache_total_count) < 1e-6
? 0
: row.rdb_block_cache_hit_count / row.rdb_block_cache_total_count * 1000000);
counters->rdb_index_and_filter_blocks_mem_usage->set(
row.rdb_index_and_filter_blocks_mem_usage);
counters->rdb_memtable_mem_usage->set(row.rdb_memtable_mem_usage);
counters->rdb_estimate_num_keys->set(row.rdb_estimate_num_keys);
counters->read_qps->set(read_qps[i]);
counters->write_qps->set(write_qps[i]);
table_stats all_stats("_all_");
for (const auto &app_rows : all_rows) {
// get statistics data for app
table_stats app_stats(app_rows.first);
for (auto partition_row : app_rows.second) {
app_stats.aggregate(partition_row);
}
get_app_counters(app_stats.app_name)->set(app_stats);
// get row data statistics for all of the apps
all_stats.merge(app_stats);
// hotspot_calculator is to detect hotspots
hotspot_calculator *hotspot_calculator =
get_hotspot_calculator(app_rows.first, app_rows.second.size());
hotspot_calculator->aggregate(app_rows.second);
// new policy can be designed by strategy pattern in hotspot_partition_data.h
hotspot_calculator->start_alg();
}
get_app_counters(all_stats.app_name)->set(all_stats);
ddebug("stat apps succeed, app_count = %d, total_read_qps = %.2f, total_write_qps = %.2f",
(int)(rows.size() - 1),
read_qps[read_qps.size() - 1],
write_qps[read_qps.size() - 1]);
(int)(all_rows.size() - 1),
all_stats.get_total_read_qps(),
all_stats.get_total_write_qps());
}
info_collector::AppStatCounters *info_collector::get_app_counters(const std::string &app_name)
info_collector::app_stat_counters *info_collector::get_app_counters(const std::string &app_name)
{
::dsn::utils::auto_lock<::dsn::utils::ex_lock_nr> l(_app_stat_counter_lock);
auto find = _app_stat_counters.find(app_name);
if (find != _app_stat_counters.end()) {
return find->second;
}
AppStatCounters *counters = new AppStatCounters();
app_stat_counters *counters = new app_stat_counters();
char counter_name[1024];
char counter_desc[1024];
......@@ -329,5 +278,17 @@ void info_collector::on_storage_size_stat(int remaining_retry_count)
_result_writer->set_result(st_stat.timestamp, "ss", st_stat.dump_to_json());
}
hotspot_calculator *info_collector::get_hotspot_calculator(const std::string &app_name,
const int partition_num)
{
auto iter = _hotspot_calculator_store.find(app_name);
if (iter != _hotspot_calculator_store.end()) {
return iter->second;
}
hotspot_calculator *calculator_address = new hotspot_calculator(app_name, partition_num);
_hotspot_calculator_store[app_name] = calculator_address;
return calculator_address;
}
} // namespace server
} // namespace pegasus
......@@ -16,9 +16,10 @@
#include <event2/event.h>
#include <event2/http.h>
#include <event2/bufferevent.h>
#include <fstream>
#include "../shell/commands.h"
#include "table_stats.h"
#include "table_hotspot_policy.h"
namespace pegasus {
namespace server {
......@@ -28,8 +29,44 @@ class result_writer;
class info_collector
{
public:
struct AppStatCounters
struct app_stat_counters
{
void set(const table_stats &row_stats)
{
get_qps->set(row_stats.total_get_qps);
multi_get_qps->set(row_stats.total_multi_get_qps);
put_qps->set(row_stats.total_put_qps);
multi_put_qps->set(row_stats.total_multi_put_qps);
remove_qps->set(row_stats.total_remove_qps);
multi_remove_qps->set(row_stats.total_multi_remove_qps);
incr_qps->set(row_stats.total_incr_qps);
check_and_set_qps->set(row_stats.total_check_and_set_qps);
check_and_mutate_qps->set(row_stats.total_check_and_mutate_qps);
scan_qps->set(row_stats.total_scan_qps);
recent_read_cu->set(row_stats.total_recent_read_cu);
recent_write_cu->set(row_stats.total_recent_write_cu);
recent_expire_count->set(row_stats.total_recent_expire_count);
recent_filter_count->set(row_stats.total_recent_filter_count);
recent_abnormal_count->set(row_stats.total_recent_abnormal_count);
recent_write_throttling_delay_count->set(
row_stats.total_recent_write_throttling_delay_count);
recent_write_throttling_reject_count->set(
row_stats.total_recent_write_throttling_reject_count);
storage_mb->set(row_stats.total_storage_mb);
storage_count->set(row_stats.total_storage_count);
rdb_block_cache_hit_rate->set(
std::abs(row_stats.total_rdb_block_cache_total_count) < 1e-6
? 0
: row_stats.total_rdb_block_cache_hit_count /
row_stats.total_rdb_block_cache_total_count * 1000000);
rdb_index_and_filter_blocks_mem_usage->set(
row_stats.total_rdb_index_and_filter_blocks_mem_usage);
rdb_memtable_mem_usage->set(row_stats.total_rdb_memtable_mem_usage);
rdb_estimate_num_keys->set(row_stats.total_rdb_estimate_num_keys);
read_qps->set(row_stats.get_total_read_qps());
write_qps->set(row_stats.get_total_write_qps());
}
::dsn::perf_counter_wrapper get_qps;
::dsn::perf_counter_wrapper multi_get_qps;
::dsn::perf_counter_wrapper put_qps;
......@@ -65,7 +102,7 @@ public:
void stop();
void on_app_stat();
AppStatCounters *get_app_counters(const std::string &app_name);
app_stat_counters *get_app_counters(const std::string &app_name);
void on_capacity_unit_stat(int remaining_retry_count);
bool has_capacity_unit_updated(const std::string &node_address, const std::string &timestamp);
......@@ -80,7 +117,7 @@ private:
uint32_t _app_stat_interval_seconds;
::dsn::task_ptr _app_stat_timer_task;
::dsn::utils::ex_lock_nr _app_stat_counter_lock;
std::map<std::string, AppStatCounters *> _app_stat_counters;
std::map<std::string, app_stat_counters *> _app_stat_counters;
// app for recording usage statistics, including read/write capacity unit and storage size.
std::string _usage_stat_app;
......@@ -99,6 +136,11 @@ private:
::dsn::utils::ex_lock_nr _capacity_unit_update_info_lock;
// mapping 'node address' --> 'last updated timestamp'
std::map<std::string, string> _capacity_unit_update_info;
std::map<std::string, hotspot_calculator *> _hotspot_calculator_store;
hotspot_calculator *get_hotspot_calculator(const std::string &app_name,
const int partition_num);
};
} // namespace server
} // namespace pegasus
// Copyright (c) 2017, Xiaomi, Inc. All rights reserved.
// This source code is licensed under the Apache License Version 2.0, which
// can be found in the LICENSE file in the root directory of this source tree.
#include "table_hotspot_policy.h"
#include <dsn/dist/fmt_logging.h>
namespace pegasus {
namespace server {
void hotspot_calculator::aggregate(const std::vector<row_data> &partitions)
{
while (_app_data.size() > kMaxQueueSize - 1) {
_app_data.pop();
}
std::vector<hotspot_partition_data> temp(partitions.size());
for (int i = 0; i < partitions.size(); i++) {
temp[i] = std::move(hotspot_partition_data(partitions[i]));
}
_app_data.emplace(temp);
}
void hotspot_calculator::init_perf_counter(const int perf_counter_count)
{
std::string counter_name;
std::string counter_desc;
for (int i = 0; i < perf_counter_count; i++) {
string paritition_desc = _app_name + '.' + std::to_string(i);
counter_name = fmt::format("app.stat.hotspots@{}", paritition_desc);
counter_desc = fmt::format("statistic the hotspots of app {}", paritition_desc);
_points[i].init_app_counter(
"app.pegasus", counter_name.c_str(), COUNTER_TYPE_NUMBER, counter_desc.c_str());
}
}
void hotspot_calculator::start_alg() { _policy->analysis(_app_data, _points); }
} // namespace server
} // namespace pegasus
// Copyright (c) 2017, Xiaomi, Inc. All rights reserved.
// This source code is licensed under the Apache License Version 2.0, which
// can be found in the LICENSE file in the root directory of this source tree.
#pragma once
#include "hotspot_partition_data.h"
#include <algorithm>
#include <gtest/gtest_prod.h>
#include <dsn/perf_counter/perf_counter.h>
namespace pegasus {
namespace server {
class hotspot_policy
{
public:
// hotspot_app_data store the historical data which related to hotspot
// it uses rolling queue to save one app's data
// vector is used to save the partitions' data of this app
// hotspot_partition_data is used to save data of one partition
virtual void analysis(const std::queue<std::vector<hotspot_partition_data>> &hotspot_app_data,
std::vector<::dsn::perf_counter_wrapper> &hot_points) = 0;
};
class hotspot_algo_qps_skew : public hotspot_policy
{
public:
void analysis(const std::queue<std::vector<hotspot_partition_data>> &hotspot_app_data,
std::vector<::dsn::perf_counter_wrapper> &hot_points)
{
const auto &anly_data = hotspot_app_data.back();
double min_total_qps = INT_MAX;
for (auto partition_anly_data : anly_data) {
min_total_qps = std::min(min_total_qps, partition_anly_data.total_qps);
}
min_total_qps = std::max(1.0, min_total_qps);
dassert(anly_data.size() == hot_points.size(), "partition counts error, please check");
for (int i = 0; i < hot_points.size(); i++) {
hot_points[i]->set(anly_data[i].total_qps / min_total_qps);
}
}
};
// hotspot_calculator is used to find the hotspot in Pegasus
class hotspot_calculator
{
public:
hotspot_calculator(const std::string &app_name, const int partition_num)
: _app_name(app_name), _points(partition_num), _policy(new hotspot_algo_qps_skew())
{
init_perf_counter(partition_num);
}
void aggregate(const std::vector<row_data> &partitions);
void start_alg();
void init_perf_counter(const int perf_counter_count);
private:
const std::string _app_name;
std::vector<::dsn::perf_counter_wrapper> _points;
std::queue<std::vector<hotspot_partition_data>> _app_data;
std::unique_ptr<hotspot_policy> _policy;
static const int kMaxQueueSize = 100;
FRIEND_TEST(table_hotspot_policy, hotspot_algo_qps_skew);
};
} // namespace server
} // namespace pegasus
// Copyright (c) 2017, Xiaomi, Inc. All rights reserved.
// This source code is licensed under the Apache License Version 2.0, which
// can be found in the LICENSE file in the root directory of this source tree.
#pragma once
struct table_stats
{
table_stats(const std::string &app_name) : app_name(app_name) {}
double get_total_read_qps() const
{
return total_get_qps + total_multi_get_qps + total_scan_qps;
}
double get_total_write_qps() const
{
return total_put_qps + total_multi_put_qps + total_remove_qps + total_multi_remove_qps +
total_incr_qps + total_check_and_set_qps + total_check_and_mutate_qps;
}
void aggregate(const row_data &row)
{
total_get_qps += row.get_qps;
total_multi_get_qps += row.multi_get_qps;
total_put_qps += row.put_qps;
total_multi_put_qps += row.multi_put_qps;
total_remove_qps += row.remove_qps;
total_multi_remove_qps += row.multi_remove_qps;
total_incr_qps += row.incr_qps;
total_check_and_set_qps += row.check_and_set_qps;
total_check_and_mutate_qps += row.check_and_mutate_qps;
total_scan_qps += row.scan_qps;
total_recent_read_cu += row.recent_read_cu;
total_recent_write_cu += row.recent_write_cu;
total_recent_expire_count += row.recent_expire_count;
total_recent_filter_count += row.recent_filter_count;
total_recent_abnormal_count += row.recent_abnormal_count;
total_recent_write_throttling_delay_count += row.recent_write_throttling_delay_count;
total_recent_write_throttling_reject_count += row.recent_write_throttling_reject_count;
total_storage_mb += row.storage_mb;
total_storage_count += row.storage_count;
total_rdb_block_cache_hit_count += row.rdb_block_cache_hit_count;
total_rdb_block_cache_total_count += row.rdb_block_cache_total_count;
total_rdb_index_and_filter_blocks_mem_usage += row.rdb_index_and_filter_blocks_mem_usage;
total_rdb_memtable_mem_usage += row.rdb_memtable_mem_usage;
total_rdb_estimate_num_keys += row.rdb_estimate_num_keys;
}
void merge(const table_stats &row_stats)
{
total_get_qps += row_stats.total_get_qps;
total_multi_get_qps += row_stats.total_multi_get_qps;
total_put_qps += row_stats.total_put_qps;
total_multi_put_qps += row_stats.total_multi_put_qps;
total_remove_qps += row_stats.total_remove_qps;
total_multi_remove_qps += row_stats.total_multi_remove_qps;
total_incr_qps += row_stats.total_incr_qps;
total_check_and_set_qps += row_stats.total_check_and_set_qps;
total_check_and_mutate_qps += row_stats.total_check_and_mutate_qps;
total_scan_qps += row_stats.total_scan_qps;
total_recent_read_cu += row_stats.total_recent_read_cu;
total_recent_write_cu += row_stats.total_recent_write_cu;
total_recent_expire_count += row_stats.total_recent_expire_count;
total_recent_filter_count += row_stats.total_recent_filter_count;
total_recent_abnormal_count += row_stats.total_recent_abnormal_count;
total_recent_write_throttling_delay_count +=
row_stats.total_recent_write_throttling_delay_count;
total_recent_write_throttling_reject_count +=
row_stats.total_recent_write_throttling_reject_count;
total_storage_mb += row_stats.total_storage_mb;
total_storage_count += row_stats.total_storage_count;
total_rdb_block_cache_hit_count += row_stats.total_rdb_block_cache_hit_count;
total_rdb_block_cache_total_count += row_stats.total_rdb_block_cache_total_count;
total_rdb_index_and_filter_blocks_mem_usage +=
row_stats.total_rdb_index_and_filter_blocks_mem_usage;
total_rdb_memtable_mem_usage += row_stats.total_rdb_memtable_mem_usage;
total_rdb_estimate_num_keys += row_stats.total_rdb_estimate_num_keys;
}
std::string app_name;
double total_get_qps = 0;
double total_multi_get_qps = 0;
double total_put_qps = 0;
double total_multi_put_qps = 0;
double total_remove_qps = 0;
double total_multi_remove_qps = 0;
double total_incr_qps = 0;
double total_check_and_set_qps = 0;
double total_check_and_mutate_qps = 0;
double total_scan_qps = 0;
double total_recent_read_cu = 0;
double total_recent_write_cu = 0;
double total_recent_expire_count = 0;
double total_recent_filter_count = 0;
double total_recent_abnormal_count = 0;
double total_recent_write_throttling_delay_count = 0;
double total_recent_write_throttling_reject_count = 0;
double total_storage_mb = 0;
double total_storage_count = 0;
double total_rdb_block_cache_hit_count = 0;
double total_rdb_block_cache_total_count = 0;
double total_rdb_index_and_filter_blocks_mem_usage = 0;
double total_rdb_memtable_mem_usage = 0;
double total_rdb_estimate_num_keys = 0;
double max_total_qps = 0;
double min_total_qps = INT_MAX;
double max_total_cu = 0;
double min_total_cu = INT_MAX;
std::string max_qps_partition_id;
std::string max_cu_partition_id;
};
......@@ -7,6 +7,7 @@ set(MY_PROJ_SRC "../pegasus_server_impl.cpp"
"../pegasus_server_write.cpp"
"../capacity_unit_calculator.cpp"
"../pegasus_mutation_duplicator.cpp"
"../table_hotspot_policy.cpp"
)
set(MY_SRC_SEARCH_MODE "GLOB")
......
// Copyright (c) 2017, Xiaomi, Inc. All rights reserved.
// This source code is licensed under the Apache License Version 2.0, which
// can be found in the LICENSE file in the root directory of this source tree.
#include "server/table_hotspot_policy.h"
#include <gtest/gtest.h>
namespace pegasus {
namespace server {
TEST(table_hotspot_policy, hotspot_algo_qps_skew)
{
std::vector<row_data> test_rows(2);
test_rows[0].get_qps = 1234.0;
test_rows[1].get_qps = 4321.0;
hotspot_calculator test_hotspot_calculator("TEST", 2);
test_hotspot_calculator.aggregate(test_rows);
test_hotspot_calculator.start_alg();
std::vector<double> result(2);
for (int i = 0; i < test_hotspot_calculator._points.size(); i++) {
result[i] = test_hotspot_calculator._points[i]->get_value();
}
std::vector<double> expect_vector{1, 3};
ASSERT_EQ(expect_vector, result);
}
} // namespace server
} // namespace pegasus
......@@ -512,6 +512,14 @@ inline bool parse_app_pegasus_perf_counter_name(const std::string &name,
struct row_data
{
double get_total_qps() const
{
return get_qps + multi_get_qps + scan_qps + put_qps + multi_put_qps + remove_qps +
multi_remove_qps + incr_qps + check_and_set_qps + check_and_mutate_qps;
}
double get_total_cu() const { return recent_read_cu + recent_write_cu; }
std::string row_name;
int32_t app_id = 0;
int32_t partition_count = 0;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册