[libvirt] [PATCH v2 06/38] virfdstream: Use messages instead of pipe

Michal Privoznik mprivozn at redhat.com
Fri Apr 28 11:39:15 UTC 2017


On 04/27/2017 07:48 PM, Daniel P. Berrange wrote:
> On Thu, Apr 20, 2017 at 12:01:35PM +0200, Michal Privoznik wrote:
>> One big downside of using the pipe to transfer the data is that
>> we can really transfer just bare data. No metadata can be carried
>> through unless some formatted messages are introduced. That would
>> be quite painful to achieve so let's use a message queue. It's
>> fairly easy to exchange info between threads now that iohelper is
>> no longer used.
>>
>> The reason why we cannot use the FD for plain files directly is
>> that despite us setting noblock flag on the FD, any
>> read()/write() blocks regardless (which is a show stopper since
>> those parts of the code are run from the event loop) and poll()
>> reports such FD as always readable/writable - even though the
>> subsequent operation might block.
>>
>> The pipe is still not gone though. It is used to signal to even
>> loop that an event occurred (e.g. data are available for reading
>> in the queue, or vice versa).
>>
>> Signed-off-by: Michal Privoznik <mprivozn at redhat.com>
>> ---
>>  src/util/virfdstream.c | 402 ++++++++++++++++++++++++++++++++++++++++++-------
>>  1 file changed, 350 insertions(+), 52 deletions(-)
>>
> 
>> +static int
>> +virFDStreamMsgQueuePush(virFDStreamDataPtr fdst,
>> +                        virFDStreamMsgPtr msg,
>> +                        int fd,
>> +                        const char *fdname)
>> +{
>> +    virFDStreamMsgPtr *tmp = &fdst->msg;
>> +    char c = '1';
>> +
>> +    while (*tmp)
>> +        tmp = &(*tmp)->next;
>> +
>> +    *tmp = msg;
>> +    virCondSignal(&fdst->threadCond);
>> +
>> +    if (safewrite(fd, &c, sizeof(c)) != sizeof(c)) {
>> +        virReportSystemError(errno,
>> +                             _("Unable to write to %s"),
>> +                             fdname);
>> +        return -1;
>> +    }
>> +
>> +    return 0;
>> +}
>> +
>> +
>> +static virFDStreamMsgPtr
>> +virFDStreamMsgQueuePop(virFDStreamDataPtr fdst,
>> +                       int fd,
>> +                       const char *fdname)
>> +{
>> +    virFDStreamMsgPtr tmp = fdst->msg;
>> +    char c;
>> +
>> +    if (tmp) {
>> +        fdst->msg = tmp->next;
>> +        tmp->next = NULL;
>> +    }
>> +
>> +    virCondSignal(&fdst->threadCond);
>> +
>> +    if (saferead(fd, &c, sizeof(c)) != sizeof(c)) {
>> +        virReportSystemError(errno,
>> +                             _("Unable to read from %s"),
>> +                             fdname);
>> +        return NULL;
>> +    }
>> +
>> +    return tmp;
>> +}
> 
> Both these methods signal the condition
> 
> 
> 
>> +static ssize_t
>> +virFDStreamThreadDoRead(virFDStreamDataPtr fdst,
>> +                        const int fdin,
>> +                        const int fdout,
>> +                        const char *fdinname,
>> +                        const char *fdoutname,
>> +                        size_t buflen)
>> +{
>> +    virFDStreamMsgPtr msg = NULL;
>> +    char *buf = NULL;
>> +    ssize_t got;
>> +
>> +    if (VIR_ALLOC(msg) < 0)
>> +        goto error;
>> +
>> +    if (VIR_ALLOC_N(buf, buflen) < 0)
>> +        goto error;
>> +
>> +    if ((got = saferead(fdin, buf, buflen)) < 0) {
>> +        virReportSystemError(errno,
>> +                             _("Unable to read %s"),
>> +                             fdinname);
>> +        goto error;
>> +    }
>> +
>> +    msg->type = VIR_FDSTREAM_MSG_TYPE_DATA;
>> +    msg->stream.data.buf = buf;
>> +    msg->stream.data.len = got;
>> +    buf = NULL;
>> +
>> +    virFDStreamMsgQueuePush(fdst, msg, fdout, fdoutname);
>> +    msg = NULL;
>> +
>> +    return got;
>> +
>> + error:
>> +    VIR_FREE(buf);
>> +    virFDStreamMsgFree(msg);
>> +    return -1;
>> +}
>> +
>> +
>> +static ssize_t
>> +virFDStreamThreadDoWrite(virFDStreamDataPtr fdst,
>> +                         const int fdin,
>> +                         const int fdout,
>> +                         const char *fdinname,
>> +                         const char *fdoutname)
>> +{
>> +    ssize_t got;
>> +    virFDStreamMsgPtr msg = fdst->msg;
>> +    bool pop = false;
>> +
>> +    switch (msg->type) {
>> +    case VIR_FDSTREAM_MSG_TYPE_DATA:
>> +        got = safewrite(fdout,
>> +                        msg->stream.data.buf + msg->stream.data.offset,
>> +                        msg->stream.data.len - msg->stream.data.offset);
>> +        if (got < 0) {
>> +            virReportSystemError(errno,
>> +                                 _("Unable to write %s"),
>> +                                 fdoutname);
>> +            return -1;
>> +        }
>> +
>> +        msg->stream.data.offset += got;
>> +
>> +        pop = msg->stream.data.offset == msg->stream.data.len;
>> +        break;
>> +    }
>> +
>> +    if (pop) {
>> +        virFDStreamMsgQueuePop(fdst, fdin, fdinname);
>> +        virFDStreamMsgFree(msg);
>> +    }
>> +
>> +    return got;
>> +}
> 
> Both these methods call into the Pop/Push functions which
> signal the condition.
> 
>> @@ -304,14 +496,12 @@ virFDStreamThread(void *opaque)
>>      int fdout = data->fdout;
>>      char *fdoutname = data->fdoutname;
>>      virFDStreamDataPtr fdst = st->privateData;
>> -    char *buf = NULL;
>> +    bool doRead = fdst->threadDoRead;
>>      size_t buflen = 256 * 1024;
>>      size_t total = 0;
>>  
>>      virObjectRef(fdst);
>> -
>> -    if (VIR_ALLOC_N(buf, buflen) < 0)
>> -        goto error;
>> +    virObjectLock(fdst);
>>  
>>      while (1) {
>>          ssize_t got;
>> @@ -323,39 +513,56 @@ virFDStreamThread(void *opaque)
>>          if (buflen == 0)
>>              break; /* End of requested data from client */
>>  
>> -        if ((got = saferead(fdin, buf, buflen)) < 0) {
>> -            virReportSystemError(errno,
>> -                                 _("Unable to read %s"),
>> -                                 fdinname);
>> +        while (doRead == (fdst->msg != NULL) &&
>> +               !fdst->threadQuit) {
>> +            if (virCondWait(&fdst->threadCond, &fdst->parent.lock)) {
>> +                virReportSystemError(errno, "%s",
>> +                                     _("failed to wait on condition"));
>> +                goto error;
>> +            }
>> +        }
>> +
>> +        if (fdst->threadQuit) {
>> +            /* If stream abort was requested, quit early. */
>> +            if (fdst->threadAbort)
>> +                goto cleanup;
>> +
>> +            /* Otherwise flush buffers and quit gracefully. */
>> +            if (doRead == (fdst->msg != NULL))
>> +                break;
>> +        }
>> +
>> +        if (doRead)
>> +            got = virFDStreamThreadDoRead(fdst,
>> +                                          fdin, fdout,
>> +                                          fdinname, fdoutname,
>> +                                          buflen);
>> +        else
>> +            got = virFDStreamThreadDoWrite(fdst,
>> +                                           fdin, fdout,
>> +                                           fdinname, fdoutname);
>> +
>> +        if (got < 0)
> 
> 
> This method waits on the condition.
> 
> So unless I'm mistaken, this thread is signaling & waiting on
> the same condition, which feels wrong. Generally different
> threads would signal vs wait.
> 

I hear what you're saying but I don't think this is a problem. Exactly
because of the way we wait on the condition. Assume this thread is doing
reads. That is it reads from fdin (an actual file on a disk) and feeds
the message queue with the data. Now, it reads some data, push it at the
end of the message queue (fdst->msg != NULL at that point) and goes to
the while loop above. Effectively it eats all the spurious wake ups for
as long as fdst->msg != NULL.
Now assume the thread is doing writes (read data from the queue and
writes into a file a disk). Again, as long as there are some messages to
be written (that is fdst->msg != NULL) the control won't even reach
virCondWait. And if it does, it's because fdst->msg == NULL in which
case there is no data to be written.

But something has just came up to my mind whilst writing these lines - I
wonder if we can ditch the condition entirely and rely on the pipe +
poll(). I mean, this worker would use pipe to signalize to the event
loop that there is a message waiting for it in the queue. Question is
how thread safe this approach would be.

Michal




More information about the libvir-list mailing list