[libvirt] [PATCH v2 06/38] virfdstream: Use messages instead of pipe
Michal Privoznik
mprivozn at redhat.com
Fri Apr 28 11:39:15 UTC 2017
On 04/27/2017 07:48 PM, Daniel P. Berrange wrote:
> On Thu, Apr 20, 2017 at 12:01:35PM +0200, Michal Privoznik wrote:
>> One big downside of using the pipe to transfer the data is that
>> we can really transfer just bare data. No metadata can be carried
>> through unless some formatted messages are introduced. That would
>> be quite painful to achieve so let's use a message queue. It's
>> fairly easy to exchange info between threads now that iohelper is
>> no longer used.
>>
>> The reason why we cannot use the FD for plain files directly is
>> that despite us setting noblock flag on the FD, any
>> read()/write() blocks regardless (which is a show stopper since
>> those parts of the code are run from the event loop) and poll()
>> reports such FD as always readable/writable - even though the
>> subsequent operation might block.
>>
>> The pipe is still not gone though. It is used to signal to even
>> loop that an event occurred (e.g. data are available for reading
>> in the queue, or vice versa).
>>
>> Signed-off-by: Michal Privoznik <mprivozn at redhat.com>
>> ---
>> src/util/virfdstream.c | 402 ++++++++++++++++++++++++++++++++++++++++++-------
>> 1 file changed, 350 insertions(+), 52 deletions(-)
>>
>
>> +static int
>> +virFDStreamMsgQueuePush(virFDStreamDataPtr fdst,
>> + virFDStreamMsgPtr msg,
>> + int fd,
>> + const char *fdname)
>> +{
>> + virFDStreamMsgPtr *tmp = &fdst->msg;
>> + char c = '1';
>> +
>> + while (*tmp)
>> + tmp = &(*tmp)->next;
>> +
>> + *tmp = msg;
>> + virCondSignal(&fdst->threadCond);
>> +
>> + if (safewrite(fd, &c, sizeof(c)) != sizeof(c)) {
>> + virReportSystemError(errno,
>> + _("Unable to write to %s"),
>> + fdname);
>> + return -1;
>> + }
>> +
>> + return 0;
>> +}
>> +
>> +
>> +static virFDStreamMsgPtr
>> +virFDStreamMsgQueuePop(virFDStreamDataPtr fdst,
>> + int fd,
>> + const char *fdname)
>> +{
>> + virFDStreamMsgPtr tmp = fdst->msg;
>> + char c;
>> +
>> + if (tmp) {
>> + fdst->msg = tmp->next;
>> + tmp->next = NULL;
>> + }
>> +
>> + virCondSignal(&fdst->threadCond);
>> +
>> + if (saferead(fd, &c, sizeof(c)) != sizeof(c)) {
>> + virReportSystemError(errno,
>> + _("Unable to read from %s"),
>> + fdname);
>> + return NULL;
>> + }
>> +
>> + return tmp;
>> +}
>
> Both these methods signal the condition
>
>
>
>> +static ssize_t
>> +virFDStreamThreadDoRead(virFDStreamDataPtr fdst,
>> + const int fdin,
>> + const int fdout,
>> + const char *fdinname,
>> + const char *fdoutname,
>> + size_t buflen)
>> +{
>> + virFDStreamMsgPtr msg = NULL;
>> + char *buf = NULL;
>> + ssize_t got;
>> +
>> + if (VIR_ALLOC(msg) < 0)
>> + goto error;
>> +
>> + if (VIR_ALLOC_N(buf, buflen) < 0)
>> + goto error;
>> +
>> + if ((got = saferead(fdin, buf, buflen)) < 0) {
>> + virReportSystemError(errno,
>> + _("Unable to read %s"),
>> + fdinname);
>> + goto error;
>> + }
>> +
>> + msg->type = VIR_FDSTREAM_MSG_TYPE_DATA;
>> + msg->stream.data.buf = buf;
>> + msg->stream.data.len = got;
>> + buf = NULL;
>> +
>> + virFDStreamMsgQueuePush(fdst, msg, fdout, fdoutname);
>> + msg = NULL;
>> +
>> + return got;
>> +
>> + error:
>> + VIR_FREE(buf);
>> + virFDStreamMsgFree(msg);
>> + return -1;
>> +}
>> +
>> +
>> +static ssize_t
>> +virFDStreamThreadDoWrite(virFDStreamDataPtr fdst,
>> + const int fdin,
>> + const int fdout,
>> + const char *fdinname,
>> + const char *fdoutname)
>> +{
>> + ssize_t got;
>> + virFDStreamMsgPtr msg = fdst->msg;
>> + bool pop = false;
>> +
>> + switch (msg->type) {
>> + case VIR_FDSTREAM_MSG_TYPE_DATA:
>> + got = safewrite(fdout,
>> + msg->stream.data.buf + msg->stream.data.offset,
>> + msg->stream.data.len - msg->stream.data.offset);
>> + if (got < 0) {
>> + virReportSystemError(errno,
>> + _("Unable to write %s"),
>> + fdoutname);
>> + return -1;
>> + }
>> +
>> + msg->stream.data.offset += got;
>> +
>> + pop = msg->stream.data.offset == msg->stream.data.len;
>> + break;
>> + }
>> +
>> + if (pop) {
>> + virFDStreamMsgQueuePop(fdst, fdin, fdinname);
>> + virFDStreamMsgFree(msg);
>> + }
>> +
>> + return got;
>> +}
>
> Both these methods call into the Pop/Push functions which
> signal the condition.
>
>> @@ -304,14 +496,12 @@ virFDStreamThread(void *opaque)
>> int fdout = data->fdout;
>> char *fdoutname = data->fdoutname;
>> virFDStreamDataPtr fdst = st->privateData;
>> - char *buf = NULL;
>> + bool doRead = fdst->threadDoRead;
>> size_t buflen = 256 * 1024;
>> size_t total = 0;
>>
>> virObjectRef(fdst);
>> -
>> - if (VIR_ALLOC_N(buf, buflen) < 0)
>> - goto error;
>> + virObjectLock(fdst);
>>
>> while (1) {
>> ssize_t got;
>> @@ -323,39 +513,56 @@ virFDStreamThread(void *opaque)
>> if (buflen == 0)
>> break; /* End of requested data from client */
>>
>> - if ((got = saferead(fdin, buf, buflen)) < 0) {
>> - virReportSystemError(errno,
>> - _("Unable to read %s"),
>> - fdinname);
>> + while (doRead == (fdst->msg != NULL) &&
>> + !fdst->threadQuit) {
>> + if (virCondWait(&fdst->threadCond, &fdst->parent.lock)) {
>> + virReportSystemError(errno, "%s",
>> + _("failed to wait on condition"));
>> + goto error;
>> + }
>> + }
>> +
>> + if (fdst->threadQuit) {
>> + /* If stream abort was requested, quit early. */
>> + if (fdst->threadAbort)
>> + goto cleanup;
>> +
>> + /* Otherwise flush buffers and quit gracefully. */
>> + if (doRead == (fdst->msg != NULL))
>> + break;
>> + }
>> +
>> + if (doRead)
>> + got = virFDStreamThreadDoRead(fdst,
>> + fdin, fdout,
>> + fdinname, fdoutname,
>> + buflen);
>> + else
>> + got = virFDStreamThreadDoWrite(fdst,
>> + fdin, fdout,
>> + fdinname, fdoutname);
>> +
>> + if (got < 0)
>
>
> This method waits on the condition.
>
> So unless I'm mistaken, this thread is signaling & waiting on
> the same condition, which feels wrong. Generally different
> threads would signal vs wait.
>
I hear what you're saying but I don't think this is a problem. Exactly
because of the way we wait on the condition. Assume this thread is doing
reads. That is it reads from fdin (an actual file on a disk) and feeds
the message queue with the data. Now, it reads some data, push it at the
end of the message queue (fdst->msg != NULL at that point) and goes to
the while loop above. Effectively it eats all the spurious wake ups for
as long as fdst->msg != NULL.
Now assume the thread is doing writes (read data from the queue and
writes into a file a disk). Again, as long as there are some messages to
be written (that is fdst->msg != NULL) the control won't even reach
virCondWait. And if it does, it's because fdst->msg == NULL in which
case there is no data to be written.
But something has just came up to my mind whilst writing these lines - I
wonder if we can ditch the condition entirely and rely on the pipe +
poll(). I mean, this worker would use pipe to signalize to the event
loop that there is a message waiting for it in the queue. Question is
how thread safe this approach would be.
Michal
More information about the libvir-list
mailing list