[libvirt] [PATCH 8/9] virnetclientstream: Process stream messages later

John Ferlan jferlan at redhat.com
Tue Apr 26 17:16:04 UTC 2016



On 04/21/2016 10:28 AM, Michal Privoznik wrote:
> On 20.04.2016 15:57, John Ferlan wrote:
>>
>>
>> On 04/15/2016 09:51 AM, Michal Privoznik wrote:
>>> Currently we have two separate functions for handling read from
>>> a stream. One is supposed to be low level and reads data in this
>>> self allocating chunk of memory. The other read function then
>>> copies data over from the chunk into a user buffer. There are two
>>> memcpy() involved even though a single would be sufficient.
>>> Moreover, since we are copying just data, we can't process
>>> alternative stream packets in the latter function, like stream
>>> seeks.
>>>
>>> In my testing, this proved two times faster then implementation
>>
>> s/then/than the/
>>
>>> which uses IO vectors.
>>
>> Can I "assume" this testing covers the reverted patch scenario. IOW: I
>> think this needs https://bugzilla.redhat.com/show_bug.cgi?id=1026136 to
>> be reopened...
> 
> This showed two times faster than even IO vectors implementation.
> 
>>
>> Might have been "nice" to indicate/summarize what this algorithm does as
>> opposed to the other. I think you started at the end of the first
>> paragraph, but I'm not 100% sure - I guess it's easier for me if it's
>> explicitly said, such as:
>>
>> In virNetClientStreamQueuePacket instead of ...
>>
>> In virNetClientStreamRecvPacket instead of ...
>>
>> instead of implicitly said if you know the code.
> 
> Something like this?
> 
> There are two functions on the client that handle incoming stream data.
> The first one virNetClientStreamQueuePacket() is a low level function
> that just process the incoming stream data from the socket and store it

...just processes ... and stores ...

> into an internal structure. This happens in the client event loop
> therefore the shorter the callbacks are, the better. The second function
> virNetClientStreamRecvPacket() then handles copying data from internal
> structure into a client provided buffer. Change introduced in this

New paragraph before "Change"

> commit makes just that: new queue for incoming stream packets is

...a new receive (rx) queue...

> introduced. Then instead of copying data into intermediate internal
> buffer and then copying them into user buffer, incoming stream messages
> are enqueued into the queue and data are copied just once - in the upper

... are queue...  ... data is copied...

> layer function virNetClientStreamRecvPacket(). In the end, there's just
> one copying of data and therefore shorter event loop callback. This
> should boost the performance which has proven to be the case in my testing.
> 
> Having said that, I don't think there's any need for reopening the bug
> since we are not hurting performance here.
> 
The only reason I suggested is I think technically the revert makes the
previous changes essentially NULL and void. Since that commit was
connected with a bug #, I just wanted to be sure "process wise" we're
covered...  It's not that important though.

OK... I see , instead of allocating and copying data from incoming
stream socket into a buffer to only be copied again into the client
buffer, we'll "steal" the entire buffer destined for the client from the
incoming stream socket and then create a queue for the client to copy -
seems OK to me...

ACK for 7-8

John

BTW:

I know it's existing, but virNetMessageQueueServe caused me to go look
and see it's really a virNetMessageQueuePop to complement the
virNetMessageQueuePush (sigh)


>>
>> The functions are just tough to read without (more) knowledge (than I
>> have about them) of how they are designed to function. Since he had a
>> hand in the above bug, hopefully Martin can take a look at this patch.
>>
>>>
>>> Signed-off-by: Michal Privoznik <mprivozn at redhat.com>
>>> ---
>>>  src/rpc/virnetclientstream.c | 106 ++++++++++++++++++++++---------------------
>>>  1 file changed, 54 insertions(+), 52 deletions(-)
>>>
>>> diff --git a/src/rpc/virnetclientstream.c b/src/rpc/virnetclientstream.c
>>> index b428f4b..34989a9 100644
>>> --- a/src/rpc/virnetclientstream.c
>>> +++ b/src/rpc/virnetclientstream.c
>>> @@ -49,9 +49,7 @@ struct _virNetClientStream {
>>>       * time by stopping consuming any incoming data
>>>       * off the socket....
>>>       */
>>> -    char *incoming;
>>> -    size_t incomingOffset;
>>> -    size_t incomingLength;
>>> +    virNetMessagePtr rx;
>>>      bool incomingEOF;
>>>  
>>>      virNetClientStreamEventCallback cb;
>>> @@ -86,9 +84,9 @@ virNetClientStreamEventTimerUpdate(virNetClientStreamPtr st)
>>>      if (!st->cb)
>>>          return;
>>>  
>>> -    VIR_DEBUG("Check timer offset=%zu %d", st->incomingOffset, st->cbEvents);
>>> +    VIR_DEBUG("Check timer rx=%p cbEvents=%d", st->rx, st->cbEvents);
>>>  
>>> -    if (((st->incomingOffset || st->incomingEOF) &&
>>> +    if (((st->rx || st->incomingEOF) &&
>>>           (st->cbEvents & VIR_STREAM_EVENT_READABLE)) ||
>>>          (st->cbEvents & VIR_STREAM_EVENT_WRITABLE)) {
>>>          VIR_DEBUG("Enabling event timer");
>>> @@ -110,13 +108,13 @@ virNetClientStreamEventTimer(int timer ATTRIBUTE_UNUSED, void *opaque)
>>>  
>>>      if (st->cb &&
>>>          (st->cbEvents & VIR_STREAM_EVENT_READABLE) &&
>>> -        (st->incomingOffset || st->incomingEOF))
>>> +        (st->rx || st->incomingEOF))
>>>          events |= VIR_STREAM_EVENT_READABLE;
>>>      if (st->cb &&
>>>          (st->cbEvents & VIR_STREAM_EVENT_WRITABLE))
>>>          events |= VIR_STREAM_EVENT_WRITABLE;
>>>  
>>> -    VIR_DEBUG("Got Timer dispatch %d %d offset=%zu", events, st->cbEvents, st->incomingOffset);
>>> +    VIR_DEBUG("Got Timer dispatch events=%d cbEvents=%d rx=%p", events, st->cbEvents, st->rx);
>>>      if (events) {
>>>          virNetClientStreamEventCallback cb = st->cb;
>>>          void *cbOpaque = st->cbOpaque;
>>> @@ -161,7 +159,11 @@ void virNetClientStreamDispose(void *obj)
>>>      virNetClientStreamPtr st = obj;
>>>  
>>>      virResetError(&st->err);
>>> -    VIR_FREE(st->incoming);
>>> +    while (st->rx) {
>>> +        virNetMessagePtr msg = st->rx;
>>> +        virNetMessageQueueServe(&st->rx);
>>> +        virNetMessageFree(msg);
>>> +    }
>>>      virObjectUnref(st->prog);
>>>  }
>>>  
>>> @@ -264,41 +266,34 @@ int virNetClientStreamSetError(virNetClientStreamPtr st,
>>>  int virNetClientStreamQueuePacket(virNetClientStreamPtr st,
>>>                                    virNetMessagePtr msg)
>>>  {
>>> -    int ret = -1;
>>> -    size_t need;
>>> +    virNetMessagePtr tmp_msg;
>>> +
>>> +    VIR_DEBUG("Incoming stream message: stream=%p message=%p", st, msg);
>>> +
>>> +    /* Unfortunately, we must allocate new message as the one we
>>> +     * get in @msg is going to be cleared later in the process. */
>>> +
>>> +    if (!(tmp_msg = virNetMessageNew(false)))
>>> +        return -1;
>>> +
>>> +    /* Copy header */
>>> +    memcpy(&tmp_msg->header, &msg->header, sizeof(msg->header));
>>> +
>>> +    /* Steal message buffer */
>>> +    tmp_msg->buffer = msg->buffer;
>>> +    tmp_msg->bufferLength = msg->bufferLength;
>>> +    tmp_msg->bufferOffset = msg->bufferOffset;
>>> +    msg->buffer = NULL;
>>> +    msg->bufferLength = msg->bufferOffset = 0;
>>>  
>>>      virObjectLock(st);
>>> -    need = msg->bufferLength - msg->bufferOffset;
>>> -    if (need) {
>>> -        size_t avail = st->incomingLength - st->incomingOffset;
>>> -        if (need > avail) {
>>> -            size_t extra = need - avail;
>>> -            if (VIR_REALLOC_N(st->incoming,
>>> -                              st->incomingLength + extra) < 0) {
>>> -                VIR_DEBUG("Out of memory handling stream data");
>>> -                goto cleanup;
>>> -            }
>>> -            st->incomingLength += extra;
>>> -        }
>>>  
>>> -        memcpy(st->incoming + st->incomingOffset,
>>> -               msg->buffer + msg->bufferOffset,
>>> -               msg->bufferLength - msg->bufferOffset);
>>> -        st->incomingOffset += (msg->bufferLength - msg->bufferOffset);
>>> -    } else {
>>> -        st->incomingEOF = true;
>>> -    }
>>> +    virNetMessageQueuePush(&st->rx, tmp_msg);
>>>  
>>> -    VIR_DEBUG("Stream incoming data offset %zu length %zu EOF %d",
>>> -              st->incomingOffset, st->incomingLength,
>>> -              st->incomingEOF);
>>>      virNetClientStreamEventTimerUpdate(st);
>>>  
>>> -    ret = 0;
>>> -
>>> - cleanup:
>>>      virObjectUnlock(st);
>>> -    return ret;
>>> +    return 0;
>>>  }
>>>  
>>>  
>>> @@ -362,10 +357,12 @@ int virNetClientStreamRecvPacket(virNetClientStreamPtr st,
>>>                                   bool nonblock)
>>>  {
>>>      int rv = -1;
>>> +    size_t want;
>>> +
>>>      VIR_DEBUG("st=%p client=%p data=%p nbytes=%zu nonblock=%d",
>>>                st, client, data, nbytes, nonblock);
>>>      virObjectLock(st);
>>> -    if (!st->incomingOffset && !st->incomingEOF) {
>>> +    if (!st->rx && !st->incomingEOF) {
>>>          virNetMessagePtr msg;
>>>          int ret;
>>>  
>>> @@ -395,23 +392,28 @@ int virNetClientStreamRecvPacket(virNetClientStreamPtr st,
>>>              goto cleanup;
>>>      }
>>>  
>>> -    VIR_DEBUG("After IO %zu", st->incomingOffset);
>>> -    if (st->incomingOffset) {
>>> -        int want = st->incomingOffset;
>>> -        if (want > nbytes)
>>> -            want = nbytes;
>>> -        memcpy(data, st->incoming, want);
>>> -        if (want < st->incomingOffset) {
>>> -            memmove(st->incoming, st->incoming + want, st->incomingOffset - want);
>>> -            st->incomingOffset -= want;
>>> -        } else {
>>> -            VIR_FREE(st->incoming);
>>> -            st->incomingOffset = st->incomingLength = 0;
>>> +    VIR_DEBUG("After IO rx=%p", st->rx);
>>> +    want = nbytes;
>>> +    while (want && st->rx) {
>>
>> So if 'st->rx == NULL', then 'rv = nbytes - want;' or 0 - I assume that
>> is 'expected'...
> 
> Yes. Calling virStreamRecv() on client side will basically boil down to
> calling this function. And return value of this function will become
> return value of the wrapper. As described in the docs, virStreamRecv()
> and this virNetClientStreamRecvPacket() returns number of bytes read
> from stream. In case there's no incoming data, there's nothing we can
> read from and therefore we should return 0.
> 
>>
>>> +        virNetMessagePtr msg = st->rx;
>>> +        size_t len = want;
>>> +
>>> +        if (len > msg->bufferLength - msg->bufferOffset)
>>> +            len = msg->bufferLength - msg->bufferOffset;
>>> +
>>> +        if (!len)
>>> +            break;
>>> +
>>> +        memcpy(data + (nbytes - want), msg->buffer + msg->bufferOffset, len);
>>> +        want -= len;
>>> +        msg->bufferOffset += len;
>>> +
>>> +        if (msg->bufferOffset == msg->bufferLength) {
>>> +            virNetMessageQueueServe(&st->rx);
>>> +            virNetMessageFree(msg);
>>
>> Nothing needs to be done with want here?   I guess this shows my lack of
>> depth of understanding of these algorithms...  Big black box that I hope
>> works without me needing to intervene!
> 
> No. This does nothing more than: if the head of linked list of incoming
> stream messages is fully read (*), then pop the message at the head and
> move to the other message in the queue (list). In that case, I haven't
> copied any data to user, therefore I should not change @want.
> 
> (*) - It may happen, that users will read less bytes than there is in
> incoming message. For instance, incoming stream packet (message) can be
> 1024 bytes in size, but user will read 1 byte at the time from stream.
> 
> Hope my explanation makes it clear(-er) to you.
> 
> Michal
> 




More information about the libvir-list mailing list