[libvirt] [PATCH 06/38] virfdstream: Use messages instead of pipe

Michal Privoznik mprivozn at redhat.com
Tue Apr 18 12:00:09 UTC 2017


On 04/13/2017 07:13 PM, Daniel P. Berrange wrote:
> On Thu, Apr 13, 2017 at 06:52:31PM +0200, Michal Privoznik wrote:
>> On 04/13/2017 03:55 PM, Daniel P. Berrange wrote:
>>> On Thu, Apr 13, 2017 at 03:31:14PM +0200, Michal Privoznik wrote:
>>>> One big downside of using the pipe to transfer the data is that
>>>> we can really transfer just bare data. No metadata can be carried
>>>> through unless some formatted messages are introduced. That would
>>>> be quite painful to achieve so let's use a message queue. It's
>>>> fairly easy to exchange info between threads now that iohelper is
>>>> no longer used.
>>>
>>> I'm not seeing how this works correctly with the event loop.
>>>
>>>> @@ -752,8 +1014,6 @@ virFDStreamOpenFileInternal(virStreamPtr st,
>>>>      if ((st->flags & VIR_STREAM_NONBLOCK) &&
>>>>          ((!S_ISCHR(sb.st_mode) &&
>>>>            !S_ISFIFO(sb.st_mode)) || forceIOHelper)) {
>>>> -        int fds[2] = { -1, -1 };
>>>> -
>>>>          if ((oflags & O_ACCMODE) == O_RDWR) {
>>>>              virReportError(VIR_ERR_INTERNAL_ERROR,
>>>>                             _("%s: Cannot request read and write flags together"),
>>>> @@ -761,12 +1021,6 @@ virFDStreamOpenFileInternal(virStreamPtr st,
>>>>              goto error;
>>>>          }
>>>>
>>>> -        if (pipe(fds) < 0) {
>>>> -            virReportSystemError(errno, "%s",
>>>> -                                 _("Unable to create pipe"));
>>>> -            goto error;
>>>> -        }
>>>
>>> Here we previously created the pipe....
>>>
>>>> @@ -775,18 +1029,14 @@ virFDStreamOpenFileInternal(virStreamPtr st,
>>>>
>>>>          if ((oflags & O_ACCMODE) == O_RDONLY) {
>>>>              threadData->fdin = fd;
>>>> -            threadData->fdout = fds[1];
>>>> -            if (VIR_STRDUP(threadData->fdinname, path) < 0 ||
>>>> -                VIR_STRDUP(threadData->fdoutname, "pipe") < 0)
>>>> +            threadData->fdout = -1;
>>>> +            if (VIR_STRDUP(threadData->fdinname, path) < 0)
>>>>                  goto error;
>>>> -            fd = fds[0];
>>>
>>> And here we set 'fd' to be the pipe
>>>
>>>>          } else {
>>>> -            threadData->fdin = fds[0];
>>>> +            threadData->fdin = -1;
>>>>              threadData->fdout = fd;
>>>> -            if (VIR_STRDUP(threadData->fdinname, "pipe") < 0 ||
>>>> -                VIR_STRDUP(threadData->fdoutname, path) < 0)
>>>> +            if (VIR_STRDUP(threadData->fdoutname, path) < 0)
>>>>                  goto error;
>>>> -            fd = fds[1];
>>>
>>> Likewise here
>>>
>>>>          }
>>>>      }
>>>
>>> ...now here 'fd' is passed to virFDStreamOpenInternal() and is the thing
>>> that the event loop watches are registered against by virFDStreamAddCallback
>>>
>>>
>>> With this change 'fd' is the actual plain file the thread is reading to/from,
>>> so the callbacks are being registered against the plain file, not the pipe.
>>>
>>> poll/select on POSIX always reports plain files as readable/writable even
>>> when they would block.  So with this change we're just going to busy loop
>>> in the main event thread even when we'll block, which defeats the whole
>>> purpose of having a iohelper and/or thread.
>>
>> Oh, I've misunderstood what we've discussed on IRC then. The way I've
>> understood it was that if an FD is set to nonblock mode and poll()
>> claims there are some data available, subsequent read() might block. If
>> that was the case we would be safe with this code. However, I didn't
>> expect poll() to lie.
>
> This code wouldn't be safe - anytime poll claims data available, we *must*
> be able to read without blocking.
>
>> Any link for further reading on this? I guess it's not only us who has
>> to deal with this problem. Basically any application with poll() and
>> disk read()/write() has to suffer from this.
>
> Yes, that's correct - QEMU has the same issue for example - it is why there
> is no 'file:' protocol for migration for example - it would block the QEMU
> main loop.
>
>> So what are our options here? Because I don't see any right now.
>
> IIUC, you didn't want to use a pipe because you want to send structured
> messages, not just plain data. If we just have a linked list of messages
> there's nothing we can poll on, so we need to keep the pipe in use, but
> find a way to get the special messages in the flow.
>
> I think we could do a trick where we have two pipes in use, one for
> monitoring the readability, and one for monitoring writability.
>
>
> When the I/O thread has data on the queue ready for read by the main
> thread, it can write a single byte to the read-monitor pipe.
>
> When the I/O thread is ready to accept more data to write from the
> main thread, it can write a single byte to the write-monitor pipe.
>
> The main thread would monitor for POLLIN condition on both the
> read-monitor pipe and write-monitor pipe.

Ah, indeed. This could work. But I also thought over different approach. 
What I need really is transfer "you're in a data/hole X bytes long" 
besides actual data. So I can use pipe for transferring the data as is 
currently, and store the metadata into a structured message that would 
the thread write/read and event loop read/write.

>
> BTW, we also need to make sure the I/O thread doesn't proactively
> queue too much data on the message queue when reading it, in case
> the main thread is being slow at consuming this read data and
> sending it to the TCP client.

Sure. Currently, with this implementation there's always one message 
with 4MiB buffer in the queue. Even though it's prepared for a queue of 
messages, there is no more than 1 message in the queue really.

Michal




More information about the libvir-list mailing list