[libvirt] [PATCH 8/9] virnetclientstream: Process stream messages later
John Ferlan
jferlan at redhat.com
Wed Apr 20 13:57:03 UTC 2016
On 04/15/2016 09:51 AM, Michal Privoznik wrote:
> Currently we have two separate functions for handling read from
> a stream. One is supposed to be low level and reads data in this
> self allocating chunk of memory. The other read function then
> copies data over from the chunk into a user buffer. There are two
> memcpy() involved even though a single would be sufficient.
> Moreover, since we are copying just data, we can't process
> alternative stream packets in the latter function, like stream
> seeks.
>
> In my testing, this proved two times faster then implementation
s/then/than the/
> which uses IO vectors.
Can I "assume" this testing covers the reverted patch scenario. IOW: I
think this needs https://bugzilla.redhat.com/show_bug.cgi?id=1026136 to
be reopened...
Might have been "nice" to indicate/summarize what this algorithm does as
opposed to the other. I think you started at the end of the first
paragraph, but I'm not 100% sure - I guess it's easier for me if it's
explicitly said, such as:
In virNetClientStreamQueuePacket instead of ...
In virNetClientStreamRecvPacket instead of ...
instead of implicitly said if you know the code.
The functions are just tough to read without (more) knowledge (than I
have about them) of how they are designed to function. Since he had a
hand in the above bug, hopefully Martin can take a look at this patch.
>
> Signed-off-by: Michal Privoznik <mprivozn at redhat.com>
> ---
> src/rpc/virnetclientstream.c | 106 ++++++++++++++++++++++---------------------
> 1 file changed, 54 insertions(+), 52 deletions(-)
>
> diff --git a/src/rpc/virnetclientstream.c b/src/rpc/virnetclientstream.c
> index b428f4b..34989a9 100644
> --- a/src/rpc/virnetclientstream.c
> +++ b/src/rpc/virnetclientstream.c
> @@ -49,9 +49,7 @@ struct _virNetClientStream {
> * time by stopping consuming any incoming data
> * off the socket....
> */
> - char *incoming;
> - size_t incomingOffset;
> - size_t incomingLength;
> + virNetMessagePtr rx;
> bool incomingEOF;
>
> virNetClientStreamEventCallback cb;
> @@ -86,9 +84,9 @@ virNetClientStreamEventTimerUpdate(virNetClientStreamPtr st)
> if (!st->cb)
> return;
>
> - VIR_DEBUG("Check timer offset=%zu %d", st->incomingOffset, st->cbEvents);
> + VIR_DEBUG("Check timer rx=%p cbEvents=%d", st->rx, st->cbEvents);
>
> - if (((st->incomingOffset || st->incomingEOF) &&
> + if (((st->rx || st->incomingEOF) &&
> (st->cbEvents & VIR_STREAM_EVENT_READABLE)) ||
> (st->cbEvents & VIR_STREAM_EVENT_WRITABLE)) {
> VIR_DEBUG("Enabling event timer");
> @@ -110,13 +108,13 @@ virNetClientStreamEventTimer(int timer ATTRIBUTE_UNUSED, void *opaque)
>
> if (st->cb &&
> (st->cbEvents & VIR_STREAM_EVENT_READABLE) &&
> - (st->incomingOffset || st->incomingEOF))
> + (st->rx || st->incomingEOF))
> events |= VIR_STREAM_EVENT_READABLE;
> if (st->cb &&
> (st->cbEvents & VIR_STREAM_EVENT_WRITABLE))
> events |= VIR_STREAM_EVENT_WRITABLE;
>
> - VIR_DEBUG("Got Timer dispatch %d %d offset=%zu", events, st->cbEvents, st->incomingOffset);
> + VIR_DEBUG("Got Timer dispatch events=%d cbEvents=%d rx=%p", events, st->cbEvents, st->rx);
> if (events) {
> virNetClientStreamEventCallback cb = st->cb;
> void *cbOpaque = st->cbOpaque;
> @@ -161,7 +159,11 @@ void virNetClientStreamDispose(void *obj)
> virNetClientStreamPtr st = obj;
>
> virResetError(&st->err);
> - VIR_FREE(st->incoming);
> + while (st->rx) {
> + virNetMessagePtr msg = st->rx;
> + virNetMessageQueueServe(&st->rx);
> + virNetMessageFree(msg);
> + }
> virObjectUnref(st->prog);
> }
>
> @@ -264,41 +266,34 @@ int virNetClientStreamSetError(virNetClientStreamPtr st,
> int virNetClientStreamQueuePacket(virNetClientStreamPtr st,
> virNetMessagePtr msg)
> {
> - int ret = -1;
> - size_t need;
> + virNetMessagePtr tmp_msg;
> +
> + VIR_DEBUG("Incoming stream message: stream=%p message=%p", st, msg);
> +
> + /* Unfortunately, we must allocate new message as the one we
> + * get in @msg is going to be cleared later in the process. */
> +
> + if (!(tmp_msg = virNetMessageNew(false)))
> + return -1;
> +
> + /* Copy header */
> + memcpy(&tmp_msg->header, &msg->header, sizeof(msg->header));
> +
> + /* Steal message buffer */
> + tmp_msg->buffer = msg->buffer;
> + tmp_msg->bufferLength = msg->bufferLength;
> + tmp_msg->bufferOffset = msg->bufferOffset;
> + msg->buffer = NULL;
> + msg->bufferLength = msg->bufferOffset = 0;
>
> virObjectLock(st);
> - need = msg->bufferLength - msg->bufferOffset;
> - if (need) {
> - size_t avail = st->incomingLength - st->incomingOffset;
> - if (need > avail) {
> - size_t extra = need - avail;
> - if (VIR_REALLOC_N(st->incoming,
> - st->incomingLength + extra) < 0) {
> - VIR_DEBUG("Out of memory handling stream data");
> - goto cleanup;
> - }
> - st->incomingLength += extra;
> - }
>
> - memcpy(st->incoming + st->incomingOffset,
> - msg->buffer + msg->bufferOffset,
> - msg->bufferLength - msg->bufferOffset);
> - st->incomingOffset += (msg->bufferLength - msg->bufferOffset);
> - } else {
> - st->incomingEOF = true;
> - }
> + virNetMessageQueuePush(&st->rx, tmp_msg);
>
> - VIR_DEBUG("Stream incoming data offset %zu length %zu EOF %d",
> - st->incomingOffset, st->incomingLength,
> - st->incomingEOF);
> virNetClientStreamEventTimerUpdate(st);
>
> - ret = 0;
> -
> - cleanup:
> virObjectUnlock(st);
> - return ret;
> + return 0;
> }
>
>
> @@ -362,10 +357,12 @@ int virNetClientStreamRecvPacket(virNetClientStreamPtr st,
> bool nonblock)
> {
> int rv = -1;
> + size_t want;
> +
> VIR_DEBUG("st=%p client=%p data=%p nbytes=%zu nonblock=%d",
> st, client, data, nbytes, nonblock);
> virObjectLock(st);
> - if (!st->incomingOffset && !st->incomingEOF) {
> + if (!st->rx && !st->incomingEOF) {
> virNetMessagePtr msg;
> int ret;
>
> @@ -395,23 +392,28 @@ int virNetClientStreamRecvPacket(virNetClientStreamPtr st,
> goto cleanup;
> }
>
> - VIR_DEBUG("After IO %zu", st->incomingOffset);
> - if (st->incomingOffset) {
> - int want = st->incomingOffset;
> - if (want > nbytes)
> - want = nbytes;
> - memcpy(data, st->incoming, want);
> - if (want < st->incomingOffset) {
> - memmove(st->incoming, st->incoming + want, st->incomingOffset - want);
> - st->incomingOffset -= want;
> - } else {
> - VIR_FREE(st->incoming);
> - st->incomingOffset = st->incomingLength = 0;
> + VIR_DEBUG("After IO rx=%p", st->rx);
> + want = nbytes;
> + while (want && st->rx) {
So if 'st->rx == NULL', then 'rv = nbytes - want;' or 0 - I assume that
is 'expected'...
> + virNetMessagePtr msg = st->rx;
> + size_t len = want;
> +
> + if (len > msg->bufferLength - msg->bufferOffset)
> + len = msg->bufferLength - msg->bufferOffset;
> +
> + if (!len)
> + break;
> +
> + memcpy(data + (nbytes - want), msg->buffer + msg->bufferOffset, len);
> + want -= len;
> + msg->bufferOffset += len;
> +
> + if (msg->bufferOffset == msg->bufferLength) {
> + virNetMessageQueueServe(&st->rx);
> + virNetMessageFree(msg);
Nothing needs to be done with want here? I guess this shows my lack of
depth of understanding of these algorithms... Big black box that I hope
works without me needing to intervene!
John
> }
> - rv = want;
> - } else {
> - rv = 0;
> }
> + rv = nbytes - want;
>
> virNetClientStreamEventTimerUpdate(st);
>
>
More information about the libvir-list
mailing list