[libvirt] [PATCH RFC 44/48] iohelper_message: Add support for sparse streams

Michal Privoznik mprivozn at redhat.com
Wed Jun 22 14:44:01 UTC 2016


Now that we have formatted messages flying through pipe back and
forth, we can start introducing support for other types of
messages. For instance, a type to represent a hole in
file/stream.

Signed-off-by: Michal Privoznik <mprivozn at redhat.com>
---
 daemon/stream.c                 |  21 ++++---
 po/POTFILES.in                  |   1 -
 src/iohelper/iohelper_message.c | 131 ++++++++++++++++++++++++++++++++++------
 src/libvirt-stream.c            |   1 +
 tests/Makefile.am               |   2 +-
 5 files changed, 129 insertions(+), 27 deletions(-)

diff --git a/daemon/stream.c b/daemon/stream.c
index 22d7cf7..83f7310 100644
--- a/daemon/stream.c
+++ b/daemon/stream.c
@@ -833,14 +833,19 @@ daemonStreamHandleRead(virNetServerClientPtr client,
         VIR_DEBUG("rv=%d inData=%d length=%llu", rv, inData, length);
 
         if (rv < 0) {
-            if (virNetServerProgramSendStreamError(remoteProgram,
-                                                   client,
-                                                   msg,
-                                                   &rerr,
-                                                   stream->procedure,
-                                                   stream->serial) < 0)
-                goto cleanup;
-            msg = NULL;
+            if (rv == -2) {
+                /* Unable to determine yet. Claim success. */
+            } else {
+                /* Proper error. */
+                if (virNetServerProgramSendStreamError(remoteProgram,
+                                                       client,
+                                                       msg,
+                                                       &rerr,
+                                                       stream->procedure,
+                                                       stream->serial) < 0)
+                    goto cleanup;
+                msg = NULL;
+            }
 
             /* We're done with this call */
             goto done;
diff --git a/po/POTFILES.in b/po/POTFILES.in
index 7f40200..9f4866c 100644
--- a/po/POTFILES.in
+++ b/po/POTFILES.in
@@ -64,7 +64,6 @@ src/interface/interface_backend_netcf.c
 src/interface/interface_backend_udev.c
 src/internal.h
 src/iohelper/iohelper.c
-src/iohelper/iohelper_message.c
 src/libvirt-admin.c
 src/libvirt-domain-snapshot.c
 src/libvirt-domain.c
diff --git a/src/iohelper/iohelper_message.c b/src/iohelper/iohelper_message.c
index d900c2f..02c0283 100644
--- a/src/iohelper/iohelper_message.c
+++ b/src/iohelper/iohelper_message.c
@@ -40,6 +40,7 @@ struct iohelperCtl {
     virNetMessagePtr msg;
     bool msgReadyRead;
     bool msgReadyWrite;
+    unsigned long long skipLength;
 };
 
 typedef ssize_t (*readfunc)(int fd, void *buf, size_t count);
@@ -122,19 +123,20 @@ messageRecv(iohelperCtlPtr ctl)
 {
     virNetMessagePtr msg = ctl->msg;
     readfunc readF = ctl->blocking ? saferead : read;
+    virNetStreamSkip data;
 
     ctl->msgReadyRead = false;
 
-    if (!msg->bufferLength) {
-        msg->bufferLength = 4;
-        if (VIR_ALLOC_N(msg->buffer, msg->bufferLength) < 0)
-            return -1;
-    }
-
     while (true) {
         ssize_t nread;
         size_t want;
 
+        if (!msg->bufferLength) {
+            msg->bufferLength = 4;
+            if (VIR_ALLOC_N(msg->buffer, msg->bufferLength) < 0)
+                return -1;
+        }
+
         want = msg->bufferLength - msg->bufferOffset;
 
      reread:
@@ -164,7 +166,17 @@ messageRecv(iohelperCtlPtr ctl)
                 if (virNetMessageDecodeHeader(msg) < 0)
                     return -1;
 
-                /* Here we would decode the payload someday */
+                if (msg->header.type == VIR_NET_STREAM_SKIP) {
+                    if (virNetMessageDecodePayload(msg,
+                                                   (xdrproc_t) xdr_virNetStreamSkip,
+                                                   &data) < 0) {
+                        return -1;
+                    }
+
+                    ctl->skipLength += data.length;
+                    messageClear(ctl);
+                    continue;
+                }
 
                 ctl->msgReadyRead = true;
                 return msg->bufferLength - msg->bufferOffset;
@@ -239,6 +251,12 @@ iohelperRead(iohelperCtlPtr ctl,
         }
     }
 
+    /* Should never happen, but things change. */
+    if (msg->header.type != VIR_NET_STREAM) {
+        errno = EAGAIN;
+        return -1;
+    }
+
     if (want > msg->bufferLength - msg->bufferOffset)
         want = msg->bufferLength - msg->bufferOffset;
 
@@ -312,21 +330,100 @@ iohelperWrite(iohelperCtlPtr ctl,
 
 
 int
-iohelperSkip(iohelperCtlPtr ctl ATTRIBUTE_UNUSED,
-             unsigned long long length ATTRIBUTE_UNUSED)
+iohelperSkip(iohelperCtlPtr ctl,
+             unsigned long long length)
 {
-    virReportError(VIR_ERR_INTERNAL_ERROR, "%s",
-                   _("sparse stream not supported"));
+    virNetMessagePtr msg = ctl->msg;
+    virNetStreamSkip data;
+
+    if (messageReadyRead(ctl)) {
+        /* This stream is used for reading. */
+        return 0;
+    }
+
+    if (!messageReadyWrite(ctl)) {
+        ssize_t nwritten;
+        /* Okay, the outgoing message is not fully sent. Try to
+         * finish the sending and recheck. */
+        if ((nwritten = messageSend(ctl)) < 0)
+            return -1;
+
+        if (!nwritten && errno != EAGAIN)
+            return 0;
+
+        if (!messageReadyWrite(ctl)) {
+            errno = EAGAIN;
+            return -2;
+        }
+    }
+
+    memset(&msg->header, 0, sizeof(msg->header));
+    msg->header.type = VIR_NET_STREAM_SKIP;
+    msg->header.status = VIR_NET_CONTINUE;
+
+    memset(&data, 0, sizeof(data));
+    data.length = length;
+
+    /* Encoding a message is fatal and we should discard any
+     * partially encoded message. */
+    if (virNetMessageEncodeHeader(msg) < 0)
+        goto error;
+
+    if (virNetMessageEncodePayload(msg,
+                                   (xdrproc_t) xdr_virNetStreamSkip,
+                                   &data) < 0)
+        goto error;
+
+    /* At this point, the message is successfully encoded. Don't
+     * discard it if something below fails. */
+    if (messageSend(ctl) < 0)
+        return -1;
+
+    return 0;
+
+ error:
+    messageClear(ctl);
     return -1;
 }
 
 
 int
-iohelperInData(iohelperCtlPtr ctl ATTRIBUTE_UNUSED,
-               int *inData ATTRIBUTE_UNUSED,
-               unsigned long long *length ATTRIBUTE_UNUSED)
+iohelperInData(iohelperCtlPtr ctl,
+               int *inData,
+               unsigned long long *length)
 {
-    virReportError(VIR_ERR_INTERNAL_ERROR, "%s",
-                   _("sparse stream not supported"));
-    return -1;
+    virNetMessagePtr msg;
+
+    /* Make sure we have a message waiting in the queue. */
+
+    if (!messageReadyRead(ctl)) {
+        ssize_t nread;
+        /* Okay, the incoming message is not fully read. Try to
+         * finish its receiving and recheck. */
+        if ((nread = messageRecv(ctl)) < 0)
+            return -1;
+
+        if (!nread && errno != EAGAIN) {
+            /* EOF */
+            *inData = *length = 0;
+            return 0;
+        }
+
+        if (!messageReadyRead(ctl)) {
+            errno = EAGAIN;
+            return -2;
+        }
+    }
+
+    if (ctl->skipLength) {
+        *inData = 0;
+        *length = ctl->skipLength;
+        ctl->skipLength = 0;
+    } else {
+        msg = ctl->msg;
+        *inData = 1;
+        *length = msg->bufferLength - msg->bufferOffset;
+    }
+
+    return 0;
 }
diff --git a/src/libvirt-stream.c b/src/libvirt-stream.c
index 2632d55..13cbbe5 100644
--- a/src/libvirt-stream.c
+++ b/src/libvirt-stream.c
@@ -491,6 +491,7 @@ virStreamHoleSize(virStreamPtr stream,
  * and return 0.
  *
  * Returns 0 on success,
+ *        -2 if unable to determine yet,
  *        -1 otherwise
  */
 int
diff --git a/tests/Makefile.am b/tests/Makefile.am
index a87de5f..aa35a6f 100644
--- a/tests/Makefile.am
+++ b/tests/Makefile.am
@@ -1303,7 +1303,7 @@ iohelpermessagetest_SOURCES = \
 iohelpermessagetest_CFLAGS = \
 	$(AM_CFLAGS) -I$(top_srcdir)/src/iohelper
 iohelpermessagetest_LDADD = \
-	$(LDADDS) ../src/libvirt-iohelper.la
+	$(LDADDS) ../src/libvirt-iohelper.la ../src/libvirt-net-rpc.la
 endif WITH_LIBVIRTD
 
 libshunload_la_SOURCES = shunloadhelper.c
-- 
2.8.4




More information about the libvir-list mailing list