未验证 提交 f2d850bf 编写于 作者: S Smilencer 提交者: GitHub

feat(hotkey): capture data part2 - declare coarse collector (#624)

上级 ab285d77
......@@ -19,6 +19,8 @@
#include <dsn/dist/replication/replication_enums.h>
#include <dsn/utility/smart_pointers.h>
#include <dsn/utility/flags.h>
#include <boost/functional/hash.hpp>
#include "base/pegasus_key_schema.h"
#include <dsn/dist/fmt_logging.h>
#include <dsn/utility/flags.h>
......@@ -26,18 +28,88 @@
namespace pegasus {
namespace server {
DSN_DEFINE_int32("pegasus.server",
coarse_data_variance_threshold,
3,
"the threshold of variance calculate to find the outliers");
DSN_DEFINE_validator(coarse_data_variance_threshold,
[](int32_t threshold) -> bool { return (threshold >= 0); });
// TODO: (Tangyanzhao) add a limit to avoid changing when detecting
DSN_DEFINE_int32("pegasus.server",
data_capture_hash_bucket_num,
37,
"the number of data capture hash buckets");
DSN_DEFINE_validator(data_capture_hash_bucket_num, [](int32_t bucket_num) -> bool {
if (bucket_num < 3) {
return false;
}
// data_capture_hash_bucket_num should be a prime number
for (int i = 2; i <= bucket_num / i; i++) {
if (bucket_num % i == 0) {
return false;
}
}
return true;
});
DSN_DEFINE_int32(
"pegasus.server",
max_seconds_to_detect_hotkey,
150,
"the max time (in seconds) allowed to capture hotkey, will stop if hotkey's not found");
// 68–95–99.7 rule, same algorithm as hotspot_partition_calculator::stat_histories_analyse
static bool
find_outlier_index(const std::vector<uint64_t> &captured_keys, int threshold, int &hot_index)
{
dcheck_gt(captured_keys.size(), 2);
int data_size = captured_keys.size();
// empirical rule to calculate hot point of each partition
// same algorithm as hotspot_partition_calculator::stat_histories_analyse
double table_captured_key_sum = 0;
int hot_value = 0;
for (int i = 0; i < data_size; i++) {
table_captured_key_sum += captured_keys[i];
if (captured_keys[i] > hot_value) {
hot_index = i;
hot_value = captured_keys[i];
}
}
// TODO: (Tangyanzhao) increase a judgment of table_captured_key_sum
double captured_keys_avg_count =
(table_captured_key_sum - captured_keys[hot_index]) / (data_size - 1);
double standard_deviation = 0;
for (int i = 0; i < data_size; i++) {
if (i != hot_index) {
standard_deviation += pow((captured_keys[i] - captured_keys_avg_count), 2);
}
}
standard_deviation = sqrt(standard_deviation / (data_size - 2));
double hot_point = (hot_value - captured_keys_avg_count) / standard_deviation;
if (hot_point >= threshold) {
return true;
} else {
hot_index = -1;
return false;
}
}
// TODO: (Tangyanzhao) replace it to xxhash
static int get_bucket_id(dsn::string_view data)
{
size_t hash_value = boost::hash_range(data.begin(), data.end());
return static_cast<int>(hash_value % FLAGS_data_capture_hash_bucket_num);
}
hotkey_collector::hotkey_collector(dsn::replication::hotkey_type::type hotkey_type,
dsn::replication::replica_base *r_base)
: replica_base(r_base),
_state(hotkey_collector_state::STOPPED),
_hotkey_type(hotkey_type),
_internal_collector(std::make_shared<hotkey_empty_data_collector>()),
_internal_collector(std::make_shared<hotkey_empty_data_collector>(this)),
_collector_start_time_second(0)
{
}
......@@ -78,7 +150,11 @@ void hotkey_collector::analyse_data()
switch (_state.load()) {
case hotkey_collector_state::COARSE_DETECTING:
if (!terminate_if_timeout()) {
_internal_collector->analyse_data();
_internal_collector->analyse_data(_result);
if (_result.coarse_bucket_index != -1) {
// TODO: (Tangyanzhao) reset _internal_collector to hotkey_fine_data_collector
_state.store(hotkey_collector_state::FINE_DETECTING);
}
}
return;
default:
......@@ -108,7 +184,7 @@ void hotkey_collector::on_start_detect(dsn::replication::detect_hotkey_response
return;
case hotkey_collector_state::STOPPED:
_collector_start_time_second = dsn_now_s();
// TODO: (Tangyanzhao) start coarse detecting
_internal_collector.reset(new hotkey_coarse_data_collector(this));
_state.store(hotkey_collector_state::COARSE_DETECTING);
resp.err = dsn::ERR_OK;
hint = fmt::format("starting to detect {} hotkey", dsn::enum_to_string(_hotkey_type));
......@@ -149,5 +225,31 @@ bool hotkey_collector::terminate_if_timeout()
return false;
}
hotkey_coarse_data_collector::hotkey_coarse_data_collector(replica_base *base)
: internal_collector_base(base), _hash_buckets(FLAGS_data_capture_hash_bucket_num)
{
for (auto &bucket : _hash_buckets) {
bucket.store(0);
}
}
void hotkey_coarse_data_collector::capture_data(const dsn::blob &hash_key, uint64_t weight)
{
_hash_buckets[get_bucket_id(hash_key)].fetch_add(weight);
}
void hotkey_coarse_data_collector::analyse_data(detect_hotkey_result &result)
{
std::vector<uint64_t> buckets(FLAGS_data_capture_hash_bucket_num);
for (int i = 0; i < buckets.size(); i++) {
buckets[i] = _hash_buckets[i].load();
_hash_buckets[i].store(0);
}
if (!find_outlier_index(
buckets, FLAGS_coarse_data_variance_threshold, result.coarse_bucket_index)) {
result.coarse_bucket_index = -1;
}
}
} // namespace server
} // namespace pegasus
......@@ -26,6 +26,11 @@ namespace server {
class internal_collector_base;
struct detect_hotkey_result
{
int coarse_bucket_index = -1;
};
// hotkey_collector is responsible to find the hot keys after the partition
// was detected to be hot. The two types of hotkey, READ & WRITE, are detected
// separately.
......@@ -82,25 +87,40 @@ private:
void terminate();
bool terminate_if_timeout();
detect_hotkey_result _result;
std::atomic<hotkey_collector_state> _state;
const dsn::replication::hotkey_type::type _hotkey_type;
std::shared_ptr<internal_collector_base> _internal_collector;
uint64_t _collector_start_time_second;
};
class internal_collector_base
class internal_collector_base : public dsn::replication::replica_base
{
public:
explicit internal_collector_base(replica_base *base) : replica_base(base){};
virtual void capture_data(const dsn::blob &hash_key, uint64_t weight) = 0;
virtual void analyse_data() = 0;
virtual void analyse_data(detect_hotkey_result &result) = 0;
};
// used in hotkey_collector_state::STOPPED and hotkey_collector_state::FINISHED, avoid null pointers
class hotkey_empty_data_collector : public internal_collector_base
{
public:
void capture_data(const dsn::blob &hash_key, uint64_t size) {}
void analyse_data() {}
explicit hotkey_empty_data_collector(replica_base *base) : internal_collector_base(base) {}
void capture_data(const dsn::blob &hash_key, uint64_t weight) override {}
void analyse_data(detect_hotkey_result &result) override {}
};
// TODO: (Tangyanzhao) add a unit test of hotkey_coarse_data_collector
class hotkey_coarse_data_collector : public internal_collector_base
{
public:
explicit hotkey_coarse_data_collector(replica_base *base);
void capture_data(const dsn::blob &hash_key, uint64_t weight) override;
void analyse_data(detect_hotkey_result &result) override;
private:
std::vector<std::atomic<uint64_t>> _hash_buckets;
};
} // namespace server
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册