[libvirt] [PATCH v2 06/38] virfdstream: Use messages instead of pipe

Daniel P. Berrange berrange at redhat.com
Thu Apr 27 17:48:59 UTC 2017


On Thu, Apr 20, 2017 at 12:01:35PM +0200, Michal Privoznik wrote:
> One big downside of using the pipe to transfer the data is that
> we can really transfer just bare data. No metadata can be carried
> through unless some formatted messages are introduced. That would
> be quite painful to achieve so let's use a message queue. It's
> fairly easy to exchange info between threads now that iohelper is
> no longer used.
> 
> The reason why we cannot use the FD for plain files directly is
> that despite us setting noblock flag on the FD, any
> read()/write() blocks regardless (which is a show stopper since
> those parts of the code are run from the event loop) and poll()
> reports such FD as always readable/writable - even though the
> subsequent operation might block.
> 
> The pipe is still not gone though. It is used to signal to even
> loop that an event occurred (e.g. data are available for reading
> in the queue, or vice versa).
> 
> Signed-off-by: Michal Privoznik <mprivozn at redhat.com>
> ---
>  src/util/virfdstream.c | 402 ++++++++++++++++++++++++++++++++++++++++++-------
>  1 file changed, 350 insertions(+), 52 deletions(-)
> 

> +static int
> +virFDStreamMsgQueuePush(virFDStreamDataPtr fdst,
> +                        virFDStreamMsgPtr msg,
> +                        int fd,
> +                        const char *fdname)
> +{
> +    virFDStreamMsgPtr *tmp = &fdst->msg;
> +    char c = '1';
> +
> +    while (*tmp)
> +        tmp = &(*tmp)->next;
> +
> +    *tmp = msg;
> +    virCondSignal(&fdst->threadCond);
> +
> +    if (safewrite(fd, &c, sizeof(c)) != sizeof(c)) {
> +        virReportSystemError(errno,
> +                             _("Unable to write to %s"),
> +                             fdname);
> +        return -1;
> +    }
> +
> +    return 0;
> +}
> +
> +
> +static virFDStreamMsgPtr
> +virFDStreamMsgQueuePop(virFDStreamDataPtr fdst,
> +                       int fd,
> +                       const char *fdname)
> +{
> +    virFDStreamMsgPtr tmp = fdst->msg;
> +    char c;
> +
> +    if (tmp) {
> +        fdst->msg = tmp->next;
> +        tmp->next = NULL;
> +    }
> +
> +    virCondSignal(&fdst->threadCond);
> +
> +    if (saferead(fd, &c, sizeof(c)) != sizeof(c)) {
> +        virReportSystemError(errno,
> +                             _("Unable to read from %s"),
> +                             fdname);
> +        return NULL;
> +    }
> +
> +    return tmp;
> +}

Both these methods signal the condition



> +static ssize_t
> +virFDStreamThreadDoRead(virFDStreamDataPtr fdst,
> +                        const int fdin,
> +                        const int fdout,
> +                        const char *fdinname,
> +                        const char *fdoutname,
> +                        size_t buflen)
> +{
> +    virFDStreamMsgPtr msg = NULL;
> +    char *buf = NULL;
> +    ssize_t got;
> +
> +    if (VIR_ALLOC(msg) < 0)
> +        goto error;
> +
> +    if (VIR_ALLOC_N(buf, buflen) < 0)
> +        goto error;
> +
> +    if ((got = saferead(fdin, buf, buflen)) < 0) {
> +        virReportSystemError(errno,
> +                             _("Unable to read %s"),
> +                             fdinname);
> +        goto error;
> +    }
> +
> +    msg->type = VIR_FDSTREAM_MSG_TYPE_DATA;
> +    msg->stream.data.buf = buf;
> +    msg->stream.data.len = got;
> +    buf = NULL;
> +
> +    virFDStreamMsgQueuePush(fdst, msg, fdout, fdoutname);
> +    msg = NULL;
> +
> +    return got;
> +
> + error:
> +    VIR_FREE(buf);
> +    virFDStreamMsgFree(msg);
> +    return -1;
> +}
> +
> +
> +static ssize_t
> +virFDStreamThreadDoWrite(virFDStreamDataPtr fdst,
> +                         const int fdin,
> +                         const int fdout,
> +                         const char *fdinname,
> +                         const char *fdoutname)
> +{
> +    ssize_t got;
> +    virFDStreamMsgPtr msg = fdst->msg;
> +    bool pop = false;
> +
> +    switch (msg->type) {
> +    case VIR_FDSTREAM_MSG_TYPE_DATA:
> +        got = safewrite(fdout,
> +                        msg->stream.data.buf + msg->stream.data.offset,
> +                        msg->stream.data.len - msg->stream.data.offset);
> +        if (got < 0) {
> +            virReportSystemError(errno,
> +                                 _("Unable to write %s"),
> +                                 fdoutname);
> +            return -1;
> +        }
> +
> +        msg->stream.data.offset += got;
> +
> +        pop = msg->stream.data.offset == msg->stream.data.len;
> +        break;
> +    }
> +
> +    if (pop) {
> +        virFDStreamMsgQueuePop(fdst, fdin, fdinname);
> +        virFDStreamMsgFree(msg);
> +    }
> +
> +    return got;
> +}

Both these methods call into the Pop/Push functions which
signal the condition.

> @@ -304,14 +496,12 @@ virFDStreamThread(void *opaque)
>      int fdout = data->fdout;
>      char *fdoutname = data->fdoutname;
>      virFDStreamDataPtr fdst = st->privateData;
> -    char *buf = NULL;
> +    bool doRead = fdst->threadDoRead;
>      size_t buflen = 256 * 1024;
>      size_t total = 0;
>  
>      virObjectRef(fdst);
> -
> -    if (VIR_ALLOC_N(buf, buflen) < 0)
> -        goto error;
> +    virObjectLock(fdst);
>  
>      while (1) {
>          ssize_t got;
> @@ -323,39 +513,56 @@ virFDStreamThread(void *opaque)
>          if (buflen == 0)
>              break; /* End of requested data from client */
>  
> -        if ((got = saferead(fdin, buf, buflen)) < 0) {
> -            virReportSystemError(errno,
> -                                 _("Unable to read %s"),
> -                                 fdinname);
> +        while (doRead == (fdst->msg != NULL) &&
> +               !fdst->threadQuit) {
> +            if (virCondWait(&fdst->threadCond, &fdst->parent.lock)) {
> +                virReportSystemError(errno, "%s",
> +                                     _("failed to wait on condition"));
> +                goto error;
> +            }
> +        }
> +
> +        if (fdst->threadQuit) {
> +            /* If stream abort was requested, quit early. */
> +            if (fdst->threadAbort)
> +                goto cleanup;
> +
> +            /* Otherwise flush buffers and quit gracefully. */
> +            if (doRead == (fdst->msg != NULL))
> +                break;
> +        }
> +
> +        if (doRead)
> +            got = virFDStreamThreadDoRead(fdst,
> +                                          fdin, fdout,
> +                                          fdinname, fdoutname,
> +                                          buflen);
> +        else
> +            got = virFDStreamThreadDoWrite(fdst,
> +                                           fdin, fdout,
> +                                           fdinname, fdoutname);
> +
> +        if (got < 0)


This method waits on the condition.

So unless I'm mistaken, this thread is signaling & waiting on
the same condition, which feels wrong. Generally different
threads would signal vs wait.


Regards,
Daniel
-- 
|: https://berrange.com      -o-    https://www.flickr.com/photos/dberrange :|
|: https://libvirt.org         -o-            https://fstop138.berrange.com :|
|: https://entangle-photo.org    -o-    https://www.instagram.com/dberrange :|




More information about the libvir-list mailing list