Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
apache
Incubator Pegasus
提交
86979768
Incubator Pegasus
项目概览
apache
/
Incubator Pegasus
通知
9
Star
5
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
DevOps
流水线
流水线任务
计划
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
Incubator Pegasus
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
DevOps
DevOps
流水线
流水线任务
计划
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
流水线任务
提交
Issue看板
前往新版Gitcode,体验更适合开发者的 AI 搜索 >>
未验证
提交
86979768
编写于
9月 17, 2020
作者:
S
Smilencer
提交者:
GitHub
9月 17, 2020
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
refactor: split read/write hotspot of hotspot_partition_calculator (#592)
上级
412492d3
变更
6
隐藏空白更改
内联
并排
Showing
6 changed file
with
202 addition
and
85 deletion
+202
-85
src/server/hotspot_partition_calculator.cpp
src/server/hotspot_partition_calculator.cpp
+66
-41
src/server/hotspot_partition_calculator.h
src/server/hotspot_partition_calculator.h
+17
-5
src/server/hotspot_partition_data.h
src/server/hotspot_partition_data.h
+0
-20
src/server/hotspot_partition_stat.h
src/server/hotspot_partition_stat.h
+43
-0
src/server/test/hotspot_partition_test.cpp
src/server/test/hotspot_partition_test.cpp
+68
-19
src/shell/command_helper.h
src/shell/command_helper.h
+8
-0
未找到文件。
src/server/hotspot_partition_calculator.cpp
浏览文件 @
86979768
...
...
@@ -20,6 +20,7 @@
#include <algorithm>
#include <math.h>
#include <dsn/dist/fmt_logging.h>
#include <rrdb/rrdb_types.h>
#include <dsn/utility/flags.h>
#include <dsn/tool-api/rpc_address.h>
#include <dsn/tool-api/group_address.h>
...
...
@@ -41,67 +42,91 @@ DSN_DEFINE_int64("pegasus.collector",
"eliminate outdated historical "
"data"
);
void
hotspot_partition_calculator
::
data_aggregate
(
const
std
::
vector
<
row_data
>
&
partitions
)
void
hotspot_partition_calculator
::
data_aggregate
(
const
std
::
vector
<
row_data
>
&
partition
_stat
s
)
{
while
(
_partition
_stat_histories
.
size
()
>
FLAGS_max_hotspot_store_size
-
1
)
{
_partition
_stat_histories
.
pop
();
while
(
_partition
s_stat_histories
.
size
()
>=
FLAGS_max_hotspot_store_size
)
{
_partition
s_stat_histories
.
pop_front
();
}
std
::
vector
<
hotspot_partition_data
>
temp
(
partitions
.
size
());
// TODO refactor the data structure
for
(
int
i
=
0
;
i
<
partitions
.
size
();
i
++
)
{
temp
[
i
]
=
std
::
move
(
hotspot_partition_data
(
partitions
[
i
]));
std
::
vector
<
hotspot_partition_stat
>
temp
;
for
(
const
auto
&
partition_stat
:
partition_stats
)
{
temp
.
emplace_back
(
hotspot_partition_stat
(
partition_stat
));
}
_partition
_stat_histories
.
emplace
(
temp
);
_partition
s_stat_histories
.
emplace_back
(
temp
);
}
void
hotspot_partition_calculator
::
init_perf_counter
(
int
partition_count
)
{
std
::
string
counter_name
;
std
::
string
counter_desc
;
for
(
int
i
=
0
;
i
<
partition_count
;
i
++
)
{
string
partition_desc
=
_app_name
+
'.'
+
std
::
to_string
(
i
);
counter_name
=
fmt
::
format
(
"app.stat.hotspots@{}"
,
partition_desc
);
counter_desc
=
fmt
::
format
(
"statistic the hotspots of app {}"
,
partition_desc
);
_hot_points
[
i
].
init_app_counter
(
"app.pegasus"
,
counter_name
.
c_str
(),
COUNTER_TYPE_NUMBER
,
counter_desc
.
c_str
());
for
(
int
data_type
=
0
;
data_type
<=
1
;
data_type
++
)
{
for
(
int
i
=
0
;
i
<
partition_count
;
i
++
)
{
string
partition_desc
=
_app_name
+
'.'
+
(
data_type
==
partition_qps_type
::
WRITE_HOTSPOT_DATA
?
"write."
:
"read."
)
+
std
::
to_string
(
i
);
std
::
string
counter_name
=
fmt
::
format
(
"app.stat.hotspots@{}"
,
partition_desc
);
std
::
string
counter_desc
=
fmt
::
format
(
"statistic the hotspots of app {}"
,
partition_desc
);
_hot_points
[
i
][
data_type
].
init_app_counter
(
"app.pegasus"
,
counter_name
.
c_str
(),
COUNTER_TYPE_NUMBER
,
counter_desc
.
c_str
());
}
}
}
void
hotspot_partition_calculator
::
data_analyse
()
void
hotspot_partition_calculator
::
stat_histories_analyse
(
int
data_type
,
std
::
vector
<
int
>
&
hot_points
)
{
dassert
(
_partition_stat_histories
.
back
().
size
()
==
_hot_points
.
size
(),
"partition counts error, please check"
);
std
::
vector
<
double
>
data_samples
;
data_samples
.
reserve
(
_partition_stat_histories
.
size
()
*
_hot_points
.
size
());
auto
temp_data
=
_partition_stat_histories
;
double
table_qps_sum
=
0
,
standard_deviation
=
0
,
table_qps_avg
=
0
;
int
sample_count
=
0
;
while
(
!
temp_data
.
empty
())
{
for
(
const
auto
&
partition_data
:
temp_data
.
front
())
{
if
(
partition_data
.
total_qps
-
1.00
>
0
)
{
data_samples
.
push_back
(
partition_data
.
total_qps
);
table_qps_sum
+=
partition_data
.
total_qps
;
sample_count
++
;
}
for
(
const
auto
&
one_partition_stat_histories
:
_partitions_stat_histories
)
{
for
(
const
auto
&
partition_stat
:
one_partition_stat_histories
)
{
table_qps_sum
+=
partition_stat
.
total_qps
[
data_type
];
sample_count
++
;
}
temp_data
.
pop
();
}
if
(
sample_count
==
0
)
{
ddebug
(
"_partition
_stat_histories size == 0
"
);
if
(
sample_count
<=
1
)
{
ddebug
(
"_partition
s_stat_histories size <= 1, not enough data for calculation
"
);
return
;
}
table_qps_avg
=
table_qps_sum
/
sample_count
;
for
(
const
auto
&
data_sample
:
data_samples
)
{
standard_deviation
+=
pow
((
data_sample
-
table_qps_avg
),
2
);
for
(
const
auto
&
one_partition_stat_histories
:
_partitions_stat_histories
)
{
for
(
const
auto
&
partition_stat
:
one_partition_stat_histories
)
{
standard_deviation
+=
pow
((
partition_stat
.
total_qps
[
data_type
]
-
table_qps_avg
),
2
);
}
}
standard_deviation
=
sqrt
(
standard_deviation
/
sample_count
);
const
auto
&
anly_data
=
_partition_stat_histories
.
back
();
for
(
int
i
=
0
;
i
<
_hot_points
.
size
();
i
++
)
{
double
hot_point
=
(
anly_data
[
i
].
total_qps
-
table_qps_avg
)
/
standard_deviation
;
// perf_counter->set can only be unsigned __int64
standard_deviation
=
sqrt
(
standard_deviation
/
(
sample_count
-
1
));
const
auto
&
anly_data
=
_partitions_stat_histories
.
back
();
int
hot_point_size
=
_hot_points
.
size
();
hot_points
.
resize
(
hot_point_size
);
for
(
int
i
=
0
;
i
<
hot_point_size
;
i
++
)
{
double
hot_point
=
0
;
if
(
standard_deviation
!=
0
)
{
hot_point
=
(
anly_data
[
i
].
total_qps
[
data_type
]
-
table_qps_avg
)
/
standard_deviation
;
}
// perf_counter->set can only be unsigned uint64_t
// use ceil to guarantee conversion results
hot_point
=
ceil
(
std
::
max
(
hot_point
,
double
(
0
)));
_hot_points
[
i
]
->
set
(
hot_point
);
hot_points
[
i
]
=
ceil
(
std
::
max
(
hot_point
,
double
(
0
)));
}
}
void
hotspot_partition_calculator
::
update_hot_point
(
int
data_type
,
std
::
vector
<
int
>
&
hot_points
)
{
dcheck_eq
(
_hot_points
.
size
(),
hot_points
.
size
());
int
size
=
hot_points
.
size
();
for
(
int
i
=
0
;
i
<
size
;
i
++
)
{
_hot_points
[
i
][
data_type
].
get
()
->
set
(
hot_points
[
i
]);
}
}
void
hotspot_partition_calculator
::
data_analyse
()
{
dassert
(
_partitions_stat_histories
.
back
().
size
()
==
_hot_points
.
size
(),
"The number of partitions in this table has changed, and hotspot analysis cannot be "
"performed,in %s"
,
_app_name
.
c_str
());
for
(
int
data_type
=
0
;
data_type
<=
1
;
data_type
++
)
{
// data_type 0: READ_HOTSPOT_DATA; 1: WRITE_HOTSPOT_DATA
std
::
vector
<
int
>
hot_points
;
stat_histories_analyse
(
data_type
,
hot_points
);
update_hot_point
(
data_type
,
hot_points
);
}
}
...
...
src/server/hotspot_partition_calculator.h
浏览文件 @
86979768
...
...
@@ -17,13 +17,20 @@
#pragma once
#include "hotspot_partition_
data
.h"
#include "hotspot_partition_
stat
.h"
#include <gtest/gtest_prod.h>
#include <dsn/perf_counter/perf_counter.h>
namespace
pegasus
{
namespace
server
{
// stores the whole histories of all partitions in one table
typedef
std
::
list
<
std
::
vector
<
hotspot_partition_stat
>>
stat_histories
;
// hot_partition_counters c[index_of_partitions][type_of_read(0)/write(1)_stat]
// so if we have n partitions, we will get 2*n hot_partition_counters, to demonstrate both
// read/write hotspot value
typedef
std
::
vector
<
std
::
array
<
dsn
::
perf_counter_wrapper
,
2
>>
hot_partition_counters
;
// hotspot_partition_calculator is used to find the hot partition in a table.
class
hotspot_partition_calculator
{
...
...
@@ -43,15 +50,20 @@ public:
const
dsn
::
apps
::
hotkey_detect_action
::
type
action
);
private:
const
std
::
string
_app_name
;
// empirical rule to calculate hot point of each partition
// ref: https://en.wikipedia.org/wiki/68%E2%80%9395%E2%80%9399.7_rule
void
stat_histories_analyse
(
int
data_type
,
std
::
vector
<
int
>
&
hot_points
);
// set hot_point to corresponding perf_counter
void
update_hot_point
(
int
data_type
,
std
::
vector
<
int
>
&
hot_points
);
const
std
::
string
_app_name
;
void
init_perf_counter
(
int
perf_counter_count
);
// usually a partition with "hot-point value" >= 3 can be considered as a hotspot partition.
std
::
vector
<
dsn
::
perf_counter_wrapper
>
_hot_points
;
hot_partition_counters
_hot_points
;
// saving historical data can improve accuracy
st
d
::
queue
<
std
::
vector
<
hotspot_partition_data
>>
_partition
_stat_histories
;
st
at_histories
_partitions
_stat_histories
;
FRIEND_TEST
(
hotspot_partition_calculator
,
hotspot_partition_policy
)
;
friend
class
hotspot_partition_test
;
};
}
// namespace server
...
...
src/server/hotspot_partition_data.h
已删除
100644 → 0
浏览文件 @
412492d3
// Copyright (c) 2017, Xiaomi, Inc. All rights reserved.
// This source code is licensed under the Apache License Version 2.0, which
// can be found in the LICENSE file in the root directory of this source tree.
#pragma once
#include "shell/command_helper.h"
namespace
pegasus
{
namespace
server
{
struct
hotspot_partition_data
{
hotspot_partition_data
(
const
row_data
&
row
)
:
total_qps
(
row
.
get_total_qps
()){};
hotspot_partition_data
()
{}
double
total_qps
;
};
}
// namespace server
}
// namespace pegasus
src/server/hotspot_partition_stat.h
0 → 100644
浏览文件 @
86979768
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include "shell/command_helper.h"
namespace
pegasus
{
namespace
server
{
enum
partition_qps_type
{
READ_HOTSPOT_DATA
=
0
,
WRITE_HOTSPOT_DATA
};
struct
hotspot_partition_stat
{
hotspot_partition_stat
(
const
row_data
&
row
)
{
total_qps
[
READ_HOTSPOT_DATA
]
=
row
.
get_total_read_qps
();
total_qps
[
WRITE_HOTSPOT_DATA
]
=
row
.
get_total_write_qps
();
}
hotspot_partition_stat
()
{}
double
total_qps
[
2
];
};
}
// namespace server
}
// namespace pegasus
src/server/test/hotspot_partition_test.cpp
浏览文件 @
86979768
...
...
@@ -17,32 +17,81 @@
#include "server/hotspot_partition_calculator.h"
#include "pegasus_server_test_base.h"
#include <gtest/gtest.h>
namespace
pegasus
{
namespace
server
{
TEST
(
hotspot_partition_calculator
,
hotspot_partition_policy
)
class
hotspot_partition_test
:
public
pegasus_server_test_base
{
// TODO: refactor the unit test
std
::
vector
<
row_data
>
test_rows
(
8
);
test_rows
[
0
].
get_qps
=
1000.0
;
test_rows
[
1
].
get_qps
=
1000.0
;
test_rows
[
2
].
get_qps
=
1000.0
;
test_rows
[
3
].
get_qps
=
1000.0
;
test_rows
[
4
].
get_qps
=
1000.0
;
test_rows
[
5
].
get_qps
=
1000.0
;
test_rows
[
6
].
get_qps
=
1000.0
;
test_rows
[
7
].
get_qps
=
5000.0
;
hotspot_partition_calculator
test_hotspot_calculator
(
"TEST"
,
8
);
test_hotspot_calculator
.
data_aggregate
(
test_rows
);
test_hotspot_calculator
.
data_analyse
();
std
::
vector
<
double
>
result
(
8
);
for
(
int
i
=
0
;
i
<
test_hotspot_calculator
.
_hot_points
.
size
();
i
++
)
{
result
[
i
]
=
test_hotspot_calculator
.
_hot_points
[
i
]
->
get_value
();
public:
hotspot_partition_test
()
:
calculator
(
"TEST"
,
8
){};
hotspot_partition_calculator
calculator
;
std
::
vector
<
row_data
>
generate_row_data
()
{
std
::
vector
<
row_data
>
test_rows
;
test_rows
.
resize
(
8
);
for
(
int
i
=
0
;
i
<
8
;
i
++
)
{
test_rows
[
i
].
get_qps
=
1000.0
;
test_rows
[
i
].
put_qps
=
1000.0
;
}
return
test_rows
;
}
std
::
vector
<
double
>
expect_vector
{
0
,
0
,
0
,
0
,
0
,
0
,
0
,
3
};
ASSERT_EQ
(
expect_vector
,
result
);
std
::
vector
<
std
::
vector
<
double
>>
get_calculator_result
(
const
hot_partition_counters
&
counters
)
{
std
::
vector
<
std
::
vector
<
double
>>
result
;
result
.
resize
(
2
);
for
(
int
i
=
0
;
i
<
counters
.
size
();
i
++
)
{
result
[
READ_HOTSPOT_DATA
].
push_back
(
counters
[
i
][
READ_HOTSPOT_DATA
].
get
()
->
get_value
());
result
[
WRITE_HOTSPOT_DATA
].
push_back
(
counters
[
i
][
WRITE_HOTSPOT_DATA
].
get
()
->
get_value
());
}
return
result
;
}
void
test_policy_in_scenarios
(
std
::
vector
<
row_data
>
scenario
,
std
::
vector
<
std
::
vector
<
double
>>
&
expect_result
,
hotspot_partition_calculator
&
calculator
)
{
calculator
.
data_aggregate
(
std
::
move
(
scenario
));
calculator
.
data_analyse
();
std
::
vector
<
std
::
vector
<
double
>>
result
=
get_calculator_result
(
calculator
.
_hot_points
);
ASSERT_EQ
(
result
,
expect_result
);
}
};
TEST_F
(
hotspot_partition_test
,
hotspot_partition_policy
)
{
// Insert normal scenario data to test
std
::
vector
<
row_data
>
test_rows
=
generate_row_data
();
std
::
vector
<
std
::
vector
<
double
>>
expect_vector
=
{{
0
,
0
,
0
,
0
,
0
,
0
,
0
,
0
},
{
0
,
0
,
0
,
0
,
0
,
0
,
0
,
0
}};
test_policy_in_scenarios
(
test_rows
,
expect_vector
,
calculator
);
// Insert hotspot scenario_0 data to test
test_rows
=
generate_row_data
();
const
int
HOT_SCENARIO_0_READ_HOT_PARTITION
=
7
;
const
int
HOT_SCENARIO_0_WRITE_HOT_PARTITION
=
0
;
test_rows
[
HOT_SCENARIO_0_READ_HOT_PARTITION
].
get_qps
=
5000.0
;
test_rows
[
HOT_SCENARIO_0_WRITE_HOT_PARTITION
].
put_qps
=
5000.0
;
expect_vector
=
{{
0
,
0
,
0
,
0
,
0
,
0
,
0
,
4
},
{
4
,
0
,
0
,
0
,
0
,
0
,
0
,
0
}};
test_policy_in_scenarios
(
test_rows
,
expect_vector
,
calculator
);
// Insert hotspot scenario_0 data to test again
test_rows
=
generate_row_data
();
test_rows
[
HOT_SCENARIO_0_READ_HOT_PARTITION
].
get_qps
=
5000.0
;
test_rows
[
HOT_SCENARIO_0_WRITE_HOT_PARTITION
].
put_qps
=
5000.0
;
expect_vector
=
{{
0
,
0
,
0
,
0
,
0
,
0
,
0
,
4
},
{
4
,
0
,
0
,
0
,
0
,
0
,
0
,
0
}};
test_policy_in_scenarios
(
test_rows
,
expect_vector
,
calculator
);
// Insert hotspot scenario_1 data to test again
test_rows
=
generate_row_data
();
const
int
HOT_SCENARIO_1_READ_HOT_PARTITION
=
3
;
const
int
HOT_SCENARIO_1_WRITE_HOT_PARTITION
=
2
;
test_rows
[
HOT_SCENARIO_1_READ_HOT_PARTITION
].
get_qps
=
5000.0
;
test_rows
[
HOT_SCENARIO_1_WRITE_HOT_PARTITION
].
put_qps
=
5000.0
;
expect_vector
=
{{
0
,
0
,
0
,
4
,
0
,
0
,
0
,
0
},
{
0
,
0
,
4
,
0
,
0
,
0
,
0
,
0
}};
test_policy_in_scenarios
(
test_rows
,
expect_vector
,
calculator
);
}
}
// namespace server
...
...
src/shell/command_helper.h
浏览文件 @
86979768
...
...
@@ -549,6 +549,14 @@ struct row_data
double
get_total_cu
()
const
{
return
recent_read_cu
+
recent_write_cu
;
}
double
get_total_read_qps
()
const
{
return
get_qps
+
multi_get_qps
+
scan_qps
;
}
double
get_total_write_qps
()
const
{
return
put_qps
+
remove_qps
+
multi_put_qps
+
multi_remove_qps
+
check_and_set_qps
+
check_and_mutate_qps
;
}
std
::
string
row_name
;
int32_t
app_id
=
0
;
int32_t
partition_count
=
0
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录