未验证 提交 3b96b264 编写于 作者: S Smilencer 提交者: GitHub

feat(hotkey): build a fundamental framework of hotkey detection (#603)

上级 5a354292
Subproject commit 69102a786f3b888155bc18b8b6c58031c7d2fd98
Subproject commit ccecd5996a7b5c5b3281860186b30048a2ee4763
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "hotkey_collector.h"
namespace pegasus {
namespace server {
// TODO: (Tangyanzhao) implement these functions
void hotkey_collector::handle_rpc(const dsn::replication::detect_hotkey_request &req,
dsn::replication::detect_hotkey_response &resp)
{
}
void hotkey_collector::capture_raw_key(const dsn::blob &raw_key, int64_t weight)
{
// TODO: (Tangyanzhao) Add a judgment sentence to check if it is a raw key
}
void hotkey_collector::capture_hash_key(const dsn::blob &hash_key, int64_t weight) {}
} // namespace server
} // namespace pegasus
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <dsn/utility/string_view.h>
#include <dsn/dist/replication/replication_types.h>
namespace pegasus {
namespace server {
// hotkey_collector is responsible to find the hot keys after the partition
// was detected to be hot. The two types of hotkey, READ & WRITE, are detected
// separately.
//
// +--------------------+ +----------------------------------------------------+
// | Replcia server | | Hotkey collector |
// | | | +-----------------------------------------------+ |
// | +----------------+ | | | Corase capture | |
// | | | |--> | +----------+ | |
// | | RPC received | || | | | Data | | |
// | | | || | | +-----+----+ | |
// | +-------+--------+ || | | | | |
// | | || | | +---------------+----v--+-------+---------+ | |
// | v || | | | |Hot | | | | | |
// | +-------+--------+ || | | |Bucket |Bucket |Bucket |Bucket |Bucket | | |
// | | Replication | || | | +-----------+-----------------------------+ | |
// | | (only on the | || | | | | |
// | | write path)) | || | +--------------|--------------------------------+ |
// | +-------+--------+ || | +--v---+ |
// | | || | | Data | |
// | v || | +------+ |
// | +-------+--------+ || | +-----|-------+-------------+ |
// | | | || | +------|-------------|-------------|---------+ |
// | | Capture data ---| | | Fine |capture | | | |
// | | | | | | | | | | |
// | +-------+--------+ | | | +----v----+ +----v----+ +----v----+ | |
// | | | | | | queue | | queue | | queue | | |
// | v | | | +----+----+ +----+----+ +----+----+ | |
// | +-------+--------+ | | | | | | | |
// | | | | | | +----v-------------v-------------v------+ | |
// | | Place data | | | | | Analsis pool | | |
// | | to the disk | | | | +-----------------|---------------------+ | |
// | | | | | +-------------------|------------------------+ |
// | +----------------+ | | v |
// | | | Hotkey |
// +--------------------+ +----------------------------------------------------+
class hotkey_collector
{
public:
// TODO: (Tangyanzhao) capture_*_key should be consistent with hotspot detection
// weight: calculate the weight according to the specific situation
void capture_raw_key(const dsn::blob &raw_key, int64_t weight);
void capture_hash_key(const dsn::blob &hash_key, int64_t weight);
void handle_rpc(const dsn::replication::detect_hotkey_request &req,
/*out*/ dsn::replication::detect_hotkey_response &resp);
};
} // namespace server
} // namespace pegasus
......@@ -22,6 +22,7 @@
#include "capacity_unit_calculator.h"
#include "pegasus_server_write.h"
#include "meta_store.h"
#include "hotkey_collector.h"
using namespace dsn::literals::chrono_literals;
......@@ -2798,5 +2799,28 @@ void pegasus_server_impl::set_ingestion_status(dsn::replication::ingestion_statu
_ingestion_status = status;
}
void pegasus_server_impl::on_detect_hotkey(const dsn::replication::detect_hotkey_request &req,
dsn::replication::detect_hotkey_response &resp)
{
if (dsn_unlikely(req.action != dsn::replication::detect_action::START &&
req.action != dsn::replication::detect_action::STOP)) {
resp.err = dsn::ERR_INVALID_PARAMETERS;
resp.__set_err_hint("invalid detect_action");
return;
}
if (dsn_unlikely(req.type != dsn::replication::hotkey_type::READ &&
req.type != dsn::replication::hotkey_type::WRITE)) {
resp.err = dsn::ERR_INVALID_PARAMETERS;
resp.__set_err_hint("invalid hotkey_type");
return;
}
auto collector = req.type == dsn::replication::hotkey_type::READ ? _read_hotkey_collector
: _write_hotkey_collector;
collector->handle_rpc(req, resp);
}
} // namespace server
} // namespace pegasus
......@@ -28,6 +28,7 @@ namespace server {
class meta_store;
class capacity_unit_calculator;
class pegasus_server_write;
class hotkey_collector;
class pegasus_server_impl : public pegasus_read_service
{
......@@ -318,6 +319,9 @@ private:
::dsn::error_code flush_all_family_columns(bool wait);
void on_detect_hotkey(const dsn::replication::detect_hotkey_request &req,
dsn::replication::detect_hotkey_response &resp) override;
private:
static const std::chrono::seconds kServerStatUpdateTimeSec;
static const std::string COMPRESSION_HEADER;
......@@ -382,6 +386,9 @@ private:
dsn::task_tracker _tracker;
std::shared_ptr<hotkey_collector> _read_hotkey_collector;
std::shared_ptr<hotkey_collector> _write_hotkey_collector;
// perf counters
::dsn::perf_counter_wrapper _pfc_get_qps;
::dsn::perf_counter_wrapper _pfc_multi_get_qps;
......
......@@ -12,6 +12,7 @@
#include "meta_store.h"
#include "pegasus_event_listener.h"
#include "pegasus_server_write.h"
#include "hotkey_collector.h"
namespace pegasus {
namespace server {
......@@ -42,6 +43,9 @@ pegasus_server_impl::pegasus_server_impl(dsn::replication::replica *r)
_primary_address = dsn::rpc_address(dsn_primary_address()).to_string();
_gpid = get_gpid();
_read_hotkey_collector = std::make_shared<hotkey_collector>();
_write_hotkey_collector = std::make_shared<hotkey_collector>();
_verbose_log = dsn_config_get_value_bool("pegasus.server",
"rocksdb_verbose_log",
false,
......
......@@ -10,6 +10,7 @@ set(MY_PROJ_SRC "../pegasus_server_impl.cpp"
"../pegasus_mutation_duplicator.cpp"
"../hotspot_partition_calculator.cpp"
"../meta_store.cpp"
"../hotkey_collector.cpp"
)
set(MY_SRC_SEARCH_MODE "GLOB")
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册