未验证 提交 e56690c2 编写于 作者: W Wu Tao 提交者: GitHub

refactor: use db_get to implement incr (#598)

上级 b0578482
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <iostream>
#include <dsn/dist/replication/replication_app_base.h>
......@@ -177,21 +177,25 @@ public:
resp.decree = decree;
resp.server = _primary_address;
rocksdb::Slice raw_key(update.key.data(), update.key.length());
std::string raw_value;
dsn::string_view raw_key(update.key.data(), update.key.length());
int64_t new_value = 0;
uint32_t new_expire_ts = 0;
rocksdb::Status s = _db->Get(_rd_opts, raw_key, &raw_value);
if (s.ok()) {
uint32_t old_expire_ts = pegasus_extract_expire_ts(_pegasus_data_version, raw_value);
if (check_if_ts_expired(utils::epoch_now(), old_expire_ts)) {
db_get_context get_ctx;
int err = db_get(raw_key, &get_ctx);
if (err == 0) {
if (!get_ctx.found) {
// old value is not found, set to 0 before increment
new_value = update.increment;
new_expire_ts = update.expire_ts_seconds > 0 ? update.expire_ts_seconds : 0;
} else if (get_ctx.expired) {
// ttl timeout, set to 0 before increment
new_value = update.increment;
new_expire_ts = update.expire_ts_seconds > 0 ? update.expire_ts_seconds : 0;
} else {
::dsn::blob old_value;
pegasus_extract_user_data(_pegasus_data_version, std::move(raw_value), old_value);
_pegasus_data_version, std::move(get_ctx.raw_value), old_value);
if (old_value.length() == 0) {
// empty old value, set to 0 before increment
new_value = update.increment;
......@@ -223,29 +227,14 @@ public:
// set new ttl
if (update.expire_ts_seconds == 0)
new_expire_ts = old_expire_ts;
else if (update.expire_ts_seconds < 0)
if (update.expire_ts_seconds == 0) {
new_expire_ts = get_ctx.expire_ts;
} else if (update.expire_ts_seconds < 0) {
new_expire_ts = 0;
else // update.expire_ts_seconds > 0
} else { // update.expire_ts_seconds > 0
new_expire_ts = update.expire_ts_seconds;
} else if (s.IsNotFound()) {
// old value is not found, set to 0 before increment
new_value = update.increment;
new_expire_ts = update.expire_ts_seconds > 0 ? update.expire_ts_seconds : 0;
} else {
// read old value failed
::dsn::blob hash_key, sort_key;
pegasus_restore_key(::dsn::blob(raw_key.data(), 0, raw_key.size()), hash_key, sort_key);
derror_rocksdb("Get for Incr",
"decree: {}, hash_key: {}, sort_key: {}",
resp.error = s.code();
return resp.error;
resp.error =
......@@ -675,7 +664,10 @@ private:
return status.code();
// The resulted `expire_ts` is -1 if record is expired.
/// Calls RocksDB Get and store the result into `db_get_context`.
/// \returns 0 if Get succeeded. On failure, a non-zero rocksdb status code is returned.
/// \result ctx.expired=true if record expired. Still 0 is returned.
/// \result ctx.found=false if record is not found. Still 0 is returned.
int db_get(dsn::string_view raw_key,
/*out*/ db_get_context *ctx)
......@@ -52,6 +52,22 @@ public:
int db_get(dsn::string_view raw_key, db_get_context *get_ctx)
return _write_impl->db_get(raw_key, get_ctx);
void single_set(dsn::blob raw_key, dsn::blob user_value)
dsn::apps::update_request put;
put.key = raw_key;
put.value = user_value;
db_write_context write_ctx;
dsn::apps::update_response put_resp;
_write_impl->batch_put(write_ctx, put, put_resp);
ASSERT_EQ(_write_impl->batch_commit(0), 0);
TEST_F(pegasus_write_service_impl_test, put_verify_timetag)
......@@ -140,5 +156,66 @@ TEST_F(pegasus_write_service_impl_test, verify_timetag_compatible_with_version_0
class incr_test : public pegasus_write_service_impl_test
void SetUp() override
req.key, dsn::string_view("hash_key"), dsn::string_view("sort_key"));
dsn::apps::incr_request req;
dsn::apps::incr_response resp;
TEST_F(incr_test, incr_on_absent_record)
// ensure key is absent
db_get_context get_ctx;
db_get(req.key, &get_ctx);
req.increment = 100;
_write_impl->incr(0, req, resp);
ASSERT_EQ(resp.new_value, 100);
db_get(req.key, &get_ctx);
TEST_F(incr_test, negative_incr_and_zero_incr)
req.increment = -100;
ASSERT_EQ(0, _write_impl->incr(0, req, resp));
ASSERT_EQ(resp.new_value, -100);
req.increment = -1;
ASSERT_EQ(0, _write_impl->incr(0, req, resp));
ASSERT_EQ(resp.new_value, -101);
req.increment = 0;
ASSERT_EQ(0, _write_impl->incr(0, req, resp));
ASSERT_EQ(resp.new_value, -101);
TEST_F(incr_test, invalid_incr)
single_set(req.key, dsn::blob::create_from_bytes("abc"));
req.increment = 10;
_write_impl->incr(1, req, resp);
ASSERT_EQ(resp.error, rocksdb::Status::kInvalidArgument);
ASSERT_EQ(resp.new_value, 0);
single_set(req.key, dsn::blob::create_from_bytes("100"));
req.increment = std::numeric_limits<int64_t>::max();
_write_impl->incr(1, req, resp);
ASSERT_EQ(resp.error, rocksdb::Status::kInvalidArgument);
ASSERT_EQ(resp.new_value, 100);
} // namespace server
} // namespace pegasus
