[libvirt] [PATCH 9/9] Ensure that EOF is dispatched to the stream callback

Daniel P. Berrange berrange at redhat.com
Tue Jun 28 17:01:59 UTC 2011


When the remote client receives end of file on the stream
it never invokes the stream callback. Applications relying
on async event driven I/O will thus never see the EOF
condition on the stream

* src/rpc/virnetclient.c, src/rpc/virnetclientstream.c:
  Ensure EOF is dispatched
---
 src/rpc/virnetclient.c       |    3 --
 src/rpc/virnetclientstream.c |   43 ++++++++++++++++++++++++-----------------
 2 files changed, 25 insertions(+), 21 deletions(-)

diff --git a/src/rpc/virnetclient.c b/src/rpc/virnetclient.c
index dc0ce51..39bdf14 100644
--- a/src/rpc/virnetclient.c
+++ b/src/rpc/virnetclient.c
@@ -580,9 +580,6 @@ static int virNetClientCallDispatchStream(virNetClientPtr client)
         if (thecall && thecall->expectReply) {
             VIR_DEBUG("Got sync data packet completion");
             thecall->mode = VIR_NET_CLIENT_MODE_COMPLETE;
-        } else {
-            // XXX
-            //remoteStreamEventTimerUpdate(privst);
         }
         return 0;
     }
diff --git a/src/rpc/virnetclientstream.c b/src/rpc/virnetclientstream.c
index 9da5aee..d5efab1 100644
--- a/src/rpc/virnetclientstream.c
+++ b/src/rpc/virnetclientstream.c
@@ -55,6 +55,7 @@ struct _virNetClientStream {
     char *incoming;
     size_t incomingOffset;
     size_t incomingLength;
+    bool incomingEOF;
 
     virNetClientStreamEventCallback cb;
     void *cbOpaque;
@@ -73,7 +74,7 @@ virNetClientStreamEventTimerUpdate(virNetClientStreamPtr st)
 
     VIR_DEBUG("Check timer offset=%zu %d", st->incomingOffset, st->cbEvents);
 
-    if ((st->incomingOffset &&
+    if (((st->incomingOffset || st->incomingEOF) &&
          (st->cbEvents & VIR_STREAM_EVENT_READABLE)) ||
         (st->cbEvents & VIR_STREAM_EVENT_WRITABLE)) {
         VIR_DEBUG("Enabling event timer");
@@ -96,7 +97,7 @@ virNetClientStreamEventTimer(int timer ATTRIBUTE_UNUSED, void *opaque)
 
     if (st->cb &&
         (st->cbEvents & VIR_STREAM_EVENT_READABLE) &&
-        st->incomingOffset)
+        (st->incomingOffset || st->incomingEOF))
         events |= VIR_STREAM_EVENT_READABLE;
     if (st->cb &&
         (st->cbEvents & VIR_STREAM_EVENT_WRITABLE))
@@ -284,24 +285,30 @@ int virNetClientStreamQueuePacket(virNetClientStreamPtr st,
 
     virMutexLock(&st->lock);
     need = msg->bufferLength - msg->bufferOffset;
-    size_t avail = st->incomingLength - st->incomingOffset;
-    if (need > avail) {
-        size_t extra = need - avail;
-        if (VIR_REALLOC_N(st->incoming,
-                          st->incomingLength + extra) < 0) {
-            VIR_DEBUG("Out of memory handling stream data");
-            goto cleanup;
+    if (need) {
+        size_t avail = st->incomingLength - st->incomingOffset;
+        if (need > avail) {
+            size_t extra = need - avail;
+            if (VIR_REALLOC_N(st->incoming,
+                              st->incomingLength + extra) < 0) {
+                VIR_DEBUG("Out of memory handling stream data");
+                goto cleanup;
+            }
+            st->incomingLength += extra;
         }
-        st->incomingLength += extra;
-    }
 
-    memcpy(st->incoming + st->incomingOffset,
-           msg->buffer + msg->bufferOffset,
-           msg->bufferLength - msg->bufferOffset);
-    st->incomingOffset += (msg->bufferLength - msg->bufferOffset);
+        memcpy(st->incoming + st->incomingOffset,
+               msg->buffer + msg->bufferOffset,
+               msg->bufferLength - msg->bufferOffset);
+        st->incomingOffset += (msg->bufferLength - msg->bufferOffset);
+    } else {
+        st->incomingEOF = true;
+    }
 
-    VIR_DEBUG("Stream incoming data offset %zu length %zu",
-              st->incomingOffset, st->incomingLength);
+    VIR_DEBUG("Stream incoming data offset %zu length %zu EOF %d",
+              st->incomingOffset, st->incomingLength,
+              st->incomingEOF);
+    virNetClientStreamEventTimerUpdate(st);
 
     ret = 0;
 
@@ -372,7 +379,7 @@ int virNetClientStreamRecvPacket(virNetClientStreamPtr st,
     VIR_DEBUG("st=%p client=%p data=%p nbytes=%zu nonblock=%d",
               st, client, data, nbytes, nonblock);
     virMutexLock(&st->lock);
-    if (!st->incomingOffset) {
+    if (!st->incomingOffset && !st->incomingEOF) {
         virNetMessagePtr msg;
         int ret;
 
-- 
1.7.4.4




More information about the libvir-list mailing list