未验证 提交 22238e7b 编写于 作者: S Smilencer 提交者: GitHub

fix(hotspot): replace partition_resolver to ddl_client (#641)

上级 2a74a7c6
......@@ -17,16 +17,13 @@
#include "hotspot_partition_calculator.h"
#include <algorithm>
#include <math.h>
#include <dsn/dist/fmt_logging.h>
#include <dsn/cpp/serialization_helper/dsn.layer2_types.h>
#include <dsn/utility/flags.h>
#include <dsn/tool-api/rpc_address.h>
#include <dsn/tool-api/group_address.h>
#include <dsn/utility/error_code.h>
#include <dsn/dist/replication/duplication_common.h>
#include <dsn/tool-api/task_tracker.h>
#include <dsn/utility/fail_point.h>
#include <dsn/dist/replication/duplication_common.h>
namespace pegasus {
namespace server {
......@@ -180,44 +177,39 @@ void hotspot_partition_calculator::detect_hotkey_in_hotpartition(int data_type)
const dsn::replication::detect_action::type action)
{
FAIL_POINT_INJECT_F("send_detect_hotkey_request", [](dsn::string_view) {});
auto request = dsn::make_unique<dsn::replication::detect_hotkey_request>();
request->type = hotkey_type;
request->action = action;
ddebug_f("{} {} hotkey detection in {}.{}",
int app_id;
int partition_count;
std::vector<dsn::partition_configuration> partitions;
_shell_context->ddl_client->list_app(app_name, app_id, partition_count, partitions);
auto target_address = partitions[partition_index].primary;
dsn::replication::detect_hotkey_response resp;
dsn::replication::detect_hotkey_request req;
req.type = hotkey_type;
req.action = action;
auto error = _shell_context->ddl_client->detect_hotkey(target_address, req, resp);
ddebug_f("{} {} hotkey detection in {}.{}, server address: {}",
(action == dsn::replication::detect_action::STOP) ? "Stop" : "Start",
(hotkey_type == dsn::replication::hotkey_type::WRITE) ? "write" : "read",
app_name,
partition_index);
dsn::rpc_address meta_server;
meta_server.assign_group("meta-servers");
std::vector<dsn::rpc_address> meta_servers;
replica_helper::load_meta_servers(meta_servers);
for (const auto &address : meta_servers) {
meta_server.group_address()->add(address);
partition_index,
target_address.to_string());
if (error != dsn::ERR_OK) {
derror_f("Hotkey detect rpc sending failed, in {}.{}, error_hint:{}",
app_name,
partition_index,
error.to_string());
}
auto cluster_name = dsn::replication::get_current_cluster_name();
// TODO: (Tangyanzhao) refactor partition_resolver to replication_ddl_client
auto resolver = partition_resolver::get_resolver(cluster_name, meta_servers, app_name.c_str());
dsn::task_tracker tracker;
detect_hotkey_rpc rpc(
std::move(request), RPC_DETECT_HOTKEY, std::chrono::seconds(10), partition_index);
rpc.call(resolver,
&tracker,
[app_name, partition_index](dsn::error_code error) {
if (error != dsn::ERR_OK) {
derror_f("Hotkey detect rpc sending failed, in {}.{}, error_hint:{}",
app_name,
partition_index,
error.to_string());
}
})
->wait();
if (rpc.response().err != dsn::ERR_OK) {
derror_f("Hotkey detect rpc sending failed, in {}.{}, error_hint:{} {}",
if (resp.err != dsn::ERR_OK) {
derror_f("Hotkey detect rpc executing failed, in {}.{}, error_hint:{} {}",
app_name,
partition_index,
rpc.response().err,
rpc.response().err_hint);
resp.err,
resp.err_hint);
}
}
......
......@@ -37,7 +37,9 @@ typedef std::vector<std::array<dsn::perf_counter_wrapper, 2>> hot_partition_coun
class hotspot_partition_calculator
{
public:
hotspot_partition_calculator(const std::string &app_name, int partition_count)
hotspot_partition_calculator(const std::string &app_name,
int partition_count,
std::shared_ptr<shell_context> context)
: _app_name(app_name), _hot_points(partition_count), _hotpartition_counter(partition_count)
{
init_perf_counter(partition_count);
......@@ -46,10 +48,10 @@ public:
void data_aggregate(const std::vector<row_data> &partitions);
// analyse the saved data to find hotspot partition
void data_analyse();
static void send_detect_hotkey_request(const std::string &app_name,
const uint64_t partition_index,
const dsn::replication::hotkey_type::type hotkey_type,
const dsn::replication::detect_action::type action);
void send_detect_hotkey_request(const std::string &app_name,
const uint64_t partition_index,
const dsn::replication::hotkey_type::type hotkey_type,
const dsn::replication::detect_action::type action);
private:
// empirical rule to calculate hot point of each partition
......@@ -65,6 +67,7 @@ private:
hot_partition_counters _hot_points;
// saving historical data can improve accuracy
stat_histories _partitions_stat_histories;
std::shared_ptr<shell_context> _shell_context;
// _hotpartition_counter p[index_of_partitions][type_of_read(0)/write(1)_stat]
// it's a counter to find partitions that often exceed the threshold
......
......@@ -57,9 +57,10 @@ info_collector::info_collector()
_cluster_name = dsn::replication::get_current_cluster_name();
_shell_context.current_cluster_name = _cluster_name;
_shell_context.meta_list = meta_servers;
_shell_context.ddl_client.reset(new replication_ddl_client(meta_servers));
_shell_context = std::make_shared<shell_context>();
_shell_context->current_cluster_name = _cluster_name;
_shell_context->meta_list = meta_servers;
_shell_context->ddl_client.reset(new replication_ddl_client(meta_servers));
_app_stat_interval_seconds = (uint32_t)dsn_config_get_value_uint64("pegasus.collector",
"app_stat_interval_seconds",
......@@ -143,7 +144,7 @@ void info_collector::on_app_stat()
{
ddebug("start to stat apps");
std::map<std::string, std::vector<row_data>> all_rows;
if (!get_app_partition_stat(&_shell_context, all_rows)) {
if (!get_app_partition_stat(_shell_context.get(), all_rows)) {
derror("call get_app_stat() failed");
return;
}
......@@ -241,7 +242,7 @@ void info_collector::on_capacity_unit_stat(int remaining_retry_count)
{
ddebug("start to stat capacity unit, remaining_retry_count = %d", remaining_retry_count);
std::vector<node_capacity_unit_stat> nodes_stat;
if (!get_capacity_unit_stat(&_shell_context, nodes_stat)) {
if (!get_capacity_unit_stat(_shell_context.get(), nodes_stat)) {
if (remaining_retry_count > 0) {
dwarn("get capacity unit stat failed, remaining_retry_count = %d, "
"wait %u seconds to retry",
......@@ -288,7 +289,7 @@ void info_collector::on_storage_size_stat(int remaining_retry_count)
{
ddebug("start to stat storage size, remaining_retry_count = %d", remaining_retry_count);
app_storage_size_stat st_stat;
if (!get_storage_size_stat(&_shell_context, st_stat)) {
if (!get_storage_size_stat(_shell_context.get(), st_stat)) {
if (remaining_retry_count > 0) {
dwarn("get storage size stat failed, remaining_retry_count = %d, "
"wait %u seconds to retry",
......@@ -316,7 +317,8 @@ info_collector::get_hotspot_calculator(const std::string &app_name, const int pa
if (iter != _hotspot_calculator_store.end()) {
return iter->second;
}
auto calculator = std::make_shared<hotspot_partition_calculator>(app_name, partition_count);
auto calculator =
std::make_shared<hotspot_partition_calculator>(app_name, partition_count, _shell_context);
_hotspot_calculator_store[app_name_pcount] = calculator;
return calculator;
}
......
......@@ -173,7 +173,7 @@ private:
dsn::task_tracker _tracker;
::dsn::rpc_address _meta_servers;
std::string _cluster_name;
shell_context _shell_context;
std::shared_ptr<shell_context> _shell_context;
uint32_t _app_stat_interval_seconds;
::dsn::task_ptr _app_stat_timer_task;
::dsn::utils::ex_lock_nr _app_stat_counter_lock;
......
......@@ -29,7 +29,7 @@ DSN_DECLARE_int32(occurrence_threshold);
class hotspot_partition_test : public pegasus_server_test_base
{
public:
hotspot_partition_test() : calculator("TEST", 8)
hotspot_partition_test() : calculator("TEST", 8, nullptr)
{
dsn::fail::setup();
dsn::fail::cfg("send_detect_hotkey_request", "return()");
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册