未验证 提交 b0578482 编写于 作者: S Smilencer 提交者: GitHub

refactor(collector): sort out the structure of partition hotspot detection (#597)

上级 d2485ff1
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "hotspot_partition_calculator.h"
#include <algorithm>
#include <math.h>
#include <dsn/dist/fmt_logging.h>
#include <dsn/utility/flags.h>
namespace pegasus {
namespace server {
DSN_DEFINE_int64("pegasus.collector",
max_hotspot_store_size,
100,
"the max count of historical data "
"stored in calculator, The FIFO "
"queue design is used to "
"eliminate outdated historical "
"data");
void hotspot_partition_calculator::data_aggregate(const std::vector<row_data> &partitions)
{
while (_partition_stat_histories.size() > FLAGS_max_hotspot_store_size - 1) {
_partition_stat_histories.pop();
}
std::vector<hotspot_partition_data> temp(partitions.size());
// TODO refactor the data structure
for (int i = 0; i < partitions.size(); i++) {
temp[i] = std::move(hotspot_partition_data(partitions[i]));
}
_partition_stat_histories.emplace(temp);
}
void hotspot_partition_calculator::init_perf_counter(int partition_count)
{
std::string counter_name;
std::string counter_desc;
for (int i = 0; i < partition_count; i++) {
string partition_desc = _app_name + '.' + std::to_string(i);
counter_name = fmt::format("app.stat.hotspots@{}", partition_desc);
counter_desc = fmt::format("statistic the hotspots of app {}", partition_desc);
_hot_points[i].init_app_counter(
"app.pegasus", counter_name.c_str(), COUNTER_TYPE_NUMBER, counter_desc.c_str());
}
}
void hotspot_partition_calculator::data_analyse()
{
dassert(_partition_stat_histories.back().size() == _hot_points.size(),
"partition counts error, please check");
std::vector<double> data_samples;
data_samples.reserve(_partition_stat_histories.size() * _hot_points.size());
auto temp_data = _partition_stat_histories;
double table_qps_sum = 0, standard_deviation = 0, table_qps_avg = 0;
int sample_count = 0;
while (!temp_data.empty()) {
for (const auto &partition_data : temp_data.front()) {
if (partition_data.total_qps - 1.00 > 0) {
data_samples.push_back(partition_data.total_qps);
table_qps_sum += partition_data.total_qps;
sample_count++;
}
}
temp_data.pop();
}
if (sample_count == 0) {
ddebug("_partition_stat_histories size == 0");
return;
}
table_qps_avg = table_qps_sum / sample_count;
for (const auto &data_sample : data_samples) {
standard_deviation += pow((data_sample - table_qps_avg), 2);
}
standard_deviation = sqrt(standard_deviation / sample_count);
const auto &anly_data = _partition_stat_histories.back();
for (int i = 0; i < _hot_points.size(); i++) {
double hot_point = (anly_data[i].total_qps - table_qps_avg) / standard_deviation;
// perf_counter->set can only be unsigned __int64
// use ceil to guarantee conversion results
hot_point = ceil(std::max(hot_point, double(0)));
_hot_points[i]->set(hot_point);
}
}
} // namespace server
} // namespace pegasus
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include "hotspot_partition_data.h"
#include <gtest/gtest_prod.h>
#include <dsn/perf_counter/perf_counter.h>
namespace pegasus {
namespace server {
// hotspot_partition_calculator is used to find the hot partition in a table.
class hotspot_partition_calculator
{
public:
hotspot_partition_calculator(const std::string &app_name, int partition_count)
: _app_name(app_name), _hot_points(partition_count)
{
init_perf_counter(partition_count);
}
// aggregate related data of hotspot detection
void data_aggregate(const std::vector<row_data> &partitions);
// analyse the saved data to find hotspot partition
void data_analyse();
private:
const std::string _app_name;
void init_perf_counter(int perf_counter_count);
// usually a partition with "hot-point value" >= 3 can be considered as a hotspot partition.
std::vector<dsn::perf_counter_wrapper> _hot_points;
// saving historical data can improve accuracy
std::queue<std::vector<hotspot_partition_data>> _partition_stat_histories;
FRIEND_TEST(hotspot_partition_calculator, hotspot_partition_policy);
};
} // namespace server
} // namespace pegasus
......@@ -4,21 +4,16 @@
#pragma once
#include "shell/commands.h"
#include "shell/command_helper.h"
namespace pegasus {
namespace server {
struct hotspot_partition_data
{
hotspot_partition_data(const row_data &row)
: total_qps(row.get_total_qps()),
total_cu(row.get_total_cu()),
partition_name(row.row_name){};
hotspot_partition_data(const row_data &row) : total_qps(row.get_total_qps()){};
hotspot_partition_data() {}
double total_qps;
double total_cu;
std::string partition_name;
};
} // namespace server
......
......@@ -14,6 +14,7 @@
#include "base/pegasus_const.h"
#include "result_writer.h"
#include "hotspot_partition_calculator.h"
using namespace ::dsn;
using namespace ::dsn::replication;
......@@ -78,10 +79,6 @@ info_collector::info_collector()
"storage_size_fetch_interval_seconds",
3600, // default value 1h
"storage size fetch interval seconds");
_hotspot_detect_algorithm = dsn_config_get_value_string("pegasus.collector",
"hotspot_detect_algorithm",
"hotspot_algo_qps_variance",
"hotspot_detect_algorithm");
// _storage_size_retry_wait_seconds is in range of [1, 60]
_storage_size_retry_wait_seconds =
std::min(60u, std::max(1u, _storage_size_fetch_interval_seconds / 10));
......@@ -96,9 +93,6 @@ info_collector::~info_collector()
for (auto kv : _app_stat_counters) {
delete kv.second;
}
for (auto store : _hotspot_calculator_store) {
delete store.second;
}
}
void info_collector::start()
......@@ -150,15 +144,11 @@ void info_collector::on_app_stat()
// get row data statistics for all of the apps
all_stats.merge(app_stats);
// hotspot_calculator is to detect hotspots
hotspot_calculator *hotspot_calculator =
// hotspot_partition_calculator is used for detecting hotspots
auto hotspot_partition_calculator =
get_hotspot_calculator(app_rows.first, app_rows.second.size());
if (!hotspot_calculator) {
continue;
}
hotspot_calculator->aggregate(app_rows.second);
// new policy can be designed by strategy pattern in hotspot_partition_data.h
hotspot_calculator->start_alg();
hotspot_partition_calculator->data_aggregate(app_rows.second);
hotspot_partition_calculator->data_analyse();
}
get_app_counters(all_stats.app_name)->set(all_stats);
......@@ -302,25 +292,16 @@ void info_collector::on_storage_size_stat(int remaining_retry_count)
_result_writer->set_result(st_stat.timestamp, "ss", st_stat.dump_to_json());
}
hotspot_calculator *info_collector::get_hotspot_calculator(const std::string &app_name,
const int partition_num)
std::shared_ptr<hotspot_partition_calculator>
info_collector::get_hotspot_calculator(const std::string &app_name, const int partition_count)
{
// use appname+partition_num as a key can prevent the impact of dynamic partition changes
std::string app_name_pcount = fmt::format("{}.{}", app_name, partition_num);
// use app_name+partition_count as a key can prevent the impact of dynamic partition changes
std::string app_name_pcount = fmt::format("{}.{}", app_name, partition_count);
auto iter = _hotspot_calculator_store.find(app_name_pcount);
if (iter != _hotspot_calculator_store.end()) {
return iter->second;
}
std::unique_ptr<hotspot_policy> policy;
if (_hotspot_detect_algorithm == "hotspot_algo_qps_variance") {
policy.reset(new hotspot_algo_qps_variance());
} else {
dwarn("hotspot detection is disabled");
_hotspot_calculator_store[app_name_pcount] = nullptr;
return nullptr;
}
hotspot_calculator *calculator =
new hotspot_calculator(app_name, partition_num, std::move(policy));
auto calculator = std::make_shared<hotspot_partition_calculator>(app_name, partition_count);
_hotspot_calculator_store[app_name_pcount] = calculator;
return calculator;
}
......
......@@ -19,12 +19,12 @@
#include "../shell/commands.h"
#include "table_stats.h"
#include "table_hotspot_policy.h"
namespace pegasus {
namespace server {
class result_writer;
class hotspot_partition_calculator;
class info_collector
{
......@@ -177,15 +177,16 @@ private:
uint32_t _storage_size_fetch_interval_seconds;
uint32_t _storage_size_retry_wait_seconds;
uint32_t _storage_size_retry_max_count;
std::string _hotspot_detect_algorithm;
::dsn::task_ptr _storage_size_stat_timer_task;
::dsn::utils::ex_lock_nr _capacity_unit_update_info_lock;
// mapping 'node address' --> 'last updated timestamp'
std::map<std::string, string> _capacity_unit_update_info;
std::map<std::string, hotspot_calculator *> _hotspot_calculator_store;
hotspot_calculator *get_hotspot_calculator(const std::string &app_name,
const int partition_num);
// _hotspot_calculator_store is to save hotspot_partition_calculator for each table, a
// hotspot_partition_calculator saves historical hotspot data and alert perf_counters of
// corresponding table
std::map<std::string, std::shared_ptr<hotspot_partition_calculator>> _hotspot_calculator_store;
std::shared_ptr<hotspot_partition_calculator>
get_hotspot_calculator(const std::string &app_name, const int partition_count);
};
} // namespace server
......
// Copyright (c) 2017, Xiaomi, Inc. All rights reserved.
// This source code is licensed under the Apache License Version 2.0, which
// can be found in the LICENSE file in the root directory of this source tree.
#include "table_hotspot_policy.h"
#include <dsn/dist/fmt_logging.h>
namespace pegasus {
namespace server {
void hotspot_calculator::aggregate(const std::vector<row_data> &partitions)
{
while (_app_data.size() > kMaxQueueSize - 1) {
_app_data.pop();
}
std::vector<hotspot_partition_data> temp(partitions.size());
for (int i = 0; i < partitions.size(); i++) {
temp[i] = std::move(hotspot_partition_data(partitions[i]));
}
_app_data.emplace(temp);
}
void hotspot_calculator::init_perf_counter(const int perf_counter_count)
{
std::string counter_name;
std::string counter_desc;
for (int i = 0; i < perf_counter_count; i++) {
string paritition_desc = _app_name + '.' + std::to_string(i);
counter_name = fmt::format("app.stat.hotspots@{}", paritition_desc);
counter_desc = fmt::format("statistic the hotspots of app {}", paritition_desc);
_points[i].init_app_counter(
"app.pegasus", counter_name.c_str(), COUNTER_TYPE_NUMBER, counter_desc.c_str());
}
}
void hotspot_calculator::start_alg() { _policy->analysis(_app_data, _points); }
} // namespace server
} // namespace pegasus
// Copyright (c) 2017, Xiaomi, Inc. All rights reserved.
// This source code is licensed under the Apache License Version 2.0, which
// can be found in the LICENSE file in the root directory of this source tree.
#pragma once
#include "hotspot_partition_data.h"
#include <algorithm>
#include <gtest/gtest_prod.h>
#include <math.h>
#include <dsn/perf_counter/perf_counter.h>
namespace pegasus {
namespace server {
class hotspot_policy
{
public:
// hotspot_app_data store the historical data which related to hotspot
// it uses rolling queue to save one app's data
// vector is used to save the partitions' data of this app
// hotspot_partition_data is used to save data of one partition
virtual void analysis(const std::queue<std::vector<hotspot_partition_data>> &hotspot_app_data,
std::vector<::dsn::perf_counter_wrapper> &perf_counters) = 0;
};
// PauTa Criterion
class hotspot_algo_qps_variance : public hotspot_policy
{
public:
void analysis(const std::queue<std::vector<hotspot_partition_data>> &hotspot_app_data,
std::vector<::dsn::perf_counter_wrapper> &perf_counters)
{
dassert(hotspot_app_data.back().size() == perf_counters.size(),
"partition counts error, please check");
std::vector<double> data_samples;
data_samples.reserve(hotspot_app_data.size() * perf_counters.size());
auto temp_data = hotspot_app_data;
double total = 0, sd = 0, avg = 0;
int sample_count = 0;
// avg: Average number
// sd: Standard deviation
// sample_count: Number of samples
while (!temp_data.empty()) {
for (auto partition_data : temp_data.front()) {
if (partition_data.total_qps - 1.00 > 0) {
data_samples.push_back(partition_data.total_qps);
total += partition_data.total_qps;
sample_count++;
}
}
temp_data.pop();
}
if (sample_count == 0) {
ddebug("hotspot_app_data size == 0");
return;
}
avg = total / sample_count;
for (auto data_sample : data_samples) {
sd += pow((data_sample - avg), 2);
}
sd = sqrt(sd / sample_count);
const auto &anly_data = hotspot_app_data.back();
for (int i = 0; i < perf_counters.size(); i++) {
double hot_point = (anly_data[i].total_qps - avg) / sd;
// perf_counter->set can only be unsigned __int64
// use ceil to guarantee conversion results
hot_point = ceil(std::max(hot_point, double(0)));
perf_counters[i]->set(hot_point);
}
}
};
// hotspot_calculator is used to find the hotspot in Pegasus
class hotspot_calculator
{
public:
hotspot_calculator(const std::string &app_name,
const int partition_num,
std::unique_ptr<hotspot_policy> policy)
: _app_name(app_name), _points(partition_num), _policy(std::move(policy))
{
init_perf_counter(partition_num);
}
void aggregate(const std::vector<row_data> &partitions);
void start_alg();
void init_perf_counter(const int perf_counter_count);
private:
const std::string _app_name;
std::vector<::dsn::perf_counter_wrapper> _points;
std::queue<std::vector<hotspot_partition_data>> _app_data;
std::unique_ptr<hotspot_policy> _policy;
static const int kMaxQueueSize = 100;
FRIEND_TEST(table_hotspot_policy, hotspot_algo_qps_variance);
};
} // namespace server
} // namespace pegasus
......@@ -8,7 +8,7 @@ set(MY_PROJ_SRC "../pegasus_server_impl.cpp"
"../pegasus_server_write.cpp"
"../capacity_unit_calculator.cpp"
"../pegasus_mutation_duplicator.cpp"
"../table_hotspot_policy.cpp"
"../hotspot_partition_calculator.cpp"
"../meta_store.cpp"
)
......
// Copyright (c) 2017, Xiaomi, Inc. All rights reserved.
// This source code is licensed under the Apache License Version 2.0, which
// can be found in the LICENSE file in the root directory of this source tree.
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "server/table_hotspot_policy.h"
#include "server/hotspot_partition_calculator.h"
#include <gtest/gtest.h>
namespace pegasus {
namespace server {
TEST(table_hotspot_policy, hotspot_algo_qps_variance)
TEST(hotspot_partition_calculator, hotspot_partition_policy)
{
// TODO: refactor the unit test
std::vector<row_data> test_rows(8);
test_rows[0].get_qps = 1000.0;
test_rows[1].get_qps = 1000.0;
......@@ -20,13 +34,12 @@ TEST(table_hotspot_policy, hotspot_algo_qps_variance)
test_rows[5].get_qps = 1000.0;
test_rows[6].get_qps = 1000.0;
test_rows[7].get_qps = 5000.0;
std::unique_ptr<hotspot_policy> policy(new hotspot_algo_qps_variance());
hotspot_calculator test_hotspot_calculator("TEST", 8, std::move(policy));
test_hotspot_calculator.aggregate(test_rows);
test_hotspot_calculator.start_alg();
hotspot_partition_calculator test_hotspot_calculator("TEST", 8);
test_hotspot_calculator.data_aggregate(test_rows);
test_hotspot_calculator.data_analyse();
std::vector<double> result(8);
for (int i = 0; i < test_hotspot_calculator._points.size(); i++) {
result[i] = test_hotspot_calculator._points[i]->get_value();
for (int i = 0; i < test_hotspot_calculator._hot_points.size(); i++) {
result[i] = test_hotspot_calculator._hot_points[i]->get_value();
}
std::vector<double> expect_vector{0, 0, 0, 0, 0, 0, 0, 3};
ASSERT_EQ(expect_vector, result);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册