[libvirt] [PATCH] rpc: RH1026137: Fix slow volume download (virsh vol-download)
Martin Kletzander
mkletzan at redhat.com
Tue Jul 14 09:25:55 UTC 2015
On Sat, Jun 06, 2015 at 07:36:48PM +0000, Ossi Herrala wrote:
>Use I/O vector (iovec) instead of one huge memory buffer as suggested
>in https://bugzilla.redhat.com/show_bug.cgi?id=1026137#c7. This avoids
>doing memmove() to big buffers and performance doesn't degrade if
>source (virNetClientStreamQueuePacket()) is faster than sink
>(virNetClientStreamRecvPacket()).
Sorry to miss this mail, it got buried somehow and I haven't got to it
until now since nobody pinged it. Sorry for the long wait then.
I would remove the 'RH1026137: ' from the commit message and instead
added a 'Resolves: http://bugzilla.redhat.com/1026137' or something
similar here.
>---
> src/rpc/virnetclientstream.c | 134 +++++++++++++++++++++++++----------------
> 1 files changed, 82 insertions(+), 52 deletions(-)
>
>diff --git a/src/rpc/virnetclientstream.c b/src/rpc/virnetclientstream.c
>index b428f4b..18c6e8b 100644
>--- a/src/rpc/virnetclientstream.c
>+++ b/src/rpc/virnetclientstream.c
>@@ -49,9 +49,9 @@ struct _virNetClientStream {
> * time by stopping consuming any incoming data
> * off the socket....
> */
>- char *incoming;
>- size_t incomingOffset;
>- size_t incomingLength;
>+ struct iovec *incomingVec; /* I/O Vector to hold data */
>+ size_t writeVec; /* Vectors produced */
>+ size_t readVec; /* Vectors consumed */
> bool incomingEOF;
>
> virNetClientStreamEventCallback cb;
>@@ -86,9 +86,9 @@ virNetClientStreamEventTimerUpdate(virNetClientStreamPtr st)
> if (!st->cb)
> return;
>
>- VIR_DEBUG("Check timer offset=%zu %d", st->incomingOffset, st->cbEvents);
>+ VIR_DEBUG("Check timer readVec %zu writeVec %zu %d", st->readVec, st->writeVec, st->cbEvents);
>
>- if (((st->incomingOffset || st->incomingEOF) &&
>+ if ((((st->readVec < st->writeVec) || st->incomingEOF) &&
> (st->cbEvents & VIR_STREAM_EVENT_READABLE)) ||
> (st->cbEvents & VIR_STREAM_EVENT_WRITABLE)) {
> VIR_DEBUG("Enabling event timer");
>@@ -110,13 +110,14 @@ virNetClientStreamEventTimer(int timer ATTRIBUTE_UNUSED, void *opaque)
>
> if (st->cb &&
> (st->cbEvents & VIR_STREAM_EVENT_READABLE) &&
>- (st->incomingOffset || st->incomingEOF))
>+ ((st->readVec < st->writeVec) || st->incomingEOF))
> events |= VIR_STREAM_EVENT_READABLE;
> if (st->cb &&
> (st->cbEvents & VIR_STREAM_EVENT_WRITABLE))
> events |= VIR_STREAM_EVENT_WRITABLE;
>
>- VIR_DEBUG("Got Timer dispatch %d %d offset=%zu", events, st->cbEvents, st->incomingOffset);
>+ VIR_DEBUG("Got Timer dispatch %d %d readVec %zu writeVec %zu", events, st->cbEvents,
>+ st->readVec, st->writeVec);
> if (events) {
> virNetClientStreamEventCallback cb = st->cb;
> void *cbOpaque = st->cbOpaque;
>@@ -161,7 +162,7 @@ void virNetClientStreamDispose(void *obj)
> virNetClientStreamPtr st = obj;
>
> virResetError(&st->err);
>- VIR_FREE(st->incoming);
>+ VIR_FREE(st->incomingVec);
> virObjectUnref(st->prog);
> }
>
>@@ -265,38 +266,49 @@ int virNetClientStreamQueuePacket(virNetClientStreamPtr st,
> virNetMessagePtr msg)
> {
> int ret = -1;
>- size_t need;
>+ struct iovec iov;
>+ char *base;
>+ size_t piece, pieces, length, offset = 0, size = 1024*1024;
>
> virObjectLock(st);
>- need = msg->bufferLength - msg->bufferOffset;
>- if (need) {
>- size_t avail = st->incomingLength - st->incomingOffset;
>- if (need > avail) {
>- size_t extra = need - avail;
>- if (VIR_REALLOC_N(st->incoming,
>- st->incomingLength + extra) < 0) {
>- VIR_DEBUG("Out of memory handling stream data");
>- goto cleanup;
>- }
>- st->incomingLength += extra;
>- }
>
>- memcpy(st->incoming + st->incomingOffset,
>- msg->buffer + msg->bufferOffset,
>- msg->bufferLength - msg->bufferOffset);
>- st->incomingOffset += (msg->bufferLength - msg->bufferOffset);
>- } else {
>+ length = msg->bufferLength - msg->bufferOffset;
>+
>+ if (length == 0) {
> st->incomingEOF = true;
>+ goto end;
> }
>
>- VIR_DEBUG("Stream incoming data offset %zu length %zu EOF %d",
>- st->incomingOffset, st->incomingLength,
>- st->incomingEOF);
>+ pieces = (length + size - 1) / size;
>+ for (piece = 0; piece < pieces; piece++) {
>+ if (size > length - offset)
>+ size = length - offset;
>+
>+ if (VIR_ALLOC_N(base, size)) {
>+ VIR_DEBUG("Allocation failed");
>+ goto cleanup;
>+ }
>+
>+ memcpy(base, msg->buffer + msg->bufferOffset + offset, size);
>+ iov.iov_base = base;
>+ iov.iov_len = size;
>+ offset += size;
>+
>+ if (VIR_APPEND_ELEMENT(st->incomingVec, st->writeVec, iov) < 0) {
>+ VIR_DEBUG("Append failed");
>+ VIR_FREE(base);
>+ goto cleanup;
>+ }
>+ VIR_DEBUG("Wrote piece of vector. readVec %zu, writeVec %zu size %zu", st->readVec, st->writeVec, size);
Long line, should be wrapped.
>+ }
>+
>+ end:
> virNetClientStreamEventTimerUpdate(st);
>-
> ret = 0;
>
> cleanup:
>+ VIR_DEBUG("Stream incoming data readVec %zu writeVec %zu EOF %d",
>+ st->readVec, st->writeVec, st->incomingEOF);
> virObjectUnlock(st);
> return ret;
> }
>@@ -361,17 +373,21 @@ int virNetClientStreamRecvPacket(virNetClientStreamPtr st,
> size_t nbytes,
> bool nonblock)
> {
>- int rv = -1;
>+ int ret = -1;
>+ size_t partial, offset;
>+
>+ virObjectLock(st);
>+
> VIR_DEBUG("st=%p client=%p data=%p nbytes=%zu nonblock=%d",
> st, client, data, nbytes, nonblock);
>- virObjectLock(st);
>- if (!st->incomingOffset && !st->incomingEOF) {
>+
>+ if ((st->readVec >= st->writeVec) && !st->incomingEOF) {
> virNetMessagePtr msg;
>- int ret;
>+ int rv;
>
> if (nonblock) {
> VIR_DEBUG("Non-blocking mode and no data available");
>- rv = -2;
>+ ret = -2;
> goto cleanup;
> }
>
>@@ -387,37 +403,51 @@ int virNetClientStreamRecvPacket(virNetClientStreamPtr st,
>
> VIR_DEBUG("Dummy packet to wait for stream data");
> virObjectUnlock(st);
>- ret = virNetClientSendWithReplyStream(client, msg, st);
>+ rv = virNetClientSendWithReplyStream(client, msg, st);
> virObjectLock(st);
> virNetMessageFree(msg);
>
>- if (ret < 0)
>+ if (rv < 0)
> goto cleanup;
> }
>
>- VIR_DEBUG("After IO %zu", st->incomingOffset);
>- if (st->incomingOffset) {
>- int want = st->incomingOffset;
>- if (want > nbytes)
>- want = nbytes;
>- memcpy(data, st->incoming, want);
>- if (want < st->incomingOffset) {
>- memmove(st->incoming, st->incoming + want, st->incomingOffset - want);
>- st->incomingOffset -= want;
>+ offset = 0;
>+ partial = nbytes;
>+
>+ while (st->incomingVec && (st->readVec < st->writeVec)) {
>+ struct iovec *iov = st->incomingVec + st->readVec;
>+
>+ if (!iov || !iov->iov_base) {
>+ VIR_DEBUG("NULL pointer");
This should be virReportError(VIR_ERR_INTERNAL_ERROR, ...) or VIR_ERR_RPC.
>+ goto cleanup;
>+ }
>+
>+ if (partial < iov->iov_len) {
>+ memcpy(data+offset, iov->iov_base, partial);
>+ memmove(iov->iov_base, (char*)iov->iov_base+partial, iov->iov_len-partial);
Long line.
>+ iov->iov_len -= partial;
>+ offset += partial;
>+ VIR_DEBUG("Consumed %zu, left %zu", partial, iov->iov_len);
>+ break;
> } else {
You don't need to enclose this in an else body thanks to the break
above.
>- VIR_FREE(st->incoming);
>- st->incomingOffset = st->incomingLength = 0;
>+ memcpy(data+offset, iov->iov_base, iov->iov_len);
>+ VIR_DEBUG("Consumed %zu. Moving to next piece", iov->iov_len);
>+ partial -= iov->iov_len;
>+ offset += iov->iov_len;
>+ VIR_FREE(iov->iov_base);
>+ iov->iov_len = 0;
>+ st->readVec++;
The only thing I would mention wrt to how it works after this patch is
that it will consume some memory that's not needed, precisely
(sizeof(struct iovec) + sizeof(void *)) * unreadMBs. It might be
worth it to do:
memmove(st->incomingVec, st->incomingVec + st->readVec,
st->writeVec - st->readVec);
VIR_SHRINK_N(st->incomingVec, st->readVec);
st->writeVec -= st->readVec;
st->readVec = 0;
Apart from that it's definitely *way* better approach. What do you
think of such modification? This would go either instead
'st->readVec++', but rather at the end of this function, so it's not
done after each MB read.
> }
>- rv = want;
>- } else {
>- rv = 0;
>+
>+ VIR_DEBUG("Read piece of vector. read %zu readVec %zu, writeVec %zu", offset, st->readVec, st->writeVec);
Long line.
> }
>
>+ ret = offset;
> virNetClientStreamEventTimerUpdate(st);
>
> cleanup:
> virObjectUnlock(st);
>- return rv;
>+ return ret;
> }
>
Apart from mentioned cosmetic changes this looks very nice.
Martin
-------------- next part --------------
A non-text attachment was scrubbed...
Name: signature.asc
Type: application/pgp-signature
Size: 819 bytes
Desc: not available
URL: <http://listman.redhat.com/archives/libvir-list/attachments/20150714/4f080856/attachment-0001.sig>
More information about the libvir-list
mailing list