未验证 提交 6da3de33 编写于 作者: H HuangWei 提交者: GitHub

shell & bench: use rocksdb::Statistics to calc histogram (#367)

上级 08459be4
......@@ -10,7 +10,6 @@ set(MY_PROJ_SRC "")
# "GLOB" for non-recursive search
set(MY_SRC_SEARCH_MODE "GLOB")
set(MY_PROJ_INC_PATH "../../../rocksdb")
set(MY_PROJ_LIB_PATH "../../../rocksdb/build")
set(MY_PROJ_LIBS
......
......@@ -8,7 +8,7 @@
#include <s2/s2testing.h>
#include <s2/s2cell.h>
#include <monitoring/histogram.h>
#include <rocksdb/statistics.h>
#include <rocksdb/env.h>
#include <dsn/utility/errors.h>
......@@ -79,12 +79,17 @@ int main(int argc, char **argv)
}
}
rocksdb::HistogramImpl latency_histogram;
rocksdb::HistogramImpl result_count_histogram;
enum class histogram_type : uint32_t
{
LATENCY,
RESULT_COUNT
};
auto statistics = rocksdb::CreateDBStatistics();
rocksdb::Env *env = rocksdb::Env::Default();
uint64_t start = env->NowNanos();
std::atomic<uint64_t> count(test_count);
dsn::utils::notify_event get_completed;
// test search_radial by lat & lng
for (int i = 0; i < test_count; ++i) {
S2LatLng latlng(S2Testing::SamplePoint(rect));
......@@ -98,8 +103,10 @@ int main(int argc, char **argv)
pegasus::geo::geo_client::SortType::random,
500,
[&, start_nanos](int error_code, std::list<pegasus::geo::SearchResult> &&results) {
latency_histogram.Add(env->NowNanos() - start_nanos);
result_count_histogram.Add(results.size());
statistics->measureTime(static_cast<uint32_t>(histogram_type::LATENCY),
env->NowNanos() - start_nanos);
statistics->measureTime(static_cast<uint32_t>(histogram_type::RESULT_COUNT),
results.size());
uint64_t left = count.fetch_sub(1);
if (left == 1) {
get_completed.notify();
......@@ -113,9 +120,11 @@ int main(int argc, char **argv)
std::cout << "start time: " << start << ", end time: " << end
<< ", QPS: " << test_count / ((end - start) / 1e9) << std::endl;
std::cout << "latency_histogram: " << std::endl;
std::cout << latency_histogram.ToString() << std::endl;
std::cout << statistics->getHistogramString(static_cast<uint32_t>(histogram_type::LATENCY))
<< std::endl;
std::cout << "result_count_histogram: " << std::endl;
std::cout << result_count_histogram.ToString() << std::endl;
std::cout << statistics->getHistogramString(static_cast<uint32_t>(histogram_type::RESULT_COUNT))
<< std::endl;
return 0;
}
......@@ -12,8 +12,7 @@ set(MY_SRC_SEARCH_MODE "GLOB_RECURSE")
set(MY_PROJ_INC_PATH
"${PEGASUS_PROJECT_DIR}/src/include"
"${PEGASUS_PROJECT_DIR}/src/base"
"${PEGASUS_PROJECT_DIR}/rocksdb")
"${PEGASUS_PROJECT_DIR}/src/base")
set(MY_PROJ_LIB_PATH
"${PEGASUS_PROJECT_DIR}/rocksdb/build")
......
......@@ -14,7 +14,7 @@
#include <rocksdb/db.h>
#include <rocksdb/sst_dump_tool.h>
#include <rocksdb/env.h>
#include <monitoring/histogram.h>
#include <rocksdb/statistics.h>
#include <dsn/cpp/json_helper.h>
#include <dsn/dist/cli/cli.client.h>
#include <dsn/dist/replication/replication_ddl_client.h>
......@@ -98,6 +98,14 @@ private:
dsn::utils::ex_lock_nr _lock;
};
enum class histogram_type
{
HASH_KEY_SIZE,
SORT_KEY_SIZE,
VALUE_SIZE,
ROW_SIZE
};
struct scan_data_context
{
scan_data_operator op;
......@@ -119,10 +127,7 @@ struct scan_data_context
std::atomic_long split_request_count;
std::atomic_bool split_completed;
bool stat_size;
rocksdb::HistogramImpl hash_key_size_histogram;
rocksdb::HistogramImpl sort_key_size_histogram;
rocksdb::HistogramImpl value_size_histogram;
rocksdb::HistogramImpl row_size_histogram;
std::shared_ptr<rocksdb::Statistics> statistics;
int top_count;
top_container top_rows;
bool count_hash_key;
......@@ -137,6 +142,7 @@ struct scan_data_context
pegasus::geo::geo_client *geoclient_,
std::atomic_bool *error_occurred_,
bool stat_size_ = false,
std::shared_ptr<rocksdb::Statistics> statistics_ = nullptr,
int top_count_ = 0,
bool count_hash_key_ = false)
: op(op_),
......@@ -154,6 +160,7 @@ struct scan_data_context
split_request_count(0),
split_completed(false),
stat_size(stat_size_),
statistics(statistics_),
top_count(top_count_),
top_rows(top_count_),
count_hash_key(count_hash_key_),
......@@ -339,18 +346,24 @@ inline void scan_data_next(scan_data_context *context)
break;
case SCAN_COUNT:
context->split_rows++;
if (context->stat_size) {
if (context->stat_size && context->statistics) {
long hash_key_size = hash_key.size();
context->hash_key_size_histogram.Add(hash_key_size);
context->statistics->measureTime(
static_cast<uint32_t>(histogram_type::HASH_KEY_SIZE),
hash_key_size);
long sort_key_size = sort_key.size();
context->sort_key_size_histogram.Add(sort_key_size);
context->statistics->measureTime(
static_cast<uint32_t>(histogram_type::SORT_KEY_SIZE),
sort_key_size);
long value_size = value.size();
context->value_size_histogram.Add(value_size);
context->statistics->measureTime(
static_cast<uint32_t>(histogram_type::VALUE_SIZE), value_size);
long row_size = hash_key_size + sort_key_size + value_size;
context->row_size_histogram.Add(row_size);
context->statistics->measureTime(
static_cast<uint32_t>(histogram_type::ROW_SIZE), row_size);
if (context->top_count > 0) {
context->top_rows.push(
......
......@@ -8,9 +8,8 @@ static void
print_current_scan_state(const std::vector<std::unique_ptr<scan_data_context>> &contexts,
const std::string &stop_desc,
bool stat_size,
std::shared_ptr<rocksdb::Statistics> statistics,
bool count_hash_key);
static void print_simple_histogram(const std::string &name,
const rocksdb::HistogramImpl &histogram);
void escape_sds_argv(int argc, sds *argv);
int mutation_check(int args_count, sds *args);
......@@ -2291,6 +2290,7 @@ bool count_data(command_executor *e, shell_context *sc, arguments args)
std::atomic_bool error_occurred(false);
std::vector<std::unique_ptr<scan_data_context>> contexts;
std::shared_ptr<rocksdb::Statistics> statistics = rocksdb::CreateDBStatistics();
for (int i = 0; i < split_count; i++) {
scan_data_context *context = new scan_data_context(SCAN_COUNT,
i,
......@@ -2301,6 +2301,7 @@ bool count_data(command_executor *e, shell_context *sc, arguments args)
nullptr,
&error_occurred,
stat_size,
statistics,
top_count,
diff_hash_key);
context->set_sort_key_filter(sort_key_filter_type, sort_key_filter_pattern);
......@@ -2364,7 +2365,7 @@ bool count_data(command_executor *e, shell_context *sc, arguments args)
break;
last_total_rows = cur_total_rows;
if (stat_size && sleep_seconds % 10 == 0) {
print_current_scan_state(contexts, "partially", stat_size, diff_hash_key);
print_current_scan_state(contexts, "partially", stat_size, statistics, diff_hash_key);
}
}
......@@ -2387,7 +2388,7 @@ bool count_data(command_executor *e, shell_context *sc, arguments args)
stop_desc = "done";
}
print_current_scan_state(contexts, stop_desc, stat_size, diff_hash_key);
print_current_scan_state(contexts, stop_desc, stat_size, statistics, diff_hash_key);
if (stat_size) {
if (top_count > 0) {
......@@ -2515,22 +2516,11 @@ int mutation_check(int args_count, sds *args)
return ret;
}
static void print_simple_histogram(const std::string &name, const rocksdb::HistogramImpl &histogram)
{
fprintf(stderr, "[%s]\n", name.c_str());
fprintf(stderr, " max = %ld\n", histogram.max());
fprintf(stderr, " med = %.2f\n", histogram.Median());
fprintf(stderr, " avg = %.2f\n", histogram.Average());
fprintf(stderr, " min = %ld\n", histogram.min());
fprintf(stderr, " P99 = %.2f\n", histogram.Percentile(99.0));
fprintf(stderr, " P95 = %.2f\n", histogram.Percentile(95.0));
fprintf(stderr, " P90 = %.2f\n", histogram.Percentile(90.0));
}
static void
print_current_scan_state(const std::vector<std::unique_ptr<scan_data_context>> &contexts,
const std::string &stop_desc,
bool stat_size,
std::shared_ptr<rocksdb::Statistics> statistics,
bool count_hash_key)
{
long total_rows = 0;
......@@ -2555,20 +2545,22 @@ print_current_scan_state(const std::vector<std::unique_ptr<scan_data_context>> &
}
if (stat_size) {
rocksdb::HistogramImpl hash_key_size_histogram;
rocksdb::HistogramImpl sort_key_size_histogram;
rocksdb::HistogramImpl value_size_histogram;
rocksdb::HistogramImpl row_size_histogram;
for (const auto &context : contexts) {
hash_key_size_histogram.Merge(context->hash_key_size_histogram);
sort_key_size_histogram.Merge(context->sort_key_size_histogram);
value_size_histogram.Merge(context->value_size_histogram);
row_size_histogram.Merge(context->row_size_histogram);
}
print_simple_histogram("hash_key_size", hash_key_size_histogram);
print_simple_histogram("sort_key_size", sort_key_size_histogram);
print_simple_histogram("value_size", value_size_histogram);
print_simple_histogram("row_size", row_size_histogram);
fprintf(stderr,
"\n============================[hash_key_size]============================\n"
"%s=======================================================================",
statistics->getHistogramString(static_cast<uint32_t>(histogram_type::HASH_KEY_SIZE)).c_str());
fprintf(stderr,
"\n============================[sort_key_size]============================\n"
"%s=======================================================================",
statistics->getHistogramString(static_cast<uint32_t>(histogram_type::SORT_KEY_SIZE)).c_str());
fprintf(stderr,
"\n==============================[value_size]=============================\n"
"%s=======================================================================",
statistics->getHistogramString(static_cast<uint32_t>(histogram_type::VALUE_SIZE)).c_str());
fprintf(stderr,
"\n===============================[row_size]==============================\n"
"%s=======================================================================\n\n",
statistics->getHistogramString(static_cast<uint32_t>(histogram_type::ROW_SIZE)).c_str());
}
}
......
......@@ -27,7 +27,7 @@
#include <util/random.h>
#include <port/port_posix.h>
#include <util/string_util.h>
#include <monitoring/histogram.h>
#include <rocksdb/statistics.h>
#include <rocksdb/rate_limiter.h>
#include <util/mutexlock.h>
......@@ -355,8 +355,7 @@ private:
uint64_t bytes_;
uint64_t last_op_finish_;
uint64_t last_report_finish_;
std::unordered_map<OperationType, std::shared_ptr<HistogramImpl>, std::hash<unsigned char>>
hist_;
std::shared_ptr<rocksdb::Statistics> hist_stats;
std::string message_;
bool exclude_from_merge_;
ReporterAgent *reporter_agent_; // does not own
......@@ -366,13 +365,13 @@ public:
Stats() { Start(-1); }
void SetReporterAgent(ReporterAgent *reporter_agent) { reporter_agent_ = reporter_agent; }
void SetHistStats(std::shared_ptr<Statistics> hist_stats_) { hist_stats = hist_stats_; }
void Start(int id)
{
id_ = id;
next_report_ = FLAGS_stats_interval ? FLAGS_stats_interval : 100;
last_op_finish_ = start_;
hist_.clear();
done_ = 0;
last_report_done_ = 0;
bytes_ = 0;
......@@ -390,15 +389,6 @@ public:
if (other.exclude_from_merge_)
return;
for (auto it = other.hist_.begin(); it != other.hist_.end(); ++it) {
auto this_it = hist_.find(it->first);
if (this_it != hist_.end()) {
this_it->second->Merge(*(other.hist_.at(it->first)));
} else {
hist_.insert({it->first, it->second});
}
}
done_ += other.done_;
bytes_ += other.bytes_;
seconds_ += other.seconds_;
......@@ -472,15 +462,10 @@ public:
if (reporter_agent_) {
reporter_agent_->ReportFinishedOps(num_ops);
}
if (FLAGS_histogram) {
if (FLAGS_histogram && hist_stats) {
uint64_t now = FLAGS_env->NowMicros();
uint64_t micros = now - last_op_finish_;
if (hist_.find(op_type) == hist_.end()) {
auto hist_temp = std::make_shared<HistogramImpl>();
hist_.insert({op_type, std::move(hist_temp)});
}
hist_[op_type]->Add(micros);
hist_stats->measureTime(op_type, micros);
if (micros > 20000 && !FLAGS_stats_interval) {
fprintf(stderr, "long op: %" PRIu64 " micros%30s\r", micros, "");
......@@ -574,14 +559,6 @@ public:
(long)throughput,
(extra.empty() ? "" : " "),
extra.c_str());
if (FLAGS_histogram) {
for (auto it = hist_.begin(); it != hist_.end(); ++it) {
fprintf(stdout,
"Microseconds per %s:\n%s\n",
OperationTypeString[it->first].c_str(),
it->second->ToString().c_str());
}
}
fflush(stdout);
}
};
......@@ -1037,13 +1014,24 @@ public:
}
CombinedStats combined_stats;
std::shared_ptr<Statistics> hist_stats =
FLAGS_histogram ? CreateDBStatistics() : nullptr;
for (int i = 0; i < num_repeat; i++) {
Stats stats = RunBenchmark(num_threads, name, method);
Stats stats = RunBenchmark(num_threads, name, method, hist_stats);
combined_stats.AddStats(stats);
}
if (num_repeat > 1) {
combined_stats.Report(name);
}
if (FLAGS_histogram) {
for (auto type : OperationTypeString) {
fprintf(stdout,
"Microseconds per %s:\n%s\n",
OperationTypeString[type.first].c_str(),
hist_stats->getHistogramString(type.first).c_str());
}
}
}
if (post_process_method != nullptr) {
(this->*post_process_method)();
......@@ -1089,7 +1077,10 @@ private:
}
}
Stats RunBenchmark(int n, Slice name, void (Benchmark::*method)(ThreadState *))
Stats RunBenchmark(int n,
Slice name,
void (Benchmark::*method)(ThreadState *),
std::shared_ptr<Statistics> hist_stats = nullptr)
{
SharedState shared;
shared.total = n;
......@@ -1138,6 +1129,7 @@ private:
arg[i].shared = &shared;
arg[i].thread = new ThreadState(i);
arg[i].thread->stats.SetReporterAgent(reporter_agent.get());
arg[i].thread->stats.SetHistStats(hist_stats);
arg[i].thread->shared = &shared;
FLAGS_env->StartThread(ThreadBody, &arg[i]);
}
......@@ -1557,7 +1549,7 @@ int db_bench_tool(int argc, char **argv)
rocksdb::Benchmark benchmark;
benchmark.Run();
sleep(1); // Sleep a while to exit gracefully.
sleep(1); // Sleep a while to exit gracefully.
return 0;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册