[libvirt] [PATCH v2 05/38] virfdstream: Drop iohelper in favour of a thread
John Ferlan
jferlan at redhat.com
Thu Apr 27 17:42:51 UTC 2017
On 04/20/2017 06:01 AM, Michal Privoznik wrote:
> Currently we use iohelper for virFDStream implementation. This is
> because UNIX I/O can lie sometimes: even though a FD for a
> file/block device is set as unblocking, actual read()/write() can
> block. To avoid this, a pipe is created and one end is kept for
> read/write while the other is handed over to iohelper to
> write/read the data for us. Thus it's iohelper which gets blocked
> and not our event loop.
>
> This approach has two problems:
> 1) we are spawning a new process.
> 2) any exchange of information between daemon and iohelper can be
> done only through the pipe.
>
> Therefore, iohelper is replaced with an implementation in thread
> which is created just for the stream lifetime. The data are still
> transferred through pipe (for now), but both problems described
> above are solved.
>
> Signed-off-by: Michal Privoznik <mprivozn at redhat.com>
> ---
> src/util/virfdstream.c | 245 +++++++++++++++++++++++++++++++------------------
> src/util/virfdstream.h | 1 -
> 2 files changed, 158 insertions(+), 88 deletions(-)
>
> diff --git a/src/util/virfdstream.c b/src/util/virfdstream.c
> index 9a4a7ff..7a8d65d 100644
> --- a/src/util/virfdstream.c
> +++ b/src/util/virfdstream.c
> @@ -56,8 +56,6 @@ struct virFDStreamData {
> virObjectLockable parent;
>
> int fd;
> - int errfd;
> - virCommandPtr cmd;
> unsigned long long offset;
> unsigned long long length;
>
> @@ -79,6 +77,11 @@ struct virFDStreamData {
> virFDStreamInternalCloseCb icbCb;
> virFDStreamInternalCloseCbFreeOpaque icbFreeOpaque;
> void *icbOpaque;
> +
> + /* Thread data */
> + virThreadPtr thread;
> + int threadErr;
> + bool threadQuit;
> };
>
> static virClassPtr virFDStreamDataClass;
> @@ -264,57 +267,123 @@ virFDStreamAddCallback(virStreamPtr st,
> return ret;
> }
>
> +
> +typedef struct _virFDStreamThreadData virFDStreamThreadData;
> +typedef virFDStreamThreadData *virFDStreamThreadDataPtr;
> +struct _virFDStreamThreadData {
> + virStreamPtr st;
> + size_t length;
> + int fdin;
> + char *fdinname;
> + int fdout;
> + char *fdoutname;
> +};
> +
> +
> +static void
> +virFDStreamThreadDataFree(virFDStreamThreadDataPtr data)
> +{
> + if (!data)
> + return;
> +
> + virObjectUnref(data->st);
> + VIR_FREE(data->fdinname);
> + VIR_FREE(data->fdoutname);
> + VIR_FREE(data);
> +}
> +
> +
> +static void
> +virFDStreamThread(void *opaque)
> +{
> + virFDStreamThreadDataPtr data = opaque;
> + virStreamPtr st = data->st;
> + size_t length = data->length;
> + int fdin = data->fdin;
> + char *fdinname = data->fdinname;
> + int fdout = data->fdout;
> + char *fdoutname = data->fdoutname;
> + virFDStreamDataPtr fdst = st->privateData;
> + char *buf = NULL;
> + size_t buflen = 256 * 1024;
> + size_t total = 0;
> +
> + virObjectRef(fdst);
> +
> + if (VIR_ALLOC_N(buf, buflen) < 0)
> + goto error;
> +
> + while (1) {
> + ssize_t got;
> +
> + if (length &&
> + (length - total) < buflen)
> + buflen = length - total;
> +
> + if (buflen == 0)
> + break; /* End of requested data from client */
> +
> + if ((got = saferead(fdin, buf, buflen)) < 0) {
> + virReportSystemError(errno,
> + _("Unable to read %s"),
> + fdinname);
> + goto error;
> + }
> +
> + if (got == 0)
> + break;
> +
> + total += got;
> +
> + if (safewrite(fdout, buf, got) < 0) {
> + virReportSystemError(errno,
> + _("Unable to write %s"),
> + fdoutname);
> + goto error;
> + }
> + }
> +
> + cleanup:
> + if (!virObjectUnref(fdst))
> + st->privateData = NULL;
> + VIR_FORCE_CLOSE(fdin);
> + VIR_FORCE_CLOSE(fdout);
> + virFDStreamThreadDataFree(data);
> + VIR_FREE(buf);
> + return;
> +
> + error:
> + virObjectLock(fdst);
> + fdst->threadErr = errno;
> + virObjectUnlock(fdst);
> + goto cleanup;
> +}
> +
> +
> static int
> -virFDStreamCloseCommand(virFDStreamDataPtr fdst, bool streamAbort)
> +virFDStreamJoinWorker(virFDStreamDataPtr fdst, bool streamAbort)
Since you're touching - arguments should be on separate lines...
> {
> - char buf[1024];
> - ssize_t len;
> - int status;
> int ret = -1;
> -
> - if (!fdst->cmd)
> + if (!fdst->thread)
> return 0;
>
> - if ((len = saferead(fdst->errfd, buf, sizeof(buf)-1)) < 0)
> - buf[0] = '\0';
> - else
> - buf[len] = '\0';
> + /* Give the thread a chance to lock the FD stream object. */
> + virObjectUnlock(fdst);
> + virThreadJoin(fdst->thread);
> + virObjectLock(fdst);
>
> - virCommandRawStatus(fdst->cmd);
> - if (virCommandWait(fdst->cmd, &status) < 0)
> - goto cleanup;
> -
> - if (status != 0) {
> - if (buf[0] != '\0') {
> - virReportError(VIR_ERR_INTERNAL_ERROR, "%s", buf);
> - } else if (WIFSIGNALED(status) && WTERMSIG(status) == SIGPIPE) {
> - if (streamAbort) {
> - /* Explicit abort request means the caller doesn't care
> - if there's data left over, so skip the error */
> - goto out;
> - }
> -
> - virReportError(VIR_ERR_INTERNAL_ERROR, "%s",
> - _("I/O helper exited "
> - "before all data was processed"));
> - } else {
> - char *str = virProcessTranslateStatus(status);
> - virReportError(VIR_ERR_INTERNAL_ERROR,
> - _("I/O helper exited with %s"),
> - NULLSTR(str));
> - VIR_FREE(str);
> - }
> + if (fdst->threadErr && !streamAbort) {
> + /* errors are expected on streamAbort */
> goto cleanup;
> }
>
> - out:
> ret = 0;
> cleanup:
> - virCommandFree(fdst->cmd);
> - fdst->cmd = NULL;
> + VIR_FREE(fdst->thread);
> return ret;
> }
>
> +
Should I say anything? I'm fine with it... :-)
> static int
> virFDStreamCloseInt(virStreamPtr st, bool streamAbort)
> {
> @@ -359,12 +428,9 @@ virFDStreamCloseInt(virStreamPtr st, bool streamAbort)
>
> /* mutex locked */
> ret = VIR_CLOSE(fdst->fd);
> - if (virFDStreamCloseCommand(fdst, streamAbort) < 0)
> + if (virFDStreamJoinWorker(fdst, streamAbort) < 0)
> ret = -1;
>
> - if (VIR_CLOSE(fdst->errfd) < 0)
> - VIR_DEBUG("ignoring failed close on fd %d", fdst->errfd);
> -
> st->privateData = NULL;
>
> /* call the internal stream closing callback */
> @@ -516,14 +582,13 @@ static virStreamDriver virFDStreamDrv = {
>
> static int virFDStreamOpenInternal(virStreamPtr st,
> int fd,
> - virCommandPtr cmd,
> - int errfd,
> + virFDStreamThreadDataPtr threadData,
> unsigned long long length)
> {
> virFDStreamDataPtr fdst;
>
> - VIR_DEBUG("st=%p fd=%d cmd=%p errfd=%d length=%llu",
> - st, fd, cmd, errfd, length);
> + VIR_DEBUG("st=%p fd=%d threadData=%p length=%llu",
> + st, fd, threadData, length);
>
> if (virFDStreamDataInitialize() < 0)
> return -1;
> @@ -538,21 +603,39 @@ static int virFDStreamOpenInternal(virStreamPtr st,
> return -1;
>
> fdst->fd = fd;
> - fdst->cmd = cmd;
> - fdst->errfd = errfd;
> fdst->length = length;
>
> st->driver = &virFDStreamDrv;
> st->privateData = fdst;
>
> + if (threadData) {
> + /* Create the thread after fdst and st were initialized.
> + * The thread worker expects them to be that way. */
> + if (VIR_ALLOC(fdst->thread) < 0)
> + goto error;
> +
> + if (virThreadCreate(fdst->thread,
> + true,
> + virFDStreamThread,
> + threadData) < 0)
> + goto error;
> + }
> +
> return 0;
> +
> + error:
> + VIR_FREE(fdst->thread);
> + st->driver = NULL;
> + st->privateData = NULL;
> + virObjectUnref(fdst);
> + return -1;
> }
>
>
> int virFDStreamOpen(virStreamPtr st,
> int fd)
> {
> - return virFDStreamOpenInternal(st, fd, NULL, -1, 0);
> + return virFDStreamOpenInternal(st, fd, NULL, 0);
> }
>
>
> @@ -598,7 +681,7 @@ int virFDStreamConnectUNIX(virStreamPtr st,
> goto error;
> }
>
> - if (virFDStreamOpenInternal(st, fd, NULL, -1, 0) < 0)
> + if (virFDStreamOpenInternal(st, fd, NULL, 0) < 0)
> goto error;
> return 0;
>
> @@ -627,11 +710,10 @@ virFDStreamOpenFileInternal(virStreamPtr st,
> bool forceIOHelper)
> {
> int fd = -1;
> - int childfd = -1;
> + int pipefds[2] = { -1, -1 };
> + int tmpfd = -1;
> struct stat sb;
> - virCommandPtr cmd = NULL;
> - int errfd = -1;
> - char *iohelper_path = NULL;
> + virFDStreamThreadDataPtr threadData = NULL;
>
> VIR_DEBUG("st=%p path=%s oflags=%x offset=%llu length=%llu mode=%o",
> st, path, oflags, offset, length, mode);
> @@ -648,6 +730,7 @@ virFDStreamOpenFileInternal(virStreamPtr st,
> path);
> return -1;
> }
> + tmpfd = fd;
>
> if (fstat(fd, &sb) < 0) {
> virReportSystemError(errno,
> @@ -672,7 +755,6 @@ virFDStreamOpenFileInternal(virStreamPtr st,
> if ((st->flags & VIR_STREAM_NONBLOCK) &&
> ((!S_ISCHR(sb.st_mode) &&
> !S_ISFIFO(sb.st_mode)) || forceIOHelper)) {
> - int fds[2] = { -1, -1 };
>
> if ((oflags & O_ACCMODE) == O_RDWR) {
> virReportError(VIR_ERR_INTERNAL_ERROR,
> @@ -681,58 +763,47 @@ virFDStreamOpenFileInternal(virStreamPtr st,
> goto error;
> }
>
There's a comment above here indicating forking a helper that should
change...
ACK w/ a couple of minor adjustments,
John
> - if (pipe(fds) < 0) {
> + if (pipe(pipefds) < 0) {
> virReportSystemError(errno, "%s",
> _("Unable to create pipe"));
> goto error;
> }
>
> - if (!(iohelper_path = virFileFindResource("libvirt_iohelper",
> - abs_topbuilddir "/src",
> - LIBEXECDIR)))
> + if (VIR_ALLOC(threadData) < 0)
> goto error;
>
> - cmd = virCommandNewArgList(iohelper_path,
> - path,
> - NULL);
> -
> - VIR_FREE(iohelper_path);
> -
> - virCommandAddArgFormat(cmd, "%llu", length);
> - virCommandPassFD(cmd, fd,
> - VIR_COMMAND_PASS_FD_CLOSE_PARENT);
> - virCommandAddArgFormat(cmd, "%d", fd);
> + threadData->st = virObjectRef(st);
> + threadData->length = length;
>
> if ((oflags & O_ACCMODE) == O_RDONLY) {
> - childfd = fds[1];
> - fd = fds[0];
> - virCommandSetOutputFD(cmd, &childfd);
> + threadData->fdin = fd;
> + threadData->fdout = pipefds[1];
> + if (VIR_STRDUP(threadData->fdinname, path) < 0 ||
> + VIR_STRDUP(threadData->fdoutname, "pipe") < 0)
> + goto error;
> + tmpfd = pipefds[0];
> } else {
> - childfd = fds[0];
> - fd = fds[1];
> - virCommandSetInputFD(cmd, childfd);
> + threadData->fdin = pipefds[0];
> + threadData->fdout = fd;
> + if (VIR_STRDUP(threadData->fdinname, "pipe") < 0 ||
> + VIR_STRDUP(threadData->fdoutname, path) < 0)
> + goto error;
> + tmpfd = pipefds[1];
> }
> - virCommandSetErrorFD(cmd, &errfd);
> -
> - if (virCommandRunAsync(cmd, NULL) < 0)
> - goto error;
> -
> - VIR_FORCE_CLOSE(childfd);
> }
>
> - if (virFDStreamOpenInternal(st, fd, cmd, errfd, length) < 0)
> + if (virFDStreamOpenInternal(st, tmpfd, threadData, length) < 0)
> goto error;
>
> return 0;
>
> error:
> - virCommandFree(cmd);
> VIR_FORCE_CLOSE(fd);
> - VIR_FORCE_CLOSE(childfd);
> - VIR_FORCE_CLOSE(errfd);
> - VIR_FREE(iohelper_path);
> + VIR_FORCE_CLOSE(pipefds[0]);
> + VIR_FORCE_CLOSE(pipefds[1]);
> if (oflags & O_CREAT)
> unlink(path);
> + virFDStreamThreadDataFree(threadData);
> return -1;
> }
>
> diff --git a/src/util/virfdstream.h b/src/util/virfdstream.h
> index 32a741e..34c4c3f 100644
> --- a/src/util/virfdstream.h
> +++ b/src/util/virfdstream.h
> @@ -24,7 +24,6 @@
> # define __VIR_FDSTREAM_H_
>
> # include "internal.h"
> -# include "vircommand.h"
>
> /* internal callback, the generic one is used up by daemon stream driver */
> /* the close callback is called with fdstream private data locked */
>
More information about the libvir-list
mailing list