未验证 提交 cdda746e 编写于 作者: S Smilencer 提交者: GitHub

refactor: use rpc_holder to receive RPC in pegasus_server_impl (#554)

上级 c16d6726
Subproject commit 11e1becc8f4c3f91fcbe84c6ee6c77e0b28f3186
Subproject commit 065c1edb3066d5ccfe88f19740caa4d8244c233e
......@@ -6,6 +6,16 @@
namespace pegasus {
namespace server {
typedef ::dsn::rpc_holder<::dsn::blob, ::dsn::apps::read_response> get_rpc;
typedef ::dsn::rpc_holder<dsn::apps::multi_get_request, dsn::apps::multi_get_response>
multi_get_rpc;
typedef ::dsn::rpc_holder<::dsn::blob, dsn::apps::count_response> sortkey_count_rpc;
typedef ::dsn::rpc_holder<::dsn::blob, dsn::apps::ttl_response> ttl_rpc;
typedef ::dsn::rpc_holder<::dsn::apps::get_scanner_request, dsn::apps::scan_response>
get_scanner_rpc;
typedef ::dsn::rpc_holder<::dsn::apps::scan_request, dsn::apps::scan_response> scan_rpc;
class pegasus_read_service : public dsn::replication::replication_app_base,
public dsn::replication::storage_serverlet<pegasus_read_service>
{
......@@ -23,77 +33,51 @@ public:
protected:
// all service handlers to be implemented further
// RPC_RRDB_RRDB_GET
virtual void on_get(const ::dsn::blob &args,
::dsn::rpc_replier<dsn::apps::read_response> &reply) = 0;
virtual void on_get(get_rpc rpc) = 0;
// RPC_RRDB_RRDB_MULTI_GET
virtual void on_multi_get(const dsn::apps::multi_get_request &args,
::dsn::rpc_replier<dsn::apps::multi_get_response> &reply) = 0;
virtual void on_multi_get(multi_get_rpc rpc) = 0;
// RPC_RRDB_RRDB_SORTKEY_COUNT
virtual void on_sortkey_count(const ::dsn::blob &args,
::dsn::rpc_replier<dsn::apps::count_response> &reply) = 0;
virtual void on_sortkey_count(sortkey_count_rpc rpc) = 0;
// RPC_RRDB_RRDB_TTL
virtual void on_ttl(const ::dsn::blob &args,
::dsn::rpc_replier<dsn::apps::ttl_response> &reply) = 0;
virtual void on_ttl(ttl_rpc rpc) = 0;
// RPC_RRDB_RRDB_GET_SCANNER
virtual void on_get_scanner(const dsn::apps::get_scanner_request &args,
::dsn::rpc_replier<dsn::apps::scan_response> &reply) = 0;
virtual void on_get_scanner(get_scanner_rpc rpc) = 0;
// RPC_RRDB_RRDB_SCAN
virtual void on_scan(const dsn::apps::scan_request &args,
::dsn::rpc_replier<dsn::apps::scan_response> &reply) = 0;
virtual void on_scan(scan_rpc rpc) = 0;
// RPC_RRDB_RRDB_CLEAR_SCANNER
virtual void on_clear_scanner(const int64_t &args) = 0;
static void register_rpc_handlers()
{
register_async_rpc_handler(dsn::apps::RPC_RRDB_RRDB_GET, "get", on_get);
register_async_rpc_handler(dsn::apps::RPC_RRDB_RRDB_MULTI_GET, "multi_get", on_multi_get);
register_async_rpc_handler(
register_rpc_handler_with_rpc_holder(dsn::apps::RPC_RRDB_RRDB_GET, "get", on_get);
register_rpc_handler_with_rpc_holder(
dsn::apps::RPC_RRDB_RRDB_MULTI_GET, "multi_get", on_multi_get);
register_rpc_handler_with_rpc_holder(
dsn::apps::RPC_RRDB_RRDB_SORTKEY_COUNT, "sortkey_count", on_sortkey_count);
register_async_rpc_handler(dsn::apps::RPC_RRDB_RRDB_TTL, "ttl", on_ttl);
register_async_rpc_handler(
register_rpc_handler_with_rpc_holder(dsn::apps::RPC_RRDB_RRDB_TTL, "ttl", on_ttl);
register_rpc_handler_with_rpc_holder(
dsn::apps::RPC_RRDB_RRDB_GET_SCANNER, "get_scanner", on_get_scanner);
register_async_rpc_handler(dsn::apps::RPC_RRDB_RRDB_SCAN, "scan", on_scan);
register_rpc_handler_with_rpc_holder(dsn::apps::RPC_RRDB_RRDB_SCAN, "scan", on_scan);
register_async_rpc_handler(
dsn::apps::RPC_RRDB_RRDB_CLEAR_SCANNER, "clear_scanner", on_clear_scanner);
}
private:
static void on_get(pegasus_read_service *svc,
const ::dsn::blob &args,
::dsn::rpc_replier<dsn::apps::read_response> &reply)
{
svc->on_get(args, reply);
}
static void on_multi_get(pegasus_read_service *svc,
const dsn::apps::multi_get_request &args,
::dsn::rpc_replier<dsn::apps::multi_get_response> &reply)
{
svc->on_multi_get(args, reply);
}
static void on_sortkey_count(pegasus_read_service *svc,
const ::dsn::blob &args,
::dsn::rpc_replier<dsn::apps::count_response> &reply)
{
svc->on_sortkey_count(args, reply);
}
static void on_ttl(pegasus_read_service *svc,
const ::dsn::blob &args,
::dsn::rpc_replier<dsn::apps::ttl_response> &reply)
static void on_get(pegasus_read_service *svc, get_rpc rpc) { svc->on_get(rpc); }
static void on_multi_get(pegasus_read_service *svc, multi_get_rpc rpc)
{
svc->on_ttl(args, reply);
svc->on_multi_get(rpc);
}
static void on_get_scanner(pegasus_read_service *svc,
const dsn::apps::get_scanner_request &args,
::dsn::rpc_replier<dsn::apps::scan_response> &reply)
static void on_sortkey_count(pegasus_read_service *svc, sortkey_count_rpc rpc)
{
svc->on_get_scanner(args, reply);
svc->on_sortkey_count(rpc);
}
static void on_scan(pegasus_read_service *svc,
const dsn::apps::scan_request &args,
::dsn::rpc_replier<dsn::apps::scan_response> &reply)
static void on_ttl(pegasus_read_service *svc, ttl_rpc rpc) { svc->on_ttl(rpc); }
static void on_get_scanner(pegasus_read_service *svc, get_scanner_rpc rpc)
{
svc->on_scan(args, reply);
svc->on_get_scanner(rpc);
}
static void on_scan(pegasus_read_service *svc, scan_rpc rpc) { svc->on_scan(rpc); }
static void on_clear_scanner(pegasus_read_service *svc, const int64_t &args)
{
svc->on_clear_scanner(args);
......
此差异已折叠。
......@@ -43,18 +43,12 @@ public:
~pegasus_server_impl() override;
// the following methods may set physical error if internal error occurs
void on_get(const ::dsn::blob &key,
::dsn::rpc_replier<::dsn::apps::read_response> &reply) override;
void on_multi_get(const ::dsn::apps::multi_get_request &args,
::dsn::rpc_replier<::dsn::apps::multi_get_response> &reply) override;
void on_sortkey_count(const ::dsn::blob &args,
::dsn::rpc_replier<::dsn::apps::count_response> &reply) override;
void on_ttl(const ::dsn::blob &key,
::dsn::rpc_replier<::dsn::apps::ttl_response> &reply) override;
void on_get_scanner(const ::dsn::apps::get_scanner_request &args,
::dsn::rpc_replier<::dsn::apps::scan_response> &reply) override;
void on_scan(const ::dsn::apps::scan_request &args,
::dsn::rpc_replier<::dsn::apps::scan_response> &reply) override;
void on_get(get_rpc rpc) override;
void on_multi_get(multi_get_rpc rpc) override;
void on_sortkey_count(sortkey_count_rpc rpc) override;
void on_ttl(ttl_rpc rpc) override;
void on_get_scanner(get_scanner_rpc rpc) override;
void on_scan(scan_rpc rpc) override;
void on_clear_scanner(const int64_t &args) override;
// input:
......
......@@ -41,14 +41,16 @@ public:
// do on_get/on_multi_get operation,
long before_count = _server->_pfc_recent_abnormal_count->get_integer_value();
if (!test.is_multi_get) {
::dsn::rpc_replier<::dsn::apps::read_response> reply(nullptr);
_server->on_get(test_key, reply);
get_rpc rpc(dsn::make_unique<dsn::blob>(test_key), dsn::apps::RPC_RRDB_RRDB_GET);
_server->on_get(rpc);
} else {
::dsn::apps::multi_get_request request;
request.__set_hash_key(dsn::blob(test_hash_key.data(), 0, test_hash_key.size()));
request.__set_sort_keys({dsn::blob(test_sort_key.data(), 0, test_sort_key.size())});
::dsn::rpc_replier<::dsn::apps::multi_get_response> reply(nullptr);
_server->on_multi_get(request, reply);
multi_get_rpc rpc(dsn::make_unique<::dsn::apps::multi_get_request>(request),
dsn::apps::RPC_RRDB_RRDB_MULTI_GET);
_server->on_multi_get(rpc);
}
long after_count = _server->_pfc_recent_abnormal_count->get_integer_value();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册