未验证 提交 f8624d5a 编写于 作者: W Wu Tao 提交者: GitHub

server: refine write process (#107)

上级 a70eb218
Subproject commit 44d32959746301d1289bf2110f1c68cc81c266a1
Subproject commit dab2ea5027939b64bc51c3511e46c567eafa8859
......@@ -37,14 +37,12 @@ int pegasus_server_write::on_batched_write_requests(dsn_message_t *requests,
if (rpc_code == dsn::apps::RPC_RRDB_RRDB_MULTI_PUT) {
dassert(count == 1, "count = %d", count);
auto rpc = multi_put_rpc::auto_reply(requests[0]);
on_multi_put(rpc);
return rpc.response().error;
return on_multi_put(rpc);
}
if (rpc_code == dsn::apps::RPC_RRDB_RRDB_MULTI_REMOVE) {
dassert(count == 1, "count = %d", count);
auto rpc = multi_remove_rpc::auto_reply(requests[0]);
on_multi_remove(rpc);
return rpc.response().error;
return on_multi_remove(rpc);
}
return on_batched_writes(requests, count, decree);
......@@ -52,7 +50,7 @@ int pegasus_server_write::on_batched_write_requests(dsn_message_t *requests,
int pegasus_server_write::on_batched_writes(dsn_message_t *requests, int count, int64_t decree)
{
int err;
int err = 0;
{
_write_svc->batch_prepare();
......@@ -62,11 +60,11 @@ int pegasus_server_write::on_batched_writes(dsn_message_t *requests, int count,
dsn::task_code rpc_code(dsn_msg_task_code(requests[i]));
if (rpc_code == dsn::apps::RPC_RRDB_RRDB_PUT) {
auto rpc = put_rpc::auto_reply(requests[i]);
on_single_put_in_batch(rpc);
err = on_single_put_in_batch(rpc);
_put_rpc_batch.emplace_back(std::move(rpc));
} else if (rpc_code == dsn::apps::RPC_RRDB_RRDB_REMOVE) {
auto rpc = remove_rpc::auto_reply(requests[i]);
on_single_remove_in_batch(rpc);
err = on_single_remove_in_batch(rpc);
_remove_rpc_batch.emplace_back(std::move(rpc));
} else {
if (rpc_code == dsn::apps::RPC_RRDB_RRDB_MULTI_PUT ||
......@@ -76,6 +74,7 @@ int pegasus_server_write::on_batched_writes(dsn_message_t *requests, int count,
dfatal("rpc code not handled: %s", rpc_code.to_string());
}
}
RETURN_NOT_ZERO(err);
}
err = _write_svc->batch_commit(decree);
......
......@@ -18,35 +18,42 @@ class pegasus_server_write : public dsn::replication::replica_base
public:
pegasus_server_write(pegasus_server_impl *server, bool verbose_log);
/// \return error code returned by rocksdb, i.e rocksdb::Status::code.
/// **NOTE**
/// Error returned is regarded as the failure of replica, thus will trigger
/// cluster membership changes. Make sure no error is returned because of
/// invalid user argument.
int on_batched_write_requests(dsn_message_t *requests,
int count,
int64_t decree,
uint64_t timestamp);
private:
void on_multi_put(multi_put_rpc &rpc)
int on_multi_put(multi_put_rpc &rpc)
{
_write_svc->multi_put(_decree, rpc.request(), rpc.response());
return _write_svc->multi_put(_decree, rpc.request(), rpc.response());
}
void on_multi_remove(multi_remove_rpc &rpc)
int on_multi_remove(multi_remove_rpc &rpc)
{
_write_svc->multi_remove(_decree, rpc.request(), rpc.response());
return _write_svc->multi_remove(_decree, rpc.request(), rpc.response());
}
/// Delay replying for the batched requests until all of them complete.
int on_batched_writes(dsn_message_t *requests, int count, int64_t decree);
void on_single_put_in_batch(put_rpc &rpc)
int on_single_put_in_batch(put_rpc &rpc)
{
_write_svc->batch_put(rpc.request(), rpc.response());
int err = _write_svc->batch_put(rpc.request(), rpc.response());
request_key_check(_decree, rpc.dsn_request(), rpc.request().key);
return err;
}
void on_single_remove_in_batch(remove_rpc &rpc)
int on_single_remove_in_batch(remove_rpc &rpc)
{
_write_svc->batch_remove(rpc.request(), rpc.response());
int err = _write_svc->batch_remove(rpc.request(), rpc.response());
request_key_check(_decree, rpc.dsn_request(), rpc.request());
return err;
}
// Ensure that the write request is directed to the right partition.
......
......@@ -60,41 +60,43 @@ pegasus_write_service::pegasus_write_service(pegasus_server_impl *server)
pegasus_write_service::~pegasus_write_service() = default;
void pegasus_write_service::multi_put(int64_t decree,
const dsn::apps::multi_put_request &update,
dsn::apps::update_response &resp)
int pegasus_write_service::multi_put(int64_t decree,
const dsn::apps::multi_put_request &update,
dsn::apps::update_response &resp)
{
uint64_t start_time = dsn_now_ns();
_pfc_multi_put_qps->increment();
_impl->multi_put(decree, update, resp);
int err = _impl->multi_put(decree, update, resp);
_pfc_multi_put_latency->set(dsn_now_ns() - start_time);
return err;
}
void pegasus_write_service::multi_remove(int64_t decree,
const dsn::apps::multi_remove_request &update,
dsn::apps::multi_remove_response &resp)
int pegasus_write_service::multi_remove(int64_t decree,
const dsn::apps::multi_remove_request &update,
dsn::apps::multi_remove_response &resp)
{
uint64_t start_time = dsn_now_ns();
_pfc_multi_remove_qps->increment();
_impl->multi_remove(decree, update, resp);
int err = _impl->multi_remove(decree, update, resp);
_pfc_multi_remove_latency->set(dsn_now_ns() - start_time);
return err;
}
void pegasus_write_service::batch_put(const dsn::apps::update_request &update,
dsn::apps::update_response &resp)
int pegasus_write_service::batch_put(const dsn::apps::update_request &update,
dsn::apps::update_response &resp)
{
_pfc_put_qps->increment();
_batch_perfcounters.push_back(_pfc_put_latency.get());
_impl->batch_put(update, resp);
return _impl->batch_put(update, resp);
}
void pegasus_write_service::batch_remove(const dsn::blob &key, dsn::apps::update_response &resp)
int pegasus_write_service::batch_remove(const dsn::blob &key, dsn::apps::update_response &resp)
{
_pfc_remove_qps->increment();
_batch_perfcounters.push_back(_pfc_remove_latency.get());
_impl->batch_remove(key, resp);
return _impl->batch_remove(key, resp);
}
int pegasus_write_service::batch_commit(int64_t decree)
......@@ -124,8 +126,11 @@ void pegasus_write_service::batch_prepare()
int pegasus_write_service::empty_put(int64_t decree)
{
std::string empty_key, empty_value;
_impl->db_write_batch_put(empty_key, empty_value, 0);
return _impl->db_write(decree);
int err = _impl->db_write_batch_put(empty_key, empty_value, 0);
if (!err) {
err = _impl->db_write(decree);
}
return err;
}
} // namespace server
......
......@@ -5,6 +5,7 @@
#pragma once
#include <dsn/cpp/perf_counter_wrapper.h>
#include <dsn/dist/replication/replica_base.h>
#include "base/pegasus_value_schema.h"
#include "base/pegasus_utils.h"
......@@ -15,6 +16,10 @@ namespace server {
class pegasus_server_impl;
#define RETURN_NOT_ZERO(err) \
if (dsn_unlikely(err)) \
return err;
/// Handle the write requests.
/// As the signatures imply, this class is not responsible for replying the rpc,
/// the caller(pegasus_server_write) should do.
......@@ -26,13 +31,13 @@ public:
~pegasus_write_service();
void multi_put(int64_t decree,
const dsn::apps::multi_put_request &update,
dsn::apps::update_response &resp);
int multi_put(int64_t decree,
const dsn::apps::multi_put_request &update,
dsn::apps::update_response &resp);
void multi_remove(int64_t decree,
const dsn::apps::multi_remove_request &update,
dsn::apps::multi_remove_response &resp);
int multi_remove(int64_t decree,
const dsn::apps::multi_remove_request &update,
dsn::apps::multi_remove_response &resp);
/// Prepare for batch write.
void batch_prepare();
......@@ -44,9 +49,9 @@ public:
/// NOTE that `resp` should not be moved or freed while
/// the batch is not committed.
void batch_put(const dsn::apps::update_request &update, dsn::apps::update_response &resp);
int batch_put(const dsn::apps::update_request &update, dsn::apps::update_response &resp);
void batch_remove(const dsn::blob &key, dsn::apps::update_response &resp);
int batch_remove(const dsn::blob &key, dsn::apps::update_response &resp);
/// \returns 0 if success, non-0 if failure.
/// If the batch contains no updates, 0 is returned.
......
......@@ -25,9 +25,9 @@ public:
{
}
void multi_put(int64_t decree,
const dsn::apps::multi_put_request &update,
dsn::apps::update_response &resp)
int multi_put(int64_t decree,
const dsn::apps::multi_put_request &update,
dsn::apps::update_response &resp)
{
resp.app_id = get_gpid().get_app_id();
resp.partition_index = get_gpid().get_partition_index();
......@@ -41,24 +41,23 @@ public:
// an invalid operation shouldn't be added to latency calculation
resp.error = rocksdb::Status::kInvalidArgument;
return;
return 0;
}
for (auto &kv : update.kvs) {
resp.error = db_write_batch_put(composite_raw_key(update.hash_key, kv.key),
kv.value,
static_cast<uint32_t>(update.expire_ts_seconds));
if (resp.error != 0) {
return;
}
RETURN_NOT_ZERO(resp.error);
}
resp.error = db_write(decree);
return resp.error;
}
void multi_remove(int64_t decree,
const dsn::apps::multi_remove_request &update,
dsn::apps::multi_remove_response &resp)
int multi_remove(int64_t decree,
const dsn::apps::multi_remove_request &update,
dsn::apps::multi_remove_response &resp)
{
resp.app_id = get_gpid().get_app_id();
resp.partition_index = get_gpid().get_partition_index();
......@@ -73,12 +72,13 @@ public:
// an invalid operation shouldn't be added to latency calculation
resp.error = rocksdb::Status::kInvalidArgument;
resp.count = 0;
return;
return 0;
}
for (auto &sort_key : update.sort_keys) {
// TODO(wutao1): check returned error
db_write_batch_delete(composite_raw_key(update.hash_key, sort_key));
resp.error = db_write_batch_delete(composite_raw_key(update.hash_key, sort_key));
RETURN_NOT_ZERO(resp.error);
}
resp.error = db_write(decree);
......@@ -87,19 +87,22 @@ public:
} else {
resp.count = update.sort_keys.size();
}
return resp.error;
}
void batch_put(const dsn::apps::update_request &update, dsn::apps::update_response &resp)
int batch_put(const dsn::apps::update_request &update, dsn::apps::update_response &resp)
{
resp.error = db_write_batch_put(
update.key, update.value, static_cast<uint32_t>(update.expire_ts_seconds));
_update_responses.emplace_back(&resp);
return resp.error;
}
void batch_remove(const dsn::blob &key, dsn::apps::update_response &resp)
int batch_remove(const dsn::blob &key, dsn::apps::update_response &resp)
{
resp.error = db_write_batch_delete(key);
_update_responses.emplace_back(&resp);
return resp.error;
}
int batch_commit(int64_t decree)
......@@ -125,14 +128,25 @@ public:
rocksdb::SliceParts skey_parts(&skey, 1);
rocksdb::SliceParts svalue =
_value_generator.generate_value(_value_schema_version, value, expire_sec);
_batch.Put(skey_parts, svalue);
return 0;
rocksdb::Status s = _batch.Put(skey_parts, svalue);
if (dsn_unlikely(!s.ok())) {
derror_rocksdb("WriteBatchPut",
s.ToString(),
"raw_key: {}, expire_sec: {}",
utils::c_escape_string(raw_key),
expire_sec);
}
return s.code();
}
int db_write_batch_delete(dsn::string_view raw_key)
{
_batch.Delete(utils::to_rocksdb_slice(raw_key));
return 0;
rocksdb::Status s = _batch.Delete(utils::to_rocksdb_slice(raw_key));
if (dsn_unlikely(!s.ok())) {
derror_rocksdb(
"WriteBatchDelete", s.ToString(), "raw_key: {}", utils::c_escape_string(raw_key));
}
return s.code();
}
// Apply the write batch into rocksdb.
......@@ -145,7 +159,7 @@ public:
_wt_opts->given_decree = static_cast<uint64_t>(decree);
auto status = _db->Write(*_wt_opts, &_batch);
if (!status.ok()) {
derror_rocksdb("write", status.ToString(), "decree: {}", decree);
derror_rocksdb("Write", status.ToString(), "decree: {}", decree);
}
_batch.Clear();
return status.code();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册