未验证 提交 4c8e64f7 编写于 作者: Q QinZuoyan 提交者: GitHub

server: support check_and_set (#122)

上级 27b0f848
...@@ -31,6 +31,8 @@ ...@@ -31,6 +31,8 @@
"collector*app.pegasus*app.stat.multi_put_qps#_all_/cluster=${cluster.name},job=collector,port=${collector.port},service=pegasus", "collector*app.pegasus*app.stat.multi_put_qps#_all_/cluster=${cluster.name},job=collector,port=${collector.port},service=pegasus",
"collector*app.pegasus*app.stat.remove_qps#_all_/cluster=${cluster.name},job=collector,port=${collector.port},service=pegasus", "collector*app.pegasus*app.stat.remove_qps#_all_/cluster=${cluster.name},job=collector,port=${collector.port},service=pegasus",
"collector*app.pegasus*app.stat.multi_remove_qps#_all_/cluster=${cluster.name},job=collector,port=${collector.port},service=pegasus", "collector*app.pegasus*app.stat.multi_remove_qps#_all_/cluster=${cluster.name},job=collector,port=${collector.port},service=pegasus",
"collector*app.pegasus*app.stat.incr_qps#_all_/cluster=${cluster.name},job=collector,port=${collector.port},service=pegasus",
"collector*app.pegasus*app.stat.check_and_set_qps#_all_/cluster=${cluster.name},job=collector,port=${collector.port},service=pegasus",
"collector*app.pegasus*app.stat.scan_qps#_all_/cluster=${cluster.name},job=collector,port=${collector.port},service=pegasus" "collector*app.pegasus*app.stat.scan_qps#_all_/cluster=${cluster.name},job=collector,port=${collector.port},service=pegasus"
], ],
"graph_type": "a", "graph_type": "a",
...@@ -38,7 +40,7 @@ ...@@ -38,7 +40,7 @@
"timespan": 86400 "timespan": 86400
}, },
{ {
"title": "P99 单条读 服务端延迟(单位:纳秒)", "title": "P99 Get 服务端延迟(单位:纳秒)",
"endpoints": ["cluster=${cluster.name} job=replica service=pegasus"], "endpoints": ["cluster=${cluster.name} job=replica service=pegasus"],
"counters": [ "counters": [
"zion*profiler*RPC_RRDB_RRDB_GET.latency.server/cluster=${cluster.name},job=replica,port=${replica.port},service=pegasus" "zion*profiler*RPC_RRDB_RRDB_GET.latency.server/cluster=${cluster.name},job=replica,port=${replica.port},service=pegasus"
...@@ -48,27 +50,27 @@ ...@@ -48,27 +50,27 @@
"timespan": 86400 "timespan": 86400
}, },
{ {
"title": "P99 单条写 服务端延迟(单位:纳秒)", "title": "P99 MultiGet 服务端延迟(单位:纳秒)",
"endpoints": ["cluster=${cluster.name} job=replica service=pegasus"], "endpoints": ["cluster=${cluster.name} job=replica service=pegasus"],
"counters": [ "counters": [
"zion*profiler*RPC_RRDB_RRDB_PUT.latency.server/cluster=${cluster.name},job=replica,port=${replica.port},service=pegasus" "zion*profiler*RPC_RRDB_RRDB_MULTI_GET.latency.server/cluster=${cluster.name},job=replica,port=${replica.port},service=pegasus"
], ],
"graph_type": "a", "graph_type": "a",
"method": "", "method": "",
"timespan": 86400 "timespan": 86400
}, },
{ {
"title": "P99 多条读 服务端延迟(单位:纳秒)", "title": "P99 Set 服务端延迟(单位:纳秒)",
"endpoints": ["cluster=${cluster.name} job=replica service=pegasus"], "endpoints": ["cluster=${cluster.name} job=replica service=pegasus"],
"counters": [ "counters": [
"zion*profiler*RPC_RRDB_RRDB_MULTI_GET.latency.server/cluster=${cluster.name},job=replica,port=${replica.port},service=pegasus" "zion*profiler*RPC_RRDB_RRDB_PUT.latency.server/cluster=${cluster.name},job=replica,port=${replica.port},service=pegasus"
], ],
"graph_type": "a", "graph_type": "a",
"method": "", "method": "",
"timespan": 86400 "timespan": 86400
}, },
{ {
"title": "P99 多条写 服务端延迟(单位:纳秒)", "title": "P99 MultiSet 服务端延迟(单位:纳秒)",
"endpoints": ["cluster=${cluster.name} job=replica service=pegasus"], "endpoints": ["cluster=${cluster.name} job=replica service=pegasus"],
"counters": [ "counters": [
"zion*profiler*RPC_RRDB_RRDB_MULTI_PUT.latency.server/cluster=${cluster.name},job=replica,port=${replica.port},service=pegasus" "zion*profiler*RPC_RRDB_RRDB_MULTI_PUT.latency.server/cluster=${cluster.name},job=replica,port=${replica.port},service=pegasus"
...@@ -77,6 +79,66 @@ ...@@ -77,6 +79,66 @@
"method": "", "method": "",
"timespan": 86400 "timespan": 86400
}, },
{
"title": "P99 Del 服务端延迟(单位:纳秒)",
"endpoints": ["cluster=${cluster.name} job=replica service=pegasus"],
"counters": [
"zion*profiler*RPC_RRDB_RRDB_REMOVE.latency.server/cluster=${cluster.name},job=replica,port=${replica.port},service=pegasus"
],
"graph_type": "a",
"method": "",
"timespan": 86400
},
{
"title": "P99 MultiDel 服务端延迟(单位:纳秒)",
"endpoints": ["cluster=${cluster.name} job=replica service=pegasus"],
"counters": [
"zion*profiler*RPC_RRDB_RRDB_MULTI_REMOVE.latency.server/cluster=${cluster.name},job=replica,port=${replica.port},service=pegasus"
],
"graph_type": "a",
"method": "",
"timespan": 86400
},
{
"title": "P99 Incr 服务端延迟(单位:纳秒)",
"endpoints": ["cluster=${cluster.name} job=replica service=pegasus"],
"counters": [
"zion*profiler*RPC_RRDB_RRDB_INCR.latency.server/cluster=${cluster.name},job=replica,port=${replica.port},service=pegasus"
],
"graph_type": "a",
"method": "",
"timespan": 86400
},
{
"title": "P99 CheckAndSet 服务端延迟(单位:纳秒)",
"endpoints": ["cluster=${cluster.name} job=replica service=pegasus"],
"counters": [
"zion*profiler*RPC_RRDB_RRDB_CHECK_AND_SET.latency.server/cluster=${cluster.name},job=replica,port=${replica.port},service=pegasus"
],
"graph_type": "a",
"method": "",
"timespan": 86400
},
{
"title": "P99 GetScanner 服务端延迟(单位:纳秒)",
"endpoints": ["cluster=${cluster.name} job=replica service=pegasus"],
"counters": [
"zion*profiler*RPC_RRDB_RRDB_GET_SCANNER.latency.server/cluster=${cluster.name},job=replica,port=${replica.port},service=pegasus"
],
"graph_type": "a",
"method": "",
"timespan": 86400
},
{
"title": "P99 Scan 服务端延迟(单位:纳秒)",
"endpoints": ["cluster=${cluster.name} job=replica service=pegasus"],
"counters": [
"zion*profiler*RPC_RRDB_RRDB_SCAN.latency.server/cluster=${cluster.name},job=replica,port=${replica.port},service=pegasus"
],
"graph_type": "a",
"method": "",
"timespan": 86400
},
{ {
"title": "P99 Prepare 客户端延迟(单位:纳秒)", "title": "P99 Prepare 客户端延迟(单位:纳秒)",
"endpoints": ["cluster=${cluster.name} job=replica service=pegasus"], "endpoints": ["cluster=${cluster.name} job=replica service=pegasus"],
...@@ -485,6 +547,8 @@ ...@@ -485,6 +547,8 @@
"collector*app.pegasus*app.stat.multi_put_qps#${table.name}/cluster=${cluster.name},job=collector,port=${collector.port},service=pegasus", "collector*app.pegasus*app.stat.multi_put_qps#${table.name}/cluster=${cluster.name},job=collector,port=${collector.port},service=pegasus",
"collector*app.pegasus*app.stat.remove_qps#${table.name}/cluster=${cluster.name},job=collector,port=${collector.port},service=pegasus", "collector*app.pegasus*app.stat.remove_qps#${table.name}/cluster=${cluster.name},job=collector,port=${collector.port},service=pegasus",
"collector*app.pegasus*app.stat.multi_remove_qps#${table.name}/cluster=${cluster.name},job=collector,port=${collector.port},service=pegasus", "collector*app.pegasus*app.stat.multi_remove_qps#${table.name}/cluster=${cluster.name},job=collector,port=${collector.port},service=pegasus",
"collector*app.pegasus*app.stat.incr_qps#${table.name}/cluster=${cluster.name},job=collector,port=${collector.port},service=pegasus",
"collector*app.pegasus*app.stat.check_and_set_qps#${table.name}/cluster=${cluster.name},job=collector,port=${collector.port},service=pegasus",
"collector*app.pegasus*app.stat.scan_qps#${table.name}/cluster=${cluster.name},job=collector,port=${collector.port},service=pegasus" "collector*app.pegasus*app.stat.scan_qps#${table.name}/cluster=${cluster.name},job=collector,port=${collector.port},service=pegasus"
], ],
"graph_type": "a", "graph_type": "a",
......
...@@ -80,7 +80,7 @@ do ...@@ -80,7 +80,7 @@ do
count=0 count=0
fi fi
pad_str "$count" $thread_pad_length right pad_str "$count" $thread_pad_length right
replica_count=$((replica_count+1)) replica_count=$((replica_count+count))
if [ $count -gt $max ]; then if [ $count -gt $max ]; then
max=$count max=$count
max_time=1 max_time=1
......
...@@ -19,4 +19,7 @@ using remove_rpc = dsn::rpc_holder<dsn::blob, dsn::apps::update_response>; ...@@ -19,4 +19,7 @@ using remove_rpc = dsn::rpc_holder<dsn::blob, dsn::apps::update_response>;
using incr_rpc = dsn::rpc_holder<dsn::apps::incr_request, dsn::apps::incr_response>; using incr_rpc = dsn::rpc_holder<dsn::apps::incr_request, dsn::apps::incr_response>;
using check_and_set_rpc =
dsn::rpc_holder<dsn::apps::check_and_set_request, dsn::apps::check_and_set_response>;
} // namespace pegasus } // namespace pegasus
...@@ -23,25 +23,6 @@ inline uint32_t epoch_now() { return time(nullptr) - epoch_begin; } ...@@ -23,25 +23,6 @@ inline uint32_t epoch_now() { return time(nullptr) - epoch_begin; }
// extract "host" from rpc_address // extract "host" from rpc_address
void addr2host(const ::dsn::rpc_address &addr, char *str, int len); void addr2host(const ::dsn::rpc_address &addr, char *str, int len);
// three-way comparison, returns value:
// < 0 iff "a" < "b",
// == 0 iff "a" == "b",
// > 0 iff "a" > "b"
// T must support data() and length() method.
template <class T>
int binary_compare(const T &a, const T &b)
{
size_t min_len = (a.length() < b.length()) ? a.length() : b.length();
int r = ::memcmp(a.data(), b.data(), min_len);
if (r == 0) {
if (a.length() < b.length())
r = -1;
else if (a.length() > b.length())
r = +1;
}
return r;
}
template <typename elem_type, typename compare = std::less<elem_type>> template <typename elem_type, typename compare = std::less<elem_type>>
class top_n class top_n
{ {
......
此差异已折叠。
...@@ -824,6 +824,120 @@ void pegasus_client_impl::async_incr(const std::string &hash_key, ...@@ -824,6 +824,120 @@ void pegasus_client_impl::async_incr(const std::string &hash_key,
partition_hash); partition_hash);
} }
int pegasus_client_impl::check_and_set(const std::string &hash_key,
const std::string &check_sort_key,
cas_check_type check_type,
const std::string &check_operand,
const std::string &set_sort_key,
const std::string &set_value,
const check_and_set_options &options,
check_and_set_results &results,
int timeout_milliseconds,
internal_info *info)
{
::dsn::utils::notify_event op_completed;
int ret = -1;
auto callback = [&](int _err, check_and_set_results &&_results, internal_info &&_info) {
ret = _err;
results = std::move(_results);
if (info != nullptr)
(*info) = std::move(_info);
op_completed.notify();
};
async_check_and_set(hash_key,
check_sort_key,
check_type,
check_operand,
set_sort_key,
set_value,
options,
std::move(callback),
timeout_milliseconds);
op_completed.wait();
return ret;
}
void pegasus_client_impl::async_check_and_set(const std::string &hash_key,
const std::string &check_sort_key,
cas_check_type check_type,
const std::string &check_operand,
const std::string &set_sort_key,
const std::string &set_value,
const check_and_set_options &options,
async_check_and_set_callback_t &&callback,
int timeout_milliseconds)
{
// check params
if (hash_key.size() >= UINT16_MAX) {
derror("invalid hash key: hash key length should be less than UINT16_MAX, but %d",
(int)hash_key.size());
if (callback != nullptr)
callback(PERR_INVALID_HASH_KEY, check_and_set_results(), internal_info());
return;
}
::dsn::apps::check_and_set_request req;
req.hash_key.assign(hash_key.c_str(), 0, hash_key.size());
req.check_sort_key.assign(check_sort_key.c_str(), 0, check_sort_key.size());
req.check_type = (dsn::apps::cas_check_type::type)check_type;
req.check_operand.assign(check_operand.c_str(), 0, check_operand.size());
if (check_sort_key != set_sort_key) {
req.set_diff_sort_key = true;
req.set_sort_key.assign(set_sort_key.c_str(), 0, set_sort_key.size());
}
req.set_value.assign(set_value.c_str(), 0, set_value.size());
if (options.set_value_ttl_seconds == 0)
req.set_expire_ts_seconds = 0;
else
req.set_expire_ts_seconds = options.set_value_ttl_seconds + utils::epoch_now();
req.return_check_value = options.return_check_value;
::dsn::blob tmp_key;
pegasus_generate_key(tmp_key, req.hash_key, ::dsn::blob());
auto partition_hash = pegasus_key_hash(tmp_key);
auto new_callback = [user_callback = std::move(callback)](
::dsn::error_code err, dsn_message_t req, dsn_message_t resp)
{
if (user_callback == nullptr) {
return;
}
check_and_set_results results;
internal_info info;
::dsn::apps::check_and_set_response response;
if (err == ::dsn::ERR_OK) {
::dsn::unmarshall(resp, response);
if (response.error == 0) {
results.set_succeed = true;
} else if (response.error == 13) { // kTryAgain
results.set_succeed = false;
response.error = 0;
} else {
results.set_succeed = false;
}
if (response.check_value_returned) {
results.check_value_returned = true;
if (response.check_value_exist) {
results.check_value_exist = true;
results.check_value.assign(response.check_value.data(),
response.check_value.length());
}
}
info.app_id = response.app_id;
info.partition_index = response.partition_index;
info.decree = response.decree;
info.server = response.server;
}
int ret =
get_client_error(err == ERR_OK ? get_rocksdb_server_error(response.error) : int(err));
user_callback(ret, std::move(results), std::move(info));
};
_client->check_and_set(req,
std::move(new_callback),
std::chrono::milliseconds(timeout_milliseconds),
0,
partition_hash);
}
int pegasus_client_impl::ttl(const std::string &hash_key, int pegasus_client_impl::ttl(const std::string &hash_key,
const std::string &sort_key, const std::string &sort_key,
int &ttl_seconds, int &ttl_seconds,
...@@ -911,12 +1025,12 @@ int pegasus_client_impl::get_scanner(const std::string &hash_key, ...@@ -911,12 +1025,12 @@ int pegasus_client_impl::get_scanner(const std::string &hash_key,
pegasus_generate_key(prefix_start, hash_key, o.sort_key_filter_pattern); pegasus_generate_key(prefix_start, hash_key, o.sort_key_filter_pattern);
pegasus_generate_next_blob(prefix_stop, hash_key, o.sort_key_filter_pattern); pegasus_generate_next_blob(prefix_stop, hash_key, o.sort_key_filter_pattern);
if (::pegasus::utils::binary_compare(prefix_start, start) > 0) { if (::dsn::string_view(prefix_start).compare(start) > 0) {
start = std::move(prefix_start); start = std::move(prefix_start);
o.start_inclusive = true; o.start_inclusive = true;
} }
if (::pegasus::utils::binary_compare(prefix_stop, stop) <= 0) { if (::dsn::string_view(prefix_stop).compare(stop) <= 0) {
stop = std::move(prefix_stop); stop = std::move(prefix_stop);
o.stop_inclusive = false; o.stop_inclusive = false;
} }
...@@ -924,7 +1038,7 @@ int pegasus_client_impl::get_scanner(const std::string &hash_key, ...@@ -924,7 +1038,7 @@ int pegasus_client_impl::get_scanner(const std::string &hash_key,
// check if range is empty // check if range is empty
std::vector<uint64_t> v; std::vector<uint64_t> v;
int c = ::pegasus::utils::binary_compare(start, stop); int c = ::dsn::string_view(start).compare(stop);
if (c < 0 || (c == 0 && o.start_inclusive && o.stop_inclusive)) { if (c < 0 || (c == 0 && o.start_inclusive && o.stop_inclusive)) {
v.push_back(pegasus_key_hash(start)); v.push_back(pegasus_key_hash(start));
} }
......
...@@ -151,6 +151,27 @@ public: ...@@ -151,6 +151,27 @@ public:
async_incr_callback_t &&callback = nullptr, async_incr_callback_t &&callback = nullptr,
int timeout_milliseconds = 5000) override; int timeout_milliseconds = 5000) override;
virtual int check_and_set(const std::string &hash_key,
const std::string &check_sort_key,
cas_check_type check_type,
const std::string &check_operand,
const std::string &set_sort_key,
const std::string &set_value,
const check_and_set_options &options,
check_and_set_results &results,
int timeout_milliseconds = 5000,
internal_info *info = nullptr) override;
virtual void async_check_and_set(const std::string &hash_key,
const std::string &check_sort_key,
cas_check_type check_type,
const std::string &check_operand,
const std::string &set_sort_key,
const std::string &set_value,
const check_and_set_options &options,
async_check_and_set_callback_t &&callback = nullptr,
int timeout_milliseconds = 5000) override;
virtual int ttl(const std::string &hashkey, virtual int ttl(const std::string &hashkey,
const std::string &sortkey, const std::string &sortkey,
int &ttl_seconds, int &ttl_seconds,
......
...@@ -12,12 +12,18 @@ rm -rf $TMP_DIR ...@@ -12,12 +12,18 @@ rm -rf $TMP_DIR
mkdir -p $TMP_DIR mkdir -p $TMP_DIR
sh $DSN_ROOT/bin/dsn.cg.sh rrdb.thrift cpp $TMP_DIR sh $DSN_ROOT/bin/dsn.cg.sh rrdb.thrift cpp $TMP_DIR
cp -v $TMP_DIR/rrdb.types.h ../include/rrdb/ cp -v $TMP_DIR/rrdb.types.h ../include/rrdb/
cp -v $TMP_DIR/rrdb.code.definition.h ../include/rrdb/ #cp -v $TMP_DIR/rrdb.code.definition.h ../include/rrdb/
cp -v $TMP_DIR/rrdb.client.h ../include/rrdb/ #cp -v $TMP_DIR/rrdb.client.h ../include/rrdb/
#sed 's/# include "rrdb.code.definition.h"/# include <rrdb\/rrdb.code.definition.h>/' $TMP_DIR/rrdb.server.h > ../include/rrdb/rrdb.server.h
sed 's/#include "dsn_types.h"/#include <dsn\/service_api_cpp.h>/' $TMP_DIR/rrdb_types.h > ../include/rrdb/rrdb_types.h sed 's/#include "dsn_types.h"/#include <dsn\/service_api_cpp.h>/' $TMP_DIR/rrdb_types.h > ../include/rrdb/rrdb_types.h
sed 's/# include "rrdb.code.definition.h"/# include <rrdb\/rrdb.code.definition.h>/' $TMP_DIR/rrdb.server.h > ../include/rrdb/rrdb.server.h
sed 's/#include "rrdb_types.h"/#include <rrdb\/rrdb_types.h>/' $TMP_DIR/rrdb_types.cpp > ../base/rrdb_types.cpp sed 's/#include "rrdb_types.h"/#include <rrdb\/rrdb_types.h>/' $TMP_DIR/rrdb_types.cpp > ../base/rrdb_types.cpp
rm -rf $TMP_DIR rm -rf $TMP_DIR
echo
echo "You should manually modify these files:"
echo " src/include/rrdb/rrdb.code.definition.h"
echo " src/include/rrdb/rrdb.client.h"
echo " src/include/rrdb/rrdb.server.h"
echo
echo "done" echo "done"
...@@ -10,6 +10,36 @@ enum filter_type ...@@ -10,6 +10,36 @@ enum filter_type
FT_MATCH_POSTFIX FT_MATCH_POSTFIX
} }
enum cas_check_type
{
CT_NO_CHECK,
// (1~4) appearance
CT_VALUE_NOT_EXIST, // value is not exist
CT_VALUE_NOT_EXIST_OR_EMPTY, // value is not exist or value is empty
CT_VALUE_EXIST, // value is exist
CT_VALUE_NOT_EMPTY, // value is exist and not empty
// (5~7) match
CT_VALUE_MATCH_ANYWHERE, // operand matches anywhere in value
CT_VALUE_MATCH_PREFIX, // operand matches prefix in value
CT_VALUE_MATCH_POSTFIX, // operand matches postfix in value
// (8~12) bytes compare
CT_VALUE_BYTES_LESS, // bytes compare: value < operand
CT_VALUE_BYTES_LESS_OR_EQUAL, // bytes compare: value <= operand
CT_VALUE_BYTES_EQUAL, // bytes compare: value == operand
CT_VALUE_BYTES_GREATER_OR_EQUAL, // bytes compare: value >= operand
CT_VALUE_BYTES_GREATER, // bytes compare: value > operand
// (13~17) int compare: first transfer bytes to int64 by atoi(); then compare by int value
CT_VALUE_INT_LESS, // int compare: value < operand
CT_VALUE_INT_LESS_OR_EQUAL, // int compare: value <= operand
CT_VALUE_INT_EQUAL, // int compare: value == operand
CT_VALUE_INT_GREATER_OR_EQUAL, // int compare: value >= operand
CT_VALUE_INT_GREATER // int compare: value > operand
}
struct update_request struct update_request
{ {
1:dsn.blob key; 1:dsn.blob key;
...@@ -124,6 +154,33 @@ struct incr_response ...@@ -124,6 +154,33 @@ struct incr_response
6:string server; 6:string server;
} }
struct check_and_set_request
{
1:dsn.blob hash_key;
2:dsn.blob check_sort_key;
3:cas_check_type check_type;
4:dsn.blob check_operand;
5:bool set_diff_sort_key; // if set different sort key with check_sort_key
6:dsn.blob set_sort_key; // used only if set_diff_sort_key is true
7:dsn.blob set_value;
8:i32 set_expire_ts_seconds;
9:bool return_check_value;
}
struct check_and_set_response
{
1:i32 error; // return kTryAgain if check not passed.
// return kInvalidArgument if check type is int compare and
// check_operand/check_value is not integer or out of range.
2:bool check_value_returned;
3:bool check_value_exist; // used only if check_value_returned is true
4:dsn.blob check_value; // used only if check_value_returned and check_value_exist is true
5:i32 app_id;
6:i32 partition_index;
7:i64 decree;
8:string server;
}
struct get_scanner_request struct get_scanner_request
{ {
1:dsn.blob start_key; 1:dsn.blob start_key;
...@@ -160,6 +217,7 @@ service rrdb ...@@ -160,6 +217,7 @@ service rrdb
update_response remove(1:dsn.blob key); update_response remove(1:dsn.blob key);
multi_remove_response multi_remove(1:multi_remove_request request); multi_remove_response multi_remove(1:multi_remove_request request);
incr_response incr(1:incr_request request); incr_response incr(1:incr_request request);
check_and_set_response check_and_set(1:check_and_set_request request);
read_response get(1:dsn.blob key); read_response get(1:dsn.blob key);
multi_get_response multi_get(1:multi_get_request request); multi_get_response multi_get(1:multi_get_request request);
count_response sortkey_count(1:dsn.blob hash_key); count_response sortkey_count(1:dsn.blob hash_key);
......
...@@ -99,6 +99,66 @@ public: ...@@ -99,6 +99,66 @@ public:
} }
}; };
enum cas_check_type
{
CT_NO_CHECK = 0,
// appearance
CT_VALUE_NOT_EXIST = 1, // value is not exist
CT_VALUE_NOT_EXIST_OR_EMPTY = 2, // value is not exist or value is empty
CT_VALUE_EXIST = 3, // value is exist
CT_VALUE_NOT_EMPTY = 4, // value is exist and not empty
// match
CT_VALUE_MATCH_ANYWHERE = 5, // operand matches anywhere in value
CT_VALUE_MATCH_PREFIX = 6, // operand matches prefix in value
CT_VALUE_MATCH_POSTFIX = 7, // operand matches postfix in value
// bytes compare
CT_VALUE_BYTES_LESS = 8, // bytes compare: value < operand
CT_VALUE_BYTES_LESS_OR_EQUAL = 9, // bytes compare: value <= operand
CT_VALUE_BYTES_EQUAL = 10, // bytes compare: value == operand
CT_VALUE_BYTES_GREATER_OR_EQUAL = 11, // bytes compare: value >= operand
CT_VALUE_BYTES_GREATER = 12, // bytes compare: value > operand
// int compare: first transfer bytes to int64 by atoi(); then compare by int value
CT_VALUE_INT_LESS = 13, // int compare: value < operand
CT_VALUE_INT_LESS_OR_EQUAL = 14, // int compare: value <= operand
CT_VALUE_INT_EQUAL = 15, // int compare: value == operand
CT_VALUE_INT_GREATER_OR_EQUAL = 16, // int compare: value >= operand
CT_VALUE_INT_GREATER = 17 // int compare: value > operand
};
struct check_and_set_options
{
int set_value_ttl_seconds; // time to live in seconds of the set value, 0 means no ttl.
bool return_check_value; // if return the check value in results.
check_and_set_options() : set_value_ttl_seconds(0), return_check_value(false) {}
check_and_set_options(const check_and_set_options &o)
: set_value_ttl_seconds(o.set_value_ttl_seconds),
return_check_value(o.return_check_value)
{
}
};
struct check_and_set_results
{
bool set_succeed; // if set value succeed.
bool check_value_returned; // if the check value is returned.
bool check_value_exist; // can be used only when check_value_returned is true.
std::string check_value; // can be used only when check_value_exist is true.
check_and_set_results()
: set_succeed(false), check_value_returned(false), check_value_exist(false)
{
}
check_and_set_results(const check_and_set_results &o)
: set_succeed(o.set_succeed),
check_value_returned(o.check_value_returned),
check_value_exist(o.check_value_exist)
{
}
};
struct scan_options struct scan_options
{ {
int timeout_ms; // RPC call timeout param, in milliseconds int timeout_ms; // RPC call timeout param, in milliseconds
...@@ -157,6 +217,9 @@ public: ...@@ -157,6 +217,9 @@ public:
typedef std::function<void( typedef std::function<void(
int /*error_code*/, int64_t /*new_value*/, internal_info && /*info*/)> int /*error_code*/, int64_t /*new_value*/, internal_info && /*info*/)>
async_incr_callback_t; async_incr_callback_t;
typedef std::function<void(
int /*error_code*/, check_and_set_results && /*results*/, internal_info && /*info*/)>
async_check_and_set_callback_t;
typedef std::function<void(int /*error_code*/, typedef std::function<void(int /*error_code*/,
std::string && /*hash_key*/, std::string && /*hash_key*/,
std::string && /*sort_key*/, std::string && /*sort_key*/,
...@@ -683,7 +746,7 @@ public: ...@@ -683,7 +746,7 @@ public:
/// ///
/// \brief incr /// \brief incr
/// increment value by key from the cluster. /// atomically increment value by key from the cluster.
/// key is composed of hashkey and sortkey. must provide both to get the value. /// key is composed of hashkey and sortkey. must provide both to get the value.
/// the increment semantic is the same as redis: /// the increment semantic is the same as redis:
/// - if old data is not found or empty, then set initial value to 0. /// - if old data is not found or empty, then set initial value to 0.
...@@ -700,6 +763,8 @@ public: ...@@ -700,6 +763,8 @@ public:
/// all the k-v under hashkey will be sorted by sortkey. /// all the k-v under hashkey will be sorted by sortkey.
/// \param increment /// \param increment
/// the value we want to increment. /// the value we want to increment.
/// \param new_value
/// out param to return the new value if increment succeed.
/// \param timeout_milliseconds /// \param timeout_milliseconds
/// if wait longer than this value, will return time out error /// if wait longer than this value, will return time out error
/// \return /// \return
...@@ -715,12 +780,14 @@ public: ...@@ -715,12 +780,14 @@ public:
/// ///
/// \brief asynchronous incr /// \brief asynchronous incr
/// increment value by key from the cluster. /// atomically increment value by key from the cluster.
/// will not be blocked, return immediately. /// will not be blocked, return immediately.
/// \param hashkey /// \param hashkey
/// used to decide which partition to get this k-v /// used to decide which partition to get this k-v
/// \param sortkey /// \param sortkey
/// all the k-v under hashkey will be sorted by sortkey. /// all the k-v under hashkey will be sorted by sortkey.
/// \param increment
/// the value we want to increment.
/// \param callback /// \param callback
/// the callback function will be invoked after operation finished or error occurred. /// the callback function will be invoked after operation finished or error occurred.
/// \param timeout_milliseconds /// \param timeout_milliseconds
...@@ -734,6 +801,81 @@ public: ...@@ -734,6 +801,81 @@ public:
async_incr_callback_t &&callback = nullptr, async_incr_callback_t &&callback = nullptr,
int timeout_milliseconds = 5000) = 0; int timeout_milliseconds = 5000) = 0;
///
/// \brief check_and_set
/// atomically check and set value by key from the cluster.
/// the value will be set if and only if check passed.
/// the sort key for checking and setting can be the same or different.
/// \param hash_key
/// used to decide which partition to get this k-v
/// \param check_sort_key
/// the sort key to check.
/// \param check_type
/// the check type.
/// \param check_operand
/// the check operand.
/// \param set_sort_key
/// the sort key to set value if check passed.
/// \param set_value
/// the value to set if check passed.
/// \param options
/// the check-and-set options.
/// \param results
/// the check-and-set results.
/// \param timeout_milliseconds
/// if wait longer than this value, will return time out error
/// \return
/// int, the error indicates whether or not the operation is succeeded.
/// this error can be converted to a string using get_error_string().
/// if check type is int compare, and check_operand/check_value is not integer
/// or out of range, then return PERR_INVALID_ARGUMENT.
///
virtual int check_and_set(const std::string &hash_key,
const std::string &check_sort_key,
cas_check_type check_type,
const std::string &check_operand,
const std::string &set_sort_key,
const std::string &set_value,
const check_and_set_options &options,
check_and_set_results &results,
int timeout_milliseconds = 5000,
internal_info *info = nullptr) = 0;
///
/// \brief asynchronous check_and_set
/// atomically check and set value by key from the cluster.
/// will not be blocked, return immediately.
/// \param hash_key
/// used to decide which partition to get this k-v
/// \param check_sort_key
/// the sort key to check.
/// \param check_type
/// the check type.
/// \param check_operand
/// the check operand.
/// \param set_sort_key
/// the sort key to set value if check passed.
/// \param set_value
/// the value to set if check passed.
/// \param options
/// the check-and-set options.
/// \param callback
/// the callback function will be invoked after operation finished or error occurred.
/// \param timeout_milliseconds
/// if wait longer than this value, will return time out error
/// \return
/// void.
///
virtual void async_check_and_set(const std::string &hash_key,
const std::string &check_sort_key,
cas_check_type check_type,
const std::string &check_operand,
const std::string &set_sort_key,
const std::string &set_value,
const check_and_set_options &options,
async_check_and_set_callback_t &&callback = nullptr,
int timeout_milliseconds = 5000) = 0;
/// ///
/// \brief ttl (time to live) /// \brief ttl (time to live)
/// get ttl in seconds of this k-v. /// get ttl in seconds of this k-v.
......
...@@ -233,6 +233,50 @@ public: ...@@ -233,6 +233,50 @@ public:
reply_thread_hash); reply_thread_hash);
} }
// ---------- call RPC_RRDB_RRDB_CHECK_AND_SET ------------
// - synchronous
std::pair<::dsn::error_code, check_and_set_response>
check_and_set_sync(const check_and_set_request &args,
std::chrono::milliseconds timeout = std::chrono::milliseconds(0),
int thread_hash = 0, // if thread_hash == 0 && partition_hash != 0,
// thread_hash is computed from partition_hash
uint64_t partition_hash = 0,
dsn::optional<::dsn::rpc_address> server_addr = dsn::none)
{
return ::dsn::rpc::wait_and_unwrap<check_and_set_response>(
::dsn::rpc::call(server_addr.unwrap_or(_server),
RPC_RRDB_RRDB_CHECK_AND_SET,
args,
&_tracker,
empty_rpc_handler,
timeout,
thread_hash,
partition_hash));
}
// - asynchronous with on-stack check_and_set_request and check_and_set_response
template <typename TCallback>
::dsn::task_ptr check_and_set(const check_and_set_request &args,
TCallback &&callback,
std::chrono::milliseconds timeout = std::chrono::milliseconds(0),
int request_thread_hash = 0, // if thread_hash == 0 &&
// partition_hash != 0, thread_hash
// is computed from partition_hash
uint64_t request_partition_hash = 0,
int reply_thread_hash = 0,
dsn::optional<::dsn::rpc_address> server_addr = dsn::none)
{
return ::dsn::rpc::call(server_addr.unwrap_or(_server),
RPC_RRDB_RRDB_CHECK_AND_SET,
args,
&_tracker,
std::forward<TCallback>(callback),
timeout,
request_thread_hash,
request_partition_hash,
reply_thread_hash);
}
// ---------- call RPC_RRDB_RRDB_GET ------------ // ---------- call RPC_RRDB_RRDB_GET ------------
// - synchronous // - synchronous
std::pair<::dsn::error_code, read_response> std::pair<::dsn::error_code, read_response>
......
...@@ -8,6 +8,7 @@ DEFINE_STORAGE_WRITE_RPC_CODE(RPC_RRDB_RRDB_MULTI_PUT, NOT_ALLOW_BATCH, IS_IDEMP ...@@ -8,6 +8,7 @@ DEFINE_STORAGE_WRITE_RPC_CODE(RPC_RRDB_RRDB_MULTI_PUT, NOT_ALLOW_BATCH, IS_IDEMP
DEFINE_STORAGE_WRITE_RPC_CODE(RPC_RRDB_RRDB_REMOVE, ALLOW_BATCH, IS_IDEMPOTENT) DEFINE_STORAGE_WRITE_RPC_CODE(RPC_RRDB_RRDB_REMOVE, ALLOW_BATCH, IS_IDEMPOTENT)
DEFINE_STORAGE_WRITE_RPC_CODE(RPC_RRDB_RRDB_MULTI_REMOVE, NOT_ALLOW_BATCH, IS_IDEMPOTENT) DEFINE_STORAGE_WRITE_RPC_CODE(RPC_RRDB_RRDB_MULTI_REMOVE, NOT_ALLOW_BATCH, IS_IDEMPOTENT)
DEFINE_STORAGE_WRITE_RPC_CODE(RPC_RRDB_RRDB_INCR, NOT_ALLOW_BATCH, NOT_IDEMPOTENT) DEFINE_STORAGE_WRITE_RPC_CODE(RPC_RRDB_RRDB_INCR, NOT_ALLOW_BATCH, NOT_IDEMPOTENT)
DEFINE_STORAGE_WRITE_RPC_CODE(RPC_RRDB_RRDB_CHECK_AND_SET, NOT_ALLOW_BATCH, NOT_IDEMPOTENT)
DEFINE_STORAGE_READ_RPC_CODE(RPC_RRDB_RRDB_GET) DEFINE_STORAGE_READ_RPC_CODE(RPC_RRDB_RRDB_GET)
DEFINE_STORAGE_READ_RPC_CODE(RPC_RRDB_RRDB_MULTI_GET) DEFINE_STORAGE_READ_RPC_CODE(RPC_RRDB_RRDB_MULTI_GET)
DEFINE_STORAGE_READ_RPC_CODE(RPC_RRDB_RRDB_SORTKEY_COUNT) DEFINE_STORAGE_READ_RPC_CODE(RPC_RRDB_RRDB_SORTKEY_COUNT)
......
...@@ -57,6 +57,14 @@ protected: ...@@ -57,6 +57,14 @@ protected:
incr_response resp; incr_response resp;
reply(resp); reply(resp);
} }
// RPC_RRDB_RRDB_CHECK_AND_SET
virtual void on_check_and_set(const check_and_set_request &args,
::dsn::rpc_replier<check_and_set_response> &reply)
{
std::cout << "... exec RPC_RRDB_RRDB_CHECK_AND_SET ... (not implemented) " << std::endl;
check_and_set_response resp;
reply(resp);
}
// RPC_RRDB_RRDB_GET // RPC_RRDB_RRDB_GET
virtual void on_get(const ::dsn::blob &args, ::dsn::rpc_replier<read_response> &reply) virtual void on_get(const ::dsn::blob &args, ::dsn::rpc_replier<read_response> &reply)
{ {
...@@ -114,6 +122,7 @@ protected: ...@@ -114,6 +122,7 @@ protected:
register_async_rpc_handler(RPC_RRDB_RRDB_MULTI_PUT, "multi_put", on_multi_put); register_async_rpc_handler(RPC_RRDB_RRDB_MULTI_PUT, "multi_put", on_multi_put);
register_async_rpc_handler(RPC_RRDB_RRDB_REMOVE, "remove", on_multi_remove); register_async_rpc_handler(RPC_RRDB_RRDB_REMOVE, "remove", on_multi_remove);
register_async_rpc_handler(RPC_RRDB_RRDB_INCR, "incr", on_incr); register_async_rpc_handler(RPC_RRDB_RRDB_INCR, "incr", on_incr);
register_async_rpc_handler(RPC_RRDB_RRDB_CHECK_AND_SET, "check_and_set", on_check_and_set);
register_async_rpc_handler(RPC_RRDB_RRDB_GET, "get", on_get); register_async_rpc_handler(RPC_RRDB_RRDB_GET, "get", on_get);
register_async_rpc_handler(RPC_RRDB_RRDB_MULTI_GET, "multi_get", on_multi_get); register_async_rpc_handler(RPC_RRDB_RRDB_MULTI_GET, "multi_get", on_multi_get);
register_async_rpc_handler(RPC_RRDB_RRDB_SORTKEY_COUNT, "sortkey_count", on_sortkey_count); register_async_rpc_handler(RPC_RRDB_RRDB_SORTKEY_COUNT, "sortkey_count", on_sortkey_count);
...@@ -153,6 +162,12 @@ private: ...@@ -153,6 +162,12 @@ private:
{ {
svc->on_incr(args, reply); svc->on_incr(args, reply);
} }
static void on_check_and_set(rrdb_service *svc,
const check_and_set_request &args,
::dsn::rpc_replier<check_and_set_response> &reply)
{
svc->on_check_and_set(args, reply);
}
static void static void
on_get(rrdb_service *svc, const ::dsn::blob &args, ::dsn::rpc_replier<read_response> &reply) on_get(rrdb_service *svc, const ::dsn::blob &args, ::dsn::rpc_replier<read_response> &reply)
{ {
......
...@@ -19,6 +19,8 @@ GENERATED_TYPE_SERIALIZATION(multi_get_request, THRIFT) ...@@ -19,6 +19,8 @@ GENERATED_TYPE_SERIALIZATION(multi_get_request, THRIFT)
GENERATED_TYPE_SERIALIZATION(multi_get_response, THRIFT) GENERATED_TYPE_SERIALIZATION(multi_get_response, THRIFT)
GENERATED_TYPE_SERIALIZATION(incr_request, THRIFT) GENERATED_TYPE_SERIALIZATION(incr_request, THRIFT)
GENERATED_TYPE_SERIALIZATION(incr_response, THRIFT) GENERATED_TYPE_SERIALIZATION(incr_response, THRIFT)
GENERATED_TYPE_SERIALIZATION(check_and_set_request, THRIFT)
GENERATED_TYPE_SERIALIZATION(check_and_set_response, THRIFT)
GENERATED_TYPE_SERIALIZATION(get_scanner_request, THRIFT) GENERATED_TYPE_SERIALIZATION(get_scanner_request, THRIFT)
GENERATED_TYPE_SERIALIZATION(scan_request, THRIFT) GENERATED_TYPE_SERIALIZATION(scan_request, THRIFT)
GENERATED_TYPE_SERIALIZATION(scan_response, THRIFT) GENERATED_TYPE_SERIALIZATION(scan_response, THRIFT)
......
...@@ -33,6 +33,33 @@ struct filter_type ...@@ -33,6 +33,33 @@ struct filter_type
extern const std::map<int, const char *> _filter_type_VALUES_TO_NAMES; extern const std::map<int, const char *> _filter_type_VALUES_TO_NAMES;
struct cas_check_type
{
enum type
{
CT_NO_CHECK = 0,
CT_VALUE_NOT_EXIST = 1,
CT_VALUE_NOT_EXIST_OR_EMPTY = 2,
CT_VALUE_EXIST = 3,
CT_VALUE_NOT_EMPTY = 4,
CT_VALUE_MATCH_ANYWHERE = 5,
CT_VALUE_MATCH_PREFIX = 6,
CT_VALUE_MATCH_POSTFIX = 7,
CT_VALUE_BYTES_LESS = 8,
CT_VALUE_BYTES_LESS_OR_EQUAL = 9,
CT_VALUE_BYTES_EQUAL = 10,
CT_VALUE_BYTES_GREATER_OR_EQUAL = 11,
CT_VALUE_BYTES_GREATER = 12,
CT_VALUE_INT_LESS = 13,
CT_VALUE_INT_LESS_OR_EQUAL = 14,
CT_VALUE_INT_EQUAL = 15,
CT_VALUE_INT_GREATER_OR_EQUAL = 16,
CT_VALUE_INT_GREATER = 17
};
};
extern const std::map<int, const char *> _cas_check_type_VALUES_TO_NAMES;
class update_request; class update_request;
class update_response; class update_response;
...@@ -59,6 +86,10 @@ class incr_request; ...@@ -59,6 +86,10 @@ class incr_request;
class incr_response; class incr_response;
class check_and_set_request;
class check_and_set_response;
class get_scanner_request; class get_scanner_request;
class scan_request; class scan_request;
...@@ -1013,6 +1044,224 @@ inline std::ostream &operator<<(std::ostream &out, const incr_response &obj) ...@@ -1013,6 +1044,224 @@ inline std::ostream &operator<<(std::ostream &out, const incr_response &obj)
return out; return out;
} }
typedef struct _check_and_set_request__isset
{
_check_and_set_request__isset()
: hash_key(false),
check_sort_key(false),
check_type(false),
check_operand(false),
set_diff_sort_key(false),
set_sort_key(false),
set_value(false),
set_expire_ts_seconds(false),
return_check_value(false)
{
}
bool hash_key : 1;
bool check_sort_key : 1;
bool check_type : 1;
bool check_operand : 1;
bool set_diff_sort_key : 1;
bool set_sort_key : 1;
bool set_value : 1;
bool set_expire_ts_seconds : 1;
bool return_check_value : 1;
} _check_and_set_request__isset;
class check_and_set_request
{
public:
check_and_set_request(const check_and_set_request &);
check_and_set_request(check_and_set_request &&);
check_and_set_request &operator=(const check_and_set_request &);
check_and_set_request &operator=(check_and_set_request &&);
check_and_set_request()
: check_type((cas_check_type::type)0),
set_diff_sort_key(0),
set_expire_ts_seconds(0),
return_check_value(0)
{
}
virtual ~check_and_set_request() throw();
::dsn::blob hash_key;
::dsn::blob check_sort_key;
cas_check_type::type check_type;
::dsn::blob check_operand;
bool set_diff_sort_key;
::dsn::blob set_sort_key;
::dsn::blob set_value;
int32_t set_expire_ts_seconds;
bool return_check_value;
_check_and_set_request__isset __isset;
void __set_hash_key(const ::dsn::blob &val);
void __set_check_sort_key(const ::dsn::blob &val);
void __set_check_type(const cas_check_type::type val);
void __set_check_operand(const ::dsn::blob &val);
void __set_set_diff_sort_key(const bool val);
void __set_set_sort_key(const ::dsn::blob &val);
void __set_set_value(const ::dsn::blob &val);
void __set_set_expire_ts_seconds(const int32_t val);
void __set_return_check_value(const bool val);
bool operator==(const check_and_set_request &rhs) const
{
if (!(hash_key == rhs.hash_key))
return false;
if (!(check_sort_key == rhs.check_sort_key))
return false;
if (!(check_type == rhs.check_type))
return false;
if (!(check_operand == rhs.check_operand))
return false;
if (!(set_diff_sort_key == rhs.set_diff_sort_key))
return false;
if (!(set_sort_key == rhs.set_sort_key))
return false;
if (!(set_value == rhs.set_value))
return false;
if (!(set_expire_ts_seconds == rhs.set_expire_ts_seconds))
return false;
if (!(return_check_value == rhs.return_check_value))
return false;
return true;
}
bool operator!=(const check_and_set_request &rhs) const { return !(*this == rhs); }
bool operator<(const check_and_set_request &) const;
uint32_t read(::apache::thrift::protocol::TProtocol *iprot);
uint32_t write(::apache::thrift::protocol::TProtocol *oprot) const;
virtual void printTo(std::ostream &out) const;
};
void swap(check_and_set_request &a, check_and_set_request &b);
inline std::ostream &operator<<(std::ostream &out, const check_and_set_request &obj)
{
obj.printTo(out);
return out;
}
typedef struct _check_and_set_response__isset
{
_check_and_set_response__isset()
: error(false),
check_value_returned(false),
check_value_exist(false),
check_value(false),
app_id(false),
partition_index(false),
decree(false),
server(false)
{
}
bool error : 1;
bool check_value_returned : 1;
bool check_value_exist : 1;
bool check_value : 1;
bool app_id : 1;
bool partition_index : 1;
bool decree : 1;
bool server : 1;
} _check_and_set_response__isset;
class check_and_set_response
{
public:
check_and_set_response(const check_and_set_response &);
check_and_set_response(check_and_set_response &&);
check_and_set_response &operator=(const check_and_set_response &);
check_and_set_response &operator=(check_and_set_response &&);
check_and_set_response()
: error(0),
check_value_returned(0),
check_value_exist(0),
app_id(0),
partition_index(0),
decree(0),
server()
{
}
virtual ~check_and_set_response() throw();
int32_t error;
bool check_value_returned;
bool check_value_exist;
::dsn::blob check_value;
int32_t app_id;
int32_t partition_index;
int64_t decree;
std::string server;
_check_and_set_response__isset __isset;
void __set_error(const int32_t val);
void __set_check_value_returned(const bool val);
void __set_check_value_exist(const bool val);
void __set_check_value(const ::dsn::blob &val);
void __set_app_id(const int32_t val);
void __set_partition_index(const int32_t val);
void __set_decree(const int64_t val);
void __set_server(const std::string &val);
bool operator==(const check_and_set_response &rhs) const
{
if (!(error == rhs.error))
return false;
if (!(check_value_returned == rhs.check_value_returned))
return false;
if (!(check_value_exist == rhs.check_value_exist))
return false;
if (!(check_value == rhs.check_value))
return false;
if (!(app_id == rhs.app_id))
return false;
if (!(partition_index == rhs.partition_index))
return false;
if (!(decree == rhs.decree))
return false;
if (!(server == rhs.server))
return false;
return true;
}
bool operator!=(const check_and_set_response &rhs) const { return !(*this == rhs); }
bool operator<(const check_and_set_response &) const;
uint32_t read(::apache::thrift::protocol::TProtocol *iprot);
uint32_t write(::apache::thrift::protocol::TProtocol *oprot) const;
virtual void printTo(std::ostream &out) const;
};
void swap(check_and_set_response &a, check_and_set_response &b);
inline std::ostream &operator<<(std::ostream &out, const check_and_set_response &obj)
{
obj.printTo(out);
return out;
}
typedef struct _get_scanner_request__isset typedef struct _get_scanner_request__isset
{ {
_get_scanner_request__isset() _get_scanner_request__isset()
......
...@@ -89,6 +89,7 @@ void info_collector::on_app_stat() ...@@ -89,6 +89,7 @@ void info_collector::on_app_stat()
all.remove_qps += row.remove_qps; all.remove_qps += row.remove_qps;
all.multi_remove_qps += row.multi_remove_qps; all.multi_remove_qps += row.multi_remove_qps;
all.incr_qps += row.incr_qps; all.incr_qps += row.incr_qps;
all.check_and_set_qps += row.check_and_set_qps;
all.scan_qps += row.scan_qps; all.scan_qps += row.scan_qps;
all.recent_expire_count += row.recent_expire_count; all.recent_expire_count += row.recent_expire_count;
all.recent_filter_count += row.recent_filter_count; all.recent_filter_count += row.recent_filter_count;
...@@ -97,11 +98,12 @@ void info_collector::on_app_stat() ...@@ -97,11 +98,12 @@ void info_collector::on_app_stat()
all.storage_count += row.storage_count; all.storage_count += row.storage_count;
read_qps[i] = row.get_qps + row.multi_get_qps + row.scan_qps; read_qps[i] = row.get_qps + row.multi_get_qps + row.scan_qps;
write_qps[i] = row.put_qps + row.multi_put_qps + row.remove_qps + row.multi_remove_qps + write_qps[i] = row.put_qps + row.multi_put_qps + row.remove_qps + row.multi_remove_qps +
row.incr_qps; row.incr_qps + row.check_and_set_qps;
} }
read_qps[read_qps.size() - 1] = all.get_qps + all.multi_get_qps + all.scan_qps; read_qps[read_qps.size() - 1] = all.get_qps + all.multi_get_qps + all.scan_qps;
write_qps[read_qps.size() - 1] = write_qps[read_qps.size() - 1] = all.put_qps + all.multi_put_qps + all.remove_qps +
all.put_qps + all.multi_put_qps + all.remove_qps + all.multi_remove_qps + all.incr_qps; all.multi_remove_qps + all.incr_qps +
all.check_and_set_qps;
for (int i = 0; i < rows.size(); ++i) { for (int i = 0; i < rows.size(); ++i) {
row_data &row = rows[i]; row_data &row = rows[i];
AppStatCounters *counters = get_app_counters(row.row_name); AppStatCounters *counters = get_app_counters(row.row_name);
...@@ -112,6 +114,7 @@ void info_collector::on_app_stat() ...@@ -112,6 +114,7 @@ void info_collector::on_app_stat()
counters->remove_qps->set(row.remove_qps); counters->remove_qps->set(row.remove_qps);
counters->multi_remove_qps->set(row.multi_remove_qps); counters->multi_remove_qps->set(row.multi_remove_qps);
counters->incr_qps->set(row.incr_qps); counters->incr_qps->set(row.incr_qps);
counters->check_and_set_qps->set(row.check_and_set_qps);
counters->scan_qps->set(row.scan_qps); counters->scan_qps->set(row.scan_qps);
counters->recent_expire_count->set(row.recent_expire_count); counters->recent_expire_count->set(row.recent_expire_count);
counters->recent_filter_count->set(row.recent_filter_count); counters->recent_filter_count->set(row.recent_filter_count);
...@@ -153,6 +156,7 @@ info_collector::AppStatCounters *info_collector::get_app_counters(const std::str ...@@ -153,6 +156,7 @@ info_collector::AppStatCounters *info_collector::get_app_counters(const std::str
INIT_COUNER(remove_qps); INIT_COUNER(remove_qps);
INIT_COUNER(multi_remove_qps); INIT_COUNER(multi_remove_qps);
INIT_COUNER(incr_qps); INIT_COUNER(incr_qps);
INIT_COUNER(check_and_set_qps);
INIT_COUNER(scan_qps); INIT_COUNER(scan_qps);
INIT_COUNER(recent_expire_count); INIT_COUNER(recent_expire_count);
INIT_COUNER(recent_filter_count); INIT_COUNER(recent_filter_count);
......
...@@ -35,6 +35,7 @@ public: ...@@ -35,6 +35,7 @@ public:
::dsn::perf_counter_wrapper remove_qps; ::dsn::perf_counter_wrapper remove_qps;
::dsn::perf_counter_wrapper multi_remove_qps; ::dsn::perf_counter_wrapper multi_remove_qps;
::dsn::perf_counter_wrapper incr_qps; ::dsn::perf_counter_wrapper incr_qps;
::dsn::perf_counter_wrapper check_and_set_qps;
::dsn::perf_counter_wrapper scan_qps; ::dsn::perf_counter_wrapper scan_qps;
::dsn::perf_counter_wrapper recent_expire_count; ::dsn::perf_counter_wrapper recent_expire_count;
::dsn::perf_counter_wrapper recent_filter_count; ::dsn::perf_counter_wrapper recent_filter_count;
......
...@@ -2021,44 +2021,30 @@ pegasus_server_impl::storage_apply_checkpoint(chkpt_apply_mode mode, ...@@ -2021,44 +2021,30 @@ pegasus_server_impl::storage_apply_checkpoint(chkpt_apply_mode mode,
return ::dsn::ERR_OK; return ::dsn::ERR_OK;
} }
bool pegasus_server_impl::is_filter_type_supported(::dsn::apps::filter_type::type filter_type)
{
return filter_type >= ::dsn::apps::filter_type::FT_NO_FILTER &&
filter_type <= ::dsn::apps::filter_type::FT_MATCH_POSTFIX;
}
bool pegasus_server_impl::validate_filter(::dsn::apps::filter_type::type filter_type, bool pegasus_server_impl::validate_filter(::dsn::apps::filter_type::type filter_type,
const ::dsn::blob &filter_pattern, const ::dsn::blob &filter_pattern,
const ::dsn::blob &value) const ::dsn::blob &value)
{ {
if (filter_type == ::dsn::apps::filter_type::FT_NO_FILTER || filter_pattern.length() == 0)
return true;
if (value.length() < filter_pattern.length())
return false;
switch (filter_type) { switch (filter_type) {
case ::dsn::apps::filter_type::FT_MATCH_ANYWHERE: { case ::dsn::apps::filter_type::FT_NO_FILTER:
// brute force search return true;
// TODO: improve it according to case ::dsn::apps::filter_type::FT_MATCH_ANYWHERE:
// http://old.blog.phusion.nl/2010/12/06/efficient-substring-searching/ case ::dsn::apps::filter_type::FT_MATCH_PREFIX:
const char *a1 = value.data(); case ::dsn::apps::filter_type::FT_MATCH_POSTFIX: {
int l1 = value.length(); if (filter_pattern.length() == 0)
const char *a2 = filter_pattern.data(); return true;
int l2 = filter_pattern.length(); if (value.length() < filter_pattern.length())
for (int i = 0; i <= l1 - l2; ++i) { return false;
int j = 0; if (filter_type == ::dsn::apps::filter_type::FT_MATCH_ANYWHERE) {
while (j < l2 && a1[i + j] == a2[j]) return dsn::string_view(value).find(filter_pattern) != dsn::string_view::npos;
++j; } else if (filter_type == ::dsn::apps::filter_type::FT_MATCH_PREFIX) {
if (j == l2) return ::memcmp(value.data(), filter_pattern.data(), filter_pattern.length()) == 0;
return true; } else { // filter_type == ::dsn::apps::filter_type::FT_MATCH_POSTFIX
return ::memcmp(value.data() + value.length() - filter_pattern.length(),
filter_pattern.data(),
filter_pattern.length()) == 0;
} }
return false;
} }
case ::dsn::apps::filter_type::FT_MATCH_PREFIX:
return (memcmp(value.data(), filter_pattern.data(), filter_pattern.length()) == 0);
case ::dsn::apps::filter_type::FT_MATCH_POSTFIX:
return (memcmp(value.data() + value.length() - filter_pattern.length(),
filter_pattern.data(),
filter_pattern.length()) == 0);
default: default:
dassert(false, "unsupported filter type: %d", filter_type); dassert(false, "unsupported filter type: %d", filter_type);
} }
......
...@@ -187,7 +187,11 @@ private: ...@@ -187,7 +187,11 @@ private:
bool no_value); bool no_value);
// return true if the filter type is supported // return true if the filter type is supported
bool is_filter_type_supported(::dsn::apps::filter_type::type filter_type); bool is_filter_type_supported(::dsn::apps::filter_type::type filter_type)
{
return filter_type >= ::dsn::apps::filter_type::FT_NO_FILTER &&
filter_type <= ::dsn::apps::filter_type::FT_MATCH_POSTFIX;
}
// return true if the data is valid for the filter // return true if the data is valid for the filter
bool validate_filter(::dsn::apps::filter_type::type filter_type, bool validate_filter(::dsn::apps::filter_type::type filter_type,
......
...@@ -49,6 +49,11 @@ int pegasus_server_write::on_batched_write_requests(dsn_message_t *requests, ...@@ -49,6 +49,11 @@ int pegasus_server_write::on_batched_write_requests(dsn_message_t *requests,
auto rpc = incr_rpc::auto_reply(requests[0]); auto rpc = incr_rpc::auto_reply(requests[0]);
return _write_svc->incr(_decree, rpc.request(), rpc.response()); return _write_svc->incr(_decree, rpc.request(), rpc.response());
} }
if (rpc_code == dsn::apps::RPC_RRDB_RRDB_CHECK_AND_SET) {
dassert(count == 1, "count = %d", count);
auto rpc = check_and_set_rpc::auto_reply(requests[0]);
return _write_svc->check_and_set(_decree, rpc.request(), rpc.response());
}
return on_batched_writes(requests, count); return on_batched_writes(requests, count);
} }
......
...@@ -37,6 +37,12 @@ pegasus_write_service::pegasus_write_service(pegasus_server_impl *server) ...@@ -37,6 +37,12 @@ pegasus_write_service::pegasus_write_service(pegasus_server_impl *server)
_pfc_incr_qps.init_app_counter( _pfc_incr_qps.init_app_counter(
"app.pegasus", name.c_str(), COUNTER_TYPE_RATE, "statistic the qps of INCR request"); "app.pegasus", name.c_str(), COUNTER_TYPE_RATE, "statistic the qps of INCR request");
name = fmt::format("check_and_set_qps@{}", str_gpid);
_pfc_check_and_set_qps.init_app_counter("app.pegasus",
name.c_str(),
COUNTER_TYPE_RATE,
"statistic the qps of CHECK_AND_SET request");
name = fmt::format("put_latency@{}", str_gpid); name = fmt::format("put_latency@{}", str_gpid);
_pfc_put_latency.init_app_counter("app.pegasus", _pfc_put_latency.init_app_counter("app.pegasus",
name.c_str(), name.c_str(),
...@@ -66,6 +72,12 @@ pegasus_write_service::pegasus_write_service(pegasus_server_impl *server) ...@@ -66,6 +72,12 @@ pegasus_write_service::pegasus_write_service(pegasus_server_impl *server)
name.c_str(), name.c_str(),
COUNTER_TYPE_NUMBER_PERCENTILES, COUNTER_TYPE_NUMBER_PERCENTILES,
"statistic the latency of INCR request"); "statistic the latency of INCR request");
name = fmt::format("check_and_set_latency@{}", str_gpid);
_pfc_check_and_set_latency.init_app_counter("app.pegasus",
name.c_str(),
COUNTER_TYPE_NUMBER_PERCENTILES,
"statistic the latency of CHECK_AND_SET request");
} }
pegasus_write_service::~pegasus_write_service() {} pegasus_write_service::~pegasus_write_service() {}
...@@ -105,6 +117,17 @@ int pegasus_write_service::incr(int64_t decree, ...@@ -105,6 +117,17 @@ int pegasus_write_service::incr(int64_t decree,
return err; return err;
} }
int pegasus_write_service::check_and_set(int64_t decree,
const dsn::apps::check_and_set_request &update,
dsn::apps::check_and_set_response &resp)
{
uint64_t start_time = dsn_now_ns();
_pfc_check_and_set_qps->increment();
int err = _impl->check_and_set(decree, update, resp);
_pfc_check_and_set_latency->set(dsn_now_ns() - start_time);
return err;
}
void pegasus_write_service::batch_prepare(int64_t decree) void pegasus_write_service::batch_prepare(int64_t decree)
{ {
dassert(_batch_start_time == 0, dassert(_batch_start_time == 0,
......
...@@ -45,6 +45,11 @@ public: ...@@ -45,6 +45,11 @@ public:
// Write INCR record. // Write INCR record.
int incr(int64_t decree, const dsn::apps::incr_request &update, dsn::apps::incr_response &resp); int incr(int64_t decree, const dsn::apps::incr_request &update, dsn::apps::incr_response &resp);
// Write CHECK_AND_SET record.
int check_and_set(int64_t decree,
const dsn::apps::check_and_set_request &update,
dsn::apps::check_and_set_response &resp);
/// For batch write. /// For batch write.
/// NOTE: A batch write may incur a database read for consistency check of timetag. /// NOTE: A batch write may incur a database read for consistency check of timetag.
/// (see pegasus::pegasus_value_generator::generate_value_v1 for more info about timetag) /// (see pegasus::pegasus_value_generator::generate_value_v1 for more info about timetag)
...@@ -91,12 +96,14 @@ private: ...@@ -91,12 +96,14 @@ private:
::dsn::perf_counter_wrapper _pfc_remove_qps; ::dsn::perf_counter_wrapper _pfc_remove_qps;
::dsn::perf_counter_wrapper _pfc_multi_remove_qps; ::dsn::perf_counter_wrapper _pfc_multi_remove_qps;
::dsn::perf_counter_wrapper _pfc_incr_qps; ::dsn::perf_counter_wrapper _pfc_incr_qps;
::dsn::perf_counter_wrapper _pfc_check_and_set_qps;
::dsn::perf_counter_wrapper _pfc_put_latency; ::dsn::perf_counter_wrapper _pfc_put_latency;
::dsn::perf_counter_wrapper _pfc_multi_put_latency; ::dsn::perf_counter_wrapper _pfc_multi_put_latency;
::dsn::perf_counter_wrapper _pfc_remove_latency; ::dsn::perf_counter_wrapper _pfc_remove_latency;
::dsn::perf_counter_wrapper _pfc_multi_remove_latency; ::dsn::perf_counter_wrapper _pfc_multi_remove_latency;
::dsn::perf_counter_wrapper _pfc_incr_latency; ::dsn::perf_counter_wrapper _pfc_incr_latency;
::dsn::perf_counter_wrapper _pfc_check_and_set_latency;
// Records all requests. // Records all requests.
std::vector<::dsn::perf_counter *> _batch_qps_perfcounters; std::vector<::dsn::perf_counter *> _batch_qps_perfcounters;
......
...@@ -29,8 +29,9 @@ public: ...@@ -29,8 +29,9 @@ public:
_primary_address(server->_primary_address), _primary_address(server->_primary_address),
_value_schema_version(server->_value_schema_version), _value_schema_version(server->_value_schema_version),
_db(server->_db), _db(server->_db),
_wt_opts(&server->_wt_opts), _wt_opts(server->_wt_opts),
_rd_opts(&server->_rd_opts) _rd_opts(server->_rd_opts),
_pfc_recent_expire_count(server->_pfc_recent_expire_count)
{ {
} }
...@@ -130,11 +131,12 @@ public: ...@@ -130,11 +131,12 @@ public:
uint32_t expire_ts = 0; uint32_t expire_ts = 0;
std::string raw_value; std::string raw_value;
int64_t new_value = 0; int64_t new_value = 0;
rocksdb::Status s = _db->Get(*_rd_opts, raw_key, &raw_value); rocksdb::Status s = _db->Get(_rd_opts, raw_key, &raw_value);
if (s.ok()) { if (s.ok()) {
expire_ts = pegasus_extract_expire_ts(_value_schema_version, raw_value); expire_ts = pegasus_extract_expire_ts(_value_schema_version, raw_value);
if (check_if_ts_expired(utils::epoch_now(), expire_ts)) { if (check_if_ts_expired(utils::epoch_now(), expire_ts)) {
// ttl timeout, set to 0 before increment, and set expire_ts to 0 // ttl timeout, set to 0 before increment, and set expire_ts to 0
_pfc_recent_expire_count->increment();
new_value = update.increment; new_value = update.increment;
expire_ts = 0; expire_ts = 0;
} else { } else {
...@@ -176,7 +178,7 @@ public: ...@@ -176,7 +178,7 @@ public:
// read old value failed // read old value failed
::dsn::blob hash_key, sort_key; ::dsn::blob hash_key, sort_key;
pegasus_restore_key(::dsn::blob(raw_key.data(), 0, raw_key.size()), hash_key, sort_key); pegasus_restore_key(::dsn::blob(raw_key.data(), 0, raw_key.size()), hash_key, sort_key);
derror_rocksdb("IncrGet", derror_rocksdb("Get for Incr",
s.ToString(), s.ToString(),
"decree: {}, hash_key: {}, sort_key: {}", "decree: {}, hash_key: {}, sort_key: {}",
decree, decree,
...@@ -201,6 +203,105 @@ public: ...@@ -201,6 +203,105 @@ public:
return resp.error; return resp.error;
} }
int check_and_set(int64_t decree,
const dsn::apps::check_and_set_request &update,
dsn::apps::check_and_set_response &resp)
{
resp.app_id = get_gpid().get_app_id();
resp.partition_index = get_gpid().get_partition_index();
resp.decree = decree;
resp.server = _primary_address;
if (!is_check_type_supported(update.check_type)) {
derror_replica("invalid argument for check_and_set: decree = {}, error = {}",
decree,
"check type {} not supported",
update.check_type);
resp.error = rocksdb::Status::kInvalidArgument;
// we should write empty record to update rocksdb's last flushed decree
return empty_put(decree);
}
::dsn::blob check_key;
pegasus_generate_key(check_key, update.hash_key, update.check_sort_key);
rocksdb::Slice check_raw_key(check_key.data(), check_key.length());
std::string check_raw_value;
rocksdb::Status s = _db->Get(_rd_opts, check_raw_key, &check_raw_value);
if (s.ok()) {
// read check value succeed
if (check_if_record_expired(
_value_schema_version, utils::epoch_now(), check_raw_value)) {
// check value ttl timeout
_pfc_recent_expire_count->increment();
s = rocksdb::Status::NotFound();
}
} else if (!s.IsNotFound()) {
// read check value failed
derror_rocksdb("GetCheckValue for CheckAndSet",
s.ToString(),
"decree: {}, hash_key: {}, check_sort_key: {}",
decree,
utils::c_escape_string(update.hash_key),
utils::c_escape_string(update.check_sort_key));
resp.error = s.code();
return resp.error;
}
dassert(s.ok() || s.IsNotFound(), "status = %s", s.ToString().c_str());
::dsn::blob check_value;
if (s.ok()) {
pegasus_extract_user_data(
_value_schema_version, std::move(check_raw_value), check_value);
}
bool invalid_argument = false;
bool passed = validate_check(
decree, update.check_type, update.check_operand, s.ok(), check_value, invalid_argument);
if (passed) {
// check passed, write new value
::dsn::blob set_key;
if (update.set_diff_sort_key) {
pegasus_generate_key(set_key, update.hash_key, update.set_sort_key);
} else {
set_key = check_key;
}
resp.error = db_write_batch_put(decree,
set_key,
update.set_value,
static_cast<uint32_t>(update.set_expire_ts_seconds));
} else {
// check not passed, write empty record to update rocksdb's last flushed decree
resp.error = db_write_batch_put(decree, dsn::string_view(), dsn::string_view(), 0);
}
if (resp.error) {
clear_up_batch_states(decree, resp.error);
return resp.error;
}
resp.error = db_write(decree);
if (resp.error) {
clear_up_batch_states(decree, resp.error);
return resp.error;
}
if (!passed) {
// check not passed, return proper error code to user
resp.error =
invalid_argument ? rocksdb::Status::kInvalidArgument : rocksdb::Status::kTryAgain;
}
if (update.return_check_value) {
resp.check_value_returned = true;
if (s.ok()) {
resp.check_value_exist = true;
resp.check_value = std::move(check_value);
}
}
clear_up_batch_states(decree, resp.error);
return 0;
}
/// For batch write. /// For batch write.
int batch_put(int64_t decree, int batch_put(int64_t decree,
...@@ -283,8 +384,8 @@ private: ...@@ -283,8 +384,8 @@ private:
FAIL_POINT_INJECT_F("db_write", [](dsn::string_view) -> int { return FAIL_DB_WRITE; }); FAIL_POINT_INJECT_F("db_write", [](dsn::string_view) -> int { return FAIL_DB_WRITE; });
_wt_opts->given_decree = static_cast<uint64_t>(decree); _wt_opts.given_decree = static_cast<uint64_t>(decree);
auto status = _db->Write(*_wt_opts, &_batch); auto status = _db->Write(_wt_opts, &_batch);
if (!status.ok()) { if (!status.ok()) {
derror_rocksdb("Write", status.ToString(), "decree: {}", decree); derror_rocksdb("Write", status.ToString(), "decree: {}", decree);
} }
...@@ -316,6 +417,111 @@ private: ...@@ -316,6 +417,111 @@ private:
return raw_key; return raw_key;
} }
// return true if the check type is supported
bool is_check_type_supported(::dsn::apps::cas_check_type::type check_type)
{
return check_type >= ::dsn::apps::cas_check_type::CT_NO_CHECK &&
check_type <= ::dsn::apps::cas_check_type::CT_VALUE_INT_GREATER;
}
// return true if check passed.
// for int compare, if check operand or value are not valid integer, then return false,
// and set out param `invalid_argument' to false.
bool validate_check(int64_t decree,
::dsn::apps::cas_check_type::type check_type,
const ::dsn::blob &check_operand,
bool value_exist,
const ::dsn::blob &value,
bool &invalid_argument)
{
invalid_argument = false;
switch (check_type) {
case ::dsn::apps::cas_check_type::CT_NO_CHECK:
return true;
case ::dsn::apps::cas_check_type::CT_VALUE_NOT_EXIST:
return !value_exist;
case ::dsn::apps::cas_check_type::CT_VALUE_NOT_EXIST_OR_EMPTY:
return !value_exist || value.length() == 0;
case ::dsn::apps::cas_check_type::CT_VALUE_EXIST:
return value_exist;
case ::dsn::apps::cas_check_type::CT_VALUE_NOT_EMPTY:
return value_exist && value.length() != 0;
case ::dsn::apps::cas_check_type::CT_VALUE_MATCH_ANYWHERE:
case ::dsn::apps::cas_check_type::CT_VALUE_MATCH_PREFIX:
case ::dsn::apps::cas_check_type::CT_VALUE_MATCH_POSTFIX: {
if (!value_exist)
return false;
if (check_operand.length() == 0)
return true;
if (value.length() < check_operand.length())
return false;
if (check_type == ::dsn::apps::cas_check_type::CT_VALUE_MATCH_ANYWHERE) {
return dsn::string_view(value).find(check_operand) != dsn::string_view::npos;
} else if (check_type == ::dsn::apps::cas_check_type::CT_VALUE_MATCH_PREFIX) {
return ::memcmp(value.data(), check_operand.data(), check_operand.length()) == 0;
} else { // check_type == ::dsn::apps::cas_check_type::CT_VALUE_MATCH_POSTFIX
return ::memcmp(value.data() + value.length() - check_operand.length(),
check_operand.data(),
check_operand.length()) == 0;
}
}
case ::dsn::apps::cas_check_type::CT_VALUE_BYTES_LESS:
case ::dsn::apps::cas_check_type::CT_VALUE_BYTES_LESS_OR_EQUAL:
case ::dsn::apps::cas_check_type::CT_VALUE_BYTES_EQUAL:
case ::dsn::apps::cas_check_type::CT_VALUE_BYTES_GREATER_OR_EQUAL:
case ::dsn::apps::cas_check_type::CT_VALUE_BYTES_GREATER: {
if (!value_exist)
return false;
int c = dsn::string_view(value).compare(dsn::string_view(check_operand));
if (c < 0) {
return check_type <= ::dsn::apps::cas_check_type::CT_VALUE_BYTES_LESS_OR_EQUAL;
} else if (c == 0) {
return check_type >= ::dsn::apps::cas_check_type::CT_VALUE_BYTES_LESS_OR_EQUAL &&
check_type <= ::dsn::apps::cas_check_type::CT_VALUE_BYTES_GREATER_OR_EQUAL;
} else { // c > 0
return check_type >= ::dsn::apps::cas_check_type::CT_VALUE_BYTES_GREATER_OR_EQUAL;
}
}
case ::dsn::apps::cas_check_type::CT_VALUE_INT_LESS:
case ::dsn::apps::cas_check_type::CT_VALUE_INT_LESS_OR_EQUAL:
case ::dsn::apps::cas_check_type::CT_VALUE_INT_EQUAL:
case ::dsn::apps::cas_check_type::CT_VALUE_INT_GREATER_OR_EQUAL:
case ::dsn::apps::cas_check_type::CT_VALUE_INT_GREATER: {
if (!value_exist)
return false;
int64_t check_value_int;
if (!dsn::buf2int64(value, check_value_int)) {
// invalid check value
derror_replica("check failed: decree = {}, error = {}",
decree,
"check value is not an integer or out of range");
invalid_argument = true;
return false;
}
int64_t check_operand_int;
if (!dsn::buf2int64(check_operand, check_operand_int)) {
// invalid check operand
derror_replica("check failed: decree = {}, error = {}",
decree,
"check operand is not an integer or out of range");
invalid_argument = true;
return false;
}
if (check_value_int < check_operand_int) {
return check_type <= ::dsn::apps::cas_check_type::CT_VALUE_INT_LESS_OR_EQUAL;
} else if (check_value_int == check_operand_int) {
return check_type >= ::dsn::apps::cas_check_type::CT_VALUE_INT_LESS_OR_EQUAL &&
check_type <= ::dsn::apps::cas_check_type::CT_VALUE_INT_GREATER_OR_EQUAL;
} else { // check_value_int > check_operand_int
return check_type >= ::dsn::apps::cas_check_type::CT_VALUE_INT_GREATER_OR_EQUAL;
}
}
default:
dassert(false, "unsupported check type: %d", check_type);
}
return false;
}
private: private:
friend class pegasus_write_service_test; friend class pegasus_write_service_test;
friend class pegasus_server_write_test; friend class pegasus_server_write_test;
...@@ -325,8 +531,9 @@ private: ...@@ -325,8 +531,9 @@ private:
rocksdb::WriteBatch _batch; rocksdb::WriteBatch _batch;
rocksdb::DB *_db; rocksdb::DB *_db;
rocksdb::WriteOptions *_wt_opts; rocksdb::WriteOptions &_wt_opts;
rocksdb::ReadOptions *_rd_opts; rocksdb::ReadOptions &_rd_opts;
::dsn::perf_counter_wrapper &_pfc_recent_expire_count;
pegasus_value_generator _value_generator; pegasus_value_generator _value_generator;
......
...@@ -375,6 +375,7 @@ struct row_data ...@@ -375,6 +375,7 @@ struct row_data
double remove_qps; double remove_qps;
double multi_remove_qps; double multi_remove_qps;
double incr_qps; double incr_qps;
double check_and_set_qps;
double scan_qps; double scan_qps;
double recent_expire_count; double recent_expire_count;
double recent_filter_count; double recent_filter_count;
...@@ -389,6 +390,7 @@ struct row_data ...@@ -389,6 +390,7 @@ struct row_data
remove_qps(0), remove_qps(0),
multi_remove_qps(0), multi_remove_qps(0),
incr_qps(0), incr_qps(0),
check_and_set_qps(0),
scan_qps(0), scan_qps(0),
recent_expire_count(0), recent_expire_count(0),
recent_filter_count(0), recent_filter_count(0),
...@@ -415,6 +417,8 @@ update_app_pegasus_perf_counter(row_data &row, const std::string &counter_name, ...@@ -415,6 +417,8 @@ update_app_pegasus_perf_counter(row_data &row, const std::string &counter_name,
row.multi_remove_qps += value; row.multi_remove_qps += value;
else if (counter_name == "incr_qps") else if (counter_name == "incr_qps")
row.incr_qps += value; row.incr_qps += value;
else if (counter_name == "check_and_set_qps")
row.check_and_set_qps += value;
else if (counter_name == "scan_qps") else if (counter_name == "scan_qps")
row.scan_qps += value; row.scan_qps += value;
else if (counter_name == "recent.expire.count") else if (counter_name == "recent.expire.count")
......
...@@ -14,6 +14,7 @@ ...@@ -14,6 +14,7 @@
#include <rocksdb/sst_dump_tool.h> #include <rocksdb/sst_dump_tool.h>
#include <dsn/utility/filesystem.h> #include <dsn/utility/filesystem.h>
#include <dsn/utility/string_conv.h> #include <dsn/utility/string_conv.h>
#include <dsn/utility/string_view.h>
#include <dsn/tool/cli/cli.client.h> #include <dsn/tool/cli/cli.client.h>
#include <dsn/dist/replication/replication_ddl_client.h> #include <dsn/dist/replication/replication_ddl_client.h>
#include <dsn/dist/replication/mutation_log_tool.h> #include <dsn/dist/replication/mutation_log_tool.h>
...@@ -40,24 +41,6 @@ inline bool version(command_executor *e, shell_context *sc, arguments args) ...@@ -40,24 +41,6 @@ inline bool version(command_executor *e, shell_context *sc, arguments args)
return true; return true;
} }
inline bool
buf2filter_type(const char *buffer, int length, pegasus::pegasus_client::filter_type &result)
{
if (length == 8 && strncmp(buffer, "anywhere", 8) == 0) {
result = pegasus::pegasus_client::FT_MATCH_ANYWHERE;
return true;
}
if (length == 6 && strncmp(buffer, "prefix", 6) == 0) {
result = pegasus::pegasus_client::FT_MATCH_PREFIX;
return true;
}
if (length == 7 && strncmp(buffer, "postfix", 7) == 0) {
result = pegasus::pegasus_client::FT_MATCH_POSTFIX;
return true;
}
return false;
}
inline bool query_cluster_info(command_executor *e, shell_context *sc, arguments args) inline bool query_cluster_info(command_executor *e, shell_context *sc, arguments args)
{ {
::dsn::error_code err = sc->ddl_client->cluster_info(""); ::dsn::error_code err = sc->ddl_client->cluster_info("");
...@@ -164,7 +147,7 @@ inline bool ls_apps(command_executor *e, shell_context *sc, arguments args) ...@@ -164,7 +147,7 @@ inline bool ls_apps(command_executor *e, shell_context *sc, arguments args)
::dsn::app_status::type s = ::dsn::app_status::AS_INVALID; ::dsn::app_status::type s = ::dsn::app_status::AS_INVALID;
if (!status.empty() && status != "all") { if (!status.empty() && status != "all") {
s = type_from_string(::dsn::_app_status_VALUES_TO_NAMES, s = type_from_string(::dsn::_app_status_VALUES_TO_NAMES,
std::string("AS_") + status, std::string("as_") + status,
::dsn::app_status::AS_INVALID); ::dsn::app_status::AS_INVALID);
verify_logged(s != ::dsn::app_status::AS_INVALID, verify_logged(s != ::dsn::app_status::AS_INVALID,
"parse %s as app_status::type failed", "parse %s as app_status::type failed",
...@@ -789,7 +772,11 @@ inline bool multi_get_range(command_executor *e, shell_context *sc, arguments ar ...@@ -789,7 +772,11 @@ inline bool multi_get_range(command_executor *e, shell_context *sc, arguments ar
} }
break; break;
case 's': case 's':
if (!buf2filter_type(optarg, strlen(optarg), options.sort_key_filter_type)) { options.sort_key_filter_type = (pegasus::pegasus_client::filter_type)type_from_string(
::dsn::apps::_filter_type_VALUES_TO_NAMES,
std::string("ft_match_") + optarg,
::dsn::apps::filter_type::FT_NO_FILTER);
if (options.sort_key_filter_type == pegasus::pegasus_client::FT_NO_FILTER) {
fprintf(stderr, "invalid sort_key_filter_type param\n"); fprintf(stderr, "invalid sort_key_filter_type param\n");
return false; return false;
} }
...@@ -1127,7 +1114,11 @@ inline bool multi_del_range(command_executor *e, shell_context *sc, arguments ar ...@@ -1127,7 +1114,11 @@ inline bool multi_del_range(command_executor *e, shell_context *sc, arguments ar
} }
break; break;
case 's': case 's':
if (!buf2filter_type(optarg, strlen(optarg), options.sort_key_filter_type)) { options.sort_key_filter_type = (pegasus::pegasus_client::filter_type)type_from_string(
::dsn::apps::_filter_type_VALUES_TO_NAMES,
std::string("ft_match_") + optarg,
::dsn::apps::filter_type::FT_NO_FILTER);
if (options.sort_key_filter_type == pegasus::pegasus_client::FT_NO_FILTER) {
fprintf(stderr, "invalid sort_key_filter_type param\n"); fprintf(stderr, "invalid sort_key_filter_type param\n");
return false; return false;
} }
...@@ -1307,6 +1298,162 @@ inline bool incr(command_executor *e, shell_context *sc, arguments args) ...@@ -1307,6 +1298,162 @@ inline bool incr(command_executor *e, shell_context *sc, arguments args)
return true; return true;
} }
inline bool check_and_set(command_executor *e, shell_context *sc, arguments args)
{
if (args.argc < 2)
return false;
std::string hash_key = sds_to_string(args.argv[1]);
bool check_sort_key_provided = false;
std::string check_sort_key;
::dsn::apps::cas_check_type::type check_type = ::dsn::apps::cas_check_type::CT_NO_CHECK;
std::string check_type_name;
bool check_operand_provided = false;
std::string check_operand;
bool set_sort_key_provided = false;
std::string set_sort_key;
bool set_value_provided = false;
std::string set_value;
pegasus::pegasus_client::check_and_set_options options;
static struct option long_options[] = {{"check_sort_key", required_argument, 0, 'c'},
{"check_type", required_argument, 0, 't'},
{"check_operand", required_argument, 0, 'o'},
{"set_sort_key", required_argument, 0, 's'},
{"set_value", required_argument, 0, 'v'},
{"set_value_ttl_seconds", required_argument, 0, 'l'},
{"return_check_value", no_argument, 0, 'r'},
{0, 0, 0, 0}};
escape_sds_argv(args.argc, args.argv);
optind = 0;
while (true) {
int option_index = 0;
int c;
c = getopt_long(args.argc, args.argv, "c:t:o:s:v:l:r", long_options, &option_index);
if (c == -1)
break;
switch (c) {
case 'c':
check_sort_key_provided = true;
check_sort_key = unescape_str(optarg);
break;
case 't':
check_type = type_from_string(::dsn::apps::_cas_check_type_VALUES_TO_NAMES,
std::string("ct_value_") + optarg,
::dsn::apps::cas_check_type::CT_NO_CHECK);
if (check_type == ::dsn::apps::cas_check_type::CT_NO_CHECK) {
fprintf(stderr, "ERROR: invalid check_type param\n");
return false;
}
check_type_name = optarg;
break;
case 'o':
check_operand_provided = true;
check_operand = unescape_str(optarg);
break;
case 's':
set_sort_key_provided = true;
set_sort_key = unescape_str(optarg);
break;
case 'v':
set_value_provided = true;
set_value = unescape_str(optarg);
break;
case 'l':
if (!dsn::buf2int32(optarg, options.set_value_ttl_seconds)) {
fprintf(stderr, "ERROR: invalid set_value_ttl_seconds param\n");
return false;
}
break;
case 'r':
options.return_check_value = true;
break;
default:
return false;
}
}
if (!check_sort_key_provided) {
fprintf(stderr, "ERROR: check_sort_key not provided\n");
return false;
}
if (check_type == ::dsn::apps::cas_check_type::CT_NO_CHECK) {
fprintf(stderr, "ERROR: check_type not provided\n");
return false;
}
if (!check_operand_provided &&
check_type >= ::dsn::apps::cas_check_type::CT_VALUE_MATCH_ANYWHERE) {
fprintf(stderr, "ERROR: check_operand not provided\n");
return false;
}
if (!set_sort_key_provided) {
fprintf(stderr, "ERROR: set_sort_key not provided\n");
return false;
}
if (!set_value_provided) {
fprintf(stderr, "ERROR: set_value not provided\n");
return false;
}
fprintf(stderr, "hash_key: \"%s\"\n", pegasus::utils::c_escape_string(hash_key).c_str());
fprintf(stderr,
"check_sort_key: \"%s\"\n",
pegasus::utils::c_escape_string(check_sort_key).c_str());
fprintf(stderr, "check_type: %s\n", check_type_name.c_str());
if (check_type >= ::dsn::apps::cas_check_type::CT_VALUE_MATCH_ANYWHERE) {
fprintf(stderr,
"check_operand: \"%s\"\n",
pegasus::utils::c_escape_string(check_operand).c_str());
}
fprintf(
stderr, "set_sort_key: \"%s\"\n", pegasus::utils::c_escape_string(set_sort_key).c_str());
fprintf(stderr, "set_value: \"%s\"\n", pegasus::utils::c_escape_string(set_value).c_str());
fprintf(stderr, "set_value_ttl_seconds: %d\n", options.set_value_ttl_seconds);
fprintf(stderr, "return_check_value: %s\n", options.return_check_value ? "true" : "false");
fprintf(stderr, "\n");
pegasus::pegasus_client::check_and_set_results results;
pegasus::pegasus_client::internal_info info;
int ret = sc->pg_client->check_and_set(hash_key,
check_sort_key,
(pegasus::pegasus_client::cas_check_type)check_type,
check_operand,
set_sort_key,
set_value,
options,
results,
sc->timeout_ms,
&info);
if (ret != pegasus::PERR_OK) {
fprintf(stderr, "ERROR: %s\n", sc->pg_client->get_error_string(ret));
} else {
if (results.set_succeed) {
fprintf(stderr, "Set succeed.\n");
} else {
fprintf(stderr, "Set failed, because check not passed.\n");
}
if (results.check_value_returned) {
fprintf(stderr, "\n");
if (results.check_value_exist) {
fprintf(
stderr,
"Check value: \"%s\"\n",
pegasus::utils::c_escape_string(results.check_value, sc->escape_all).c_str());
} else {
fprintf(stderr, "Check value not exist.\n");
}
}
}
fprintf(stderr, "\n");
fprintf(stderr, "app_id : %d\n", info.app_id);
fprintf(stderr, "partition_index : %d\n", info.partition_index);
fprintf(stderr, "decree : %ld\n", info.decree);
fprintf(stderr, "server : %s\n", info.server.c_str());
return true;
}
inline bool get_ttl(command_executor *e, shell_context *sc, arguments args) inline bool get_ttl(command_executor *e, shell_context *sc, arguments args)
{ {
if (args.argc != 3) { if (args.argc != 3) {
...@@ -1420,7 +1567,11 @@ inline bool hash_scan(command_executor *e, shell_context *sc, arguments args) ...@@ -1420,7 +1567,11 @@ inline bool hash_scan(command_executor *e, shell_context *sc, arguments args)
} }
break; break;
case 's': case 's':
if (!buf2filter_type(optarg, strlen(optarg), options.sort_key_filter_type)) { options.sort_key_filter_type = (pegasus::pegasus_client::filter_type)type_from_string(
::dsn::apps::_filter_type_VALUES_TO_NAMES,
std::string("ft_match_") + optarg,
::dsn::apps::filter_type::FT_NO_FILTER);
if (options.sort_key_filter_type == pegasus::pegasus_client::FT_NO_FILTER) {
fprintf(stderr, "invalid sort_key_filter_type param\n"); fprintf(stderr, "invalid sort_key_filter_type param\n");
return false; return false;
} }
...@@ -1601,7 +1752,11 @@ inline bool full_scan(command_executor *e, shell_context *sc, arguments args) ...@@ -1601,7 +1752,11 @@ inline bool full_scan(command_executor *e, shell_context *sc, arguments args)
} }
break; break;
case 'h': case 'h':
if (!buf2filter_type(optarg, strlen(optarg), options.hash_key_filter_type)) { options.hash_key_filter_type = (pegasus::pegasus_client::filter_type)type_from_string(
::dsn::apps::_filter_type_VALUES_TO_NAMES,
std::string("ft_match_") + optarg,
::dsn::apps::filter_type::FT_NO_FILTER);
if (options.hash_key_filter_type == pegasus::pegasus_client::FT_NO_FILTER) {
fprintf(stderr, "invalid hash_key_filter_type param\n"); fprintf(stderr, "invalid hash_key_filter_type param\n");
return false; return false;
} }
...@@ -1611,7 +1766,11 @@ inline bool full_scan(command_executor *e, shell_context *sc, arguments args) ...@@ -1611,7 +1766,11 @@ inline bool full_scan(command_executor *e, shell_context *sc, arguments args)
options.hash_key_filter_pattern = unescape_str(optarg); options.hash_key_filter_pattern = unescape_str(optarg);
break; break;
case 's': case 's':
if (!buf2filter_type(optarg, strlen(optarg), options.sort_key_filter_type)) { options.sort_key_filter_type = (pegasus::pegasus_client::filter_type)type_from_string(
dsn::apps::_filter_type_VALUES_TO_NAMES,
std::string("ft_match_") + optarg,
::dsn::apps::filter_type::FT_NO_FILTER);
if (options.sort_key_filter_type == pegasus::pegasus_client::FT_NO_FILTER) {
fprintf(stderr, "invalid sort_key_filter_type param\n"); fprintf(stderr, "invalid sort_key_filter_type param\n");
return false; return false;
} }
...@@ -2433,6 +2592,7 @@ inline bool data_operations(command_executor *e, shell_context *sc, arguments ar ...@@ -2433,6 +2592,7 @@ inline bool data_operations(command_executor *e, shell_context *sc, arguments ar
{"multi_del", multi_del_value}, {"multi_del", multi_del_value},
{"multi_del_range", multi_del_range}, {"multi_del_range", multi_del_range},
{"incr", incr}, {"incr", incr},
{"check_and_set", check_and_set},
{"exist", exist}, {"exist", exist},
{"count", sortkey_count}, {"count", sortkey_count},
{"ttl", get_ttl}, {"ttl", get_ttl},
...@@ -3240,10 +3400,11 @@ inline bool app_stat(command_executor *e, shell_context *sc, arguments args) ...@@ -3240,10 +3400,11 @@ inline bool app_stat(command_executor *e, shell_context *sc, arguments args)
out << std::setw(w) << std::right << "GET" << std::setw(w) << std::right << "MULTI_GET" out << std::setw(w) << std::right << "GET" << std::setw(w) << std::right << "MULTI_GET"
<< std::setw(w) << std::right << "PUT" << std::setw(w) << std::right << "MULTI_PUT" << std::setw(w) << std::right << "PUT" << std::setw(w) << std::right << "MULTI_PUT"
<< std::setw(w) << std::right << "DEL" << std::setw(w) << std::right << "MULTI_DEL" << std::setw(w) << std::right << "DEL" << std::setw(w) << std::right << "MULTI_DEL"
<< std::setw(w) << std::right << "INCR" << std::setw(w) << std::right << "SCAN" << std::setw(w) << std::right << "INCR" << std::setw(w) << std::right << "CAS"
<< std::setw(w) << std::right << "expired" << std::setw(w) << std::right << "filtered" << std::setw(w) << std::right << "SCAN" << std::setw(w) << std::right << "expired"
<< std::setw(w) << std::right << "abnormal" << std::setw(w) << std::right << "storage_mb" << std::setw(w) << std::right << "filtered" << std::setw(w) << std::right << "abnormal"
<< std::setw(w) << std::right << "file_count" << std::endl; << std::setw(w) << std::right << "storage_mb" << std::setw(w) << std::right << "file_count"
<< std::endl;
rows.resize(rows.size() + 1); rows.resize(rows.size() + 1);
row_data &sum = rows.back(); row_data &sum = rows.back();
for (int i = 0; i < rows.size() - 1; ++i) { for (int i = 0; i < rows.size() - 1; ++i) {
...@@ -3255,6 +3416,7 @@ inline bool app_stat(command_executor *e, shell_context *sc, arguments args) ...@@ -3255,6 +3416,7 @@ inline bool app_stat(command_executor *e, shell_context *sc, arguments args)
sum.remove_qps += row.remove_qps; sum.remove_qps += row.remove_qps;
sum.multi_remove_qps += row.multi_remove_qps; sum.multi_remove_qps += row.multi_remove_qps;
sum.incr_qps += row.incr_qps; sum.incr_qps += row.incr_qps;
sum.check_and_set_qps += row.check_and_set_qps;
sum.scan_qps += row.scan_qps; sum.scan_qps += row.scan_qps;
sum.recent_expire_count += row.recent_expire_count; sum.recent_expire_count += row.recent_expire_count;
sum.recent_filter_count += row.recent_filter_count; sum.recent_filter_count += row.recent_filter_count;
...@@ -3279,6 +3441,7 @@ inline bool app_stat(command_executor *e, shell_context *sc, arguments args) ...@@ -3279,6 +3441,7 @@ inline bool app_stat(command_executor *e, shell_context *sc, arguments args)
PRINT_QPS(remove_qps); PRINT_QPS(remove_qps);
PRINT_QPS(multi_remove_qps); PRINT_QPS(multi_remove_qps);
PRINT_QPS(incr_qps); PRINT_QPS(incr_qps);
PRINT_QPS(check_and_set_qps);
PRINT_QPS(scan_qps); PRINT_QPS(scan_qps);
out << std::setw(w) << std::right << (int64_t)row.recent_expire_count << std::setw(w) out << std::setw(w) << std::right << (int64_t)row.recent_expire_count << std::setw(w)
<< std::right << (int64_t)row.recent_filter_count << std::setw(w) << std::right << std::right << (int64_t)row.recent_filter_count << std::setw(w) << std::right
......
...@@ -173,7 +173,26 @@ static command_executor commands[] = { ...@@ -173,7 +173,26 @@ static command_executor commands[] = {
data_operations, data_operations,
}, },
{ {
"incr", "increment value of a key", "<hash_key> <sort_key> [increment]", data_operations, "incr",
"atomically increment value of a key",
"<hash_key> <sort_key> [increment]",
data_operations,
},
{
"check_and_set",
"atomically check and set value",
"<hash_key> "
"[-c|--check_sort_key str] "
"[-t|--check_type not_exist|not_exist_or_empty|exist|not_empty] "
"[match_anywhere|match_prefix|match_postfix] "
"[bytes_less|bytes_less_or_equal|bytes_equal|bytes_greater_or_equal|bytes_greater] "
"[int_less|int_less_or_equal|int_equal|int_greater_or_equal|int_greater] "
"[-o|--check_operand str] "
"[-s|--set_sort_key str] "
"[-v|--set_value str] "
"[-l|--set_value_ttl_seconds num] "
"[-r|--return_check_value]",
data_operations,
}, },
{ {
"exist", "check value exist", "<hash_key> <sort_key>", data_operations, "exist", "check value exist", "<hash_key> <sort_key>", data_operations,
...@@ -210,7 +229,8 @@ static command_executor commands[] = { ...@@ -210,7 +229,8 @@ static command_executor commands[] = {
"copy_data", "copy_data",
"copy app data", "copy app data",
"<-c|--target_cluster_name str> <-a|--target_app_name str> " "<-c|--target_cluster_name str> <-a|--target_app_name str> "
"[-s|--max_split_count num] [-b|--max_batch_count num] [-t|--timeout_ms num] [-g|--geo_data]", "[-s|--max_split_count num] [-b|--max_batch_count num] [-t|--timeout_ms num] "
"[-g|--geo_data]",
data_operations, data_operations,
}, },
{ {
......
...@@ -17,8 +17,10 @@ table_name=temp ...@@ -17,8 +17,10 @@ table_name=temp
GTEST_OUTPUT="xml:$REPORT_DIR/basic.xml" GTEST_FILTER="basic.*" ./$test_case $config_file $table_name GTEST_OUTPUT="xml:$REPORT_DIR/basic.xml" GTEST_FILTER="basic.*" ./$test_case $config_file $table_name
exit_if_fail $? "run test basic failed: $test_case $config_file $table_name" exit_if_fail $? "run test basic failed: $test_case $config_file $table_name"
GTEST_OUTPUT="xml:$REPORT_DIR/basic.xml" GTEST_FILTER="incr.*" ./$test_case $config_file $table_name GTEST_OUTPUT="xml:$REPORT_DIR/incr" GTEST_FILTER="incr.*" ./$test_case $config_file $table_name
exit_if_fail $? "run test incr failed: $test_case $config_file $table_name" exit_if_fail $? "run test incr failed: $test_case $config_file $table_name"
GTEST_OUTPUT="xml:$REPORT_DIR/check_and_set.xml" GTEST_FILTER="check_and_set.*" ./$test_case $config_file $table_name
exit_if_fail $? "run test check_and_set failed: $test_case $config_file $table_name"
GTEST_OUTPUT="xml:$REPORT_DIR/scan.xml" GTEST_FILTER="scan.*" ./$test_case $config_file $table_name GTEST_OUTPUT="xml:$REPORT_DIR/scan.xml" GTEST_FILTER="scan.*" ./$test_case $config_file $table_name
exit_if_fail $? "run test scan failed: $test_case $config_file $table_name" exit_if_fail $? "run test scan failed: $test_case $config_file $table_name"
GTEST_OUTPUT="xml:$REPORT_DIR/slog_log.xml" GTEST_FILTER="lost_log.*" ./$test_case $config_file $table_name GTEST_OUTPUT="xml:$REPORT_DIR/slog_log.xml" GTEST_FILTER="lost_log.*" ./$test_case $config_file $table_name
......
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册