[libvirt] [PATCH v2 05/38] virfdstream: Drop iohelper in favour of a thread

John Ferlan jferlan at redhat.com
Thu Apr 27 17:42:51 UTC 2017



On 04/20/2017 06:01 AM, Michal Privoznik wrote:
> Currently we use iohelper for virFDStream implementation. This is
> because UNIX I/O can lie sometimes: even though a FD for a
> file/block device is set as unblocking, actual read()/write() can
> block. To avoid this, a pipe is created and one end is kept for
> read/write while the other is handed over to iohelper to
> write/read the data for us. Thus it's iohelper which gets blocked
> and not our event loop.
> 
> This approach has two problems:
> 1) we are spawning a new process.
> 2) any exchange of information between daemon and iohelper can be
> done only through the pipe.
> 
> Therefore, iohelper is replaced with an implementation in thread
> which is created just for the stream lifetime. The data are still
> transferred through pipe (for now), but both problems described
> above are solved.
> 
> Signed-off-by: Michal Privoznik <mprivozn at redhat.com>
> ---
>  src/util/virfdstream.c | 245 +++++++++++++++++++++++++++++++------------------
>  src/util/virfdstream.h |   1 -
>  2 files changed, 158 insertions(+), 88 deletions(-)
> 
> diff --git a/src/util/virfdstream.c b/src/util/virfdstream.c
> index 9a4a7ff..7a8d65d 100644
> --- a/src/util/virfdstream.c
> +++ b/src/util/virfdstream.c
> @@ -56,8 +56,6 @@ struct virFDStreamData {
>      virObjectLockable parent;
>  
>      int fd;
> -    int errfd;
> -    virCommandPtr cmd;
>      unsigned long long offset;
>      unsigned long long length;
>  
> @@ -79,6 +77,11 @@ struct virFDStreamData {
>      virFDStreamInternalCloseCb icbCb;
>      virFDStreamInternalCloseCbFreeOpaque icbFreeOpaque;
>      void *icbOpaque;
> +
> +    /* Thread data */
> +    virThreadPtr thread;
> +    int threadErr;
> +    bool threadQuit;
>  };
>  
>  static virClassPtr virFDStreamDataClass;
> @@ -264,57 +267,123 @@ virFDStreamAddCallback(virStreamPtr st,
>      return ret;
>  }
>  
> +
> +typedef struct _virFDStreamThreadData virFDStreamThreadData;
> +typedef virFDStreamThreadData *virFDStreamThreadDataPtr;
> +struct _virFDStreamThreadData {
> +    virStreamPtr st;
> +    size_t length;
> +    int fdin;
> +    char *fdinname;
> +    int fdout;
> +    char *fdoutname;
> +};
> +
> +
> +static void
> +virFDStreamThreadDataFree(virFDStreamThreadDataPtr data)
> +{
> +    if (!data)
> +        return;
> +
> +    virObjectUnref(data->st);
> +    VIR_FREE(data->fdinname);
> +    VIR_FREE(data->fdoutname);
> +    VIR_FREE(data);
> +}
> +
> +
> +static void
> +virFDStreamThread(void *opaque)
> +{
> +    virFDStreamThreadDataPtr data = opaque;
> +    virStreamPtr st = data->st;
> +    size_t length = data->length;
> +    int fdin = data->fdin;
> +    char *fdinname = data->fdinname;
> +    int fdout = data->fdout;
> +    char *fdoutname = data->fdoutname;
> +    virFDStreamDataPtr fdst = st->privateData;
> +    char *buf = NULL;
> +    size_t buflen = 256 * 1024;
> +    size_t total = 0;
> +
> +    virObjectRef(fdst);
> +
> +    if (VIR_ALLOC_N(buf, buflen) < 0)
> +        goto error;
> +
> +    while (1) {
> +        ssize_t got;
> +
> +        if (length &&
> +            (length - total) < buflen)
> +            buflen = length - total;
> +
> +        if (buflen == 0)
> +            break; /* End of requested data from client */
> +
> +        if ((got = saferead(fdin, buf, buflen)) < 0) {
> +            virReportSystemError(errno,
> +                                 _("Unable to read %s"),
> +                                 fdinname);
> +            goto error;
> +        }
> +
> +        if (got == 0)
> +            break;
> +
> +        total += got;
> +
> +        if (safewrite(fdout, buf, got) < 0) {
> +            virReportSystemError(errno,
> +                                 _("Unable to write %s"),
> +                                 fdoutname);
> +            goto error;
> +        }
> +    }
> +
> + cleanup:
> +    if (!virObjectUnref(fdst))
> +        st->privateData = NULL;
> +    VIR_FORCE_CLOSE(fdin);
> +    VIR_FORCE_CLOSE(fdout);
> +    virFDStreamThreadDataFree(data);
> +    VIR_FREE(buf);
> +    return;
> +
> + error:
> +    virObjectLock(fdst);
> +    fdst->threadErr = errno;
> +    virObjectUnlock(fdst);
> +    goto cleanup;
> +}
> +
> +
>  static int
> -virFDStreamCloseCommand(virFDStreamDataPtr fdst, bool streamAbort)
> +virFDStreamJoinWorker(virFDStreamDataPtr fdst, bool streamAbort)

Since you're touching - arguments should be on separate lines...

>  {
> -    char buf[1024];
> -    ssize_t len;
> -    int status;
>      int ret = -1;
> -
> -    if (!fdst->cmd)
> +    if (!fdst->thread)
>          return 0;
>  
> -    if ((len = saferead(fdst->errfd, buf, sizeof(buf)-1)) < 0)
> -        buf[0] = '\0';
> -    else
> -        buf[len] = '\0';
> +    /* Give the thread a chance to lock the FD stream object. */
> +    virObjectUnlock(fdst);
> +    virThreadJoin(fdst->thread);
> +    virObjectLock(fdst);
>  
> -    virCommandRawStatus(fdst->cmd);
> -    if (virCommandWait(fdst->cmd, &status) < 0)
> -        goto cleanup;
> -
> -    if (status != 0) {
> -        if (buf[0] != '\0') {
> -            virReportError(VIR_ERR_INTERNAL_ERROR, "%s", buf);
> -        } else if (WIFSIGNALED(status) && WTERMSIG(status) == SIGPIPE) {
> -            if (streamAbort) {
> -                /* Explicit abort request means the caller doesn't care
> -                   if there's data left over, so skip the error */
> -                goto out;
> -            }
> -
> -            virReportError(VIR_ERR_INTERNAL_ERROR, "%s",
> -                           _("I/O helper exited "
> -                             "before all data was processed"));
> -        } else {
> -            char *str = virProcessTranslateStatus(status);
> -            virReportError(VIR_ERR_INTERNAL_ERROR,
> -                           _("I/O helper exited with %s"),
> -                           NULLSTR(str));
> -            VIR_FREE(str);
> -        }
> +    if (fdst->threadErr && !streamAbort) {
> +        /* errors are expected on streamAbort */
>          goto cleanup;
>      }
>  
> - out:
>      ret = 0;
>   cleanup:
> -    virCommandFree(fdst->cmd);
> -    fdst->cmd = NULL;
> +    VIR_FREE(fdst->thread);
>      return ret;
>  }
>  
> +

Should I say anything? I'm fine with it... :-)

>  static int
>  virFDStreamCloseInt(virStreamPtr st, bool streamAbort)
>  {
> @@ -359,12 +428,9 @@ virFDStreamCloseInt(virStreamPtr st, bool streamAbort)
>  
>      /* mutex locked */
>      ret = VIR_CLOSE(fdst->fd);
> -    if (virFDStreamCloseCommand(fdst, streamAbort) < 0)
> +    if (virFDStreamJoinWorker(fdst, streamAbort) < 0)
>          ret = -1;
>  
> -    if (VIR_CLOSE(fdst->errfd) < 0)
> -        VIR_DEBUG("ignoring failed close on fd %d", fdst->errfd);
> -
>      st->privateData = NULL;
>  
>      /* call the internal stream closing callback */
> @@ -516,14 +582,13 @@ static virStreamDriver virFDStreamDrv = {
>  
>  static int virFDStreamOpenInternal(virStreamPtr st,
>                                     int fd,
> -                                   virCommandPtr cmd,
> -                                   int errfd,
> +                                   virFDStreamThreadDataPtr threadData,
>                                     unsigned long long length)
>  {
>      virFDStreamDataPtr fdst;
>  
> -    VIR_DEBUG("st=%p fd=%d cmd=%p errfd=%d length=%llu",
> -              st, fd, cmd, errfd, length);
> +    VIR_DEBUG("st=%p fd=%d threadData=%p length=%llu",
> +              st, fd, threadData, length);
>  
>      if (virFDStreamDataInitialize() < 0)
>          return -1;
> @@ -538,21 +603,39 @@ static int virFDStreamOpenInternal(virStreamPtr st,
>          return -1;
>  
>      fdst->fd = fd;
> -    fdst->cmd = cmd;
> -    fdst->errfd = errfd;
>      fdst->length = length;
>  
>      st->driver = &virFDStreamDrv;
>      st->privateData = fdst;
>  
> +    if (threadData) {
> +        /* Create the thread after fdst and st were initialized.
> +         * The thread worker expects them to be that way. */
> +        if (VIR_ALLOC(fdst->thread) < 0)
> +            goto error;
> +
> +        if (virThreadCreate(fdst->thread,
> +                            true,
> +                            virFDStreamThread,
> +                            threadData) < 0)
> +            goto error;
> +    }
> +
>      return 0;
> +
> + error:
> +    VIR_FREE(fdst->thread);
> +    st->driver = NULL;
> +    st->privateData = NULL;
> +    virObjectUnref(fdst);
> +    return -1;
>  }
>  
>  
>  int virFDStreamOpen(virStreamPtr st,
>                      int fd)
>  {
> -    return virFDStreamOpenInternal(st, fd, NULL, -1, 0);
> +    return virFDStreamOpenInternal(st, fd, NULL, 0);
>  }
>  
>  
> @@ -598,7 +681,7 @@ int virFDStreamConnectUNIX(virStreamPtr st,
>          goto error;
>      }
>  
> -    if (virFDStreamOpenInternal(st, fd, NULL, -1, 0) < 0)
> +    if (virFDStreamOpenInternal(st, fd, NULL, 0) < 0)
>          goto error;
>      return 0;
>  
> @@ -627,11 +710,10 @@ virFDStreamOpenFileInternal(virStreamPtr st,
>                              bool forceIOHelper)
>  {
>      int fd = -1;
> -    int childfd = -1;
> +    int pipefds[2] = { -1, -1 };
> +    int tmpfd = -1;
>      struct stat sb;
> -    virCommandPtr cmd = NULL;
> -    int errfd = -1;
> -    char *iohelper_path = NULL;
> +    virFDStreamThreadDataPtr threadData = NULL;
>  
>      VIR_DEBUG("st=%p path=%s oflags=%x offset=%llu length=%llu mode=%o",
>                st, path, oflags, offset, length, mode);
> @@ -648,6 +730,7 @@ virFDStreamOpenFileInternal(virStreamPtr st,
>                               path);
>          return -1;
>      }
> +    tmpfd = fd;
>  
>      if (fstat(fd, &sb) < 0) {
>          virReportSystemError(errno,
> @@ -672,7 +755,6 @@ virFDStreamOpenFileInternal(virStreamPtr st,
>      if ((st->flags & VIR_STREAM_NONBLOCK) &&
>          ((!S_ISCHR(sb.st_mode) &&
>            !S_ISFIFO(sb.st_mode)) || forceIOHelper)) {
> -        int fds[2] = { -1, -1 };
>  
>          if ((oflags & O_ACCMODE) == O_RDWR) {
>              virReportError(VIR_ERR_INTERNAL_ERROR,
> @@ -681,58 +763,47 @@ virFDStreamOpenFileInternal(virStreamPtr st,
>              goto error;
>          }
>  

There's a comment above here indicating forking a helper that should
change...

ACK w/ a couple of minor adjustments,

John

> -        if (pipe(fds) < 0) {
> +        if (pipe(pipefds) < 0) {
>              virReportSystemError(errno, "%s",
>                                   _("Unable to create pipe"));
>              goto error;
>          }
>  
> -        if (!(iohelper_path = virFileFindResource("libvirt_iohelper",
> -                                                  abs_topbuilddir "/src",
> -                                                  LIBEXECDIR)))
> +        if (VIR_ALLOC(threadData) < 0)
>              goto error;
>  
> -        cmd = virCommandNewArgList(iohelper_path,
> -                                   path,
> -                                   NULL);
> -
> -        VIR_FREE(iohelper_path);
> -
> -        virCommandAddArgFormat(cmd, "%llu", length);
> -        virCommandPassFD(cmd, fd,
> -                         VIR_COMMAND_PASS_FD_CLOSE_PARENT);
> -        virCommandAddArgFormat(cmd, "%d", fd);
> +        threadData->st = virObjectRef(st);
> +        threadData->length = length;
>  
>          if ((oflags & O_ACCMODE) == O_RDONLY) {
> -            childfd = fds[1];
> -            fd = fds[0];
> -            virCommandSetOutputFD(cmd, &childfd);
> +            threadData->fdin = fd;
> +            threadData->fdout = pipefds[1];
> +            if (VIR_STRDUP(threadData->fdinname, path) < 0 ||
> +                VIR_STRDUP(threadData->fdoutname, "pipe") < 0)
> +                goto error;
> +            tmpfd = pipefds[0];
>          } else {
> -            childfd = fds[0];
> -            fd = fds[1];
> -            virCommandSetInputFD(cmd, childfd);
> +            threadData->fdin = pipefds[0];
> +            threadData->fdout = fd;
> +            if (VIR_STRDUP(threadData->fdinname, "pipe") < 0 ||
> +                VIR_STRDUP(threadData->fdoutname, path) < 0)
> +                goto error;
> +            tmpfd = pipefds[1];
>          }
> -        virCommandSetErrorFD(cmd, &errfd);
> -
> -        if (virCommandRunAsync(cmd, NULL) < 0)
> -            goto error;
> -
> -        VIR_FORCE_CLOSE(childfd);
>      }
>  
> -    if (virFDStreamOpenInternal(st, fd, cmd, errfd, length) < 0)
> +    if (virFDStreamOpenInternal(st, tmpfd, threadData, length) < 0)
>          goto error;
>  
>      return 0;
>  
>   error:
> -    virCommandFree(cmd);
>      VIR_FORCE_CLOSE(fd);
> -    VIR_FORCE_CLOSE(childfd);
> -    VIR_FORCE_CLOSE(errfd);
> -    VIR_FREE(iohelper_path);
> +    VIR_FORCE_CLOSE(pipefds[0]);
> +    VIR_FORCE_CLOSE(pipefds[1]);
>      if (oflags & O_CREAT)
>          unlink(path);
> +    virFDStreamThreadDataFree(threadData);
>      return -1;
>  }
>  
> diff --git a/src/util/virfdstream.h b/src/util/virfdstream.h
> index 32a741e..34c4c3f 100644
> --- a/src/util/virfdstream.h
> +++ b/src/util/virfdstream.h
> @@ -24,7 +24,6 @@
>  # define __VIR_FDSTREAM_H_
>  
>  # include "internal.h"
> -# include "vircommand.h"
>  
>  /* internal callback, the generic one is used up by daemon stream driver */
>  /* the close callback is called with fdstream private data locked */
> 




More information about the libvir-list mailing list