提交 19dae331 编写于 作者: S Smilencer 提交者: neverchanje

feat(hotspot): new algorithm of hotspot detection (#479)

上级 8eae8a51
......@@ -77,6 +77,10 @@ info_collector::info_collector()
"storage_size_fetch_interval_seconds",
3600, // default value 1h
"storage size fetch interval seconds");
_hotspot_detect_algorithm = dsn_config_get_value_string("pegasus.collector",
"hotspot_detect_algorithm",
"hotspot_algo_qps_variance",
"hotspot_detect_algorithm");
// _storage_size_retry_wait_seconds is in range of [1, 60]
_storage_size_retry_wait_seconds =
std::min(60u, std::max(1u, _storage_size_fetch_interval_seconds / 10));
......@@ -148,6 +152,9 @@ void info_collector::on_app_stat()
// hotspot_calculator is to detect hotspots
hotspot_calculator *hotspot_calculator =
get_hotspot_calculator(app_rows.first, app_rows.second.size());
if (!hotspot_calculator) {
continue;
}
hotspot_calculator->aggregate(app_rows.second);
// new policy can be designed by strategy pattern in hotspot_partition_data.h
hotspot_calculator->start_alg();
......@@ -285,9 +292,20 @@ hotspot_calculator *info_collector::get_hotspot_calculator(const std::string &ap
if (iter != _hotspot_calculator_store.end()) {
return iter->second;
}
hotspot_calculator *calculator_address = new hotspot_calculator(app_name, partition_num);
_hotspot_calculator_store[app_name] = calculator_address;
return calculator_address;
std::unique_ptr<hotspot_policy> policy;
if (_hotspot_detect_algorithm == "hotspot_algo_qps_variance") {
policy.reset(new hotspot_algo_qps_variance());
} else if (_hotspot_detect_algorithm == "hotspot_algo_qps_skew") {
policy.reset(new hotspot_algo_qps_skew());
} else {
dwarn("hotspot detection is disabled");
_hotspot_calculator_store[app_name] = nullptr;
return nullptr;
}
hotspot_calculator *calculator =
new hotspot_calculator(app_name, partition_num, std::move(policy));
_hotspot_calculator_store[app_name] = calculator;
return calculator;
}
} // namespace server
......
......@@ -132,6 +132,7 @@ private:
uint32_t _storage_size_fetch_interval_seconds;
uint32_t _storage_size_retry_wait_seconds;
uint32_t _storage_size_retry_max_count;
std::string _hotspot_detect_algorithm;
::dsn::task_ptr _storage_size_stat_timer_task;
::dsn::utils::ex_lock_nr _capacity_unit_update_info_lock;
// mapping 'node address' --> 'last updated timestamp'
......
......@@ -8,6 +8,8 @@
#include <algorithm>
#include <gtest/gtest_prod.h>
#include <math.h>
#include <dsn/perf_counter/perf_counter.h>
namespace pegasus {
......@@ -20,14 +22,14 @@ public:
// vector is used to save the partitions' data of this app
// hotspot_partition_data is used to save data of one partition
virtual void analysis(const std::queue<std::vector<hotspot_partition_data>> &hotspot_app_data,
std::vector<::dsn::perf_counter_wrapper> &hot_points) = 0;
std::vector<::dsn::perf_counter_wrapper> &perf_counters) = 0;
};
class hotspot_algo_qps_skew : public hotspot_policy
{
public:
void analysis(const std::queue<std::vector<hotspot_partition_data>> &hotspot_app_data,
std::vector<::dsn::perf_counter_wrapper> &hot_points)
std::vector<::dsn::perf_counter_wrapper> &perf_counters)
{
const auto &anly_data = hotspot_app_data.back();
double min_total_qps = INT_MAX;
......@@ -35,9 +37,56 @@ public:
min_total_qps = std::min(min_total_qps, partition_anly_data.total_qps);
}
min_total_qps = std::max(1.0, min_total_qps);
dassert(anly_data.size() == hot_points.size(), "partition counts error, please check");
for (int i = 0; i < hot_points.size(); i++) {
hot_points[i]->set(anly_data[i].total_qps / min_total_qps);
dassert(anly_data.size() == perf_counters.size(), "partition counts error, please check");
for (int i = 0; i < perf_counters.size(); i++) {
perf_counters[i]->set(anly_data[i].total_qps / min_total_qps);
}
}
};
// PauTa Criterion
class hotspot_algo_qps_variance : public hotspot_policy
{
public:
void analysis(const std::queue<std::vector<hotspot_partition_data>> &hotspot_app_data,
std::vector<::dsn::perf_counter_wrapper> &perf_counters)
{
dassert(hotspot_app_data.back().size() == perf_counters.size(),
"partition counts error, please check");
std::vector<double> data_samples;
data_samples.reserve(hotspot_app_data.size() * perf_counters.size());
auto temp_data = hotspot_app_data;
double total = 0, sd = 0, avg = 0;
int sample_count = 0;
// avg: Average number
// sd: Standard deviation
// sample_count: Number of samples
while (!temp_data.empty()) {
for (auto partition_data : temp_data.front()) {
if (partition_data.total_qps - 1.00 > 0) {
data_samples.push_back(partition_data.total_qps);
total += partition_data.total_qps;
sample_count++;
}
}
temp_data.pop();
}
if (sample_count == 0) {
ddebug("hotspot_app_data size == 0");
return;
}
avg = total / sample_count;
for (auto data_sample : data_samples) {
sd += pow((data_sample - avg), 2);
}
sd = sqrt(sd / sample_count);
const auto &anly_data = hotspot_app_data.back();
for (int i = 0; i < perf_counters.size(); i++) {
double hot_point = (anly_data[i].total_qps - avg) / sd;
// perf_counter->set can only be unsigned __int64
// use ceil to guarantee conversion results
hot_point = ceil(std::max(hot_point, double(0)));
perf_counters[i]->set(hot_point);
}
}
};
......@@ -46,8 +95,10 @@ public:
class hotspot_calculator
{
public:
hotspot_calculator(const std::string &app_name, const int partition_num)
: _app_name(app_name), _points(partition_num), _policy(new hotspot_algo_qps_skew())
hotspot_calculator(const std::string &app_name,
const int partition_num,
std::unique_ptr<hotspot_policy> policy)
: _app_name(app_name), _points(partition_num), _policy(std::move(policy))
{
init_perf_counter(partition_num);
}
......@@ -62,6 +113,7 @@ private:
std::unique_ptr<hotspot_policy> _policy;
static const int kMaxQueueSize = 100;
FRIEND_TEST(table_hotspot_policy, hotspot_algo_qps_variance);
FRIEND_TEST(table_hotspot_policy, hotspot_algo_qps_skew);
};
} // namespace server
......
......@@ -14,7 +14,8 @@ TEST(table_hotspot_policy, hotspot_algo_qps_skew)
std::vector<row_data> test_rows(2);
test_rows[0].get_qps = 1234.0;
test_rows[1].get_qps = 4321.0;
hotspot_calculator test_hotspot_calculator("TEST", 2);
std::unique_ptr<hotspot_policy> policy(new hotspot_algo_qps_skew());
hotspot_calculator test_hotspot_calculator("TEST", 2, std::move(policy));
test_hotspot_calculator.aggregate(test_rows);
test_hotspot_calculator.start_alg();
std::vector<double> result(2);
......@@ -25,5 +26,28 @@ TEST(table_hotspot_policy, hotspot_algo_qps_skew)
ASSERT_EQ(expect_vector, result);
}
TEST(table_hotspot_policy, hotspot_algo_qps_variance)
{
std::vector<row_data> test_rows(8);
test_rows[0].get_qps = 1000.0;
test_rows[1].get_qps = 1000.0;
test_rows[2].get_qps = 1000.0;
test_rows[3].get_qps = 1000.0;
test_rows[4].get_qps = 1000.0;
test_rows[5].get_qps = 1000.0;
test_rows[6].get_qps = 1000.0;
test_rows[7].get_qps = 5000.0;
std::unique_ptr<hotspot_policy> policy(new hotspot_algo_qps_variance());
hotspot_calculator test_hotspot_calculator("TEST", 8, std::move(policy));
test_hotspot_calculator.aggregate(test_rows);
test_hotspot_calculator.start_alg();
std::vector<double> result(8);
for (int i = 0; i < test_hotspot_calculator._points.size(); i++) {
result[i] = test_hotspot_calculator._points[i]->get_value();
}
std::vector<double> expect_vector{0, 0, 0, 0, 0, 0, 0, 3};
ASSERT_EQ(expect_vector, result);
}
} // namespace server
} // namespace pegasus
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册