未验证 提交 033488ee 编写于 作者: Q QinZuoyan 提交者: GitHub

shell: support get count of different hash keys in command count_data (#223)

上级 b228652b
Subproject commit 254106639febed1da5ff56a9534e4e527799fb17
Subproject commit 178165a022e4a9fa11d02d8e2423def8aad52901
......@@ -655,7 +655,7 @@ function run_start_onebox()
sleep 1
sleeped=$((sleeped+1))
echo "Sleeped for $sleeped seconds"
unhealthy_count=`echo "ls -d" | ./run.sh shell | awk 'f{ if(NF<7){f=0} else if($3!=$4){print} } /fully_healthy/{print;f=1}' | wc -l`
unhealthy_count=`echo "ls -d" | ./run.sh shell | awk 'f{ if(NF<7){f=0} else if($3!=$4){print} } / fully_healthy /{print;f=1}' | wc -l`
if [ $unhealthy_count -eq 1 ]; then
echo "Cluster becomes healthy."
break
......
......@@ -171,7 +171,7 @@ do
echo "Wait cluster to become healthy..."
while true
do
unhealthy_count=`echo "ls -d" | ./run.sh shell --cluster $meta_list | awk 'BEGIN{s=0} f{ if(NF<7){f=0} else if($3!=$4){s=s+$5+$6} } /fully_healthy/{f=1} END{print s}'`
unhealthy_count=`echo "ls -d" | ./run.sh shell --cluster $meta_list | awk 'BEGIN{s=0} f{ if(NF<7){f=0} else if($3!=$4){s=s+$5+$6} } / fully_healthy /{f=1} END{print s}'`
if [ $unhealthy_count -eq 0 ]; then
echo "Cluster becomes healthy"
break
......
......@@ -247,7 +247,7 @@ do
echo "Wait cluster to become healthy..."
while true
do
unhealthy_count=`echo "ls -d" | ./run.sh shell --cluster $meta_list | awk 'f{ if(NF<7){f=0} else if($3!=$4){print} } /fully_healthy/{f=1}' | wc -l`
unhealthy_count=`echo "ls -d" | ./run.sh shell --cluster $meta_list | awk 'f{ if(NF<7){f=0} else if($3!=$4){print} } / fully_healthy /{f=1}' | wc -l`
if [ $unhealthy_count -eq 0 ]; then
echo "Cluster becomes healthy."
break
......
......@@ -115,6 +115,9 @@ struct scan_data_context
rocksdb::HistogramImpl row_size_histogram;
int top_count;
top_container top_rows;
bool count_hash_key;
std::string last_hash_key;
std::atomic_long split_hash_key_count;
scan_data_context(scan_data_operator op_,
int split_id_,
int max_batch_count_,
......@@ -124,7 +127,8 @@ struct scan_data_context
pegasus::geo::geo_client *geoclient_,
std::atomic_bool *error_occurred_,
bool stat_size_ = false,
int top_count_ = 0)
int top_count_ = 0,
bool count_hash_key_ = false)
: op(op_),
split_id(split_id_),
max_batch_count(max_batch_count_),
......@@ -138,8 +142,13 @@ struct scan_data_context
split_completed(false),
stat_size(stat_size_),
top_count(top_count_),
top_rows(top_count_)
top_rows(top_count_),
count_hash_key(count_hash_key_),
split_hash_key_count(0)
{
// max_batch_count should > 1 because scan may be terminated
// when split_request_count = 1
dassert(max_batch_count > 1, "");
}
};
inline void update_atomic_max(std::atomic_long &max, long value)
......@@ -232,6 +241,12 @@ inline void scan_data_next(scan_data_context *context)
std::move(hash_key), std::move(sort_key), row_size);
}
}
if (context->count_hash_key) {
if (hash_key != context->last_hash_key) {
context->split_hash_key_count++;
context->last_hash_key = std::move(hash_key);
}
}
scan_data_next(context);
break;
case SCAN_GEN_GEO:
......@@ -278,6 +293,11 @@ inline void scan_data_next(scan_data_context *context)
// to prevent that split_request_count becomes 0 in the middle.
context->split_request_count--;
});
if (context->count_hash_key) {
// disable parallel scan if count_hash_key == true
break;
}
}
}
......
......@@ -2223,17 +2223,17 @@ inline bool copy_data(command_executor *e, shell_context *sc, arguments args)
}
if (max_split_count <= 0) {
fprintf(stderr, "ERROR: max_split_count should no less than 0\n");
fprintf(stderr, "ERROR: max_split_count should be greater than 0\n");
return false;
}
if (max_batch_count <= 0) {
fprintf(stderr, "ERROR: max_batch_count should no less than 0\n");
if (max_batch_count <= 1) {
fprintf(stderr, "ERROR: max_batch_count should be greater than 1\n");
return false;
}
if (timeout_ms <= 0) {
fprintf(stderr, "ERROR: timeout_ms should no less than 0\n");
fprintf(stderr, "ERROR: timeout_ms should be greater than 0\n");
return false;
}
......@@ -2416,17 +2416,17 @@ inline bool clear_data(command_executor *e, shell_context *sc, arguments args)
}
if (max_split_count <= 0) {
fprintf(stderr, "ERROR: max_split_count should no less than 0\n");
fprintf(stderr, "ERROR: max_split_count should be greater than 0\n");
return false;
}
if (max_batch_count <= 0) {
fprintf(stderr, "ERROR: max_batch_count should no less than 0\n");
if (max_batch_count <= 1) {
fprintf(stderr, "ERROR: max_batch_count should be greater than 1\n");
return false;
}
if (timeout_ms <= 0) {
fprintf(stderr, "ERROR: timeout_ms should no less than 0\n");
fprintf(stderr, "ERROR: timeout_ms should be greater than 0\n");
return false;
}
......@@ -2544,15 +2544,29 @@ static void print_simple_histogram(const std::string &name, const rocksdb::Histo
static void print_current_scan_state(const std::vector<scan_data_context *> &contexts,
const std::string &stop_desc,
bool stat_size)
bool stat_size,
bool count_hash_key)
{
long total_rows = 0;
long total_hash_key_count = 0;
for (const auto &context : contexts) {
total_rows += context->split_rows.load();
fprintf(
stderr, "INFO: split[%d]: %ld rows\n", context->split_id, context->split_rows.load());
long rows = context->split_rows.load();
total_rows += rows;
fprintf(stderr, "INFO: split[%d]: %ld rows", context->split_id, rows);
if (count_hash_key) {
long hash_key_count = context->split_hash_key_count.load();
total_hash_key_count += hash_key_count;
fprintf(stderr, " (%ld hash keys)\n", hash_key_count);
} else {
fprintf(stderr, "\n");
}
}
fprintf(stderr, "Count %s, total %ld rows", stop_desc.c_str(), total_rows);
if (count_hash_key) {
fprintf(stderr, " (%ld hash keys)\n", total_hash_key_count);
} else {
fprintf(stderr, "\n");
}
fprintf(stderr, "Count %s, total %ld rows.\n", stop_desc.c_str(), total_rows);
if (stat_size) {
rocksdb::HistogramImpl hash_key_size_histogram;
......@@ -2577,6 +2591,7 @@ inline bool count_data(command_executor *e, shell_context *sc, arguments args)
static struct option long_options[] = {{"max_split_count", required_argument, 0, 's'},
{"max_batch_count", required_argument, 0, 'b'},
{"timeout_ms", required_argument, 0, 't'},
{"count_hash_key", no_argument, 0, 'h'},
{"stat_size", no_argument, 0, 'z'},
{"top_count", required_argument, 0, 'c'},
{"run_seconds", required_argument, 0, 'r'},
......@@ -2585,6 +2600,7 @@ inline bool count_data(command_executor *e, shell_context *sc, arguments args)
int max_split_count = 100000000;
int max_batch_count = 500;
int timeout_ms = sc->timeout_ms;
bool count_hash_key = false;
bool stat_size = false;
int top_count = 0;
int run_seconds = 0;
......@@ -2593,7 +2609,7 @@ inline bool count_data(command_executor *e, shell_context *sc, arguments args)
while (true) {
int option_index = 0;
int c;
c = getopt_long(args.argc, args.argv, "s:b:t:zc:r:", long_options, &option_index);
c = getopt_long(args.argc, args.argv, "s:b:t:hzc:r:", long_options, &option_index);
if (c == -1)
break;
switch (c) {
......@@ -2615,6 +2631,9 @@ inline bool count_data(command_executor *e, shell_context *sc, arguments args)
return false;
}
break;
case 'h':
count_hash_key = true;
break;
case 'z':
stat_size = true;
break;
......@@ -2640,8 +2659,8 @@ inline bool count_data(command_executor *e, shell_context *sc, arguments args)
return false;
}
if (max_batch_count <= 0) {
fprintf(stderr, "ERROR: max_batch_count should be greater than 0\n");
if (max_batch_count <= 1) {
fprintf(stderr, "ERROR: max_batch_count should be greater than 1\n");
return false;
}
......@@ -2665,6 +2684,7 @@ inline bool count_data(command_executor *e, shell_context *sc, arguments args)
fprintf(stderr, "INFO: max_split_count = %d\n", max_split_count);
fprintf(stderr, "INFO: max_batch_count = %d\n", max_batch_count);
fprintf(stderr, "INFO: timeout_ms = %d\n", timeout_ms);
fprintf(stderr, "INFO: count_hash_key = %s\n", count_hash_key ? "true" : "false");
fprintf(stderr, "INFO: stat_size = %s\n", stat_size ? "true" : "false");
fprintf(stderr, "INFO: top_count = %d\n", top_count);
fprintf(stderr, "INFO: run_seconds = %d\n", run_seconds);
......@@ -2694,7 +2714,8 @@ inline bool count_data(command_executor *e, shell_context *sc, arguments args)
nullptr,
&error_occurred,
stat_size,
top_count);
top_count,
count_hash_key);
contexts.push_back(context);
dsn::tasking::enqueue(LPC_SCAN_DATA, nullptr, std::bind(scan_data_next, context));
}
......@@ -2716,35 +2737,45 @@ inline bool count_data(command_executor *e, shell_context *sc, arguments args)
}
int completed_split_count = 0;
long cur_total_rows = 0;
long cur_total_hash_key_count = 0;
for (int i = 0; i < scanners.size(); i++) {
cur_total_rows += contexts[i]->split_rows.load();
if (count_hash_key)
cur_total_hash_key_count += contexts[i]->split_hash_key_count.load();
if (contexts[i]->split_request_count.load() == 0)
completed_split_count++;
}
char hash_key_count_str[100];
hash_key_count_str[0] = '\0';
if (count_hash_key) {
sprintf(hash_key_count_str, " (%ld hash keys)", cur_total_hash_key_count);
}
if (!stopped_by_wait_seconds && error_occurred.load()) {
fprintf(stderr,
"INFO: processed for %d seconds, (%d/%d) splits, total %ld rows, last second "
"INFO: processed for %d seconds, (%d/%d) splits, total %ld rows%s, last second "
"%ld rows, error occurred, terminating...\n",
sleep_seconds,
completed_split_count,
split_count,
cur_total_rows,
hash_key_count_str,
cur_total_rows - last_total_rows);
} else {
fprintf(stderr,
"INFO: processed for %d seconds, (%d/%d) splits, total %ld rows, last second "
"INFO: processed for %d seconds, (%d/%d) splits, total %ld rows%s, last second "
"%ld rows\n",
sleep_seconds,
completed_split_count,
split_count,
cur_total_rows,
hash_key_count_str,
cur_total_rows - last_total_rows);
}
if (completed_split_count == scanners.size())
break;
last_total_rows = cur_total_rows;
if (stat_size && sleep_seconds % 10 == 0) {
print_current_scan_state(contexts, "partially", stat_size);
print_current_scan_state(contexts, "partially", stat_size, count_hash_key);
}
}
......@@ -2767,7 +2798,7 @@ inline bool count_data(command_executor *e, shell_context *sc, arguments args)
stop_desc = "done";
}
print_current_scan_state(contexts, stop_desc, stat_size);
print_current_scan_state(contexts, stop_desc, stat_size, count_hash_key);
if (stat_size) {
if (top_count > 0) {
......
......@@ -259,8 +259,8 @@ static command_executor commands[] = {
{
"count_data",
"get app row count",
"[-s|--max_split_count num] [-b|--max_batch_count num] "
"[-t|--timeout_ms num] [-z|--stat_size] [-c|--top_count num] "
"[-s|--max_split_count num] [-b|--max_batch_count num] [-t|--timeout_ms num] "
"[-h|--count_hash_key] [-z|--stat_size] [-c|--top_count num] "
"[-r|--run_seconds num]",
data_operations,
},
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册