提交 51338a75 编写于 作者: W Wu Tao 提交者: neverchanje

feat(dup): implement pegasus_mutation_duplicator (#399)

上级 b981490b
......@@ -150,4 +150,10 @@ inline uint64_t pegasus_key_hash(const ::dsn::blob &key)
}
}
/// Calculate hash value from hash key.
inline uint64_t pegasus_hash_key_hash(const ::dsn::blob &hash_key)
{
return dsn::utils::crc64_calc(hash_key.data(), hash_key.length(), 0);
}
} // namespace pegasus
......@@ -22,6 +22,8 @@ using incr_rpc = dsn::rpc_holder<dsn::apps::incr_request, dsn::apps::incr_respon
using check_and_set_rpc =
dsn::rpc_holder<dsn::apps::check_and_set_request, dsn::apps::check_and_set_response>;
using duplicate_rpc = dsn::apps::duplicate_rpc;
using check_and_mutate_rpc =
dsn::rpc_holder<dsn::apps::check_and_mutate_request, dsn::apps::check_and_mutate_response>;
......
......@@ -4254,5 +4254,288 @@ void scan_response::printTo(std::ostream &out) const
<< "server=" << to_string(server);
out << ")";
}
duplicate_request::~duplicate_request() throw() {}
void duplicate_request::__set_timestamp(const int64_t val)
{
this->timestamp = val;
__isset.timestamp = true;
}
void duplicate_request::__set_task_code(const ::dsn::task_code &val)
{
this->task_code = val;
__isset.task_code = true;
}
void duplicate_request::__set_raw_message(const ::dsn::blob &val)
{
this->raw_message = val;
__isset.raw_message = true;
}
void duplicate_request::__set_cluster_id(const int8_t val)
{
this->cluster_id = val;
__isset.cluster_id = true;
}
uint32_t duplicate_request::read(::apache::thrift::protocol::TProtocol *iprot)
{
apache::thrift::protocol::TInputRecursionTracker tracker(*iprot);
uint32_t xfer = 0;
std::string fname;
::apache::thrift::protocol::TType ftype;
int16_t fid;
xfer += iprot->readStructBegin(fname);
using ::apache::thrift::protocol::TProtocolException;
while (true) {
xfer += iprot->readFieldBegin(fname, ftype, fid);
if (ftype == ::apache::thrift::protocol::T_STOP) {
break;
}
switch (fid) {
case 1:
if (ftype == ::apache::thrift::protocol::T_I64) {
xfer += iprot->readI64(this->timestamp);
this->__isset.timestamp = true;
} else {
xfer += iprot->skip(ftype);
}
break;
case 2:
if (ftype == ::apache::thrift::protocol::T_STRUCT) {
xfer += this->task_code.read(iprot);
this->__isset.task_code = true;
} else {
xfer += iprot->skip(ftype);
}
break;
case 3:
if (ftype == ::apache::thrift::protocol::T_STRUCT) {
xfer += this->raw_message.read(iprot);
this->__isset.raw_message = true;
} else {
xfer += iprot->skip(ftype);
}
break;
case 4:
if (ftype == ::apache::thrift::protocol::T_BYTE) {
xfer += iprot->readByte(this->cluster_id);
this->__isset.cluster_id = true;
} else {
xfer += iprot->skip(ftype);
}
break;
default:
xfer += iprot->skip(ftype);
break;
}
xfer += iprot->readFieldEnd();
}
xfer += iprot->readStructEnd();
return xfer;
}
uint32_t duplicate_request::write(::apache::thrift::protocol::TProtocol *oprot) const
{
uint32_t xfer = 0;
apache::thrift::protocol::TOutputRecursionTracker tracker(*oprot);
xfer += oprot->writeStructBegin("duplicate_request");
if (this->__isset.timestamp) {
xfer += oprot->writeFieldBegin("timestamp", ::apache::thrift::protocol::T_I64, 1);
xfer += oprot->writeI64(this->timestamp);
xfer += oprot->writeFieldEnd();
}
if (this->__isset.task_code) {
xfer += oprot->writeFieldBegin("task_code", ::apache::thrift::protocol::T_STRUCT, 2);
xfer += this->task_code.write(oprot);
xfer += oprot->writeFieldEnd();
}
if (this->__isset.raw_message) {
xfer += oprot->writeFieldBegin("raw_message", ::apache::thrift::protocol::T_STRUCT, 3);
xfer += this->raw_message.write(oprot);
xfer += oprot->writeFieldEnd();
}
if (this->__isset.cluster_id) {
xfer += oprot->writeFieldBegin("cluster_id", ::apache::thrift::protocol::T_BYTE, 4);
xfer += oprot->writeByte(this->cluster_id);
xfer += oprot->writeFieldEnd();
}
xfer += oprot->writeFieldStop();
xfer += oprot->writeStructEnd();
return xfer;
}
void swap(duplicate_request &a, duplicate_request &b)
{
using ::std::swap;
swap(a.timestamp, b.timestamp);
swap(a.task_code, b.task_code);
swap(a.raw_message, b.raw_message);
swap(a.cluster_id, b.cluster_id);
swap(a.__isset, b.__isset);
}
duplicate_request::duplicate_request(const duplicate_request &other126)
{
timestamp = other126.timestamp;
task_code = other126.task_code;
raw_message = other126.raw_message;
cluster_id = other126.cluster_id;
__isset = other126.__isset;
}
duplicate_request::duplicate_request(duplicate_request &&other127)
{
timestamp = std::move(other127.timestamp);
task_code = std::move(other127.task_code);
raw_message = std::move(other127.raw_message);
cluster_id = std::move(other127.cluster_id);
__isset = std::move(other127.__isset);
}
duplicate_request &duplicate_request::operator=(const duplicate_request &other128)
{
timestamp = other128.timestamp;
task_code = other128.task_code;
raw_message = other128.raw_message;
cluster_id = other128.cluster_id;
__isset = other128.__isset;
return *this;
}
duplicate_request &duplicate_request::operator=(duplicate_request &&other129)
{
timestamp = std::move(other129.timestamp);
task_code = std::move(other129.task_code);
raw_message = std::move(other129.raw_message);
cluster_id = std::move(other129.cluster_id);
__isset = std::move(other129.__isset);
return *this;
}
void duplicate_request::printTo(std::ostream &out) const
{
using ::apache::thrift::to_string;
out << "duplicate_request(";
out << "timestamp=";
(__isset.timestamp ? (out << to_string(timestamp)) : (out << "<null>"));
out << ", "
<< "task_code=";
(__isset.task_code ? (out << to_string(task_code)) : (out << "<null>"));
out << ", "
<< "raw_message=";
(__isset.raw_message ? (out << to_string(raw_message)) : (out << "<null>"));
out << ", "
<< "cluster_id=";
(__isset.cluster_id ? (out << to_string(cluster_id)) : (out << "<null>"));
out << ")";
}
duplicate_response::~duplicate_response() throw() {}
void duplicate_response::__set_error(const int32_t val)
{
this->error = val;
__isset.error = true;
}
uint32_t duplicate_response::read(::apache::thrift::protocol::TProtocol *iprot)
{
apache::thrift::protocol::TInputRecursionTracker tracker(*iprot);
uint32_t xfer = 0;
std::string fname;
::apache::thrift::protocol::TType ftype;
int16_t fid;
xfer += iprot->readStructBegin(fname);
using ::apache::thrift::protocol::TProtocolException;
while (true) {
xfer += iprot->readFieldBegin(fname, ftype, fid);
if (ftype == ::apache::thrift::protocol::T_STOP) {
break;
}
switch (fid) {
case 1:
if (ftype == ::apache::thrift::protocol::T_I32) {
xfer += iprot->readI32(this->error);
this->__isset.error = true;
} else {
xfer += iprot->skip(ftype);
}
break;
default:
xfer += iprot->skip(ftype);
break;
}
xfer += iprot->readFieldEnd();
}
xfer += iprot->readStructEnd();
return xfer;
}
uint32_t duplicate_response::write(::apache::thrift::protocol::TProtocol *oprot) const
{
uint32_t xfer = 0;
apache::thrift::protocol::TOutputRecursionTracker tracker(*oprot);
xfer += oprot->writeStructBegin("duplicate_response");
if (this->__isset.error) {
xfer += oprot->writeFieldBegin("error", ::apache::thrift::protocol::T_I32, 1);
xfer += oprot->writeI32(this->error);
xfer += oprot->writeFieldEnd();
}
xfer += oprot->writeFieldStop();
xfer += oprot->writeStructEnd();
return xfer;
}
void swap(duplicate_response &a, duplicate_response &b)
{
using ::std::swap;
swap(a.error, b.error);
swap(a.__isset, b.__isset);
}
duplicate_response::duplicate_response(const duplicate_response &other130)
{
error = other130.error;
__isset = other130.__isset;
}
duplicate_response::duplicate_response(duplicate_response &&other131)
{
error = std::move(other131.error);
__isset = std::move(other131.__isset);
}
duplicate_response &duplicate_response::operator=(const duplicate_response &other132)
{
error = other132.error;
__isset = other132.__isset;
return *this;
}
duplicate_response &duplicate_response::operator=(duplicate_response &&other133)
{
error = std::move(other133.error);
__isset = std::move(other133.__isset);
return *this;
}
void duplicate_response::printTo(std::ostream &out) const
{
using ::apache::thrift::to_string;
out << "duplicate_response(";
out << "error=";
(__isset.error ? (out << to_string(error)) : (out << "<null>"));
out << ")";
}
}
} // namespace
......@@ -1244,6 +1244,13 @@ int pegasus_client_impl::get_unordered_scanners(int max_split_count,
return ret;
}
void pegasus_client_impl::async_duplicate(dsn::apps::duplicate_rpc rpc,
std::function<void(dsn::error_code)> &&callback,
dsn::task_tracker *tracker)
{
_client->duplicate(rpc, std::move(callback), tracker);
}
const char *pegasus_client_impl::get_error_string(int error_code) const
{
auto it = _client_error_to_string.find(error_code);
......
......@@ -221,6 +221,13 @@ public:
const scan_options &options,
async_get_unordered_scanners_callback_t &&callback) override;
/// \internal
/// This is an internal function for duplication.
/// \see pegasus::server::pegasus_mutation_duplicator
void async_duplicate(dsn::apps::duplicate_rpc rpc,
std::function<void(dsn::error_code)> &&callback,
dsn::task_tracker *tracker);
virtual const char *get_error_string(int error_code) const override;
static void init_error();
......@@ -279,6 +286,9 @@ public:
static const ::dsn::blob _max;
};
static int get_client_error(int server_error);
static int get_rocksdb_server_error(int rocskdb_error);
private:
class pegasus_scanner_impl_wrapper : public abstract_pegasus_scanner
{
......@@ -298,9 +308,6 @@ private:
}
};
static int get_client_error(int server_error);
static int get_rocksdb_server_error(int rocskdb_error);
private:
std::string _cluster_name;
std::string _app_name;
......
......@@ -4,3 +4,7 @@ namespace cpp dsn
struct blob
{
}
struct task_code
{
}
......@@ -251,6 +251,26 @@ struct scan_response
6:string server;
}
struct duplicate_request
{
// The timestamp of this write.
1: optional i64 timestamp
// The code to identify this write.
2: optional dsn.task_code task_code
// The binary form of the write.
3: optional dsn.blob raw_message
// ID of the cluster where this write comes from.
4: optional byte cluster_id
}
struct duplicate_response
{
1: optional i32 error;
}
service rrdb
{
update_response put(1:update_request update);
......
......@@ -8,6 +8,9 @@
namespace dsn {
namespace apps {
typedef rpc_holder<duplicate_request, duplicate_response> duplicate_rpc;
class rrdb_client
{
public:
......@@ -405,6 +408,15 @@ public:
partition_hash);
}
// ---------- call RPC_RRDB_RRDB_DUPLICATE ------------
// - asynchronous with on-stack duplicate_request and duplicate_response
template <typename TCallback>
task_ptr duplicate(duplicate_rpc &rpc, TCallback &&callback, dsn::task_tracker *tracker)
{
return rpc.call(_resolver, tracker, std::forward<TCallback &&>(callback));
}
private:
dsn::replication::partition_resolver_ptr _resolver;
dsn::task_tracker _tracker;
......
......@@ -10,6 +10,7 @@ DEFINE_STORAGE_WRITE_RPC_CODE(RPC_RRDB_RRDB_MULTI_REMOVE, NOT_ALLOW_BATCH, IS_ID
DEFINE_STORAGE_WRITE_RPC_CODE(RPC_RRDB_RRDB_INCR, NOT_ALLOW_BATCH, NOT_IDEMPOTENT)
DEFINE_STORAGE_WRITE_RPC_CODE(RPC_RRDB_RRDB_CHECK_AND_SET, NOT_ALLOW_BATCH, NOT_IDEMPOTENT)
DEFINE_STORAGE_WRITE_RPC_CODE(RPC_RRDB_RRDB_CHECK_AND_MUTATE, NOT_ALLOW_BATCH, NOT_IDEMPOTENT)
DEFINE_STORAGE_WRITE_RPC_CODE(RPC_RRDB_RRDB_DUPLICATE, NOT_ALLOW_BATCH, IS_IDEMPOTENT)
DEFINE_STORAGE_READ_RPC_CODE(RPC_RRDB_RRDB_GET)
DEFINE_STORAGE_READ_RPC_CODE(RPC_RRDB_RRDB_MULTI_GET)
DEFINE_STORAGE_READ_RPC_CODE(RPC_RRDB_RRDB_SORTKEY_COUNT)
......
......@@ -113,6 +113,10 @@ class scan_request;
class scan_response;
class duplicate_request;
class duplicate_response;
typedef struct _update_request__isset
{
_update_request__isset() : key(false), value(false), expire_ts_seconds(false) {}
......@@ -1791,6 +1795,129 @@ inline std::ostream &operator<<(std::ostream &out, const scan_response &obj)
obj.printTo(out);
return out;
}
typedef struct _duplicate_request__isset
{
_duplicate_request__isset()
: timestamp(false), task_code(false), raw_message(false), cluster_id(false)
{
}
bool timestamp : 1;
bool task_code : 1;
bool raw_message : 1;
bool cluster_id : 1;
} _duplicate_request__isset;
class duplicate_request
{
public:
duplicate_request(const duplicate_request &);
duplicate_request(duplicate_request &&);
duplicate_request &operator=(const duplicate_request &);
duplicate_request &operator=(duplicate_request &&);
duplicate_request() : timestamp(0), cluster_id(0) {}
virtual ~duplicate_request() throw();
int64_t timestamp;
::dsn::task_code task_code;
::dsn::blob raw_message;
int8_t cluster_id;
_duplicate_request__isset __isset;
void __set_timestamp(const int64_t val);
void __set_task_code(const ::dsn::task_code &val);
void __set_raw_message(const ::dsn::blob &val);
void __set_cluster_id(const int8_t val);
bool operator==(const duplicate_request &rhs) const
{
if (__isset.timestamp != rhs.__isset.timestamp)
return false;
else if (__isset.timestamp && !(timestamp == rhs.timestamp))
return false;
if (__isset.task_code != rhs.__isset.task_code)
return false;
else if (__isset.task_code && !(task_code == rhs.task_code))
return false;
if (__isset.raw_message != rhs.__isset.raw_message)
return false;
else if (__isset.raw_message && !(raw_message == rhs.raw_message))
return false;
if (__isset.cluster_id != rhs.__isset.cluster_id)
return false;
else if (__isset.cluster_id && !(cluster_id == rhs.cluster_id))
return false;
return true;
}
bool operator!=(const duplicate_request &rhs) const { return !(*this == rhs); }
bool operator<(const duplicate_request &) const;
uint32_t read(::apache::thrift::protocol::TProtocol *iprot);
uint32_t write(::apache::thrift::protocol::TProtocol *oprot) const;
virtual void printTo(std::ostream &out) const;
};
void swap(duplicate_request &a, duplicate_request &b);
inline std::ostream &operator<<(std::ostream &out, const duplicate_request &obj)
{
obj.printTo(out);
return out;
}
typedef struct _duplicate_response__isset
{
_duplicate_response__isset() : error(false) {}
bool error : 1;
} _duplicate_response__isset;
class duplicate_response
{
public:
duplicate_response(const duplicate_response &);
duplicate_response(duplicate_response &&);
duplicate_response &operator=(const duplicate_response &);
duplicate_response &operator=(duplicate_response &&);
duplicate_response() : error(0) {}
virtual ~duplicate_response() throw();
int32_t error;
_duplicate_response__isset __isset;
void __set_error(const int32_t val);
bool operator==(const duplicate_response &rhs) const
{
if (__isset.error != rhs.__isset.error)
return false;
else if (__isset.error && !(error == rhs.error))
return false;
return true;
}
bool operator!=(const duplicate_response &rhs) const { return !(*this == rhs); }
bool operator<(const duplicate_response &) const;
uint32_t read(::apache::thrift::protocol::TProtocol *iprot);
uint32_t write(::apache::thrift::protocol::TProtocol *oprot) const;
virtual void printTo(std::ostream &out) const;
};
void swap(duplicate_response &a, duplicate_response &b);
inline std::ostream &operator<<(std::ostream &out, const duplicate_response &obj)
{
obj.printTo(out);
return out;
}
}
} // namespace
......
// Copyright (c) 2017, Xiaomi, Inc. All rights reserved.
// This source code is licensed under the Apache License Version 2.0, which
// can be found in the LICENSE file in the root directory of this source tree.
#include "pegasus_mutation_duplicator.h"
#include "pegasus_server_impl.h"
#include "base/pegasus_rpc_types.h"
#include <dsn/cpp/message_utils.h>
#include <dsn/utility/chrono_literals.h>
#include <dsn/dist/replication/duplication_common.h>
#include <rrdb/rrdb.client.h>
namespace dsn {
namespace replication {
/// static definition of mutation_duplicator::creator.
/*static*/ std::function<std::unique_ptr<mutation_duplicator>(
replica_base *, string_view, string_view)>
mutation_duplicator::creator = [](replica_base *r, string_view remote, string_view app) {
return make_unique<pegasus::server::pegasus_mutation_duplicator>(r, remote, app);
};
} // namespace replication
} // namespace dsn
namespace pegasus {
namespace server {
using namespace dsn::literals::chrono_literals;
/*extern*/ uint64_t get_hash_from_request(dsn::task_code tc, const dsn::blob &data)
{
if (tc == dsn::apps::RPC_RRDB_RRDB_PUT) {
dsn::apps::update_request thrift_request;
dsn::from_blob_to_thrift(data, thrift_request);
return pegasus_key_hash(thrift_request.key);
}
if (tc == dsn::apps::RPC_RRDB_RRDB_REMOVE) {
dsn::blob raw_key;
dsn::from_blob_to_thrift(data, raw_key);
return pegasus_key_hash(raw_key);
}
if (tc == dsn::apps::RPC_RRDB_RRDB_MULTI_PUT) {
dsn::apps::multi_put_request thrift_request;
dsn::from_blob_to_thrift(data, thrift_request);
return pegasus_hash_key_hash(thrift_request.hash_key);
}
if (tc == dsn::apps::RPC_RRDB_RRDB_MULTI_REMOVE) {
dsn::apps::multi_remove_request thrift_request;
dsn::from_blob_to_thrift(data, thrift_request);
return pegasus_hash_key_hash(thrift_request.hash_key);
}
dfatal("unexpected task code: %s", tc.to_string());
__builtin_unreachable();
}
pegasus_mutation_duplicator::pegasus_mutation_duplicator(dsn::replication::replica_base *r,
dsn::string_view remote_cluster,
dsn::string_view app)
: mutation_duplicator(r), _remote_cluster(remote_cluster)
{
// initialize pegasus-client when this class is first time used.
static __attribute__((unused)) bool _dummy = pegasus_client_factory::initialize(nullptr);
pegasus_client *client = pegasus_client_factory::get_client(remote_cluster.data(), app.data());
_client = static_cast<client::pegasus_client_impl *>(client);
auto ret = dsn::replication::get_duplication_cluster_id(remote_cluster.data());
dassert_replica(ret.is_ok(), // never possible, meta server disallows such remote_cluster.
"invalid remote cluster: {}, err_ret: {}",
remote_cluster,
ret.get_error());
_remote_cluster_id = static_cast<uint8_t>(ret.get_value());
ddebug_replica("initialize mutation duplicator for local cluster [id:{}], "
"remote cluster [id:{}, addr:{}]",
get_current_cluster_id(),
_remote_cluster_id,
remote_cluster);
// never possible to duplicate data to itself
dassert_replica(get_current_cluster_id() != _remote_cluster_id,
"invalid remote cluster: {} {}",
remote_cluster,
_remote_cluster_id);
std::string str_gpid = fmt::format("{}", get_gpid());
_shipped_ops.init_app_counter("app.pegasus",
fmt::format("dup_shipped_ops@{}", str_gpid).c_str(),
COUNTER_TYPE_RATE,
"the total ops of DUPLICATE requests sent from this app");
_failed_shipping_ops.init_app_counter(
"app.pegasus",
fmt::format("dup_failed_shipping_ops@{}", str_gpid).c_str(),
COUNTER_TYPE_RATE,
"the qps of failed DUPLICATE requests sent from this app");
}
void pegasus_mutation_duplicator::send(uint64_t hash, callback cb)
{
duplicate_rpc rpc;
{
dsn::zauto_lock _(_lock);
rpc = _inflights[hash].front();
_inflights[hash].pop_front();
}
_client->async_duplicate(rpc,
[hash, cb, rpc, this](dsn::error_code err) mutable {
on_duplicate_reply(hash, std::move(cb), std::move(rpc), err);
},
_env.__conf.tracker);
}
void pegasus_mutation_duplicator::on_duplicate_reply(uint64_t hash,
mutation_duplicator::callback cb,
duplicate_rpc rpc,
dsn::error_code err)
{
int perr = PERR_OK;
if (err == dsn::ERR_OK) {
perr = client::pegasus_client_impl::get_client_error(
client::pegasus_client_impl::get_rocksdb_server_error(rpc.response().error));
}
if (perr != PERR_OK || err != dsn::ERR_OK) {
_failed_shipping_ops->increment();
// randomly log the 1% of the failed duplicate rpc, because minor number of
// errors are acceptable.
// TODO(wutao1): print the entire request for future debugging.
if (dsn::rand::next_double01() <= 0.01) {
derror_replica("duplicate_rpc failed: {} [code:{}, timestamp:{}]",
err == dsn::ERR_OK ? _client->get_error_string(perr) : err.to_string(),
rpc.request().timestamp);
}
} else {
_shipped_ops->increment();
_total_shipped_size +=
rpc.dsn_request()->header->body_length + rpc.dsn_request()->header->hdr_length;
}
{
dsn::zauto_lock _(_lock);
if (perr != PERR_OK || err != dsn::ERR_OK) {
// retry this rpc
_inflights[hash].push_front(rpc);
_env.schedule([hash, cb, this]() { send(hash, cb); }, 1_s);
return;
}
if (_inflights[hash].empty()) {
_inflights.erase(hash);
if (_inflights.empty()) {
// move forward to the next step.
cb(_total_shipped_size);
}
} else {
// start next rpc immediately
_env.schedule([hash, cb, this]() { send(hash, cb); });
return;
}
}
}
void pegasus_mutation_duplicator::duplicate(mutation_tuple_set muts, callback cb)
{
_total_shipped_size = 0;
for (auto mut : muts) {
// mut: 0=timestamp, 1=rpc_code, 2=raw_message
dsn::task_code rpc_code = std::get<1>(mut);
dsn::blob raw_message = std::get<2>(mut);
auto dreq = dsn::make_unique<dsn::apps::duplicate_request>();
uint64_t hash = get_hash_from_request(rpc_code, raw_message);
if (rpc_code == dsn::apps::RPC_RRDB_RRDB_DUPLICATE) {
// ignore if it is a DUPLICATE
} else {
dreq->__set_raw_message(raw_message);
dreq->__set_task_code(rpc_code);
dreq->__set_timestamp(std::get<0>(mut));
dreq->__set_cluster_id(get_current_cluster_id());
}
duplicate_rpc rpc(std::move(dreq),
dsn::apps::RPC_RRDB_RRDB_DUPLICATE,
10_s, // TODO(wutao1): configurable timeout.
hash);
_inflights[hash].push_back(std::move(rpc));
}
if (_inflights.empty()) {
cb(0);
return;
}
auto inflights = _inflights;
for (const auto &kv : inflights) {
send(kv.first, cb);
}
}
} // namespace server
} // namespace pegasus
// Copyright (c) 2017, Xiaomi, Inc. All rights reserved.
// This source code is licensed under the Apache License Version 2.0, which
// can be found in the LICENSE file in the root directory of this source tree.
#pragma once
#include <dsn/dist/replication/mutation_duplicator.h>
#include <dsn/dist/replication/replica_base.h>
#include <rrdb/rrdb.code.definition.h>
#include "client_lib/pegasus_client_factory_impl.h"
namespace pegasus {
namespace server {
using namespace dsn::literals::chrono_literals;
// Duplicates the loaded mutations to the remote pegasus cluster using pegasus client.
class pegasus_mutation_duplicator : public dsn::replication::mutation_duplicator
{
using mutation_tuple_set = dsn::replication::mutation_tuple_set;
using mutation_tuple = dsn::replication::mutation_tuple;
using duplicate_rpc = dsn::apps::duplicate_rpc;
public:
pegasus_mutation_duplicator(dsn::replication::replica_base *r,
dsn::string_view remote_cluster,
dsn::string_view app);
void duplicate(mutation_tuple_set muts, callback cb) override;
~pegasus_mutation_duplicator() override { _env.__conf.tracker->wait_outstanding_tasks(); }
private:
void send(uint64_t hash, callback cb);
void on_duplicate_reply(uint64_t hash, callback, duplicate_rpc, dsn::error_code err);
private:
friend class pegasus_mutation_duplicator_test;
client::pegasus_client_impl *_client{nullptr};
uint8_t _remote_cluster_id{0};
std::string _remote_cluster;
// The duplicate_rpc are isolated by their hash value from hash key.
// Writes with the same hash are duplicated in mutation order to preserve data consistency,
// otherwise they are duplicated concurrently to improve performance.
std::map<uint64_t, std::deque<duplicate_rpc>> _inflights; // hash -> duplicate_rpc
dsn::zlock _lock;
size_t _total_shipped_size{0};
dsn::perf_counter_wrapper _shipped_ops;
dsn::perf_counter_wrapper _failed_shipping_ops;
};
// Decodes the binary `request_data` into write request in thrift struct, and
// calculates the hash value from the write's hash key.
extern uint64_t get_hash_from_request(dsn::task_code rpc_code, const dsn::blob &request_data);
} // namespace server
} // namespace pegasus
......@@ -6,6 +6,7 @@
#include <dsn/perf_counter/perf_counter_wrapper.h>
#include <dsn/dist/replication/replica_base.h>
#include <dsn/dist/replication/duplication_common.h>
#include "base/pegasus_value_schema.h"
#include "base/pegasus_utils.h"
......@@ -14,6 +15,14 @@
namespace pegasus {
namespace server {
inline uint8_t get_current_cluster_id()
{
static const uint8_t cluster_id =
dsn::replication::get_duplication_cluster_id(dsn::replication::get_current_cluster_name())
.get_value();
return cluster_id;
}
class pegasus_server_impl;
class capacity_unit_calculator;
......
......@@ -6,6 +6,7 @@ set(MY_PROJ_SRC "../pegasus_server_impl.cpp"
"../pegasus_write_service.cpp"
"../pegasus_server_write.cpp"
"../capacity_unit_calculator.cpp"
"../pegasus_mutation_duplicator.cpp"
)
set(MY_SRC_SEARCH_MODE "GLOB")
......
......@@ -125,6 +125,7 @@ stateful = true
[replication]
data_dirs_black_list_file = /home/mi/.pegasus_data_dirs_black_list
cluster_name = onebox
deny_client_on_start = false
delay_for_fd_timeout_on_start = false
......@@ -497,5 +498,10 @@ profiler::cancelled = false
[meta_server]
server_list = 0.0.0.0:34701
[duplication-group]
onebox = 1
onebox2 = 2
[pegasus.clusters]
onebox = @LOCAL_IP@:34701,@LOCAL_IP@:34702,@LOCAL_IP@:34703
onebox2 = @LOCAL_IP@:34701,@LOCAL_IP@:34702,@LOCAL_IP@:34703
// Copyright (c) 2017, Xiaomi, Inc. All rights reserved.
// This source code is licensed under the Apache License Version 2.0, which
// can be found in the LICENSE file in the root directory of this source tree.
#include "server/pegasus_mutation_duplicator.h"
#include "base/pegasus_rpc_types.h"
#include "pegasus_server_test_base.h"
#include <gtest/gtest.h>
#include <dsn/cpp/message_utils.h>
#include <dsn/dist/replication/replica_base.h>
#include <condition_variable>
namespace pegasus {
namespace server {
using namespace dsn::replication;
class pegasus_mutation_duplicator_test : public pegasus_server_test_base
{
dsn::task_tracker _tracker;
dsn::pipeline::environment _env;
public:
pegasus_mutation_duplicator_test()
{
_env.thread_pool(LPC_REPLICATION_LOW).task_tracker(&_tracker);
}
void test_duplicate()
{
replica_base replica(dsn::gpid(1, 1), "fake_replica");
auto duplicator = new_mutation_duplicator(&replica, "onebox2", "temp");
duplicator->set_task_environment(&_env);
mutation_tuple_set muts;
for (uint64_t i = 0; i < 100; i++) {
uint64_t ts = 200 + i;
dsn::task_code code = dsn::apps::RPC_RRDB_RRDB_PUT;
dsn::apps::update_request request;
pegasus::pegasus_generate_key(request.key, std::string("hash"), std::string("sort"));
dsn::message_ptr msg =
dsn::from_thrift_request_to_received_message(request, dsn::apps::RPC_RRDB_RRDB_PUT);
auto data = dsn::move_message_to_blob(msg.get());
muts.insert(std::make_tuple(ts, code, data));
}
size_t total_shipped_size = 0;
auto duplicator_impl = dynamic_cast<pegasus_mutation_duplicator *>(duplicator.get());
RPC_MOCKING(duplicate_rpc)
{
duplicator->duplicate(muts, [](size_t) {});
size_t total_size = 100;
while (total_size > 0) {
// ensure mutations having the same hash are sending sequentially.
ASSERT_EQ(duplicator_impl->_inflights.size(), 1);
ASSERT_EQ(duplicate_rpc::mail_box().size(), 1);
total_size--;
ASSERT_EQ(duplicator_impl->_inflights.begin()->second.size(), total_size);
auto rpc = duplicate_rpc::mail_box().back();
duplicate_rpc::mail_box().pop_back();
total_shipped_size +=
rpc.dsn_request()->body_size() + rpc.dsn_request()->header->hdr_length;
duplicator_impl->on_duplicate_reply(get_hash(rpc),
[total_shipped_size](size_t final_size) {
ASSERT_EQ(total_shipped_size, final_size);
},
rpc,
dsn::ERR_OK);
// schedule next round
_tracker.wait_outstanding_tasks();
}
ASSERT_EQ(duplicator_impl->_total_shipped_size, total_shipped_size);
ASSERT_EQ(duplicator_impl->_inflights.size(), 0);
ASSERT_EQ(duplicate_rpc::mail_box().size(), 0);
}
}
void test_duplicate_failed()
{
replica_base replica(dsn::gpid(1, 1), "fake_replica");
auto duplicator = new_mutation_duplicator(&replica, "onebox2", "temp");
duplicator->set_task_environment(&_env);
mutation_tuple_set muts;
for (uint64_t i = 0; i < 10; i++) {
uint64_t ts = 200 + i;
dsn::task_code code = dsn::apps::RPC_RRDB_RRDB_PUT;
dsn::apps::update_request request;
pegasus::pegasus_generate_key(request.key, std::string("hash"), std::string("sort"));
dsn::message_ptr msg =
dsn::from_thrift_request_to_received_message(request, dsn::apps::RPC_RRDB_RRDB_PUT);
auto data = dsn::move_message_to_blob(msg.get());
muts.insert(std::make_tuple(ts, code, data));
}
auto duplicator_impl = dynamic_cast<pegasus_mutation_duplicator *>(duplicator.get());
RPC_MOCKING(duplicate_rpc)
{
duplicator->duplicate(muts, [](size_t) {});
auto rpc = duplicate_rpc::mail_box().back();
duplicate_rpc::mail_box().pop_back();
ASSERT_EQ(duplicator_impl->_inflights.begin()->second.size(), 9);
// failed
duplicator_impl->on_duplicate_reply(
get_hash(rpc), [](size_t) {}, rpc, dsn::ERR_TIMEOUT);
// schedule next round
_tracker.wait_outstanding_tasks();
// retry infinitely
ASSERT_EQ(duplicator_impl->_inflights.size(), 1);
ASSERT_EQ(duplicate_rpc::mail_box().size(), 1);
ASSERT_EQ(duplicator_impl->_inflights.begin()->second.size(), 9);
duplicate_rpc::mail_box().clear();
// with other error
rpc.response().error = PERR_INVALID_ARGUMENT;
duplicator_impl->on_duplicate_reply(get_hash(rpc), [](size_t) {}, rpc, dsn::ERR_OK);
_tracker.wait_outstanding_tasks();
ASSERT_EQ(duplicator_impl->_inflights.size(), 1);
ASSERT_EQ(duplicate_rpc::mail_box().size(), 1);
ASSERT_EQ(duplicator_impl->_inflights.begin()->second.size(), 9);
duplicate_rpc::mail_box().clear();
// with other error
rpc.response().error = PERR_OK;
duplicator_impl->on_duplicate_reply(
get_hash(rpc), [](size_t) {}, rpc, dsn::ERR_IO_PENDING);
_tracker.wait_outstanding_tasks();
ASSERT_EQ(duplicator_impl->_inflights.size(), 1);
ASSERT_EQ(duplicate_rpc::mail_box().size(), 1);
ASSERT_EQ(duplicator_impl->_inflights.begin()->second.size(), 9);
duplicate_rpc::mail_box().clear();
}
}
void test_duplicate_isolated_hashkeys()
{
replica_base replica(dsn::gpid(1, 1), "fake_replica");
auto duplicator = new_mutation_duplicator(&replica, "onebox2", "temp");
duplicator->set_task_environment(&_env);
size_t total_size = 3000;
mutation_tuple_set muts;
for (uint64_t i = 0; i < total_size; i++) {
uint64_t ts = 200 + i;
dsn::task_code code = dsn::apps::RPC_RRDB_RRDB_PUT;
dsn::apps::update_request request;
pegasus::pegasus_generate_key(
request.key, std::string("hash") + std::to_string(i), std::string("sort"));
dsn::message_ptr msg = dsn::from_thrift_request_to_received_message(request, code);
auto data = dsn::move_message_to_blob(msg.get());
muts.insert(std::make_tuple(ts, code, data));
}
auto duplicator_impl = dynamic_cast<pegasus_mutation_duplicator *>(duplicator.get());
RPC_MOCKING(duplicate_rpc)
{
duplicator->duplicate(muts, [](size_t) {});
// ensure each bucket has only 1 request and each request is
// isolated with others.
ASSERT_EQ(duplicator_impl->_inflights.size(), total_size);
ASSERT_EQ(duplicate_rpc::mail_box().size(), total_size);
for (const auto &ents : duplicator_impl->_inflights) {
ASSERT_EQ(ents.second.size(), 0);
}
// reply with success
auto rpc_list = std::move(duplicate_rpc::mail_box());
for (const auto &rpc : rpc_list) {
rpc.response().error = dsn::ERR_OK;
duplicator_impl->on_duplicate_reply(get_hash(rpc), [](size_t) {}, rpc, dsn::ERR_OK);
}
_tracker.wait_outstanding_tasks();
ASSERT_EQ(duplicate_rpc::mail_box().size(), 0);
ASSERT_EQ(duplicator_impl->_inflights.size(), 0);
}
}
void test_create_duplicator()
{
replica_base replica(dsn::gpid(1, 1), "fake_replica");
auto duplicator = new_mutation_duplicator(&replica, "onebox2", "temp");
duplicator->set_task_environment(&_env);
auto duplicator_impl = dynamic_cast<pegasus_mutation_duplicator *>(duplicator.get());
ASSERT_EQ(duplicator_impl->_remote_cluster_id, 2);
ASSERT_EQ(duplicator_impl->_remote_cluster, "onebox2");
ASSERT_EQ(get_current_cluster_id(), 1);
}
private:
static uint64_t get_hash(const duplicate_rpc &rpc)
{
return get_hash_from_request(rpc.request().task_code, rpc.request().raw_message);
}
};
TEST_F(pegasus_mutation_duplicator_test, get_hash_from_request)
{
std::string hash_key("hash");
std::string sort_key("sort");
uint64_t hash =
pegasus::pegasus_hash_key_hash(dsn::blob(hash_key.data(), 0, hash_key.length()));
{
dsn::apps::multi_put_request request;
request.hash_key.assign(hash_key.data(), 0, hash_key.length());
dsn::message_ptr msg = dsn::from_thrift_request_to_received_message(
request, dsn::apps::RPC_RRDB_RRDB_MULTI_PUT);
auto data = dsn::move_message_to_blob(msg.get());
ASSERT_EQ(hash, get_hash_from_request(dsn::apps::RPC_RRDB_RRDB_MULTI_PUT, data));
}
{
dsn::apps::multi_remove_request request;
request.hash_key.assign(hash_key.data(), 0, hash_key.length());
dsn::message_ptr msg = dsn::from_thrift_request_to_received_message(
request, dsn::apps::RPC_RRDB_RRDB_MULTI_REMOVE);
auto data = dsn::move_message_to_blob(msg.get());
ASSERT_EQ(hash, get_hash_from_request(dsn::apps::RPC_RRDB_RRDB_MULTI_REMOVE, data));
}
{
dsn::apps::update_request request;
pegasus::pegasus_generate_key(request.key, hash_key, sort_key);
dsn::message_ptr msg =
dsn::from_thrift_request_to_received_message(request, dsn::apps::RPC_RRDB_RRDB_PUT);
auto data = dsn::move_message_to_blob(msg.get());
ASSERT_EQ(hash, get_hash_from_request(dsn::apps::RPC_RRDB_RRDB_PUT, data));
}
{
dsn::blob key;
pegasus::pegasus_generate_key(key, hash_key, sort_key);
dsn::message_ptr msg =
dsn::from_thrift_request_to_received_message(key, dsn::apps::RPC_RRDB_RRDB_REMOVE);
auto data = dsn::move_message_to_blob(msg.get());
ASSERT_EQ(hash, get_hash_from_request(dsn::apps::RPC_RRDB_RRDB_REMOVE, data));
}
}
// Verifies that calls on `get_hash_key_from_request` won't make
// message unable to read. (if `get_hash_key_from_request` doesn't
// use copy the message internally, it will.)
TEST_F(pegasus_mutation_duplicator_test, read_after_get_hash_key)
{
std::string hash_key("hash");
std::string sort_key("sort");
uint64_t hash =
pegasus::pegasus_hash_key_hash(dsn::blob(hash_key.data(), 0, hash_key.length()));
dsn::message_ex *msg;
{
dsn::apps::update_request request;
pegasus::pegasus_generate_key(request.key, hash_key, sort_key);
msg = dsn::from_thrift_request_to_received_message(request, dsn::apps::RPC_RRDB_RRDB_PUT);
}
auto data = dsn::move_message_to_blob(msg);
ASSERT_EQ(hash, get_hash_from_request(dsn::apps::RPC_RRDB_RRDB_PUT, data));
pegasus::put_rpc rpc(msg);
dsn::blob raw_key;
pegasus::pegasus_generate_key(raw_key, hash_key, sort_key);
ASSERT_EQ(rpc.request().key.to_string(), raw_key.to_string());
}
TEST_F(pegasus_mutation_duplicator_test, duplicate) { test_duplicate(); }
TEST_F(pegasus_mutation_duplicator_test, duplicate_failed) { test_duplicate_failed(); }
TEST_F(pegasus_mutation_duplicator_test, duplicate_isolated_hashkeys)
{
test_duplicate_isolated_hashkeys();
}
TEST_F(pegasus_mutation_duplicator_test, create_duplicator) { test_create_duplicator(); }
} // namespace server
} // namespace pegasus
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册