[libvirt] [PATCH 8/9] virnetclientstream: Process stream messages later
John Ferlan
jferlan at redhat.com
Tue Apr 26 17:16:04 UTC 2016
On 04/21/2016 10:28 AM, Michal Privoznik wrote:
> On 20.04.2016 15:57, John Ferlan wrote:
>>
>>
>> On 04/15/2016 09:51 AM, Michal Privoznik wrote:
>>> Currently we have two separate functions for handling read from
>>> a stream. One is supposed to be low level and reads data in this
>>> self allocating chunk of memory. The other read function then
>>> copies data over from the chunk into a user buffer. There are two
>>> memcpy() involved even though a single would be sufficient.
>>> Moreover, since we are copying just data, we can't process
>>> alternative stream packets in the latter function, like stream
>>> seeks.
>>>
>>> In my testing, this proved two times faster then implementation
>>
>> s/then/than the/
>>
>>> which uses IO vectors.
>>
>> Can I "assume" this testing covers the reverted patch scenario. IOW: I
>> think this needs https://bugzilla.redhat.com/show_bug.cgi?id=1026136 to
>> be reopened...
>
> This showed two times faster than even IO vectors implementation.
>
>>
>> Might have been "nice" to indicate/summarize what this algorithm does as
>> opposed to the other. I think you started at the end of the first
>> paragraph, but I'm not 100% sure - I guess it's easier for me if it's
>> explicitly said, such as:
>>
>> In virNetClientStreamQueuePacket instead of ...
>>
>> In virNetClientStreamRecvPacket instead of ...
>>
>> instead of implicitly said if you know the code.
>
> Something like this?
>
> There are two functions on the client that handle incoming stream data.
> The first one virNetClientStreamQueuePacket() is a low level function
> that just process the incoming stream data from the socket and store it
...just processes ... and stores ...
> into an internal structure. This happens in the client event loop
> therefore the shorter the callbacks are, the better. The second function
> virNetClientStreamRecvPacket() then handles copying data from internal
> structure into a client provided buffer. Change introduced in this
New paragraph before "Change"
> commit makes just that: new queue for incoming stream packets is
...a new receive (rx) queue...
> introduced. Then instead of copying data into intermediate internal
> buffer and then copying them into user buffer, incoming stream messages
> are enqueued into the queue and data are copied just once - in the upper
... are queue... ... data is copied...
> layer function virNetClientStreamRecvPacket(). In the end, there's just
> one copying of data and therefore shorter event loop callback. This
> should boost the performance which has proven to be the case in my testing.
>
> Having said that, I don't think there's any need for reopening the bug
> since we are not hurting performance here.
>
The only reason I suggested is I think technically the revert makes the
previous changes essentially NULL and void. Since that commit was
connected with a bug #, I just wanted to be sure "process wise" we're
covered... It's not that important though.
OK... I see , instead of allocating and copying data from incoming
stream socket into a buffer to only be copied again into the client
buffer, we'll "steal" the entire buffer destined for the client from the
incoming stream socket and then create a queue for the client to copy -
seems OK to me...
ACK for 7-8
John
BTW:
I know it's existing, but virNetMessageQueueServe caused me to go look
and see it's really a virNetMessageQueuePop to complement the
virNetMessageQueuePush (sigh)
>>
>> The functions are just tough to read without (more) knowledge (than I
>> have about them) of how they are designed to function. Since he had a
>> hand in the above bug, hopefully Martin can take a look at this patch.
>>
>>>
>>> Signed-off-by: Michal Privoznik <mprivozn at redhat.com>
>>> ---
>>> src/rpc/virnetclientstream.c | 106 ++++++++++++++++++++++---------------------
>>> 1 file changed, 54 insertions(+), 52 deletions(-)
>>>
>>> diff --git a/src/rpc/virnetclientstream.c b/src/rpc/virnetclientstream.c
>>> index b428f4b..34989a9 100644
>>> --- a/src/rpc/virnetclientstream.c
>>> +++ b/src/rpc/virnetclientstream.c
>>> @@ -49,9 +49,7 @@ struct _virNetClientStream {
>>> * time by stopping consuming any incoming data
>>> * off the socket....
>>> */
>>> - char *incoming;
>>> - size_t incomingOffset;
>>> - size_t incomingLength;
>>> + virNetMessagePtr rx;
>>> bool incomingEOF;
>>>
>>> virNetClientStreamEventCallback cb;
>>> @@ -86,9 +84,9 @@ virNetClientStreamEventTimerUpdate(virNetClientStreamPtr st)
>>> if (!st->cb)
>>> return;
>>>
>>> - VIR_DEBUG("Check timer offset=%zu %d", st->incomingOffset, st->cbEvents);
>>> + VIR_DEBUG("Check timer rx=%p cbEvents=%d", st->rx, st->cbEvents);
>>>
>>> - if (((st->incomingOffset || st->incomingEOF) &&
>>> + if (((st->rx || st->incomingEOF) &&
>>> (st->cbEvents & VIR_STREAM_EVENT_READABLE)) ||
>>> (st->cbEvents & VIR_STREAM_EVENT_WRITABLE)) {
>>> VIR_DEBUG("Enabling event timer");
>>> @@ -110,13 +108,13 @@ virNetClientStreamEventTimer(int timer ATTRIBUTE_UNUSED, void *opaque)
>>>
>>> if (st->cb &&
>>> (st->cbEvents & VIR_STREAM_EVENT_READABLE) &&
>>> - (st->incomingOffset || st->incomingEOF))
>>> + (st->rx || st->incomingEOF))
>>> events |= VIR_STREAM_EVENT_READABLE;
>>> if (st->cb &&
>>> (st->cbEvents & VIR_STREAM_EVENT_WRITABLE))
>>> events |= VIR_STREAM_EVENT_WRITABLE;
>>>
>>> - VIR_DEBUG("Got Timer dispatch %d %d offset=%zu", events, st->cbEvents, st->incomingOffset);
>>> + VIR_DEBUG("Got Timer dispatch events=%d cbEvents=%d rx=%p", events, st->cbEvents, st->rx);
>>> if (events) {
>>> virNetClientStreamEventCallback cb = st->cb;
>>> void *cbOpaque = st->cbOpaque;
>>> @@ -161,7 +159,11 @@ void virNetClientStreamDispose(void *obj)
>>> virNetClientStreamPtr st = obj;
>>>
>>> virResetError(&st->err);
>>> - VIR_FREE(st->incoming);
>>> + while (st->rx) {
>>> + virNetMessagePtr msg = st->rx;
>>> + virNetMessageQueueServe(&st->rx);
>>> + virNetMessageFree(msg);
>>> + }
>>> virObjectUnref(st->prog);
>>> }
>>>
>>> @@ -264,41 +266,34 @@ int virNetClientStreamSetError(virNetClientStreamPtr st,
>>> int virNetClientStreamQueuePacket(virNetClientStreamPtr st,
>>> virNetMessagePtr msg)
>>> {
>>> - int ret = -1;
>>> - size_t need;
>>> + virNetMessagePtr tmp_msg;
>>> +
>>> + VIR_DEBUG("Incoming stream message: stream=%p message=%p", st, msg);
>>> +
>>> + /* Unfortunately, we must allocate new message as the one we
>>> + * get in @msg is going to be cleared later in the process. */
>>> +
>>> + if (!(tmp_msg = virNetMessageNew(false)))
>>> + return -1;
>>> +
>>> + /* Copy header */
>>> + memcpy(&tmp_msg->header, &msg->header, sizeof(msg->header));
>>> +
>>> + /* Steal message buffer */
>>> + tmp_msg->buffer = msg->buffer;
>>> + tmp_msg->bufferLength = msg->bufferLength;
>>> + tmp_msg->bufferOffset = msg->bufferOffset;
>>> + msg->buffer = NULL;
>>> + msg->bufferLength = msg->bufferOffset = 0;
>>>
>>> virObjectLock(st);
>>> - need = msg->bufferLength - msg->bufferOffset;
>>> - if (need) {
>>> - size_t avail = st->incomingLength - st->incomingOffset;
>>> - if (need > avail) {
>>> - size_t extra = need - avail;
>>> - if (VIR_REALLOC_N(st->incoming,
>>> - st->incomingLength + extra) < 0) {
>>> - VIR_DEBUG("Out of memory handling stream data");
>>> - goto cleanup;
>>> - }
>>> - st->incomingLength += extra;
>>> - }
>>>
>>> - memcpy(st->incoming + st->incomingOffset,
>>> - msg->buffer + msg->bufferOffset,
>>> - msg->bufferLength - msg->bufferOffset);
>>> - st->incomingOffset += (msg->bufferLength - msg->bufferOffset);
>>> - } else {
>>> - st->incomingEOF = true;
>>> - }
>>> + virNetMessageQueuePush(&st->rx, tmp_msg);
>>>
>>> - VIR_DEBUG("Stream incoming data offset %zu length %zu EOF %d",
>>> - st->incomingOffset, st->incomingLength,
>>> - st->incomingEOF);
>>> virNetClientStreamEventTimerUpdate(st);
>>>
>>> - ret = 0;
>>> -
>>> - cleanup:
>>> virObjectUnlock(st);
>>> - return ret;
>>> + return 0;
>>> }
>>>
>>>
>>> @@ -362,10 +357,12 @@ int virNetClientStreamRecvPacket(virNetClientStreamPtr st,
>>> bool nonblock)
>>> {
>>> int rv = -1;
>>> + size_t want;
>>> +
>>> VIR_DEBUG("st=%p client=%p data=%p nbytes=%zu nonblock=%d",
>>> st, client, data, nbytes, nonblock);
>>> virObjectLock(st);
>>> - if (!st->incomingOffset && !st->incomingEOF) {
>>> + if (!st->rx && !st->incomingEOF) {
>>> virNetMessagePtr msg;
>>> int ret;
>>>
>>> @@ -395,23 +392,28 @@ int virNetClientStreamRecvPacket(virNetClientStreamPtr st,
>>> goto cleanup;
>>> }
>>>
>>> - VIR_DEBUG("After IO %zu", st->incomingOffset);
>>> - if (st->incomingOffset) {
>>> - int want = st->incomingOffset;
>>> - if (want > nbytes)
>>> - want = nbytes;
>>> - memcpy(data, st->incoming, want);
>>> - if (want < st->incomingOffset) {
>>> - memmove(st->incoming, st->incoming + want, st->incomingOffset - want);
>>> - st->incomingOffset -= want;
>>> - } else {
>>> - VIR_FREE(st->incoming);
>>> - st->incomingOffset = st->incomingLength = 0;
>>> + VIR_DEBUG("After IO rx=%p", st->rx);
>>> + want = nbytes;
>>> + while (want && st->rx) {
>>
>> So if 'st->rx == NULL', then 'rv = nbytes - want;' or 0 - I assume that
>> is 'expected'...
>
> Yes. Calling virStreamRecv() on client side will basically boil down to
> calling this function. And return value of this function will become
> return value of the wrapper. As described in the docs, virStreamRecv()
> and this virNetClientStreamRecvPacket() returns number of bytes read
> from stream. In case there's no incoming data, there's nothing we can
> read from and therefore we should return 0.
>
>>
>>> + virNetMessagePtr msg = st->rx;
>>> + size_t len = want;
>>> +
>>> + if (len > msg->bufferLength - msg->bufferOffset)
>>> + len = msg->bufferLength - msg->bufferOffset;
>>> +
>>> + if (!len)
>>> + break;
>>> +
>>> + memcpy(data + (nbytes - want), msg->buffer + msg->bufferOffset, len);
>>> + want -= len;
>>> + msg->bufferOffset += len;
>>> +
>>> + if (msg->bufferOffset == msg->bufferLength) {
>>> + virNetMessageQueueServe(&st->rx);
>>> + virNetMessageFree(msg);
>>
>> Nothing needs to be done with want here? I guess this shows my lack of
>> depth of understanding of these algorithms... Big black box that I hope
>> works without me needing to intervene!
>
> No. This does nothing more than: if the head of linked list of incoming
> stream messages is fully read (*), then pop the message at the head and
> move to the other message in the queue (list). In that case, I haven't
> copied any data to user, therefore I should not change @want.
>
> (*) - It may happen, that users will read less bytes than there is in
> incoming message. For instance, incoming stream packet (message) can be
> 1024 bytes in size, but user will read 1 byte at the time from stream.
>
> Hope my explanation makes it clear(-er) to you.
>
> Michal
>
More information about the libvir-list
mailing list