未验证 提交 34455e35 编写于 作者: S Smilencer 提交者: GitHub

feat(hotkey): implement the RPC handler in hotkey_collector (#621)

上级 1461dc55
......@@ -17,21 +17,39 @@
#include "hotkey_collector.h"
#include <dsn/dist/replication/replication_enums.h>
#include <dsn/utility/smart_pointers.h>
#include "base/pegasus_key_schema.h"
#include <dsn/dist/fmt_logging.h>
namespace pegasus {
namespace server {
hotkey_collector::hotkey_collector()
: _internal_collector(dsn::make_unique<hotkey_empty_data_collector>())
hotkey_collector::hotkey_collector(dsn::replication::hotkey_type::type hotkey_type,
dsn::replication::replica_base *r_base)
: replica_base(r_base),
_state(hotkey_collector_state::STOPPED),
_hotkey_type(hotkey_type),
_internal_collector(std::make_shared<hotkey_empty_data_collector>())
{
}
// TODO: (Tangyanzhao) implement these functions
void hotkey_collector::handle_rpc(const dsn::replication::detect_hotkey_request &req,
dsn::replication::detect_hotkey_response &resp)
{
switch (req.action) {
case dsn::replication::detect_action::START:
on_start_detect(resp);
return;
case dsn::replication::detect_action::STOP:
on_stop_detect(resp);
return;
default:
std::string hint = fmt::format("{}: can't find this detect action", req.action);
resp.err = dsn::ERR_INVALID_STATE;
resp.__set_err_hint(hint);
derror_replica(hint);
}
}
void hotkey_collector::capture_raw_key(const dsn::blob &raw_key, int64_t weight)
......@@ -49,5 +67,51 @@ void hotkey_collector::capture_hash_key(const dsn::blob &hash_key, int64_t weigh
void hotkey_collector::analyse_data() { _internal_collector->analyse_data(); }
void hotkey_collector::on_start_detect(dsn::replication::detect_hotkey_response &resp)
{
auto now_state = _state.load();
std::string hint;
switch (now_state) {
case hotkey_collector_state::COARSE_DETECTING:
case hotkey_collector_state::FINE_DETECTING:
resp.err = dsn::ERR_INVALID_STATE;
hint = fmt::format("still detecting {} hotkey, state is {}",
dsn::enum_to_string(_hotkey_type),
enum_to_string(now_state));
dwarn_replica(hint);
return;
case hotkey_collector_state::FINISHED:
resp.err = dsn::ERR_INVALID_STATE;
hint = fmt::format(
"{} hotkey result has been found, you can send a stop rpc to restart hotkey detection",
dsn::enum_to_string(_hotkey_type));
dwarn_replica(hint);
return;
case hotkey_collector_state::STOPPED:
// TODO: (Tangyanzhao) start coarse detecting
_state.store(hotkey_collector_state::COARSE_DETECTING);
resp.err = dsn::ERR_OK;
hint = fmt::format("starting to detect {} hotkey", dsn::enum_to_string(_hotkey_type));
ddebug_replica(hint);
return;
default:
hint = "invalid collector state";
resp.err = dsn::ERR_INVALID_STATE;
resp.__set_err_hint(hint);
derror_replica(hint);
dassert(false, "invalid collector state");
}
}
void hotkey_collector::on_stop_detect(dsn::replication::detect_hotkey_response &resp)
{
_state.store(hotkey_collector_state::STOPPED);
_internal_collector.reset();
resp.err = dsn::ERR_OK;
std::string hint =
fmt::format("{} hotkey stopped, cache cleared", dsn::enum_to_string(_hotkey_type));
ddebug_replica(hint);
}
} // namespace server
} // namespace pegasus
......@@ -17,9 +17,9 @@
#pragma once
#include <dsn/utility/string_view.h>
#include <dsn/dist/replication/replication_types.h>
#include "hotkey_collector_state.h"
#include <dsn/dist/replication/replica_base.h>
namespace pegasus {
namespace server {
......@@ -63,10 +63,11 @@ class internal_collector_base;
// | | | Hotkey |
// +--------------------+ +----------------------------------------------------+
class hotkey_collector
class hotkey_collector : public dsn::replication::replica_base
{
public:
hotkey_collector();
hotkey_collector(dsn::replication::hotkey_type::type hotkey_type,
dsn::replication::replica_base *r_base);
// TODO: (Tangyanzhao) capture_*_key should be consistent with hotspot detection
// weight: calculate the weight according to the specific situation
void capture_raw_key(const dsn::blob &raw_key, int64_t weight);
......@@ -76,8 +77,11 @@ public:
/*out*/ dsn::replication::detect_hotkey_response &resp);
private:
std::unique_ptr<internal_collector_base> _internal_collector;
void on_start_detect(dsn::replication::detect_hotkey_response &resp);
void on_stop_detect(dsn::replication::detect_hotkey_response &resp);
std::atomic<hotkey_collector_state> _state;
const dsn::replication::hotkey_type::type _hotkey_type;
std::shared_ptr<internal_collector_base> _internal_collector;
};
class internal_collector_base
......
......@@ -43,8 +43,10 @@ pegasus_server_impl::pegasus_server_impl(dsn::replication::replica *r)
_primary_address = dsn::rpc_address(dsn_primary_address()).to_string();
_gpid = get_gpid();
_read_hotkey_collector = std::make_shared<hotkey_collector>();
_write_hotkey_collector = std::make_shared<hotkey_collector>();
_read_hotkey_collector =
std::make_shared<hotkey_collector>(dsn::replication::hotkey_type::READ, this);
_write_hotkey_collector =
std::make_shared<hotkey_collector>(dsn::replication::hotkey_type::WRITE, this);
_verbose_log = dsn_config_get_value_bool("pegasus.server",
"rocksdb_verbose_log",
......
......@@ -29,7 +29,9 @@ public:
explicit mock_capacity_unit_calculator(dsn::replication::replica_base *r)
: capacity_unit_calculator(
r, std::make_shared<hotkey_collector>(), std::make_shared<hotkey_collector>())
r,
std::make_shared<hotkey_collector>(dsn::replication::hotkey_type::READ, this),
std::make_shared<hotkey_collector>(dsn::replication::hotkey_type::WRITE, this))
{
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册