未验证 提交 1f066c60 编写于 作者: 小地鼠家的小松鼠's avatar 小地鼠家的小松鼠 提交者: GitHub

feat(bulk-load): add bulk load shell command (#561)

上级 418548da
Subproject commit 31637897ebb4774415a94c89c3475a03a4f120e4
Subproject commit e32ce21c00b9492991533d1b545faf79076abc55
......@@ -250,3 +250,15 @@ bool set_dup_fail_mode(command_executor *e, shell_context *sc, arguments args);
bool query_disk_capacity(command_executor *e, shell_context *sc, arguments args);
bool query_disk_replica(command_executor *e, shell_context *sc, arguments args);
// == bulk load (see 'commands/bulk_load.cpp') == //
bool start_bulk_load(command_executor *e, shell_context *sc, arguments args);
bool query_bulk_load_status(command_executor *e, shell_context *sc, arguments args);
bool pause_bulk_load(command_executor *e, shell_context *sc, arguments args);
bool restart_bulk_load(command_executor *e, shell_context *sc, arguments args);
bool cancel_bulk_load(command_executor *e, shell_context *sc, arguments args);
// Copyright (c) 2019, Xiaomi, Inc. All rights reserved.
// This source code is licensed under the Apache License Version 2.0, which
// can be found in the LICENSE file in the root directory of this source tree.
#include "shell/commands.h"
bool start_bulk_load(command_executor *e, shell_context *sc, arguments args)
{
static struct option long_options[] = {{"app_name", required_argument, 0, 'a'},
{"cluster_name", required_argument, 0, 'c'},
{"file_provider_type", required_argument, 0, 'p'},
{0, 0, 0, 0}};
std::string app_name;
std::string cluster_name;
std::string file_provider_type;
optind = 0;
while (true) {
int option_index = 0;
int c;
c = getopt_long(args.argc, args.argv, "a:c:p:", long_options, &option_index);
if (c == -1)
break;
switch (c) {
case 'a':
app_name = optarg;
break;
case 'c':
cluster_name = optarg;
break;
case 'p':
file_provider_type = optarg;
break;
default:
return false;
}
}
if (app_name.empty()) {
fprintf(stderr, "app_name should not be empty\n");
return false;
}
if (cluster_name.empty()) {
fprintf(stderr, "cluster_name should not be empty\n");
return false;
}
if (file_provider_type.empty()) {
fprintf(stderr, "file_provider_type should not be empty\n");
return false;
}
auto err_resp = sc->ddl_client->start_bulk_load(app_name, cluster_name, file_provider_type);
dsn::error_s err = err_resp.get_error();
std::string hint_msg;
if (err.is_ok()) {
err = dsn::error_s::make(err_resp.get_value().err);
hint_msg = err_resp.get_value().hint_msg;
}
if (!err.is_ok()) {
fmt::print(stderr, "start bulk load failed, error={} [hint:\"{}\"]\n", err, hint_msg);
} else {
fmt::print(stdout, "start bulk load succeed\n");
}
return true;
}
// helper function for pause/restart bulk load
bool control_bulk_load_helper(command_executor *e,
shell_context *sc,
arguments args,
dsn::replication::bulk_load_control_type::type type)
{
if (type != dsn::replication::bulk_load_control_type::BLC_PAUSE &&
type != dsn::replication::bulk_load_control_type::BLC_RESTART) {
return false;
}
static struct option long_options[] = {{"app_name", required_argument, 0, 'a'}, {0, 0, 0, 0}};
std::string app_name;
optind = 0;
while (true) {
int option_index = 0;
int c;
c = getopt_long(args.argc, args.argv, "a:", long_options, &option_index);
if (c == -1)
break;
switch (c) {
case 'a':
app_name = optarg;
break;
default:
return false;
}
}
if (app_name.empty()) {
fprintf(stderr, "app_name should not be empty\n");
return false;
}
auto err_resp = sc->ddl_client->control_bulk_load(app_name, type);
dsn::error_s err = err_resp.get_error();
std::string hint_msg;
if (err.is_ok()) {
err = dsn::error_s::make(err_resp.get_value().err);
hint_msg = err_resp.get_value().hint_msg;
}
std::string type_str =
type == dsn::replication::bulk_load_control_type::BLC_PAUSE ? "pause" : "restart";
if (!err.is_ok()) {
fmt::print(
stderr, "{} bulk load failed, error={} [hint:\"{}\"]\n", type_str, err, hint_msg);
} else {
fmt::print(stdout, "{} bulk load succeed\n", type_str);
}
return true;
}
bool pause_bulk_load(command_executor *e, shell_context *sc, arguments args)
{
return control_bulk_load_helper(
e, sc, args, dsn::replication::bulk_load_control_type::BLC_PAUSE);
}
bool restart_bulk_load(command_executor *e, shell_context *sc, arguments args)
{
return control_bulk_load_helper(
e, sc, args, dsn::replication::bulk_load_control_type::BLC_RESTART);
}
bool cancel_bulk_load(command_executor *e, shell_context *sc, arguments args)
{
static struct option long_options[] = {
{"app_name", required_argument, 0, 'a'}, {"forced", no_argument, 0, 'f'}, {0, 0, 0, 0}};
std::string app_name;
bool forced = false;
optind = 0;
while (true) {
int option_index = 0;
int c;
c = getopt_long(args.argc, args.argv, "a:f", long_options, &option_index);
if (c == -1)
break;
switch (c) {
case 'a':
app_name = optarg;
break;
case 'f':
forced = true;
break;
default:
return false;
}
}
if (app_name.empty()) {
fprintf(stderr, "app_name should not be empty\n");
return false;
}
auto type = forced ? dsn::replication::bulk_load_control_type::BLC_FORCE_CANCEL
: dsn::replication::bulk_load_control_type::BLC_CANCEL;
auto err_resp = sc->ddl_client->control_bulk_load(app_name, type);
dsn::error_s err = err_resp.get_error();
std::string hint_msg;
if (err.is_ok()) {
err = dsn::error_s::make(err_resp.get_value().err);
hint_msg = err_resp.get_value().hint_msg;
}
if (!err.is_ok()) {
fmt::print(stderr, "cancel bulk load failed, error={} [hint:\"{}\"]\n", err, hint_msg);
if (err.code() == dsn::ERR_INVALID_STATE &&
type == dsn::replication::bulk_load_control_type::BLC_CANCEL) {
fmt::print(stderr, "you can force cancel bulk load by using \"-f\"\n");
}
} else {
fmt::print(stdout, "cancel bulk load succeed\n");
}
return true;
}
bool query_bulk_load_status(command_executor *e, shell_context *sc, arguments args)
{
// TODO(heyuchen): TBD
return true;
}
......@@ -450,6 +450,30 @@ static command_executor commands[] = {
"set fail_mode of duplication",
"<app_name> <dup_id> <slow|skip>",
set_dup_fail_mode},
{
"start_bulk_load",
"start app bulk load",
"<-a --app_name str> <-c --cluster_name str> <-p --file_provider_type str>",
start_bulk_load,
},
{
"query_bulk_load_status",
"query app bulk load status",
"<-a --app_name str> [-i --partition_index num] [-d --detailed]",
query_bulk_load_status,
},
{
"pause_bulk_load", "pause app bulk load", "<-a --app_name str>", pause_bulk_load,
},
{
"restart_bulk_load", "restart app bulk load", "<-a --app_name str>", restart_bulk_load,
},
{
"cancel_bulk_load",
"cancel app bulk load",
"<-a --app_name str> [-f --forced]",
cancel_bulk_load,
},
{
"exit", "exit shell", "", exit_shell,
},
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册