[libvirt] [PATCH 1/7] virCommand: Introduce virCommandDoAsyncIO

Peter Krempa pkrempa at redhat.com
Fri Jan 25 18:26:15 UTC 2013


On 01/23/13 10:41, Michal Privoznik wrote:
> Currently, if we want to feed stdin, or catch stdout or stderr of a
> virCommand we have to use virCommandRun(). When using virCommandRunAsync()
> we have to register FD handles by hand. This may lead to code duplication.
> Hence, introduce an internal API, which does this automatically within
> virCommandRunAsync(). The intended usage looks like this:
>
>      virCommandPtr cmd = virCommandNew*(...);
>      char *buf = NULL;
>
>      ...
>
>      virCommandSetOutputBuffer(cmd, &buf);
>      virCommandDoAsyncIO(cmd);
>
>      if (virCommandRunAsync(cmd, NULL) < 0)
>          goto cleanup;
>
>      ...
>
>      if (virCommandWait(cmd, NULL) < 0)
>          goto cleanup;
>
>      /* @buf now contains @cmd's stdout */
>      VIR_DEBUG("STDOUT: %s", NULLSTR(buf));
>
>      ...
>
> cleanup:
>      VIR_FREE(buf);
>      virCommandFree(cmd);
>
> Note, that both stdout and stderr buffers may change until virCommandWait()
> returns.
> ---
>   src/libvirt_private.syms |   1 +
>   src/util/vircommand.c    | 214 +++++++++++++++++++++++++++++++++++++++++++++--
>   src/util/vircommand.h    |   1 +
>   3 files changed, 208 insertions(+), 8 deletions(-)
>
> diff --git a/src/libvirt_private.syms b/src/libvirt_private.syms
> index fc23adc..f89d1aa 100644
> --- a/src/libvirt_private.syms
> +++ b/src/libvirt_private.syms
> @@ -142,6 +142,7 @@ virCommandAddEnvString;
>   virCommandAllowCap;
>   virCommandClearCaps;
>   virCommandDaemonize;
> +virCommandDoAsyncIO;
>   virCommandExec;
>   virCommandFree;
>   virCommandHandshakeNotify;
> diff --git a/src/util/vircommand.c b/src/util/vircommand.c
> index 8566d1a..5d67bd2 100644
> --- a/src/util/vircommand.c
> +++ b/src/util/vircommand.c
> @@ -47,11 +47,12 @@
>
>   /* Flags for virExecWithHook */
>   enum {
> -    VIR_EXEC_NONE   = 0,
> -    VIR_EXEC_NONBLOCK = (1 << 0),
> -    VIR_EXEC_DAEMON = (1 << 1),
> +    VIR_EXEC_NONE       = 0,
> +    VIR_EXEC_NONBLOCK   = (1 << 0),
> +    VIR_EXEC_DAEMON     = (1 << 1),
>       VIR_EXEC_CLEAR_CAPS = (1 << 2),
> -    VIR_EXEC_RUN_SYNC = (1 << 3),
> +    VIR_EXEC_RUN_SYNC   = (1 << 3),
> +    VIR_EXEC_ASYNC_IO   = (1 << 4),
>   };
>
>   struct _virCommand {
> @@ -84,6 +85,11 @@ struct _virCommand {
>       int *outfdptr;
>       int *errfdptr;
>
> +    size_t inbufOffset;
> +    int inWatch;
> +    int outWatch;
> +    int errWatch;
> +
>       bool handshake;
>       int handshakeWait[2];
>       int handshakeNotify[2];
> @@ -779,6 +785,7 @@ virCommandNewArgs(const char *const*args)
>       cmd->handshakeNotify[1] = -1;
>
>       cmd->infd = cmd->outfd = cmd->errfd = -1;
> +    cmd->inWatch = cmd->outWatch = cmd->errWatch = -1;
>       cmd->pid = -1;
>
>       virCommandAddArgSet(cmd, args);
> @@ -2122,6 +2129,152 @@ virCommandHook(void *data)
>   }
>
>
> +static void
> +virCommandHandleReadWrite(int watch, int fd, int events, void *opaque)
> +{
> +    virCommandPtr cmd = (virCommandPtr) opaque;
> +    char **bufptr = NULL;
> +    char buf[1024];
> +    ssize_t nread;
> +    size_t len = 0;
> +    int *watchPtr = NULL;
> +
> +    VIR_DEBUG("watch=%d fd=%d events=%d", watch, fd, events);
> +
> +    if (watch == cmd->inWatch) {
> +        watchPtr = &cmd->inWatch;
> +
> +        if (events & VIR_EVENT_HANDLE_WRITABLE) {
> +            len = strlen(cmd->inbuf);

I suppose this isn't intended to work on non-string data. It is worth 
documenting this so that it doesn't surprise someone.

> +
> +            while ((nread = write(fd, cmd->inbuf + cmd->inbufOffset,
> +                                  len - cmd->inbufOffset))) {

Hm, what's the difference between len and inbuffOffset here?

> +                if (nread < 0) {
> +                    if (errno != EAGAIN)
> +                        virReportSystemError(errno,
> +                                             _("Unable to write command's "
> +                                               "input to FD %d"),
> +                                             fd);
> +                    break;
> +                }
> +
> +                cmd->inbufOffset += nread;
> +                if (cmd->inbufOffset == len)
> +                    VIR_FORCE_CLOSE(cmd->infd);
> +            }
> +        }
> +    } else {
> +        if (watch == cmd->outWatch) {
> +            watchPtr = &cmd->outWatch;
> +            bufptr = cmd->outbuf;
> +        }
> +
> +        if (watch == cmd->errWatch) {
> +            watchPtr = &cmd->errWatch;
> +            bufptr = cmd->errbuf;
> +        }
> +
> +        if (bufptr && events & VIR_EVENT_HANDLE_READABLE) {
> +            if (*bufptr)
> +                len = strlen(*bufptr);
> +
> +            while ((nread = read(fd, buf, sizeof(buf)))) {
> +                if (nread < 0) {
> +                    if (errno != EAGAIN)
> +                        virReportSystemError(errno,
> +                                             _("unable to read command's "
> +                                               "output from FD %d"),
> +                                             fd);
> +                    break;
> +                }
> +
> +                if (VIR_REALLOC_N(*bufptr, len + nread + 1) < 0) {
> +                    virReportOOMError();
> +                    break;
> +                }
> +
> +                memcpy(*bufptr + len, buf, nread);
> +                (*bufptr)[len + nread] = '\0';
> +            }
> +        }
> +    }
> +
> +    if (events & VIR_EVENT_HANDLE_HANGUP) {
> +        *watchPtr = -1;
> +        virEventRemoveHandle(watch);
> +    }
> +}
> +
> +
> +static int
> +virCommandRegisterEventLoop(virCommandPtr cmd)
> +{
> +    int ret = -1;
> +
> +    if (cmd->inbuf) {
> +        if (virSetNonBlock(cmd->infd) < 0) {
> +            virReportSystemError(errno,
> +                                 _("Failed to set non-blocking flag on FD %d"),
> +                                 cmd->infd);
> +            goto cleanup;
> +        }
> +
> +        if ((cmd->inWatch = virEventAddHandle(cmd->infd,
> +                                              VIR_EVENT_HANDLE_WRITABLE,
> +                                              virCommandHandleReadWrite,
> +                                              cmd, NULL)) < 0) {
> +            virReportError(VIR_ERR_INTERNAL_ERROR,
> +                           _("Unable to register infd %d in the event loop"),
> +                           cmd->infd);
> +            goto cleanup;
> +        }
> +    }
> +
> +    if (cmd->outbuf && cmd->outfdptr == &cmd->outfd) {
> +        if (virSetNonBlock(cmd->outfd) < 0) {
> +            virReportSystemError(errno,
> +                                 _("Failed to set non-blocking flag on FD %d"),
> +                                 cmd->outfd);
> +            goto cleanup;
> +        }
> +
> +        if ((cmd->outWatch = virEventAddHandle(cmd->outfd,
> +                                               VIR_EVENT_HANDLE_READABLE,
> +                                               virCommandHandleReadWrite,
> +                                               cmd, NULL)) < 0) {
> +            virReportError(VIR_ERR_INTERNAL_ERROR,
> +                           _("Unable to register outfd %d in the event loop"),
> +                           cmd->outfd);
> +            goto cleanup;
> +        }
> +    }
> +
> +    if (cmd->errbuf && cmd->errfdptr == &cmd->errfd) {
> +        if (virSetNonBlock(cmd->errfd) < 0) {
> +            virReportSystemError(errno,
> +                                 _("Failed to set non-blocking flag on FD %d"),
> +                                 cmd->errfd);
> +            goto cleanup;
> +        }
> +
> +        if ((cmd->errWatch = virEventAddHandle(cmd->errfd,
> +                                               VIR_EVENT_HANDLE_READABLE,
> +                                               virCommandHandleReadWrite,
> +                                               cmd, NULL)) < 0) {
> +            virReportError(VIR_ERR_INTERNAL_ERROR,
> +                           _("Unable to register errfd %d in the event loop"),
> +                           cmd->errfd);
> +            goto cleanup;
> +        }
> +    }
> +
> +    ret = 0;
> +
> +cleanup:
> +    return ret;
> +}
> +
> +
>   /**
>    * virCommandRunAsync:
>    * @cmd: command to start
> @@ -2149,6 +2302,7 @@ virCommandRunAsync(virCommandPtr cmd, pid_t *pid)
>       char *str;
>       int i;
>       bool synchronous = false;
> +    int infd[2] = {-1, -1};
>
>       if (!cmd || cmd->has_error == ENOMEM) {
>           virReportOOMError();
> @@ -2163,10 +2317,23 @@ virCommandRunAsync(virCommandPtr cmd, pid_t *pid)
>       synchronous = cmd->flags & VIR_EXEC_RUN_SYNC;
>       cmd->flags &= ~VIR_EXEC_RUN_SYNC;
>
> -    /* Buffer management can only be requested via virCommandRun.  */
> -    if ((cmd->inbuf && cmd->infd == -1) ||
> -        (cmd->outbuf && cmd->outfdptr != &cmd->outfd) ||
> -        (cmd->errbuf && cmd->errfdptr != &cmd->errfd)) {
> +    /* Buffer management can only be requested via virCommandRun, unless help
> +     * from the event loop has been requested via virCommandDoAsyncIO. */
> +    if (cmd->flags & VIR_EXEC_ASYNC_IO) {
> +        /* If we have an input buffer, we need
> +         * a pipe to feed the data to the child */
> +        if (cmd->inbuf && cmd->infd == -1) {
> +            if (pipe2(infd, O_CLOEXEC) < 0) {
> +                virReportSystemError(errno, "%s",
> +                                     _("unable to open pipe"));
> +                cmd->has_error = -1;
> +                return -1;
> +            }
> +            cmd->infd = infd[0];
> +        }
> +    } else if ((cmd->inbuf && cmd->infd == -1) ||
> +         (cmd->outbuf && cmd->outfdptr != &cmd->outfd) ||
> +         (cmd->errbuf && cmd->errfdptr != &cmd->errfd)) {
>           virReportError(VIR_ERR_INTERNAL_ERROR, "%s",
>                          _("cannot mix string I/O with asynchronous command"));
>           return -1;
> @@ -2228,6 +2395,16 @@ virCommandRunAsync(virCommandPtr cmd, pid_t *pid)
>       else
>           cmd->reap = true;
>
> +    if (ret == 0 && cmd->flags & VIR_EXEC_ASYNC_IO) {
> +        cmd->flags &= ~VIR_EXEC_ASYNC_IO;
> +        if (cmd->inbuf && cmd->infd != -1) {
> +            /* close the read end of infd and replace it with the write end */
> +            VIR_FORCE_CLOSE(cmd->infd);
> +            cmd->infd = infd[1];
> +        }
> +        ret = virCommandRegisterEventLoop(cmd);
> +    }
> +
>       return ret;
>   }
>
> @@ -2265,6 +2442,11 @@ virCommandWait(virCommandPtr cmd, int *exitstatus)
>           return -1;
>       }
>
> +    while (cmd->inWatch != -1 &&
> +           cmd->outWatch != -1 &&
> +           cmd->errWatch != -1)
> +        usleep(100);

Hm, is there a possibility to avoid this active wait loop?

> +
>       /* If virProcessWait reaps pid but then returns failure because
>        * exitstatus was NULL, then a second virCommandWait would risk
>        * calling waitpid on an unrelated process.  Besides, that error
> @@ -2516,8 +2698,24 @@ virCommandFree(virCommandPtr cmd)
>       if (cmd->reap)
>           virCommandAbort(cmd);
>
> +    if (cmd->inWatch != -1)
> +        virEventRemoveHandle(cmd->inWatch);
> +    if (cmd->outWatch != -1)
> +        virEventRemoveHandle(cmd->outWatch);
> +    if (cmd->errWatch != -1)
> +        virEventRemoveHandle(cmd->errWatch);
> +
>       VIR_FREE(cmd->transfer);
>       VIR_FREE(cmd->preserve);
>
>       VIR_FREE(cmd);
>   }
> +
> +void
> +virCommandDoAsyncIO(virCommandPtr cmd)
> +{
> +   if (!cmd || cmd->has_error)
> +       return;
> +
> +   cmd->flags |= VIR_EXEC_ASYNC_IO;
> +}
> diff --git a/src/util/vircommand.h b/src/util/vircommand.h
> index 9b7117d..c1a2e24 100644
> --- a/src/util/vircommand.h
> +++ b/src/util/vircommand.h
> @@ -163,4 +163,5 @@ void virCommandAbort(virCommandPtr cmd);
>
>   void virCommandFree(virCommandPtr cmd);
>
> +void virCommandDoAsyncIO(virCommandPtr cmd);
>   #endif /* __VIR_COMMAND_H__ */
>

Peter




More information about the libvir-list mailing list