[libvirt] [PATCH] rpc: RH1026137: Fix slow volume download (virsh vol-download)

Martin Kletzander mkletzan at redhat.com
Tue Jul 14 09:25:55 UTC 2015


On Sat, Jun 06, 2015 at 07:36:48PM +0000, Ossi Herrala wrote:
>Use I/O vector (iovec) instead of one huge memory buffer as suggested
>in https://bugzilla.redhat.com/show_bug.cgi?id=1026137#c7. This avoids
>doing memmove() to big buffers and performance doesn't degrade if
>source (virNetClientStreamQueuePacket()) is faster than sink
>(virNetClientStreamRecvPacket()).

Sorry to miss this mail, it got buried somehow and I haven't got to it
until now since nobody pinged it.  Sorry for the long wait then.

I would remove the 'RH1026137: ' from the commit message and instead
added a 'Resolves: http://bugzilla.redhat.com/1026137' or something
similar here.

>---
> src/rpc/virnetclientstream.c |  134 +++++++++++++++++++++++++----------------
> 1 files changed, 82 insertions(+), 52 deletions(-)
>
>diff --git a/src/rpc/virnetclientstream.c b/src/rpc/virnetclientstream.c
>index b428f4b..18c6e8b 100644
>--- a/src/rpc/virnetclientstream.c
>+++ b/src/rpc/virnetclientstream.c
>@@ -49,9 +49,9 @@ struct _virNetClientStream {
>      * time by stopping consuming any incoming data
>      * off the socket....
>      */
>-    char *incoming;
>-    size_t incomingOffset;
>-    size_t incomingLength;
>+    struct iovec *incomingVec; /* I/O Vector to hold data */
>+    size_t writeVec;           /* Vectors produced */
>+    size_t readVec;            /* Vectors consumed */
>     bool incomingEOF;
>
>     virNetClientStreamEventCallback cb;
>@@ -86,9 +86,9 @@ virNetClientStreamEventTimerUpdate(virNetClientStreamPtr st)
>     if (!st->cb)
>         return;
>
>-    VIR_DEBUG("Check timer offset=%zu %d", st->incomingOffset, st->cbEvents);
>+    VIR_DEBUG("Check timer readVec %zu writeVec %zu %d", st->readVec, st->writeVec, st->cbEvents);
>
>-    if (((st->incomingOffset || st->incomingEOF) &&
>+    if ((((st->readVec < st->writeVec) || st->incomingEOF) &&
>          (st->cbEvents & VIR_STREAM_EVENT_READABLE)) ||
>         (st->cbEvents & VIR_STREAM_EVENT_WRITABLE)) {
>         VIR_DEBUG("Enabling event timer");
>@@ -110,13 +110,14 @@ virNetClientStreamEventTimer(int timer ATTRIBUTE_UNUSED, void *opaque)
>
>     if (st->cb &&
>         (st->cbEvents & VIR_STREAM_EVENT_READABLE) &&
>-        (st->incomingOffset || st->incomingEOF))
>+        ((st->readVec < st->writeVec) || st->incomingEOF))
>         events |= VIR_STREAM_EVENT_READABLE;
>     if (st->cb &&
>         (st->cbEvents & VIR_STREAM_EVENT_WRITABLE))
>         events |= VIR_STREAM_EVENT_WRITABLE;
>
>-    VIR_DEBUG("Got Timer dispatch %d %d offset=%zu", events, st->cbEvents, st->incomingOffset);
>+    VIR_DEBUG("Got Timer dispatch %d %d readVec %zu writeVec %zu", events, st->cbEvents,
>+              st->readVec, st->writeVec);
>     if (events) {
>         virNetClientStreamEventCallback cb = st->cb;
>         void *cbOpaque = st->cbOpaque;
>@@ -161,7 +162,7 @@ void virNetClientStreamDispose(void *obj)
>     virNetClientStreamPtr st = obj;
>
>     virResetError(&st->err);
>-    VIR_FREE(st->incoming);
>+    VIR_FREE(st->incomingVec);
>     virObjectUnref(st->prog);
> }
>
>@@ -265,38 +266,49 @@ int virNetClientStreamQueuePacket(virNetClientStreamPtr st,
>                                   virNetMessagePtr msg)
> {
>     int ret = -1;
>-    size_t need;
>+    struct iovec iov;
>+    char *base;
>+    size_t piece, pieces, length, offset = 0, size = 1024*1024;
>
>     virObjectLock(st);
>-    need = msg->bufferLength - msg->bufferOffset;
>-    if (need) {
>-        size_t avail = st->incomingLength - st->incomingOffset;
>-        if (need > avail) {
>-            size_t extra = need - avail;
>-            if (VIR_REALLOC_N(st->incoming,
>-                              st->incomingLength + extra) < 0) {
>-                VIR_DEBUG("Out of memory handling stream data");
>-                goto cleanup;
>-            }
>-            st->incomingLength += extra;
>-        }
>
>-        memcpy(st->incoming + st->incomingOffset,
>-               msg->buffer + msg->bufferOffset,
>-               msg->bufferLength - msg->bufferOffset);
>-        st->incomingOffset += (msg->bufferLength - msg->bufferOffset);
>-    } else {
>+    length = msg->bufferLength - msg->bufferOffset;
>+
>+    if (length == 0) {
>         st->incomingEOF = true;
>+        goto end;
>     }
>
>-    VIR_DEBUG("Stream incoming data offset %zu length %zu EOF %d",
>-              st->incomingOffset, st->incomingLength,
>-              st->incomingEOF);
>+    pieces = (length + size - 1) / size;
>+    for (piece = 0; piece < pieces; piece++) {
>+        if (size > length - offset)
>+            size = length - offset;
>+
>+        if (VIR_ALLOC_N(base, size)) {
>+            VIR_DEBUG("Allocation failed");
>+            goto cleanup;
>+        }
>+
>+        memcpy(base, msg->buffer + msg->bufferOffset + offset, size);
>+        iov.iov_base = base;
>+        iov.iov_len = size;
>+        offset += size;
>+
>+        if (VIR_APPEND_ELEMENT(st->incomingVec, st->writeVec, iov) < 0) {
>+            VIR_DEBUG("Append failed");
>+            VIR_FREE(base);
>+            goto cleanup;
>+        }
>+        VIR_DEBUG("Wrote piece of vector. readVec %zu, writeVec %zu size %zu", st->readVec, st->writeVec, size);

Long line, should be wrapped.

>+    }
>+
>+ end:
>     virNetClientStreamEventTimerUpdate(st);
>-
>     ret = 0;
>
>  cleanup:
>+    VIR_DEBUG("Stream incoming data readVec %zu writeVec %zu EOF %d",
>+              st->readVec, st->writeVec, st->incomingEOF);
>     virObjectUnlock(st);
>     return ret;
> }
>@@ -361,17 +373,21 @@ int virNetClientStreamRecvPacket(virNetClientStreamPtr st,
>                                  size_t nbytes,
>                                  bool nonblock)
> {
>-    int rv = -1;
>+    int ret = -1;
>+    size_t partial, offset;
>+
>+    virObjectLock(st);
>+
>     VIR_DEBUG("st=%p client=%p data=%p nbytes=%zu nonblock=%d",
>               st, client, data, nbytes, nonblock);
>-    virObjectLock(st);
>-    if (!st->incomingOffset && !st->incomingEOF) {
>+
>+    if ((st->readVec >= st->writeVec) && !st->incomingEOF) {
>         virNetMessagePtr msg;
>-        int ret;
>+        int rv;
>
>         if (nonblock) {
>             VIR_DEBUG("Non-blocking mode and no data available");
>-            rv = -2;
>+            ret = -2;
>             goto cleanup;
>         }
>
>@@ -387,37 +403,51 @@ int virNetClientStreamRecvPacket(virNetClientStreamPtr st,
>
>         VIR_DEBUG("Dummy packet to wait for stream data");
>         virObjectUnlock(st);
>-        ret = virNetClientSendWithReplyStream(client, msg, st);
>+        rv = virNetClientSendWithReplyStream(client, msg, st);
>         virObjectLock(st);
>         virNetMessageFree(msg);
>
>-        if (ret < 0)
>+        if (rv < 0)
>             goto cleanup;
>     }
>
>-    VIR_DEBUG("After IO %zu", st->incomingOffset);
>-    if (st->incomingOffset) {
>-        int want = st->incomingOffset;
>-        if (want > nbytes)
>-            want = nbytes;
>-        memcpy(data, st->incoming, want);
>-        if (want < st->incomingOffset) {
>-            memmove(st->incoming, st->incoming + want, st->incomingOffset - want);
>-            st->incomingOffset -= want;
>+    offset = 0;
>+    partial = nbytes;
>+
>+    while (st->incomingVec && (st->readVec < st->writeVec)) {
>+        struct iovec *iov = st->incomingVec + st->readVec;
>+
>+        if (!iov || !iov->iov_base) {
>+            VIR_DEBUG("NULL pointer");

This should be virReportError(VIR_ERR_INTERNAL_ERROR, ...) or VIR_ERR_RPC.

>+            goto cleanup;
>+        }
>+
>+        if (partial < iov->iov_len) {
>+            memcpy(data+offset, iov->iov_base, partial);
>+            memmove(iov->iov_base, (char*)iov->iov_base+partial, iov->iov_len-partial);

Long line.

>+            iov->iov_len -= partial;
>+            offset += partial;
>+            VIR_DEBUG("Consumed %zu, left %zu", partial, iov->iov_len);
>+            break;
>         } else {

You don't need to enclose this in an else body thanks to the break
above.

>-            VIR_FREE(st->incoming);
>-            st->incomingOffset = st->incomingLength = 0;
>+            memcpy(data+offset, iov->iov_base, iov->iov_len);
>+            VIR_DEBUG("Consumed %zu. Moving to next piece", iov->iov_len);
>+            partial -= iov->iov_len;
>+            offset += iov->iov_len;
>+            VIR_FREE(iov->iov_base);
>+            iov->iov_len = 0;
>+            st->readVec++;

The only thing I would mention wrt to how it works after this patch is
that it will consume some memory that's not needed, precisely
(sizeof(struct iovec) + sizeof(void *)) * unreadMBs.  It might be
worth it to do:

  memmove(st->incomingVec, st->incomingVec + st->readVec,
          st->writeVec - st->readVec);
  VIR_SHRINK_N(st->incomingVec, st->readVec);
  st->writeVec -= st->readVec;
  st->readVec = 0;

Apart from that it's definitely *way* better approach.  What do you
think of such modification?  This would go either instead
'st->readVec++', but rather at the end of this function, so it's not
done after each MB read.

>         }
>-        rv = want;
>-    } else {
>-        rv = 0;
>+
>+        VIR_DEBUG("Read piece of vector. read %zu readVec %zu, writeVec %zu", offset, st->readVec, st->writeVec);

Long line.

>     }
>
>+    ret = offset;
>     virNetClientStreamEventTimerUpdate(st);
>
>  cleanup:
>     virObjectUnlock(st);
>-    return rv;
>+    return ret;
> }
>

Apart from mentioned cosmetic changes this looks very nice.

Martin
-------------- next part --------------
A non-text attachment was scrubbed...
Name: signature.asc
Type: application/pgp-signature
Size: 819 bytes
Desc: not available
URL: <http://listman.redhat.com/archives/libvir-list/attachments/20150714/4f080856/attachment-0001.sig>


More information about the libvir-list mailing list