提交 dc0771cf 编写于 作者: D Daniel P. Berrangé

tools: rewrite interactive job monitoring logic

For long running jobs (save, managed save, dump & live migrate)
virsh runs a background thread for executing the job and then
has the main thread catch Ctrl-C for graceful shutdown, as well
as displaying progress info.

The monitoring code is written using poll, with a pipe used
to get the completion status from the thread. Using a pipe
and poll is problematic for Windows portability. This rewrites
the code to use a GMainLoop instance for monitoring stdin and
doing progress updates. The use of a pipe is entirely eliminated,
instead there is just a shared variable between both threads
containing the job completion status.

No mutex locking is used because the background thread writes
to the variable only when the main loop is still running,
while the foreground thread only reads it after the main loop
has exited.
Reviewed-by: NPavel Hrdina <phrdina@redhat.com>
Signed-off-by: NDaniel P. Berrangé <berrange@redhat.com>
上级 29c4a3c7
......@@ -23,7 +23,6 @@
#include "virsh-util.h"
#include <fcntl.h>
#include <poll.h>
#include <signal.h>
#include <sys/time.h>
......@@ -4224,7 +4223,6 @@ doSave(void *opaque)
virshCtrlData *data = opaque;
vshControl *ctl = data->ctl;
const vshCmd *cmd = data->cmd;
char ret = '1';
virDomainPtr dom = NULL;
const char *name = NULL;
const char *to = NULL;
......@@ -4269,7 +4267,7 @@ doSave(void *opaque)
goto out;
}
ret = '0';
data->ret = 0;
out:
#ifndef WIN32
......@@ -4278,18 +4276,126 @@ doSave(void *opaque)
#endif /* !WIN32 */
virshDomainFree(dom);
VIR_FREE(xml);
ignore_value(safewrite(data->writefd, &ret, sizeof(ret)));
g_main_loop_quit(data->eventLoop);
}
typedef void (*jobWatchTimeoutFunc)(vshControl *ctl, virDomainPtr dom,
void *opaque);
static bool
struct virshWatchData {
vshControl *ctl;
virDomainPtr dom;
jobWatchTimeoutFunc timeout_func;
void *opaque;
const char *label;
GIOChannel *stdin_ioc;
bool jobStarted;
bool verbose;
};
static gboolean
virshWatchTimeout(gpointer opaque)
{
struct virshWatchData *data = opaque;
/* suspend the domain when migration timeouts. */
vshDebug(data->ctl, VSH_ERR_DEBUG, "watchJob: timeout\n");
if (data->timeout_func)
(data->timeout_func)(data->ctl, data->dom, data->opaque);
return G_SOURCE_REMOVE;
}
static gboolean
virshWatchProgress(gpointer opaque)
{
struct virshWatchData *data = opaque;
virDomainJobInfo jobinfo;
int ret;
#ifndef WIN32
sigset_t sigmask, oldsigmask;
sigemptyset(&sigmask);
sigaddset(&sigmask, SIGINT);
pthread_sigmask(SIG_BLOCK, &sigmask, &oldsigmask);
#endif /* !WIN32 */
vshDebug(data->ctl, VSH_ERR_DEBUG, "%s",
"watchJob: progress update\n");
ret = virDomainGetJobInfo(data->dom, &jobinfo);
#ifndef WIN32
pthread_sigmask(SIG_SETMASK, &oldsigmask, NULL);
#endif /* !WIN32 */
if (ret == 0) {
if (data->verbose && jobinfo.dataTotal > 0)
virshPrintJobProgress(data->label, jobinfo.dataRemaining,
jobinfo.dataTotal);
if (!data->jobStarted &&
(jobinfo.type == VIR_DOMAIN_JOB_BOUNDED ||
jobinfo.type == VIR_DOMAIN_JOB_UNBOUNDED)) {
vshTTYDisableInterrupt(data->ctl);
data->jobStarted = true;
if (!data->verbose) {
vshDebug(data->ctl, VSH_ERR_DEBUG,
"watchJob: job started, disabling callback\n");
return G_SOURCE_REMOVE;
}
}
} else {
vshResetLibvirtError();
}
return G_SOURCE_CONTINUE;
}
static gboolean
virshWatchInterrupt(GIOChannel *source G_GNUC_UNUSED,
GIOCondition condition,
gpointer opaque)
{
struct virshWatchData *data = opaque;
char retchar;
gsize nread = 0;
vshDebug(data->ctl, VSH_ERR_DEBUG,
"watchJob: stdin data %d\n", condition);
if (condition & G_IO_IN) {
g_io_channel_read_chars(data->stdin_ioc,
&retchar,
sizeof(retchar),
&nread,
NULL);
vshDebug(data->ctl, VSH_ERR_DEBUG,
"watchJob: got %zu characters\n", nread);
if (nread == 1 &&
vshTTYIsInterruptCharacter(data->ctl, retchar)) {
virDomainAbortJob(data->dom);
return G_SOURCE_REMOVE;
}
}
if (condition & (G_IO_ERR | G_IO_HUP)) {
virDomainAbortJob(data->dom);
return G_SOURCE_REMOVE;
}
return G_SOURCE_CONTINUE;
}
static void
virshWatchJob(vshControl *ctl,
virDomainPtr dom,
bool verbose,
int pipe_fd,
int timeout_ms,
GMainLoop *eventLoop,
int *job_err,
int timeout_secs,
jobWatchTimeoutFunc timeout_func,
void *opaque,
const char *label)
......@@ -4297,22 +4403,22 @@ virshWatchJob(vshControl *ctl,
#ifndef WIN32
struct sigaction sig_action;
struct sigaction old_sig_action;
sigset_t sigmask, oldsigmask;
#endif /* !WIN32 */
struct pollfd pollfd[2] = {{.fd = pipe_fd, .events = POLLIN, .revents = 0},
{.fd = STDIN_FILENO, .events = POLLIN, .revents = 0}};
unsigned long long start_us, curr_us;
virDomainJobInfo jobinfo;
int ret = -1;
char retchar;
bool functionReturn = false;
bool jobStarted = false;
nfds_t npollfd = 2;
g_autoptr(GSource) timeout_src = NULL;
g_autoptr(GSource) progress_src = NULL;
g_autoptr(GSource) stdin_src = NULL;
struct virshWatchData data = {
.ctl = ctl,
.dom = dom,
.timeout_func = timeout_func,
.opaque = opaque,
.label = label,
.stdin_ioc = NULL,
.jobStarted = false,
.verbose = verbose,
};
#ifndef WIN32
sigemptyset(&sigmask);
sigaddset(&sigmask, SIGINT);
intCaught = 0;
sig_action.sa_sigaction = virshCatchInt;
sig_action.sa_flags = SA_SIGINFO;
......@@ -4321,98 +4427,77 @@ virshWatchJob(vshControl *ctl,
#endif /* !WIN32 */
/* don't poll on STDIN if we are not using a terminal */
if (!vshTTYAvailable(ctl))
npollfd = 1;
start_us = g_get_real_time();
while (1) {
ret = poll((struct pollfd *)&pollfd, npollfd, 500);
if (ret > 0) {
if (pollfd[1].revents & POLLIN &&
saferead(STDIN_FILENO, &retchar, sizeof(retchar)) > 0) {
if (vshTTYIsInterruptCharacter(ctl, retchar))
virDomainAbortJob(dom);
continue;
}
if (pollfd[0].revents & POLLIN &&
saferead(pipe_fd, &retchar, sizeof(retchar)) > 0 &&
retchar == '0') {
if (verbose) {
/* print [100 %] */
virshPrintJobProgress(label, 0, 1);
}
break;
}
goto cleanup;
}
if (ret < 0) {
if (errno == EINTR) {
if (intCaught) {
virDomainAbortJob(dom);
intCaught = 0;
}
continue;
}
goto cleanup;
}
curr_us = g_get_real_time();
if (timeout_ms && ((curr_us - start_us)/1000) > timeout_ms) {
/* suspend the domain when migration timeouts. */
vshDebug(ctl, VSH_ERR_DEBUG, "%s timeout", label);
if (timeout_func)
(timeout_func)(ctl, dom, opaque);
timeout_ms = 0;
}
if (verbose || !jobStarted) {
#ifndef WIN32
pthread_sigmask(SIG_BLOCK, &sigmask, &oldsigmask);
#endif /* !WIN32 */
ret = virDomainGetJobInfo(dom, &jobinfo);
#ifndef WIN32
pthread_sigmask(SIG_SETMASK, &oldsigmask, NULL);
#endif /* !WIN32 */
if (ret == 0) {
if (verbose && jobinfo.dataTotal > 0)
virshPrintJobProgress(label, jobinfo.dataRemaining,
jobinfo.dataTotal);
if (!jobStarted &&
(jobinfo.type == VIR_DOMAIN_JOB_BOUNDED ||
jobinfo.type == VIR_DOMAIN_JOB_UNBOUNDED)) {
vshTTYDisableInterrupt(ctl);
jobStarted = true;
}
} else {
vshResetLibvirtError();
}
}
if (vshTTYAvailable(ctl)) {
vshDebug(ctl, VSH_ERR_DEBUG, "%s",
"watchJob: on TTY, enabling Ctrl-c processing\n");
#ifdef WIN32
data.stdin_ioc = g_io_channel_win32_new_fd(STDIN_FILENO);
#else
data.stdin_ioc = g_io_channel_unix_new(STDIN_FILENO);
#endif
stdin_src = g_io_create_watch(data.stdin_ioc, G_IO_IN);
g_source_set_callback(stdin_src,
(GSourceFunc)virshWatchInterrupt,
&data, NULL);
g_source_attach(stdin_src,
g_main_loop_get_context(eventLoop));
}
functionReturn = true;
if (timeout_secs) {
vshDebug(ctl, VSH_ERR_DEBUG,
"watchJob: setting timeout of %d secs\n", timeout_secs);
timeout_src = g_timeout_source_new_seconds(timeout_secs);
g_source_set_callback(timeout_src,
virshWatchTimeout,
&data, NULL);
g_source_attach(timeout_src,
g_main_loop_get_context(eventLoop));
}
progress_src = g_timeout_source_new(500);
g_source_set_callback(progress_src,
virshWatchProgress,
&data, NULL);
g_source_attach(progress_src,
g_main_loop_get_context(eventLoop));
g_main_loop_run(eventLoop);
vshDebug(ctl, VSH_ERR_DEBUG,
"watchJob: job done, status %d\n", *job_err);
if (*job_err == 0 && verbose) /* print [100 %] */
virshPrintJobProgress(label, 0, 1);
if (timeout_src)
g_source_destroy(timeout_src);
g_source_destroy(progress_src);
if (stdin_src)
g_source_destroy(stdin_src);
cleanup:
#ifndef WIN32
sigaction(SIGINT, &old_sig_action, NULL);
#endif /* !WIN32 */
vshTTYRestore(ctl);
return functionReturn;
if (data.stdin_ioc)
g_io_channel_unref(data.stdin_ioc);
}
static bool
cmdSave(vshControl *ctl, const vshCmd *cmd)
{
bool ret = false;
virDomainPtr dom = NULL;
int p[2] = {-1. -1};
virThread workerThread;
bool verbose = false;
virshCtrlData data;
const char *to = NULL;
const char *name = NULL;
g_autoptr(GMainContext) eventCtxt = g_main_context_new();
g_autoptr(GMainLoop) eventLoop = g_main_loop_new(eventCtxt, FALSE);
virshCtrlData data = {
.ctl = ctl,
.cmd = cmd,
.eventLoop = eventLoop,
.ret = -1,
};
if (!(dom = virshCommandOptDomain(ctl, cmd, &name)))
return false;
......@@ -4423,29 +4508,23 @@ cmdSave(vshControl *ctl, const vshCmd *cmd)
if (vshCommandOptBool(cmd, "verbose"))
verbose = true;
if (virPipeQuiet(p) < 0)
goto cleanup;
data.ctl = ctl;
data.cmd = cmd;
data.writefd = p[1];
if (virThreadCreate(&workerThread,
true,
doSave,
&data) < 0)
goto cleanup;
ret = virshWatchJob(ctl, dom, verbose, p[0], 0, NULL, NULL, _("Save"));
virshWatchJob(ctl, dom, verbose, eventLoop,
&data.ret, 0, NULL, NULL, _("Save"));
virThreadJoin(&workerThread);
if (ret)
if (!data.ret)
vshPrintExtra(ctl, _("\nDomain %s saved to %s\n"), name, to);
cleanup:
virshDomainFree(dom);
return ret;
return !data.ret;
}
/*
......@@ -4674,7 +4753,6 @@ static const vshCmdOptDef opts_managedsave[] = {
static void
doManagedsave(void *opaque)
{
char ret = '1';
virshCtrlData *data = opaque;
vshControl *ctl = data->ctl;
const vshCmd *cmd = data->cmd;
......@@ -4705,26 +4783,31 @@ doManagedsave(void *opaque)
goto out;
}
ret = '0';
data->ret = 0;
out:
#ifndef WIN32
pthread_sigmask(SIG_SETMASK, &oldsigmask, NULL);
out_sig:
#endif /* !WIN32 */
virshDomainFree(dom);
ignore_value(safewrite(data->writefd, &ret, sizeof(ret)));
g_main_loop_quit(data->eventLoop);
}
static bool
cmdManagedSave(vshControl *ctl, const vshCmd *cmd)
{
virDomainPtr dom;
int p[2] = { -1, -1};
bool ret = false;
bool verbose = false;
const char *name = NULL;
virshCtrlData data;
virThread workerThread;
g_autoptr(GMainContext) eventCtxt = g_main_context_new();
g_autoptr(GMainLoop) eventLoop = g_main_loop_new(eventCtxt, FALSE);
virshCtrlData data = {
.ctl = ctl,
.cmd = cmd,
.eventLoop = eventLoop,
.ret = -1,
};
if (!(dom = virshCommandOptDomain(ctl, cmd, &name)))
return false;
......@@ -4732,32 +4815,23 @@ cmdManagedSave(vshControl *ctl, const vshCmd *cmd)
if (vshCommandOptBool(cmd, "verbose"))
verbose = true;
if (virPipeQuiet(p) < 0)
goto cleanup;
data.ctl = ctl;
data.cmd = cmd;
data.writefd = p[1];
if (virThreadCreate(&workerThread,
true,
doManagedsave,
&data) < 0)
goto cleanup;
ret = virshWatchJob(ctl, dom, verbose, p[0], 0,
NULL, NULL, _("Managedsave"));
virshWatchJob(ctl, dom, verbose, eventLoop,
&data.ret, 0, NULL, NULL, _("Managedsave"));
virThreadJoin(&workerThread);
if (ret)
if (!data.ret)
vshPrintExtra(ctl, _("\nDomain %s state saved by libvirt\n"), name);
cleanup:
virshDomainFree(dom);
VIR_FORCE_CLOSE(p[0]);
VIR_FORCE_CLOSE(p[1]);
return ret;
return !data.ret;
}
/*
......@@ -5438,20 +5512,27 @@ doDump(void *opaque)
#endif /* !WIN32 */
if (dom)
virshDomainFree(dom);
ignore_value(safewrite(data->writefd, &ret, sizeof(ret)));
data->ret = ret;
g_main_loop_quit(data->eventLoop);
}
static bool
cmdDump(vshControl *ctl, const vshCmd *cmd)
{
virDomainPtr dom;
int p[2] = { -1, -1};
bool ret = false;
bool verbose = false;
const char *name = NULL;
const char *to = NULL;
virshCtrlData data;
virThread workerThread;
g_autoptr(GMainContext) eventCtxt = g_main_context_new();
g_autoptr(GMainLoop) eventLoop = g_main_loop_new(eventCtxt, FALSE);
virshCtrlData data = {
.ctl = ctl,
.cmd = cmd,
.eventLoop = eventLoop,
.ret = -1,
};
if (!(dom = virshCommandOptDomain(ctl, cmd, &name)))
return false;
......@@ -5462,31 +5543,23 @@ cmdDump(vshControl *ctl, const vshCmd *cmd)
if (vshCommandOptBool(cmd, "verbose"))
verbose = true;
if (virPipeQuiet(p) < 0)
goto cleanup;
data.ctl = ctl;
data.cmd = cmd;
data.writefd = p[1];
if (virThreadCreate(&workerThread,
true,
doDump,
&data) < 0)
goto cleanup;
ret = virshWatchJob(ctl, dom, verbose, p[0], 0, NULL, NULL, _("Dump"));
virshWatchJob(ctl, dom, verbose, eventLoop,
&data.ret, 0, NULL, NULL, _("Dump"));
virThreadJoin(&workerThread);
if (ret)
if (!ret)
vshPrintExtra(ctl, _("\nDomain %s dumped to %s\n"), name, to);
cleanup:
virshDomainFree(dom);
VIR_FORCE_CLOSE(p[0]);
VIR_FORCE_CLOSE(p[1]);
return ret;
return !ret;
}
static const vshCmdInfo info_screenshot[] = {
......@@ -10916,7 +10989,8 @@ doMigrate(void *opaque)
#endif /* !WIN32 */
virTypedParamsFree(params, nparams);
virshDomainFree(dom);
ignore_value(safewrite(data->writefd, &ret, sizeof(ret)));
data->ret = ret;
g_main_loop_quit(data->eventLoop);
return;
save_error:
......@@ -10976,16 +11050,22 @@ static bool
cmdMigrate(vshControl *ctl, const vshCmd *cmd)
{
virDomainPtr dom = NULL;
int p[2] = {-1, -1};
virThread workerThread;
bool verbose = false;
bool functionReturn = false;
int timeout = 0;
unsigned int timeout = 0;
virshMigrateTimeoutAction timeoutAction = VIRSH_MIGRATE_TIMEOUT_DEFAULT;
bool live_flag = false;
virshCtrlData data = { .dconn = NULL };
virshControlPtr priv = ctl->privData;
int iterEvent = -1;
g_autoptr(GMainContext) eventCtxt = g_main_context_new();
g_autoptr(GMainLoop) eventLoop = g_main_loop_new(eventCtxt, FALSE);
virshCtrlData data = {
.dconn = NULL,
.ctl = ctl,
.cmd = cmd,
.eventLoop = eventLoop,
.ret = -1,
};
VSH_EXCLUSIVE_OPTIONS("live", "offline");
VSH_EXCLUSIVE_OPTIONS("timeout-suspend", "timeout-postcopy");
......@@ -11002,7 +11082,7 @@ cmdMigrate(vshControl *ctl, const vshCmd *cmd)
if (vshCommandOptBool(cmd, "live"))
live_flag = true;
if (vshCommandOptTimeoutToMs(ctl, cmd, &timeout) < 0) {
if (vshCommandOptUInt(ctl, cmd, "timeout", &timeout) < 0) {
goto cleanup;
} else if (timeout > 0 && !live_flag) {
vshError(ctl, "%s",
......@@ -11033,13 +11113,6 @@ cmdMigrate(vshControl *ctl, const vshCmd *cmd)
goto cleanup;
}
if (virPipeQuiet(p) < 0)
goto cleanup;
data.ctl = ctl;
data.cmd = cmd;
data.writefd = p[1];
if (vshCommandOptBool(cmd, "p2p") || vshCommandOptBool(cmd, "direct")) {
data.dconn = NULL;
} else {
......@@ -11062,9 +11135,10 @@ cmdMigrate(vshControl *ctl, const vshCmd *cmd)
doMigrate,
&data) < 0)
goto cleanup;
functionReturn = virshWatchJob(ctl, dom, verbose, p[0], timeout,
virshMigrateTimeout,
&timeoutAction, _("Migration"));
virshWatchJob(ctl, dom, verbose, eventLoop,
&data.ret, timeout,
virshMigrateTimeout,
&timeoutAction, _("Migration"));
virThreadJoin(&workerThread);
......@@ -11074,9 +11148,7 @@ cmdMigrate(vshControl *ctl, const vshCmd *cmd)
if (iterEvent != -1)
virConnectDomainEventDeregisterAny(priv->conn, iterEvent);
virshDomainFree(dom);
VIR_FORCE_CLOSE(p[0]);
VIR_FORCE_CLOSE(p[1]);
return functionReturn;
return !data.ret;
}
/*
......
......@@ -159,7 +159,8 @@ struct _virshControl {
struct _virshCtrlData {
vshControl *ctl;
const vshCmd *cmd;
int writefd;
GMainLoop *eventLoop;
int ret;
virConnectPtr dconn;
};
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册