Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
apache
Incubator Pegasus
提交
9e58867d
Incubator Pegasus
项目概览
apache
/
Incubator Pegasus
通知
9
Star
5
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
DevOps
流水线
流水线任务
计划
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
Incubator Pegasus
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
DevOps
DevOps
流水线
流水线任务
计划
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
流水线任务
提交
Issue看板
前往新版Gitcode,体验更适合开发者的 AI 搜索 >>
提交
9e58867d
编写于
11月 13, 2018
作者:
L
Lai Yingchun
提交者:
QinZuoyan
11月 13, 2018
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
monitor: expose rocksdb statistics to pegasus (#212)
上级
7ba3b707
变更
8
隐藏空白更改
内联
并排
Showing
8 changed file
with
239 addition
and
136 deletion
+239
-136
rdsn
rdsn
+1
-1
src/server/brief_stat.cpp
src/server/brief_stat.cpp
+1
-0
src/server/info_collector.cpp
src/server/info_collector.cpp
+15
-0
src/server/info_collector.h
src/server/info_collector.h
+4
-0
src/server/pegasus_server_impl.cpp
src/server/pegasus_server_impl.cpp
+150
-92
src/server/pegasus_server_impl.h
src/server/pegasus_server_impl.h
+20
-7
src/shell/command_helper.h
src/shell/command_helper.h
+31
-33
src/shell/commands.h
src/shell/commands.h
+17
-3
未找到文件。
rdsn
@
bb2fb74b
比较
a9479d62
...
bb2fb74b
Subproject commit
a9479d629d2dc36ebe0c2d25230dd4713f9cd085
Subproject commit
bb2fb74bee731128e8ca88ceba1804cef967a16a
src/server/brief_stat.cpp
浏览文件 @
9e58867d
...
...
@@ -26,6 +26,7 @@ static std::map<std::string, std::string> s_brief_stat_map = {
{
"replica*eon.replica_stub*replicas.learning.count"
,
"learning_count"
},
{
"replica*app.pegasus*manual.compact.running.count"
,
"manual_compact_running_count"
},
{
"replica*app.pegasus*manual.compact.enqueue.count"
,
"manual_compact_enqueue_count"
},
{
"replica*app.pegasus*rdb.block_cache.memory_usage"
,
"rdb_block_cache_memory_usage"
},
{
"replica*eon.replica_stub*shared.log.size(MB)"
,
"shared_log_size(MB)"
},
{
"replica*server*memused.virt(MB)"
,
"memused_virt(MB)"
},
{
"replica*server*memused.res(MB)"
,
"memused_res(MB)"
},
...
...
src/server/info_collector.cpp
浏览文件 @
9e58867d
...
...
@@ -97,6 +97,11 @@ void info_collector::on_app_stat()
all
.
recent_abnormal_count
+=
row
.
recent_abnormal_count
;
all
.
storage_mb
+=
row
.
storage_mb
;
all
.
storage_count
+=
row
.
storage_count
;
all
.
rdb_block_cache_hit_count
+=
row
.
rdb_block_cache_hit_count
;
all
.
rdb_block_cache_total_count
+=
row
.
rdb_block_cache_total_count
;
all
.
rdb_block_cache_mem_usage
+=
row
.
rdb_block_cache_mem_usage
;
all
.
rdb_index_and_filter_blocks_mem_usage
+=
row
.
rdb_index_and_filter_blocks_mem_usage
;
all
.
rdb_memtable_mem_usage
+=
row
.
rdb_memtable_mem_usage
;
read_qps
[
i
]
=
row
.
get_qps
+
row
.
multi_get_qps
+
row
.
scan_qps
;
write_qps
[
i
]
=
row
.
put_qps
+
row
.
multi_put_qps
+
row
.
remove_qps
+
row
.
multi_remove_qps
+
row
.
incr_qps
+
row
.
check_and_set_qps
+
row
.
check_and_mutate_qps
;
...
...
@@ -123,6 +128,12 @@ void info_collector::on_app_stat()
counters
->
recent_abnormal_count
->
set
(
row
.
recent_abnormal_count
);
counters
->
storage_mb
->
set
(
row
.
storage_mb
);
counters
->
storage_count
->
set
(
row
.
storage_count
);
counters
->
rdb_block_cache_hit_rate
->
set
(
row
.
rdb_block_cache_hit_count
/
row
.
rdb_block_cache_total_count
);
counters
->
rdb_block_cache_mem_usage
->
set
(
row
.
rdb_block_cache_mem_usage
);
counters
->
rdb_index_and_filter_blocks_mem_usage
->
set
(
row
.
rdb_index_and_filter_blocks_mem_usage
);
counters
->
rdb_memtable_mem_usage
->
set
(
row
.
rdb_memtable_mem_usage
);
counters
->
read_qps
->
set
(
read_qps
[
i
]);
counters
->
write_qps
->
set
(
write_qps
[
i
]);
}
...
...
@@ -166,6 +177,10 @@ info_collector::AppStatCounters *info_collector::get_app_counters(const std::str
INIT_COUNER
(
recent_abnormal_count
);
INIT_COUNER
(
storage_mb
);
INIT_COUNER
(
storage_count
);
INIT_COUNER
(
rdb_block_cache_hit_rate
);
INIT_COUNER
(
rdb_block_cache_mem_usage
);
INIT_COUNER
(
rdb_index_and_filter_blocks_mem_usage
);
INIT_COUNER
(
rdb_memtable_mem_usage
);
INIT_COUNER
(
read_qps
);
INIT_COUNER
(
write_qps
);
_app_stat_counters
[
app_name
]
=
counters
;
...
...
src/server/info_collector.h
浏览文件 @
9e58867d
...
...
@@ -43,6 +43,10 @@ public:
::
dsn
::
perf_counter_wrapper
recent_abnormal_count
;
::
dsn
::
perf_counter_wrapper
storage_mb
;
::
dsn
::
perf_counter_wrapper
storage_count
;
::
dsn
::
perf_counter_wrapper
rdb_block_cache_hit_rate
;
::
dsn
::
perf_counter_wrapper
rdb_block_cache_mem_usage
;
::
dsn
::
perf_counter_wrapper
rdb_index_and_filter_blocks_mem_usage
;
::
dsn
::
perf_counter_wrapper
rdb_memtable_mem_usage
;
::
dsn
::
perf_counter_wrapper
read_qps
;
::
dsn
::
perf_counter_wrapper
write_qps
;
};
...
...
src/server/pegasus_server_impl.cpp
浏览文件 @
9e58867d
...
...
@@ -7,13 +7,14 @@
#include <algorithm>
#include <boost/lexical_cast.hpp>
#include <rocksdb/convenience.h>
#include <rocksdb/table.h>
#include <rocksdb/utilities/checkpoint.h>
#include <rocksdb/filter_policy.h>
#include <dsn/utility/chrono_literals.h>
#include <dsn/utility/utils.h>
#include <dsn/utility/filesystem.h>
#include <dsn/utility/string_conv.h>
#include <dsn/dist/fmt_logging.h>
#include <dsn/dist/replication/replication.codes.h>
#include "base/pegasus_key_schema.h"
#include "base/pegasus_value_schema.h"
...
...
@@ -21,13 +22,13 @@
#include "pegasus_event_listener.h"
#include "pegasus_server_write.h"
using
namespace
dsn
::
literals
::
chrono_literals
;
namespace
pegasus
{
namespace
server
{
DEFINE_TASK_CODE
(
LPC_PEGASUS_SERVER_DELAY
,
TASK_PRIORITY_COMMON
,
::
dsn
::
THREAD_POOL_DEFAULT
)
DEFINE_TASK_CODE
(
LPC_UPDATING_ROCKSDB_SSTSIZE
,
TASK_PRIORITY_COMMON
,
THREAD_POOL_REPLICATION_LONG
)
static
std
::
string
chkpt_get_dir_name
(
int64_t
decree
)
{
char
buffer
[
256
];
...
...
@@ -41,6 +42,10 @@ static bool chkpt_init_from_dir(const char *name, int64_t &decree)
std
::
string
(
name
)
==
chkpt_get_dir_name
(
decree
);
}
std
::
shared_ptr
<
rocksdb
::
Cache
>
pegasus_server_impl
::
_block_cache
;
::
dsn
::
task_ptr
pegasus_server_impl
::
_update_server_rdb_stat
;
::
dsn
::
perf_counter_wrapper
pegasus_server_impl
::
_pfc_rdb_block_cache_mem_usage
;
pegasus_server_impl
::
pegasus_server_impl
(
dsn
::
replication
::
replica
*
r
)
:
dsn
::
apps
::
rrdb_service
(
r
),
_db
(
nullptr
),
...
...
@@ -198,34 +203,39 @@ pegasus_server_impl::pegasus_server_impl(dsn::replication::replica *r)
}
}
// read rocksdb::BlockBasedTableOptions configurations
rocksdb
::
BlockBasedTableOptions
tbl_opts
;
// disable table block cache, default: false
if
(
dsn_config_get_value_bool
(
"pegasus.server"
,
"rocksdb_disable_table_block_cache"
,
false
,
"rocksdb tbl_opts.no_block_cache, default false"
))
{
tbl_opts
.
no_block_cache
=
true
;
tbl_opts
.
block_restart_interval
=
4
;
_
tbl_opts
.
no_block_cache
=
true
;
_
tbl_opts
.
block_restart_interval
=
4
;
}
else
{
// block cache capacity, default 10G
static
uint64_t
capacity
=
dsn_config_get_value_uint64
(
"pegasus.server"
,
"rocksdb_block_cache_capacity"
,
10
*
1024
*
1024
*
1024ULL
,
"block cache capacity for one pegasus server, shared by all rocksdb instances"
);
// block cache num shard bits, default -1(auto)
static
int
num_shard_bits
=
(
int
)
dsn_config_get_value_int64
(
"pegasus.server"
,
"rocksdb_block_cache_num_shard_bits"
,
-
1
,
"block cache will be sharded into 2^num_shard_bits shards"
);
// init block cache
static
std
::
shared_ptr
<
rocksdb
::
Cache
>
cache
=
rocksdb
::
NewLRUCache
(
capacity
,
num_shard_bits
);
tbl_opts
.
block_cache
=
cache
;
// If block cache is enabled, all replicas on this server will share the same block cache
// object. It's convenient to control the total memory used by this server, and the LRU
// algorithm used by the block cache object can be more efficient in this way.
static
std
::
once_flag
flag
;
std
::
call_once
(
flag
,
[
&
]()
{
// block cache capacity, default 10G
uint64_t
capacity
=
dsn_config_get_value_uint64
(
"pegasus.server"
,
"rocksdb_block_cache_capacity"
,
10
*
1024
*
1024
*
1024ULL
,
"block cache capacity for one pegasus server, shared by all rocksdb instances"
);
// block cache num shard bits, default -1(auto)
int
num_shard_bits
=
(
int
)
dsn_config_get_value_int64
(
"pegasus.server"
,
"rocksdb_block_cache_num_shard_bits"
,
-
1
,
"block cache will be sharded into 2^num_shard_bits shards"
);
// init block cache
_block_cache
=
rocksdb
::
NewLRUCache
(
capacity
,
num_shard_bits
);
});
// every replica has the same block cache
_tbl_opts
.
block_cache
=
_block_cache
;
}
// disable bloom filter, default: false
...
...
@@ -233,13 +243,17 @@ pegasus_server_impl::pegasus_server_impl(dsn::replication::replica *r)
"rocksdb_disable_bloom_filter"
,
false
,
"rocksdb tbl_opts.filter_policy, default nullptr"
))
{
tbl_opts
.
filter_policy
.
reset
(
rocksdb
::
NewBloomFilterPolicy
(
10
,
false
));
_
tbl_opts
.
filter_policy
.
reset
(
rocksdb
::
NewBloomFilterPolicy
(
10
,
false
));
}
_db_opts
.
table_factory
.
reset
(
NewBlockBasedTableFactory
(
tbl_opts
));
_db_opts
.
table_factory
.
reset
(
NewBlockBasedTableFactory
(
_
tbl_opts
));
_key_ttl_compaction_filter_factory
=
std
::
make_shared
<
KeyWithTTLCompactionFilterFactory
>
();
_db_opts
.
compaction_filter_factory
=
_key_ttl_compaction_filter_factory
;
_statistics
=
rocksdb
::
CreateDBStatistics
();
_statistics
->
stats_level_
=
rocksdb
::
kExceptDetailedTimers
;
_db_opts
.
statistics
=
_statistics
;
_db_opts
.
listeners
.
emplace_back
(
new
pegasus_event_listener
());
// disable write ahead logging as replication handles logging instead now
...
...
@@ -254,12 +268,8 @@ pegasus_server_impl::pegasus_server_impl(dsn::replication::replica *r)
0
,
"checkpoint_reserve_time_seconds, 0 means no check"
);
// get the _updating_sstsize_inteval_seconds.
_updating_rocksdb_sstsize_interval_seconds
=
(
uint32_t
)
dsn_config_get_value_uint64
(
"pegasus.server"
,
"updating_rocksdb_sstsize_interval_seconds"
,
600
,
"updating_rocksdb_sstsize_interval_seconds"
);
_update_rdb_stat_interval
=
std
::
chrono
::
seconds
(
dsn_config_get_value_uint64
(
"pegasus.server"
,
"update_rdb_stat_interval"
,
600
,
"update_rdb_stat_interval, in seconds"
));
// TODO: move the qps/latency counters and it's statistics to replication_app_base layer
char
str_gpid
[
128
],
buf
[
256
];
...
...
@@ -315,14 +325,46 @@ pegasus_server_impl::pegasus_server_impl(dsn::replication::replica *r)
"statistic the recent abnormal read count"
);
snprintf
(
buf
,
255
,
"disk.storage.sst.count@%s"
,
str_gpid
);
_pfc_sst_count
.
init_app_counter
(
_pfc_
rdb_
sst_count
.
init_app_counter
(
"app.pegasus"
,
buf
,
COUNTER_TYPE_NUMBER
,
"statistic the count of sstable files"
);
snprintf
(
buf
,
255
,
"disk.storage.sst(MB)@%s"
,
str_gpid
);
_pfc_sst_size
.
init_app_counter
(
_pfc_
rdb_
sst_size
.
init_app_counter
(
"app.pegasus"
,
buf
,
COUNTER_TYPE_NUMBER
,
"statistic the size of sstable files"
);
updating_rocksdb_sstsize
();
snprintf
(
buf
,
255
,
"rdb.block_cache.hit_count@%s"
,
str_gpid
);
_pfc_rdb_block_cache_hit_count
.
init_app_counter
(
"app.pegasus"
,
buf
,
COUNTER_TYPE_NUMBER
,
"statistic the hit count of rocksdb block cache"
);
snprintf
(
buf
,
255
,
"rdb.block_cache.total_count@%s"
,
str_gpid
);
_pfc_rdb_block_cache_total_count
.
init_app_counter
(
"app.pegasus"
,
buf
,
COUNTER_TYPE_NUMBER
,
"statistic the total count of rocksdb block cache"
);
// Block cache is a singleton on this server shared by all replicas, so we initialize
// `_pfc_rdb_block_cache_mem_usage` only once.
static
std
::
once_flag
flag
;
std
::
call_once
(
flag
,
[
&
]()
{
_pfc_rdb_block_cache_mem_usage
.
init_global_counter
(
"replica"
,
"app.pegasus"
,
"rdb.block_cache.memory_usage"
,
COUNTER_TYPE_NUMBER
,
"statistic the memory usage of rocksdb block cache"
);
});
snprintf
(
buf
,
255
,
"rdb.index_and_filter_blocks.memory_usage@%s"
,
str_gpid
);
_pfc_rdb_index_and_filter_blocks_mem_usage
.
init_app_counter
(
"app.pegasus"
,
buf
,
COUNTER_TYPE_NUMBER
,
"statistic the memory usage of rocksdb index and filter blocks"
);
snprintf
(
buf
,
255
,
"rdb.memtable.memory_usage@%s"
,
str_gpid
);
_pfc_rdb_memtable_mem_usage
.
init_app_counter
(
"app.pegasus"
,
buf
,
COUNTER_TYPE_NUMBER
,
"statistic the memory usage of rocksdb memtable"
);
}
void
pegasus_server_impl
::
parse_checkpoints
()
...
...
@@ -1523,14 +1565,24 @@ void pegasus_server_impl::on_clear_scanner(const int64_t &args) { _context_cache
_is_open
=
true
;
dinfo
(
"%s: start the updating sstsize timer task"
,
replica_name
());
_updating_rocksdb_sstsize_timer_task
=
::
dsn
::
tasking
::
enqueue_timer
(
LPC_UPDATING_ROCKSDB_SSTSIZE
,
&
_tracker
,
[
this
]()
{
this
->
updating_rocksdb_sstsize
();
},
std
::
chrono
::
seconds
(
_updating_rocksdb_sstsize_interval_seconds
),
0
,
std
::
chrono
::
seconds
(
30
));
dinfo
(
"%s: start the update rocksdb statistics timer task"
,
replica_name
());
_update_replica_rdb_stat
=
::
dsn
::
tasking
::
enqueue_timer
(
LPC_REPLICATION_LONG_COMMON
,
&
_tracker
,
[
this
]()
{
this
->
update_replica_rocksdb_statistics
();
},
_update_rdb_stat_interval
);
// Block cache is a singleton on this server shared by all replicas, its metrics update task
// should be scheduled once an interval on the server view.
static
std
::
once_flag
flag
;
std
::
call_once
(
flag
,
[
&
]()
{
// The timer task will always running even though there is no replicas
_update_server_rdb_stat
=
::
dsn
::
tasking
::
enqueue_timer
(
LPC_REPLICATION_LONG_COMMON
,
nullptr
,
// TODO: the tracker is nullptr, we will fix it later
[
this
]()
{
update_server_rocksdb_statistics
();
},
_update_rdb_stat_interval
);
});
// initialize write service after server being initialized.
_server_write
=
dsn
::
make_unique
<
pegasus_server_write
>
(
this
,
_verbose_log
);
...
...
@@ -1566,9 +1618,9 @@ void pegasus_server_impl::cancel_background_work(bool wait)
}
// stop all tracked tasks when pegasus server is stopped.
if
(
_updat
ing_rocksdb_sstsize_timer_task
!=
nullptr
)
{
_updat
ing_rocksdb_sstsize_timer_task
->
cancel
(
true
);
_updat
ing_rocksdb_sstsize_timer_task
=
nullptr
;
if
(
_updat
e_replica_rdb_stat
!=
nullptr
)
{
_updat
e_replica_rdb_stat
->
cancel
(
true
);
_updat
e_replica_rdb_stat
=
nullptr
;
}
_tracker
.
cancel_outstanding_tasks
();
...
...
@@ -1600,8 +1652,13 @@ void pegasus_server_impl::cancel_background_work(bool wait)
derror
(
"%s: rmdir %s failed when stop app"
,
replica_name
(),
data_dir
().
c_str
());
return
::
dsn
::
ERR_FILE_OPERATION_FAILED
;
}
_pfc_sst_count
->
set
(
0
);
_pfc_sst_size
->
set
(
0
);
_pfc_rdb_sst_count
->
set
(
0
);
_pfc_rdb_sst_size
->
set
(
0
);
_pfc_rdb_block_cache_hit_count
->
set
(
0
);
_pfc_rdb_block_cache_total_count
->
set
(
0
);
_pfc_rdb_block_cache_mem_usage
->
set
(
0
);
_pfc_rdb_index_and_filter_blocks_mem_usage
->
set
(
0
);
_pfc_rdb_memtable_mem_usage
->
set
(
0
);
}
ddebug
(
...
...
@@ -2161,54 +2218,55 @@ int pegasus_server_impl::append_key_value_for_multi_get(
return
1
;
}
// statistic the count and size of files of this type. return (-1,-1) if failed.
static
std
::
pair
<
int64_t
,
int64_t
>
get_type_file_size
(
const
std
::
string
&
path
,
const
std
::
string
&
type
)
void
pegasus_server_impl
::
update_replica_rocksdb_statistics
()
{
std
::
vector
<
std
::
string
>
files
;
if
(
!::
dsn
::
utils
::
filesystem
::
get_subfiles
(
path
,
files
,
false
))
{
dwarn
(
"get subfiles of dir %s failed"
,
path
.
c_str
());
return
std
::
pair
<
int64_t
,
int64_t
>
(
-
1
,
-
1
);
}
int64_t
res
=
0
;
int64_t
cnt
=
0
;
for
(
auto
&
f
:
files
)
{
if
(
f
.
length
()
>
type
.
length
()
&&
f
.
substr
(
f
.
length
()
-
type
.
length
())
==
type
)
{
int64_t
tsize
=
0
;
if
(
::
dsn
::
utils
::
filesystem
::
file_size
(
f
,
tsize
))
{
res
+=
tsize
;
cnt
++
;
}
else
{
dwarn
(
"get size of file %s failed"
,
f
.
c_str
());
return
std
::
pair
<
int64_t
,
int64_t
>
(
-
1
,
-
1
);
}
std
::
string
str_val
;
uint64_t
val
=
0
;
for
(
int
i
=
0
;
i
<
_db_opts
.
num_levels
;
++
i
)
{
int
cur_level_count
=
0
;
if
(
_db
->
GetProperty
(
rocksdb
::
DB
::
Properties
::
kNumFilesAtLevelPrefix
+
std
::
to_string
(
i
),
&
str_val
)
&&
dsn
::
buf2int32
(
str_val
,
cur_level_count
))
{
val
+=
cur_level_count
;
}
}
return
std
::
pair
<
int64_t
,
int64_t
>
(
cnt
,
res
);
}
_pfc_rdb_sst_count
->
set
(
val
);
ddebug_replica
(
"_pfc_rdb_sst_count: {}"
,
val
);
std
::
pair
<
int64_t
,
int64_t
>
pegasus_server_impl
::
statistic_sst_size
()
{
// dir = data_dir()/rdb
return
get_type_file_size
(
::
dsn
::
utils
::
filesystem
::
path_combine
(
data_dir
(),
"rdb"
),
".sst"
);
if
(
_db
->
GetProperty
(
rocksdb
::
DB
::
Properties
::
kTotalSstFilesSize
,
&
str_val
)
&&
dsn
::
buf2uint64
(
str_val
,
val
))
{
static
uint64_t
bytes_per_mb
=
1U
<<
20U
;
_pfc_rdb_sst_size
->
set
(
val
/
bytes_per_mb
);
ddebug_replica
(
"_pfc_rdb_sst_size: {} bytes"
,
val
);
}
uint64_t
block_cache_hit
=
_statistics
->
getTickerCount
(
rocksdb
::
BLOCK_CACHE_HIT
);
_pfc_rdb_block_cache_hit_count
->
set
(
block_cache_hit
);
ddebug_replica
(
"_pfc_rdb_block_cache_hit_count: {}"
,
block_cache_hit
);
uint64_t
block_cache_miss
=
_statistics
->
getTickerCount
(
rocksdb
::
BLOCK_CACHE_MISS
);
uint64_t
block_cache_total
=
block_cache_hit
+
block_cache_miss
;
_pfc_rdb_block_cache_total_count
->
set
(
block_cache_total
);
ddebug_replica
(
"_pfc_rdb_block_cache_total_count: {}"
,
block_cache_total
);
if
(
_db
->
GetProperty
(
rocksdb
::
DB
::
Properties
::
kEstimateTableReadersMem
,
&
str_val
)
&&
dsn
::
buf2uint64
(
str_val
,
val
))
{
_pfc_rdb_index_and_filter_blocks_mem_usage
->
set
(
val
);
ddebug_replica
(
"_pfc_rdb_index_and_filter_blocks_mem_usage: {} bytes"
,
val
);
}
if
(
_db
->
GetProperty
(
rocksdb
::
DB
::
Properties
::
kCurSizeAllMemTables
,
&
str_val
)
&&
dsn
::
buf2uint64
(
str_val
,
val
))
{
_pfc_rdb_memtable_mem_usage
->
set
(
val
);
ddebug_replica
(
"_pfc_rdb_memtable_mem_usage: {} bytes"
,
val
);
}
}
void
pegasus_server_impl
::
updat
ing_rocksdb_sstsize
()
void
pegasus_server_impl
::
updat
e_server_rocksdb_statistics
()
{
std
::
pair
<
int64_t
,
int64_t
>
sst_size
=
statistic_sst_size
();
if
(
sst_size
.
first
==
-
1
)
{
dwarn
(
"%s: statistic sst file size failed"
,
replica_name
());
}
else
{
int64_t
sst_size_mb
=
sst_size
.
second
/
1048576
;
ddebug
(
"%s: statistic sst file size succeed, sst_count = %"
PRId64
", sst_size = %"
PRId64
"(%"
PRId64
"MB)"
,
replica_name
(),
sst_size
.
first
,
sst_size
.
second
,
sst_size_mb
);
_pfc_sst_count
->
set
(
sst_size
.
first
);
_pfc_sst_size
->
set
(
sst_size_mb
);
}
uint64_t
val
=
_block_cache
->
GetUsage
();
_pfc_rdb_block_cache_mem_usage
->
set
(
val
);
ddebug_f
(
"_pfc_rdb_block_cache_mem_usage: {} bytes"
,
val
);
}
std
::
pair
<
std
::
string
,
bool
>
...
...
@@ -2438,8 +2496,8 @@ uint64_t pegasus_server_impl::do_manual_compact(const rocksdb::CompactRangeOptio
status
.
ToString
().
c_str
(),
dsn_now_ms
()
-
start_time
);
// update
size
immediately
updat
ing_rocksdb_sstsize
();
// update
rocksdb statistics
immediately
updat
e_replica_rocksdb_statistics
();
return
_db
->
GetLastManualCompactFinishTime
();
}
...
...
src/server/pegasus_server_impl.h
浏览文件 @
9e58867d
...
...
@@ -6,6 +6,7 @@
#include <vector>
#include <rocksdb/db.h>
#include <rocksdb/table.h>
#include <rocksdb/listener.h>
#include <dsn/perf_counter/perf_counter_wrapper.h>
#include <dsn/dist/replication/replication.codes.h>
...
...
@@ -198,10 +199,9 @@ private:
const
::
dsn
::
blob
&
filter_pattern
,
const
::
dsn
::
blob
&
value
);
// statistic the sst file info for this replica. return (-1,-1) if failed.
std
::
pair
<
int64_t
,
int64_t
>
statistic_sst_size
();
void
update_replica_rocksdb_statistics
();
void
updating_rocksdb_sstsize
();
static
void
update_server_rocksdb_statistics
();
// get the absolute path of restore directory and the flag whether force restore from env
// return
...
...
@@ -250,12 +250,15 @@ private:
uint64_t
_abnormal_multi_get_iterate_count_threshold
;
std
::
shared_ptr
<
KeyWithTTLCompactionFilterFactory
>
_key_ttl_compaction_filter_factory
;
std
::
shared_ptr
<
rocksdb
::
Statistics
>
_statistics
;
rocksdb
::
BlockBasedTableOptions
_tbl_opts
;
rocksdb
::
Options
_db_opts
;
rocksdb
::
WriteOptions
_wt_opts
;
rocksdb
::
ReadOptions
_rd_opts
;
std
::
string
_usage_scenario
;
rocksdb
::
DB
*
_db
;
static
std
::
shared_ptr
<
rocksdb
::
Cache
>
_block_cache
;
volatile
bool
_is_open
;
uint32_t
_value_schema_version
;
std
::
atomic
<
int64_t
>
_last_durable_decree
;
...
...
@@ -270,8 +273,9 @@ private:
pegasus_context_cache
_context_cache
;
::
dsn
::
task_ptr
_updating_rocksdb_sstsize_timer_task
;
uint32_t
_updating_rocksdb_sstsize_interval_seconds
;
std
::
chrono
::
seconds
_update_rdb_stat_interval
;
::
dsn
::
task_ptr
_update_replica_rdb_stat
;
static
::
dsn
::
task_ptr
_update_server_rdb_stat
;
pegasus_manual_compact_service
_manual_compact_svc
;
...
...
@@ -289,8 +293,17 @@ private:
::
dsn
::
perf_counter_wrapper
_pfc_recent_expire_count
;
::
dsn
::
perf_counter_wrapper
_pfc_recent_filter_count
;
::
dsn
::
perf_counter_wrapper
_pfc_recent_abnormal_count
;
::
dsn
::
perf_counter_wrapper
_pfc_sst_count
;
::
dsn
::
perf_counter_wrapper
_pfc_sst_size
;
// rocksdb internal statistics
// server level
static
::
dsn
::
perf_counter_wrapper
_pfc_rdb_block_cache_mem_usage
;
// replica level
::
dsn
::
perf_counter_wrapper
_pfc_rdb_sst_count
;
::
dsn
::
perf_counter_wrapper
_pfc_rdb_sst_size
;
::
dsn
::
perf_counter_wrapper
_pfc_rdb_block_cache_hit_count
;
::
dsn
::
perf_counter_wrapper
_pfc_rdb_block_cache_total_count
;
::
dsn
::
perf_counter_wrapper
_pfc_rdb_index_and_filter_blocks_mem_usage
;
::
dsn
::
perf_counter_wrapper
_pfc_rdb_memtable_mem_usage
;
};
}
// namespace server
...
...
src/shell/command_helper.h
浏览文件 @
9e58867d
...
...
@@ -368,40 +368,28 @@ inline bool parse_app_pegasus_perf_counter_name(const std::string &name,
struct
row_data
{
std
::
string
row_name
;
double
get_qps
;
double
multi_get_qps
;
double
put_qps
;
double
multi_put_qps
;
double
remove_qps
;
double
multi_remove_qps
;
double
incr_qps
;
double
check_and_set_qps
;
double
check_and_mutate_qps
;
double
scan_qps
;
double
recent_expire_count
;
double
recent_filter_count
;
double
recent_abnormal_count
;
double
storage_mb
;
double
storage_count
;
row_data
()
:
get_qps
(
0
),
multi_get_qps
(
0
),
put_qps
(
0
),
multi_put_qps
(
0
),
remove_qps
(
0
),
multi_remove_qps
(
0
),
incr_qps
(
0
),
check_and_set_qps
(
0
),
check_and_mutate_qps
(
0
),
scan_qps
(
0
),
recent_expire_count
(
0
),
recent_filter_count
(
0
),
recent_abnormal_count
(
0
),
storage_mb
(
0
),
storage_count
(
0
)
{
}
double
get_qps
=
0
;
double
multi_get_qps
=
0
;
double
put_qps
=
0
;
double
multi_put_qps
=
0
;
double
remove_qps
=
0
;
double
multi_remove_qps
=
0
;
double
incr_qps
=
0
;
double
check_and_set_qps
=
0
;
double
check_and_mutate_qps
=
0
;
double
scan_qps
=
0
;
double
recent_expire_count
=
0
;
double
recent_filter_count
=
0
;
double
recent_abnormal_count
=
0
;
double
storage_mb
=
0
;
double
storage_count
=
0
;
double
rdb_block_cache_hit_count
=
0
;
double
rdb_block_cache_total_count
=
0
;
double
rdb_block_cache_mem_usage
=
0
;
double
rdb_index_and_filter_blocks_mem_usage
=
0
;
double
rdb_memtable_mem_usage
=
0
;
};
inline
bool
update_app_pegasus_perf_counter
(
row_data
&
row
,
const
std
::
string
&
counter_name
,
double
value
)
{
...
...
@@ -435,6 +423,16 @@ update_app_pegasus_perf_counter(row_data &row, const std::string &counter_name,
row
.
storage_mb
+=
value
;
else
if
(
counter_name
==
"disk.storage.sst.count"
)
row
.
storage_count
+=
value
;
else
if
(
counter_name
==
"rdb.block_cache.hit_count"
)
row
.
rdb_block_cache_hit_count
+=
value
;
else
if
(
counter_name
==
"rdb.block_cache.total_count"
)
row
.
rdb_block_cache_total_count
+=
value
;
else
if
(
counter_name
==
"rdb.block_cache.memory_usage"
)
row
.
rdb_block_cache_mem_usage
+=
value
;
else
if
(
counter_name
==
"rdb.index_and_filter_blocks.memory_usage"
)
row
.
rdb_index_and_filter_blocks_mem_usage
+=
value
;
else
if
(
counter_name
==
"rdb.memtable.memory_usage"
)
row
.
rdb_memtable_mem_usage
+=
value
;
else
return
false
;
return
true
;
...
...
src/shell/commands.h
浏览文件 @
9e58867d
...
...
@@ -3652,7 +3652,7 @@ inline bool app_stat(command_executor *e, shell_context *sc, arguments args)
}
std
::
ostream
out
(
buf
);
size_t
w
=
10
;
s
tatic
const
s
ize_t
w
=
10
;
size_t
first_column_width
=
w
;
if
(
app_name
.
empty
())
{
for
(
row_data
&
row
:
rows
)
{
...
...
@@ -3670,7 +3670,8 @@ inline bool app_stat(command_executor *e, shell_context *sc, arguments args)
if
(
!
only_qps
)
{
out
<<
std
::
setw
(
w
)
<<
std
::
right
<<
"expired"
<<
std
::
setw
(
w
)
<<
std
::
right
<<
"filtered"
<<
std
::
setw
(
w
)
<<
std
::
right
<<
"abnormal"
<<
std
::
setw
(
w
)
<<
std
::
right
<<
"file_mb"
<<
std
::
setw
(
w
)
<<
std
::
right
<<
"file_num"
;
<<
std
::
setw
(
w
)
<<
std
::
right
<<
"file_num"
<<
std
::
setw
(
w
)
<<
std
::
right
<<
"hit_rate"
<<
std
::
setw
(
w
)
<<
std
::
right
<<
"rdbmem_mb"
;
}
out
<<
std
::
endl
;
rows
.
resize
(
rows
.
size
()
+
1
);
...
...
@@ -3692,6 +3693,11 @@ inline bool app_stat(command_executor *e, shell_context *sc, arguments args)
sum
.
recent_abnormal_count
+=
row
.
recent_abnormal_count
;
sum
.
storage_mb
+=
row
.
storage_mb
;
sum
.
storage_count
+=
row
.
storage_count
;
sum
.
rdb_block_cache_hit_count
+=
row
.
rdb_block_cache_hit_count
;
sum
.
rdb_block_cache_total_count
+=
row
.
rdb_block_cache_total_count
;
sum
.
rdb_block_cache_mem_usage
+=
row
.
rdb_block_cache_mem_usage
;
sum
.
rdb_index_and_filter_blocks_mem_usage
+=
row
.
rdb_index_and_filter_blocks_mem_usage
;
sum
.
rdb_memtable_mem_usage
+=
row
.
rdb_memtable_mem_usage
;
}
#define PRINT_QPS(field) \
do { \
...
...
@@ -3714,11 +3720,19 @@ inline bool app_stat(command_executor *e, shell_context *sc, arguments args)
PRINT_QPS
(
check_and_mutate_qps
);
PRINT_QPS
(
scan_qps
);
if
(
!
only_qps
)
{
double
block_cache_hit_rate
=
abs
(
row
.
rdb_block_cache_total_count
)
<
1e-6
?
0.0
:
row
.
rdb_block_cache_hit_count
/
row
.
rdb_block_cache_total_count
;
out
<<
std
::
setw
(
w
)
<<
std
::
right
<<
(
int64_t
)
row
.
recent_expire_count
<<
std
::
setw
(
w
)
<<
std
::
right
<<
(
int64_t
)
row
.
recent_filter_count
<<
std
::
setw
(
w
)
<<
std
::
right
<<
(
int64_t
)
row
.
recent_abnormal_count
<<
std
::
setw
(
w
)
<<
std
::
right
<<
(
int64_t
)
row
.
storage_mb
<<
std
::
setw
(
w
)
<<
std
::
right
<<
(
int64_t
)
row
.
storage_count
;
<<
(
int64_t
)
row
.
storage_count
<<
std
::
setw
(
w
)
<<
std
::
right
<<
block_cache_hit_rate
<<
std
::
setw
(
w
)
<<
std
::
right
<<
(
row
.
rdb_block_cache_mem_usage
+
row
.
rdb_index_and_filter_blocks_mem_usage
+
row
.
rdb_memtable_mem_usage
)
/
(
1
<<
20U
);
}
out
<<
std
::
endl
;
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录