[libvirt] [PATCH v3 09/31] Introduce virStreamSparseSendAll

Michal Privoznik mprivozn at redhat.com
Tue May 16 14:03:49 UTC 2017


This is just a wrapper over new function that have been just
introduced: virStreamSendHole() . It's very similar to
virStreamSendAll() except it handles sparse streams well.

Signed-off-by: Michal Privoznik <mprivozn at redhat.com>
---
 include/libvirt/libvirt-stream.h |  65 +++++++++++++++-
 src/libvirt-stream.c             | 159 +++++++++++++++++++++++++++++++++++++++
 src/libvirt_public.syms          |   1 +
 3 files changed, 222 insertions(+), 3 deletions(-)

diff --git a/include/libvirt/libvirt-stream.h b/include/libvirt/libvirt-stream.h
index a5e69a1c1..d18d43140 100644
--- a/include/libvirt/libvirt-stream.h
+++ b/include/libvirt/libvirt-stream.h
@@ -71,9 +71,9 @@ int virStreamRecvHole(virStreamPtr,
  * @nbytes: size of the data array
  * @opaque: optional application provided data
  *
- * The virStreamSourceFunc callback is used together
- * with the virStreamSendAll function for libvirt to
- * obtain the data that is to be sent.
+ * The virStreamSourceFunc callback is used together with
+ * the virStreamSendAll and virStreamSparseSendAll functions
+ * for libvirt to obtain the data that is to be sent.
  *
  * The callback will be invoked multiple times,
  * fetching data in small chunks. The application
@@ -96,6 +96,65 @@ int virStreamSendAll(virStreamPtr st,
                      virStreamSourceFunc handler,
                      void *opaque);
 
+/**
+ * virStreamSourceHoleFunc:
+ * @st: the stream object
+ * @inData: are we in data section
+ * @length: how long is the section we are currently in
+ * @opaque: optional application provided data
+ *
+ * The virStreamSourceHoleFunc callback is used together with the
+ * virStreamSparseSendAll function for libvirt to obtain the
+ * length of section stream is currently in.
+ *
+ * Moreover, upon successful return, @length should be updated
+ * with how many bytes are left until the current section ends
+ * (either data section or hole section). Also the stream is
+ * currently in data section, @inData should be set to a non-zero
+ * value and vice versa.
+ *
+ * NB: there's an implicit hole at the end of each file. If
+ * that's the case, @inData and @length should be both set to 0.
+ *
+ * This function should not adjust the current position within
+ * the file.
+ *
+ * Returns 0 on success,
+ *        -1 upon error
+ */
+typedef int (*virStreamSourceHoleFunc)(virStreamPtr st,
+                                       int *inData,
+                                       long long *length,
+                                       void *opaque);
+
+/**
+ * virStreamSourceSkipFunc:
+ * @st: the stream object
+ * @length: stream hole size
+ * @opaque: optional application provided data
+ *
+ * This callback is used together with the virStreamSparseSendAll
+ * to skip holes in the underlying file as reported by
+ * virStreamSourceHoleFunc.
+ *
+ * The callback may be invoked multiple times as holes are found
+ * during processing a stream. The application should skip
+ * processing the hole in the stream source and then return.
+ * A return value of -1 at any time will abort the send operation.
+ *
+ * Returns 0 on success,
+ *        -1 upon error.
+ */
+typedef int (*virStreamSourceSkipFunc)(virStreamPtr st,
+                                       long long length,
+                                       void *opaque);
+
+int virStreamSparseSendAll(virStreamPtr st,
+                           virStreamSourceFunc handler,
+                           virStreamSourceHoleFunc holeHandler,
+                           virStreamSourceSkipFunc skipHandler,
+                           void *opaque);
+
 /**
  * virStreamSinkFunc:
  *
diff --git a/src/libvirt-stream.c b/src/libvirt-stream.c
index 6bf4c4f29..4cbe5eee1 100644
--- a/src/libvirt-stream.c
+++ b/src/libvirt-stream.c
@@ -574,6 +574,165 @@ virStreamSendAll(virStreamPtr stream,
 }
 
 
+/**
+ * virStreamSparseSendAll:
+ * @stream: pointer to the stream object
+ * @handler: source callback for reading data from application
+ * @holeHandler: source callback for determining holes
+ * @skipHandler: skip holes as reported by @holeHandler
+ * @opaque: application defined data
+ *
+ * Send the entire data stream, reading the data from the
+ * requested data source. This is simply a convenient alternative
+ * to virStreamSend, for apps that do blocking-I/O.
+ *
+ * An example using this with a hypothetical file upload
+ * API looks like
+ *
+ *   int mysource(virStreamPtr st, char *buf, int nbytes, void *opaque) {
+ *       int *fd = opaque;
+ *
+ *       return read(*fd, buf, nbytes);
+ *   }
+ *
+ *   int myskip(virStreamPtr st, long long offset, void *opaque) {
+ *       int *fd = opaque;
+ *
+ *       return lseek(*fd, offset, SEEK_CUR) == (off_t) -1 ? -1 : 0;
+ *   }
+ *
+ *   int myindata(virStreamPtr st, int *inData,
+ *                long long *offset, void *opaque) {
+ *       int *fd = opaque;
+ *
+ *       if (@fd in hole) {
+ *           *inData = 0;
+ *           *offset = holeSize;
+ *       } else {
+ *           *inData = 1;
+ *           *offset = dataSize;
+ *       }
+ *
+ *       return 0;
+ *   }
+ *
+ *   virStreamPtr st = virStreamNew(conn, 0);
+ *   int fd = open("demo.iso", O_RDONLY);
+ *
+ *   virConnectUploadFile(conn, st);
+ *   if (virStreamSparseSendAll(st,
+ *                              mysource,
+ *                              myindata,
+ *                              myskip,
+ *                              &fd) < 0) {
+ *      ...report an error ...
+ *      goto done;
+ *   }
+ *   if (virStreamFinish(st) < 0)
+ *      ...report an error...
+ *   virStreamFree(st);
+ *   close(fd);
+ *
+ * Note that @opaque data are shared between @handler, @holeHandler and @skipHandler.
+ *
+ * Returns 0 if all the data was successfully sent. The caller
+ * should invoke virStreamFinish(st) to flush the stream upon
+ * success and then virStreamFree.
+ *
+ * Returns -1 upon any error, with virStreamAbort() already
+ * having been called,  so the caller need only call
+ * virStreamFree().
+ */
+int virStreamSparseSendAll(virStreamPtr stream,
+                           virStreamSourceFunc handler,
+                           virStreamSourceHoleFunc holeHandler,
+                           virStreamSourceSkipFunc skipHandler,
+                           void *opaque)
+{
+    char *bytes = NULL;
+    size_t want = VIR_NET_MESSAGE_LEGACY_PAYLOAD_MAX;
+    int ret = -1;
+    unsigned long long dataLen = 0;
+
+    VIR_DEBUG("stream=%p handler=%p holeHandler=%p opaque=%p",
+              stream, handler, holeHandler, opaque);
+
+    virResetLastError();
+
+    virCheckStreamReturn(stream, -1);
+    virCheckNonNullArgGoto(handler, cleanup);
+    virCheckNonNullArgGoto(holeHandler, cleanup);
+    virCheckNonNullArgGoto(skipHandler, cleanup);
+
+    if (stream->flags & VIR_STREAM_NONBLOCK) {
+        virReportError(VIR_ERR_OPERATION_INVALID, "%s",
+                       _("data sources cannot be used for non-blocking streams"));
+        goto cleanup;
+    }
+
+    if (VIR_ALLOC_N(bytes, want) < 0)
+        goto cleanup;
+
+    for (;;) {
+        int inData, got, offset = 0;
+        long long sectionLen;
+        const unsigned int skipFlags = 0;
+
+        if (!dataLen) {
+            if (holeHandler(stream, &inData, &sectionLen, opaque) < 0) {
+                virStreamAbort(stream);
+                goto cleanup;
+            }
+
+            if (!inData && sectionLen) {
+                if (virStreamSendHole(stream, sectionLen, skipFlags) < 0) {
+                    virStreamAbort(stream);
+                    goto cleanup;
+                }
+
+                if (skipHandler(stream, sectionLen, opaque) < 0) {
+                    virReportSystemError(errno, "%s",
+                                         _("unable to skip hole"));
+                    virStreamAbort(stream);
+                    goto cleanup;
+                }
+                continue;
+            } else {
+                dataLen = sectionLen;
+            }
+        }
+
+        if (want > dataLen)
+            want = dataLen;
+
+        got = (handler)(stream, bytes, want, opaque);
+        if (got < 0) {
+            virStreamAbort(stream);
+            goto cleanup;
+        }
+        if (got == 0)
+            break;
+        while (offset < got) {
+            int done;
+            done = virStreamSend(stream, bytes + offset, got - offset);
+            if (done < 0)
+                goto cleanup;
+            offset += done;
+            dataLen -= done;
+        }
+    }
+    ret = 0;
+
+ cleanup:
+    VIR_FREE(bytes);
+
+    if (ret != 0)
+        virDispatchError(stream->conn);
+
+    return ret;
+}
+
+
 /**
  * virStreamRecvAll:
  * @stream: pointer to the stream object
diff --git a/src/libvirt_public.syms b/src/libvirt_public.syms
index 37fc4e224..fac77fbea 100644
--- a/src/libvirt_public.syms
+++ b/src/libvirt_public.syms
@@ -765,6 +765,7 @@ LIBVIRT_3.4.0 {
         virStreamRecvHole;
         virStreamSendHole;
         virStreamSparseRecvAll;
+        virStreamSparseSendAll;
 } LIBVIRT_3.1.0;
 
 # .... define new API here using predicted next version number ....
-- 
2.13.0




More information about the libvir-list mailing list