[libvirt] [PATCH v3 08/31] Introduce virStreamSparseRecvAll

Michal Privoznik mprivozn at redhat.com
Tue May 16 14:03:48 UTC 2017


This is just a wrapper over new functions that have been just
introduced: virStreamRecvFlags(), virStreamRecvHole(). It's very
similar to virStreamRecvAll() except it handles sparse streams
well.

Signed-off-by: Michal Privoznik <mprivozn at redhat.com>
---
 include/libvirt/libvirt-stream.h |  33 ++++++++++-
 src/libvirt-stream.c             | 123 +++++++++++++++++++++++++++++++++++++++
 src/libvirt_public.syms          |   1 +
 3 files changed, 154 insertions(+), 3 deletions(-)

diff --git a/include/libvirt/libvirt-stream.h b/include/libvirt/libvirt-stream.h
index c4baaf7a3..a5e69a1c1 100644
--- a/include/libvirt/libvirt-stream.h
+++ b/include/libvirt/libvirt-stream.h
@@ -104,9 +104,9 @@ int virStreamSendAll(virStreamPtr st,
  * @nbytes: size of the data array
  * @opaque: optional application provided data
  *
- * The virStreamSinkFunc callback is used together
- * with the virStreamRecvAll function for libvirt to
- * provide the data that has been received.
+ * The virStreamSinkFunc callback is used together with the
+ * virStreamRecvAll or virStreamSparseRecvAll functions for
+ * libvirt to provide the data that has been received.
  *
  * The callback will be invoked multiple times,
  * providing data in small chunks. The application
@@ -129,6 +129,33 @@ int virStreamRecvAll(virStreamPtr st,
                      virStreamSinkFunc handler,
                      void *opaque);
 
+/**
+ * virStreamSinkHoleFunc:
+ * @st: the stream object
+ * @length: stream hole size
+ * @opaque: optional application provided data
+ *
+ * This callback is used together with the virStreamSparseRecvAll
+ * function for libvirt to provide the size of a hole that
+ * occurred in the stream.
+ *
+ * The callback may be invoked multiple times as holes are found
+ * during processing a stream. The application should create the
+ * hole in the stream target and then return. A return value of
+ * -1 at any time will abort the receive operation.
+ *
+ * Returns 0 on success,
+ *        -1 upon error
+ */
+typedef int (*virStreamSinkHoleFunc)(virStreamPtr st,
+                                     long long length,
+                                     void *opaque);
+
+int virStreamSparseRecvAll(virStreamPtr stream,
+                           virStreamSinkFunc handler,
+                           virStreamSinkHoleFunc holeHandler,
+                           void *opaque);
+
 typedef enum {
     VIR_STREAM_EVENT_READABLE  = (1 << 0),
     VIR_STREAM_EVENT_WRITABLE  = (1 << 1),
diff --git a/src/libvirt-stream.c b/src/libvirt-stream.c
index bedb6159a..6bf4c4f29 100644
--- a/src/libvirt-stream.c
+++ b/src/libvirt-stream.c
@@ -668,6 +668,129 @@ virStreamRecvAll(virStreamPtr stream,
 }
 
 
+/**
+ * virStreamSparseRecvAll:
+ * @stream: pointer to the stream object
+ * @handler: sink callback for writing data to application
+ * @holeHandler: stream hole callback for skipping holes
+ * @opaque: application defined data
+ *
+ * Receive the entire data stream, sending the data to the
+ * requested data sink @handler and calling the skip @holeHandler
+ * to generate holes for sparse stream targets. This is simply a
+ * convenient alternative to virStreamRecvFlags, for apps that do
+ * blocking-I/O.
+ *
+ * An example using this with a hypothetical file download
+ * API looks like:
+ *
+ *   int mysink(virStreamPtr st, const char *buf, int nbytes, void *opaque) {
+ *       int *fd = opaque;
+ *
+ *       return write(*fd, buf, nbytes);
+ *   }
+ *
+ *   int myskip(virStreamPtr st, long long offset, void *opaque) {
+ *       int *fd = opaque;
+ *
+ *       return lseek(*fd, offset, SEEK_CUR) == (off_t) -1 ? -1 : 0;
+ *   }
+ *
+ *   virStreamPtr st = virStreamNew(conn, 0);
+ *   int fd = open("demo.iso", O_WRONLY);
+ *
+ *   virConnectDownloadSparseFile(conn, st);
+ *   if (virStreamSparseRecvAll(st, mysink, myskip, &fd) < 0) {
+ *      ...report an error ...
+ *      goto done;
+ *   }
+ *   if (virStreamFinish(st) < 0)
+ *      ...report an error...
+ *   virStreamFree(st);
+ *   close(fd);
+ *
+ * Note that @opaque data is shared between both @handler and
+ * @holeHandler callbacks.
+ *
+ * Returns 0 if all the data was successfully received. The caller
+ * should invoke virStreamFinish(st) to flush the stream upon
+ * success and then virStreamFree(st).
+ *
+ * Returns -1 upon any error, with virStreamAbort() already
+ * having been called, so the caller need only call virStreamFree().
+ */
+int
+virStreamSparseRecvAll(virStreamPtr stream,
+                       virStreamSinkFunc handler,
+                       virStreamSinkHoleFunc holeHandler,
+                       void *opaque)
+{
+    char *bytes = NULL;
+    size_t want = VIR_NET_MESSAGE_LEGACY_PAYLOAD_MAX;
+    const unsigned int flags = VIR_STREAM_RECV_STOP_AT_HOLE;
+    int ret = -1;
+
+    VIR_DEBUG("stream=%p handler=%p holeHandler=%p opaque=%p",
+              stream, handler, holeHandler, opaque);
+
+    virResetLastError();
+
+    virCheckStreamReturn(stream, -1);
+    virCheckNonNullArgGoto(handler, cleanup);
+    virCheckNonNullArgGoto(holeHandler, cleanup);
+
+    if (stream->flags & VIR_STREAM_NONBLOCK) {
+        virReportError(VIR_ERR_OPERATION_INVALID, "%s",
+                       _("data sinks cannot be used for non-blocking streams"));
+        goto cleanup;
+    }
+
+    if (VIR_ALLOC_N(bytes, want) < 0)
+        goto cleanup;
+
+    for (;;) {
+        int got, offset = 0;
+        long long holeLen;
+        const unsigned int holeFlags = 0;
+
+        got = virStreamRecvFlags(stream, bytes, want, flags);
+        if (got == -3) {
+            if (virStreamRecvHole(stream, &holeLen, holeFlags) < 0) {
+                virStreamAbort(stream);
+                goto cleanup;
+            }
+
+            if (holeHandler(stream, holeLen, opaque) < 0) {
+                virStreamAbort(stream);
+                goto cleanup;
+            }
+            continue;
+        } else if (got < 0) {
+            goto cleanup;
+        } else if (got == 0) {
+            break;
+        }
+        while (offset < got) {
+            int done;
+            done = (handler)(stream, bytes + offset, got - offset, opaque);
+            if (done < 0) {
+                virStreamAbort(stream);
+                goto cleanup;
+            }
+            offset += done;
+        }
+    }
+    ret = 0;
+
+ cleanup:
+    VIR_FREE(bytes);
+
+    if (ret != 0)
+        virDispatchError(stream->conn);
+
+    return ret;
+}
+
 /**
  * virStreamEventAddCallback:
  * @stream: pointer to the stream object
diff --git a/src/libvirt_public.syms b/src/libvirt_public.syms
index b73cc8af1..37fc4e224 100644
--- a/src/libvirt_public.syms
+++ b/src/libvirt_public.syms
@@ -764,6 +764,7 @@ LIBVIRT_3.4.0 {
         virStreamRecvFlags;
         virStreamRecvHole;
         virStreamSendHole;
+        virStreamSparseRecvAll;
 } LIBVIRT_3.1.0;
 
 # .... define new API here using predicted next version number ....
-- 
2.13.0




More information about the libvir-list mailing list