Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
apache
Incubator Pegasus
提交
95ac8491
Incubator Pegasus
项目概览
apache
/
Incubator Pegasus
通知
9
Star
5
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
DevOps
流水线
流水线任务
计划
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
Incubator Pegasus
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
DevOps
DevOps
流水线
流水线任务
计划
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
流水线任务
提交
Issue看板
前往新版Gitcode,体验更适合开发者的 AI 搜索 >>
未验证
提交
95ac8491
编写于
9月 25, 2020
作者:
W
Wu Tao
提交者:
GitHub
9月 25, 2020
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
fix: bugfix on incr (#608)
上级
49b85c23
变更
2
隐藏空白更改
内联
并排
Showing
2 changed file
with
76 addition
and
48 deletion
+76
-48
src/server/pegasus_write_service_impl.h
src/server/pegasus_write_service_impl.h
+50
-48
src/server/test/pegasus_write_service_impl_test.cpp
src/server/test/pegasus_write_service_impl_test.cpp
+26
-0
未找到文件。
src/server/pegasus_write_service_impl.h
浏览文件 @
95ac8491
...
...
@@ -182,59 +182,61 @@ public:
uint32_t
new_expire_ts
=
0
;
db_get_context
get_ctx
;
int
err
=
db_get
(
raw_key
,
&
get_ctx
);
if
(
err
==
0
)
{
if
(
!
get_ctx
.
found
)
{
// old value is not found, set to 0 before increment
new_value
=
update
.
increment
;
new_expire_ts
=
update
.
expire_ts_seconds
>
0
?
update
.
expire_ts_seconds
:
0
;
}
else
if
(
get_ctx
.
expired
)
{
// ttl timeout, set to 0 before increment
_pfc_recent_expire_count
->
increment
();
if
(
err
!=
0
)
{
resp
.
error
=
err
;
return
err
;
}
if
(
!
get_ctx
.
found
)
{
// old value is not found, set to 0 before increment
new_value
=
update
.
increment
;
new_expire_ts
=
update
.
expire_ts_seconds
>
0
?
update
.
expire_ts_seconds
:
0
;
}
else
if
(
get_ctx
.
expired
)
{
// ttl timeout, set to 0 before increment
_pfc_recent_expire_count
->
increment
();
new_value
=
update
.
increment
;
new_expire_ts
=
update
.
expire_ts_seconds
>
0
?
update
.
expire_ts_seconds
:
0
;
}
else
{
::
dsn
::
blob
old_value
;
pegasus_extract_user_data
(
_pegasus_data_version
,
std
::
move
(
get_ctx
.
raw_value
),
old_value
);
if
(
old_value
.
length
()
==
0
)
{
// empty old value, set to 0 before increment
new_value
=
update
.
increment
;
new_expire_ts
=
update
.
expire_ts_seconds
>
0
?
update
.
expire_ts_seconds
:
0
;
}
else
{
::
dsn
::
blob
old_value
;
pegasus_extract_user_data
(
_pegasus_data_version
,
std
::
move
(
get_ctx
.
raw_value
),
old_value
);
if
(
old_value
.
length
()
==
0
)
{
// empty old value, set to 0 before increment
new_value
=
update
.
increment
;
}
else
{
int64_t
old_value_int
;
if
(
!
dsn
::
buf2int64
(
old_value
,
old_value_int
))
{
// invalid old value
derror_replica
(
"incr failed: decree = {}, error = "
"old value
\"
{}
\"
is not an integer or out of range"
,
decree
,
utils
::
c_escape_string
(
old_value
));
resp
.
error
=
rocksdb
::
Status
::
kInvalidArgument
;
// we should write empty record to update rocksdb's last flushed decree
return
empty_put
(
decree
);
}
new_value
=
old_value_int
+
update
.
increment
;
if
((
update
.
increment
>
0
&&
new_value
<
old_value_int
)
||
(
update
.
increment
<
0
&&
new_value
>
old_value_int
))
{
// new value is out of range, return old value by 'new_value'
derror_replica
(
"incr failed: decree = {}, error = "
"new value is out of range, old_value = {}, increment = {}"
,
decree
,
old_value_int
,
update
.
increment
);
resp
.
error
=
rocksdb
::
Status
::
kInvalidArgument
;
resp
.
new_value
=
old_value_int
;
// we should write empty record to update rocksdb's last flushed decree
return
empty_put
(
decree
);
}
int64_t
old_value_int
;
if
(
!
dsn
::
buf2int64
(
old_value
,
old_value_int
))
{
// invalid old value
derror_replica
(
"incr failed: decree = {}, error = "
"old value
\"
{}
\"
is not an integer or out of range"
,
decree
,
utils
::
c_escape_string
(
old_value
));
resp
.
error
=
rocksdb
::
Status
::
kInvalidArgument
;
// we should write empty record to update rocksdb's last flushed decree
return
empty_put
(
decree
);
}
// set new ttl
if
(
update
.
expire_ts_seconds
==
0
)
{
new_expire_ts
=
get_ctx
.
expire_ts
;
}
else
if
(
update
.
expire_ts_seconds
<
0
)
{
new_expire_ts
=
0
;
}
else
{
// update.expire_ts_seconds > 0
new_expire_ts
=
update
.
expire_ts_seconds
;
new_value
=
old_value_int
+
update
.
increment
;
if
((
update
.
increment
>
0
&&
new_value
<
old_value_int
)
||
(
update
.
increment
<
0
&&
new_value
>
old_value_int
))
{
// new value is out of range, return old value by 'new_value'
derror_replica
(
"incr failed: decree = {}, error = "
"new value is out of range, old_value = {}, increment = {}"
,
decree
,
old_value_int
,
update
.
increment
);
resp
.
error
=
rocksdb
::
Status
::
kInvalidArgument
;
resp
.
new_value
=
old_value_int
;
// we should write empty record to update rocksdb's last flushed decree
return
empty_put
(
decree
);
}
}
// set new ttl
if
(
update
.
expire_ts_seconds
==
0
)
{
new_expire_ts
=
get_ctx
.
expire_ts
;
}
else
if
(
update
.
expire_ts_seconds
<
0
)
{
new_expire_ts
=
0
;
}
else
{
// update.expire_ts_seconds > 0
new_expire_ts
=
update
.
expire_ts_seconds
;
}
}
resp
.
error
=
...
...
src/server/test/pegasus_write_service_impl_test.cpp
浏览文件 @
95ac8491
...
...
@@ -217,5 +217,31 @@ TEST_F(incr_test, invalid_incr)
ASSERT_EQ
(
resp
.
new_value
,
100
);
}
TEST_F
(
incr_test
,
fail_on_get
)
{
dsn
::
fail
::
setup
();
dsn
::
fail
::
cfg
(
"db_get"
,
"100%1*return()"
);
// when db_get failed, incr should return an error.
req
.
increment
=
10
;
_write_impl
->
incr
(
1
,
req
,
resp
);
ASSERT_EQ
(
resp
.
error
,
FAIL_DB_GET
);
dsn
::
fail
::
teardown
();
}
TEST_F
(
incr_test
,
fail_on_put
)
{
dsn
::
fail
::
setup
();
dsn
::
fail
::
cfg
(
"db_write_batch_put"
,
"100%1*return()"
);
// when rocksdb put failed, incr should return an error.
req
.
increment
=
10
;
_write_impl
->
incr
(
1
,
req
,
resp
);
ASSERT_EQ
(
resp
.
error
,
FAIL_DB_WRITE_BATCH_PUT
);
dsn
::
fail
::
teardown
();
}
}
// namespace server
}
// namespace pegasus
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录