未验证 提交 74d577ee 编写于 作者: S Smilencer 提交者: GitHub

refactor(hotspot): remove hotkey_detect_request in rrdb.thrift (#612)

上级 e147550a
......@@ -70,18 +70,6 @@ const std::map<int, const char *> _mutate_operation_VALUES_TO_NAMES(
::apache::thrift::TEnumIterator(2, _kmutate_operationValues, _kmutate_operationNames),
::apache::thrift::TEnumIterator(-1, NULL, NULL));
int _khotkey_typeValues[] = {hotkey_type::READ, hotkey_type::WRITE};
const char *_khotkey_typeNames[] = {"READ", "WRITE"};
const std::map<int, const char *> _hotkey_type_VALUES_TO_NAMES(
::apache::thrift::TEnumIterator(2, _khotkey_typeValues, _khotkey_typeNames),
::apache::thrift::TEnumIterator(-1, NULL, NULL));
int _khotkey_detect_actionValues[] = {hotkey_detect_action::START, hotkey_detect_action::STOP};
const char *_khotkey_detect_actionNames[] = {"START", "STOP"};
const std::map<int, const char *> _hotkey_detect_action_VALUES_TO_NAMES(
::apache::thrift::TEnumIterator(2, _khotkey_detect_actionValues, _khotkey_detect_actionNames),
::apache::thrift::TEnumIterator(-1, NULL, NULL));
update_request::~update_request() throw() {}
void update_request::__set_key(const ::dsn::blob &val) { this->key = val; }
......@@ -4603,250 +4591,5 @@ void duplicate_response::printTo(std::ostream &out) const
(__isset.error_hint ? (out << to_string(error_hint)) : (out << "<null>"));
out << ")";
}
hotkey_detect_request::~hotkey_detect_request() throw() {}
void hotkey_detect_request::__set_type(const hotkey_type::type val) { this->type = val; }
void hotkey_detect_request::__set_action(const hotkey_detect_action::type val)
{
this->action = val;
}
uint32_t hotkey_detect_request::read(::apache::thrift::protocol::TProtocol *iprot)
{
apache::thrift::protocol::TInputRecursionTracker tracker(*iprot);
uint32_t xfer = 0;
std::string fname;
::apache::thrift::protocol::TType ftype;
int16_t fid;
xfer += iprot->readStructBegin(fname);
using ::apache::thrift::protocol::TProtocolException;
while (true) {
xfer += iprot->readFieldBegin(fname, ftype, fid);
if (ftype == ::apache::thrift::protocol::T_STOP) {
break;
}
switch (fid) {
case 1:
if (ftype == ::apache::thrift::protocol::T_I32) {
int32_t ecast134;
xfer += iprot->readI32(ecast134);
this->type = (hotkey_type::type)ecast134;
this->__isset.type = true;
} else {
xfer += iprot->skip(ftype);
}
break;
case 2:
if (ftype == ::apache::thrift::protocol::T_I32) {
int32_t ecast135;
xfer += iprot->readI32(ecast135);
this->action = (hotkey_detect_action::type)ecast135;
this->__isset.action = true;
} else {
xfer += iprot->skip(ftype);
}
break;
default:
xfer += iprot->skip(ftype);
break;
}
xfer += iprot->readFieldEnd();
}
xfer += iprot->readStructEnd();
return xfer;
}
uint32_t hotkey_detect_request::write(::apache::thrift::protocol::TProtocol *oprot) const
{
uint32_t xfer = 0;
apache::thrift::protocol::TOutputRecursionTracker tracker(*oprot);
xfer += oprot->writeStructBegin("hotkey_detect_request");
xfer += oprot->writeFieldBegin("type", ::apache::thrift::protocol::T_I32, 1);
xfer += oprot->writeI32((int32_t)this->type);
xfer += oprot->writeFieldEnd();
xfer += oprot->writeFieldBegin("action", ::apache::thrift::protocol::T_I32, 2);
xfer += oprot->writeI32((int32_t)this->action);
xfer += oprot->writeFieldEnd();
xfer += oprot->writeFieldStop();
xfer += oprot->writeStructEnd();
return xfer;
}
void swap(hotkey_detect_request &a, hotkey_detect_request &b)
{
using ::std::swap;
swap(a.type, b.type);
swap(a.action, b.action);
swap(a.__isset, b.__isset);
}
hotkey_detect_request::hotkey_detect_request(const hotkey_detect_request &other136)
{
type = other136.type;
action = other136.action;
__isset = other136.__isset;
}
hotkey_detect_request::hotkey_detect_request(hotkey_detect_request &&other137)
{
type = std::move(other137.type);
action = std::move(other137.action);
__isset = std::move(other137.__isset);
}
hotkey_detect_request &hotkey_detect_request::operator=(const hotkey_detect_request &other138)
{
type = other138.type;
action = other138.action;
__isset = other138.__isset;
return *this;
}
hotkey_detect_request &hotkey_detect_request::operator=(hotkey_detect_request &&other139)
{
type = std::move(other139.type);
action = std::move(other139.action);
__isset = std::move(other139.__isset);
return *this;
}
void hotkey_detect_request::printTo(std::ostream &out) const
{
using ::apache::thrift::to_string;
out << "hotkey_detect_request(";
out << "type=" << to_string(type);
out << ", "
<< "action=" << to_string(action);
out << ")";
}
hotkey_detect_response::~hotkey_detect_response() throw() {}
void hotkey_detect_response::__set_err(const int32_t val) { this->err = val; }
void hotkey_detect_response::__set_err_hint(const std::string &val)
{
this->err_hint = val;
__isset.err_hint = true;
}
uint32_t hotkey_detect_response::read(::apache::thrift::protocol::TProtocol *iprot)
{
apache::thrift::protocol::TInputRecursionTracker tracker(*iprot);
uint32_t xfer = 0;
std::string fname;
::apache::thrift::protocol::TType ftype;
int16_t fid;
xfer += iprot->readStructBegin(fname);
using ::apache::thrift::protocol::TProtocolException;
while (true) {
xfer += iprot->readFieldBegin(fname, ftype, fid);
if (ftype == ::apache::thrift::protocol::T_STOP) {
break;
}
switch (fid) {
case 1:
if (ftype == ::apache::thrift::protocol::T_I32) {
xfer += iprot->readI32(this->err);
this->__isset.err = true;
} else {
xfer += iprot->skip(ftype);
}
break;
case 2:
if (ftype == ::apache::thrift::protocol::T_STRING) {
xfer += iprot->readString(this->err_hint);
this->__isset.err_hint = true;
} else {
xfer += iprot->skip(ftype);
}
break;
default:
xfer += iprot->skip(ftype);
break;
}
xfer += iprot->readFieldEnd();
}
xfer += iprot->readStructEnd();
return xfer;
}
uint32_t hotkey_detect_response::write(::apache::thrift::protocol::TProtocol *oprot) const
{
uint32_t xfer = 0;
apache::thrift::protocol::TOutputRecursionTracker tracker(*oprot);
xfer += oprot->writeStructBegin("hotkey_detect_response");
xfer += oprot->writeFieldBegin("err", ::apache::thrift::protocol::T_I32, 1);
xfer += oprot->writeI32(this->err);
xfer += oprot->writeFieldEnd();
if (this->__isset.err_hint) {
xfer += oprot->writeFieldBegin("err_hint", ::apache::thrift::protocol::T_STRING, 2);
xfer += oprot->writeString(this->err_hint);
xfer += oprot->writeFieldEnd();
}
xfer += oprot->writeFieldStop();
xfer += oprot->writeStructEnd();
return xfer;
}
void swap(hotkey_detect_response &a, hotkey_detect_response &b)
{
using ::std::swap;
swap(a.err, b.err);
swap(a.err_hint, b.err_hint);
swap(a.__isset, b.__isset);
}
hotkey_detect_response::hotkey_detect_response(const hotkey_detect_response &other140)
{
err = other140.err;
err_hint = other140.err_hint;
__isset = other140.__isset;
}
hotkey_detect_response::hotkey_detect_response(hotkey_detect_response &&other141)
{
err = std::move(other141.err);
err_hint = std::move(other141.err_hint);
__isset = std::move(other141.__isset);
}
hotkey_detect_response &hotkey_detect_response::operator=(const hotkey_detect_response &other142)
{
err = other142.err;
err_hint = other142.err_hint;
__isset = other142.__isset;
return *this;
}
hotkey_detect_response &hotkey_detect_response::operator=(hotkey_detect_response &&other143)
{
err = std::move(other143.err);
err_hint = std::move(other143.err_hint);
__isset = std::move(other143.__isset);
return *this;
}
void hotkey_detect_response::printTo(std::ostream &out) const
{
using ::apache::thrift::to_string;
out << "hotkey_detect_response(";
out << "err=" << to_string(err);
out << ", "
<< "err_hint=";
(__isset.err_hint ? (out << to_string(err_hint)) : (out << "<null>"));
out << ")";
}
}
} // namespace
......@@ -46,20 +46,6 @@ enum mutate_operation
MO_DELETE
}
enum hotkey_type
{
READ,
WRITE
}
enum hotkey_detect_action
{
START,
STOP
}
struct update_request
{
1:dsn.blob key;
......@@ -291,20 +277,6 @@ struct duplicate_response
2: optional string error_hint;
}
struct hotkey_detect_request {
1: hotkey_type type
2: hotkey_detect_action action
}
struct hotkey_detect_response {
// Possible error:
// - ERR_OK: start/stop hotkey detect succeed
// - ERR_SERVICE_NOT_FOUND: wrong rpc type
// - ERR_SERVICE_ALREADY_EXIST: hotkey detection is running now
1: i32 err;
2: optional string err_hint;
}
service rrdb
{
update_response put(1:update_request update);
......
......@@ -71,28 +71,6 @@ struct mutate_operation
extern const std::map<int, const char *> _mutate_operation_VALUES_TO_NAMES;
struct hotkey_type
{
enum type
{
READ = 0,
WRITE = 1
};
};
extern const std::map<int, const char *> _hotkey_type_VALUES_TO_NAMES;
struct hotkey_detect_action
{
enum type
{
START = 0,
STOP = 1
};
};
extern const std::map<int, const char *> _hotkey_detect_action_VALUES_TO_NAMES;
class update_request;
class update_response;
......@@ -139,10 +117,6 @@ class duplicate_request;
class duplicate_response;
class hotkey_detect_request;
class hotkey_detect_response;
typedef struct _update_request__isset
{
_update_request__isset() : key(false), value(false), expire_ts_seconds(false) {}
......@@ -1964,112 +1938,6 @@ inline std::ostream &operator<<(std::ostream &out, const duplicate_response &obj
obj.printTo(out);
return out;
}
typedef struct _hotkey_detect_request__isset
{
_hotkey_detect_request__isset() : type(false), action(false) {}
bool type : 1;
bool action : 1;
} _hotkey_detect_request__isset;
class hotkey_detect_request
{
public:
hotkey_detect_request(const hotkey_detect_request &);
hotkey_detect_request(hotkey_detect_request &&);
hotkey_detect_request &operator=(const hotkey_detect_request &);
hotkey_detect_request &operator=(hotkey_detect_request &&);
hotkey_detect_request() : type((hotkey_type::type)0), action((hotkey_detect_action::type)0) {}
virtual ~hotkey_detect_request() throw();
hotkey_type::type type;
hotkey_detect_action::type action;
_hotkey_detect_request__isset __isset;
void __set_type(const hotkey_type::type val);
void __set_action(const hotkey_detect_action::type val);
bool operator==(const hotkey_detect_request &rhs) const
{
if (!(type == rhs.type))
return false;
if (!(action == rhs.action))
return false;
return true;
}
bool operator!=(const hotkey_detect_request &rhs) const { return !(*this == rhs); }
bool operator<(const hotkey_detect_request &) const;
uint32_t read(::apache::thrift::protocol::TProtocol *iprot);
uint32_t write(::apache::thrift::protocol::TProtocol *oprot) const;
virtual void printTo(std::ostream &out) const;
};
void swap(hotkey_detect_request &a, hotkey_detect_request &b);
inline std::ostream &operator<<(std::ostream &out, const hotkey_detect_request &obj)
{
obj.printTo(out);
return out;
}
typedef struct _hotkey_detect_response__isset
{
_hotkey_detect_response__isset() : err(false), err_hint(false) {}
bool err : 1;
bool err_hint : 1;
} _hotkey_detect_response__isset;
class hotkey_detect_response
{
public:
hotkey_detect_response(const hotkey_detect_response &);
hotkey_detect_response(hotkey_detect_response &&);
hotkey_detect_response &operator=(const hotkey_detect_response &);
hotkey_detect_response &operator=(hotkey_detect_response &&);
hotkey_detect_response() : err(0), err_hint() {}
virtual ~hotkey_detect_response() throw();
int32_t err;
std::string err_hint;
_hotkey_detect_response__isset __isset;
void __set_err(const int32_t val);
void __set_err_hint(const std::string &val);
bool operator==(const hotkey_detect_response &rhs) const
{
if (!(err == rhs.err))
return false;
if (__isset.err_hint != rhs.__isset.err_hint)
return false;
else if (__isset.err_hint && !(err_hint == rhs.err_hint))
return false;
return true;
}
bool operator!=(const hotkey_detect_response &rhs) const { return !(*this == rhs); }
bool operator<(const hotkey_detect_response &) const;
uint32_t read(::apache::thrift::protocol::TProtocol *iprot);
uint32_t write(::apache::thrift::protocol::TProtocol *oprot) const;
virtual void printTo(std::ostream &out) const;
};
void swap(hotkey_detect_response &a, hotkey_detect_response &b);
inline std::ostream &operator<<(std::ostream &out, const hotkey_detect_response &obj)
{
obj.printTo(out);
return out;
}
}
} // namespace
......
......@@ -20,15 +20,12 @@
#include <algorithm>
#include <math.h>
#include <dsn/dist/fmt_logging.h>
#include <rrdb/rrdb_types.h>
#include <dsn/utility/flags.h>
#include <dsn/tool-api/rpc_address.h>
#include <dsn/tool-api/group_address.h>
#include <dsn/utility/error_code.h>
#include <rrdb/rrdb_types.h>
#include <dsn/dist/replication/duplication_common.h>
#include <dsn/tool-api/task_tracker.h>
#include "pegasus_read_service.h"
#include <dsn/utility/fail_point.h>
namespace pegasus {
......@@ -44,7 +41,7 @@ DSN_DEFINE_int64("pegasus.collector",
"data");
DSN_DEFINE_bool("pegasus.collector",
enable_hotkey_detect,
enable_detect_hotkey,
false,
"auto detect hot key in the hot paritition");
......@@ -145,7 +142,7 @@ void hotspot_partition_calculator::data_analyse()
stat_histories_analyse(data_type, hot_points);
update_hot_point(data_type, hot_points);
}
if (!FLAGS_enable_hotkey_detect) {
if (!FLAGS_enable_detect_hotkey) {
return;
}
for (int data_type = 0; data_type <= 1; data_type++) {
......@@ -162,12 +159,12 @@ void hotspot_partition_calculator::detect_hotkey_in_hotpartition(int data_type)
(data_type == partition_qps_type::READ_HOTSPOT_DATA ? "read" : "write"),
_app_name,
index);
send_hotkey_detect_request(_app_name,
send_detect_hotkey_request(_app_name,
index,
(data_type == dsn::apps::hotkey_type::type::READ)
? dsn::apps::hotkey_type::type::READ
: dsn::apps::hotkey_type::type::WRITE,
dsn::apps::hotkey_detect_action::type::START);
(data_type == dsn::replication::hotkey_type::type::READ)
? dsn::replication::hotkey_type::type::READ
: dsn::replication::hotkey_type::type::WRITE,
dsn::replication::detect_action::type::START);
}
} else {
_hotpartition_counter[index][data_type] =
......@@ -176,19 +173,19 @@ void hotspot_partition_calculator::detect_hotkey_in_hotpartition(int data_type)
}
}
/*static*/ void hotspot_partition_calculator::send_hotkey_detect_request(
/*static*/ void hotspot_partition_calculator::send_detect_hotkey_request(
const std::string &app_name,
const uint64_t partition_index,
const dsn::apps::hotkey_type::type hotkey_type,
const dsn::apps::hotkey_detect_action::type action)
const dsn::replication::hotkey_type::type hotkey_type,
const dsn::replication::detect_action::type action)
{
FAIL_POINT_INJECT_F("send_hotkey_detect_request", [](dsn::string_view) {});
auto request = dsn::make_unique<dsn::apps::hotkey_detect_request>();
FAIL_POINT_INJECT_F("send_detect_hotkey_request", [](dsn::string_view) {});
auto request = dsn::make_unique<dsn::replication::detect_hotkey_request>();
request->type = hotkey_type;
request->action = action;
ddebug_f("{} {} hotkey detection in {}.{}",
(action == dsn::apps::hotkey_detect_action::STOP) ? "Stop" : "Start",
(hotkey_type == dsn::apps::hotkey_type::WRITE) ? "write" : "read",
(action == dsn::replication::detect_action::STOP) ? "Stop" : "Start",
(hotkey_type == dsn::replication::hotkey_type::WRITE) ? "write" : "read",
app_name,
partition_index);
dsn::rpc_address meta_server;
......@@ -199,6 +196,7 @@ void hotspot_partition_calculator::detect_hotkey_in_hotpartition(int data_type)
meta_server.group_address()->add(address);
}
auto cluster_name = dsn::replication::get_current_cluster_name();
// TODO: (Tangyanzhao) refactor partition_resolver to replication_ddl_client
auto resolver = partition_resolver::get_resolver(cluster_name, meta_servers, app_name.c_str());
dsn::task_tracker tracker;
detect_hotkey_rpc rpc(
......
......@@ -46,10 +46,10 @@ public:
void data_aggregate(const std::vector<row_data> &partitions);
// analyse the saved data to find hotspot partition
void data_analyse();
static void send_hotkey_detect_request(const std::string &app_name,
static void send_detect_hotkey_request(const std::string &app_name,
const uint64_t partition_index,
const dsn::apps::hotkey_type::type hotkey_type,
const dsn::apps::hotkey_detect_action::type action);
const dsn::replication::hotkey_type::type hotkey_type,
const dsn::replication::detect_action::type action);
private:
// empirical rule to calculate hot point of each partition
......@@ -72,6 +72,8 @@ private:
// hotkey on the replica automatically
std::vector<std::array<int, 2>> _hotpartition_counter;
typedef dsn::rpc_holder<detect_hotkey_request, detect_hotkey_response> detect_hotkey_rpc;
friend class hotspot_partition_test;
};
......
......@@ -32,8 +32,6 @@ typedef ::dsn::rpc_holder<::dsn::blob, dsn::apps::ttl_response> ttl_rpc;
typedef ::dsn::rpc_holder<::dsn::apps::get_scanner_request, dsn::apps::scan_response>
get_scanner_rpc;
typedef ::dsn::rpc_holder<::dsn::apps::scan_request, dsn::apps::scan_response> scan_rpc;
typedef ::dsn::rpc_holder<::dsn::apps::hotkey_detect_request, dsn::apps::hotkey_detect_response>
detect_hotkey_rpc;
class pegasus_read_service : public dsn::replication::replication_app_base,
public dsn::replication::storage_serverlet<pegasus_read_service>
......
......@@ -511,4 +511,4 @@ onebox = 0.0.0.0:34701
onebox2 = 0.0.0.0:35701
[pegasus.collector]
enable_hotkey_detect = true
enable_detect_hotkey = true
......@@ -32,7 +32,7 @@ public:
hotspot_partition_test() : calculator("TEST", 8)
{
dsn::fail::setup();
dsn::fail::cfg("send_hotkey_detect_request", "return()");
dsn::fail::cfg("send_detect_hotkey_request", "return()");
};
~hotspot_partition_test() { dsn::fail::teardown(); }
......@@ -124,7 +124,7 @@ TEST_F(hotspot_partition_test, hotspot_partition_policy)
clear_calculator_histories();
}
TEST_F(hotspot_partition_test, send_hotkey_detect_request)
TEST_F(hotspot_partition_test, send_detect_hotkey_request)
{
const int READ_HOT_PARTITION = 7;
const int WRITE_HOT_PARTITION = 0;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册