[libvirt] [PATCH 01/11] Add public API definition for data stream handling

Daniel P. Berrange berrange at redhat.com
Mon Aug 24 20:51:04 UTC 2009


* include/libvirt/libvirt.h, include/libvirt/libvirt.h.in: Public
  API contract for virStreamPtr object
* src/libvirt_public.syms: Export data stream APIs
* src/libvirt_private.syms: Export internal helper APIs
* src/libvirt.c: Data stream API driver dispatch
* src/datatypes.h, src/datatypes.c: Internal helpers for virStreamPtr
  object
* src/driver.h: Define internal driver API for streams
* .x-sc_avoid_write: Ignore src/libvirt.c because it trips
  up on comments including write()
---
 .x-sc_avoid_write            |    1 +
 include/libvirt/libvirt.h    |   93 ++++++
 include/libvirt/libvirt.h.in |   93 ++++++
 src/datatypes.c              |   59 ++++
 src/datatypes.h              |   33 ++
 src/driver.h                 |   34 ++
 src/libvirt.c                |  683 ++++++++++++++++++++++++++++++++++++++++++
 src/libvirt_private.syms     |    2 +
 src/libvirt_public.syms      |   15 +
 9 files changed, 1013 insertions(+), 0 deletions(-)

diff --git a/.x-sc_avoid_write b/.x-sc_avoid_write
index 8ed87c5..c5a7535 100644
--- a/.x-sc_avoid_write
+++ b/.x-sc_avoid_write
@@ -1,5 +1,6 @@
 ^src/util\.c$
 ^src/xend_internal\.c$
 ^src/util-lib\.c$
+^src/libvirt\.c$
 ^qemud/qemud.c$
 ^gnulib/
diff --git a/include/libvirt/libvirt.h b/include/libvirt/libvirt.h
index 855f755..5dcecfd 100644
--- a/include/libvirt/libvirt.h
+++ b/include/libvirt/libvirt.h
@@ -110,6 +110,24 @@ typedef enum {
      VIR_DOMAIN_NONE = 0
 } virDomainCreateFlags;
 
+
+
+/**
+ * virStream:
+ *
+ * a virStream is a private structure representing a data stream.
+ */
+typedef struct _virStream virStream;
+
+/**
+ * virStreamPtr:
+ *
+ * a virStreamPtr is pointer to a virStream private structure, this is the
+ * type used to reference a data stream in the API.
+ */
+typedef virStream *virStreamPtr;
+
+
 /**
  * VIR_SECURITY_LABEL_BUFLEN:
  *
@@ -1448,6 +1466,81 @@ void virEventRegisterImpl(virEventAddHandleFunc addHandle,
                           virEventAddTimeoutFunc addTimeout,
                           virEventUpdateTimeoutFunc updateTimeout,
                           virEventRemoveTimeoutFunc removeTimeout);
+
+enum {
+    VIR_STREAM_NONBLOCK = (1 << 0),
+};
+
+virStreamPtr virStreamNew(virConnectPtr conn,
+                          unsigned int flags);
+int virStreamRef(virStreamPtr st);
+
+int virStreamSend(virStreamPtr st,
+                  const char *data,
+                  size_t nbytes);
+
+int virStreamRecv(virStreamPtr st,
+                  char *data,
+                  size_t nbytes);
+
+
+typedef int (*virStreamSourceFunc)(virStreamPtr st,
+                                   char *data,
+                                   size_t nbytes,
+                                   void *opaque);
+
+int virStreamSendAll(virStreamPtr st,
+                     virStreamSourceFunc handler,
+                     void *opaque);
+
+typedef int (*virStreamSinkFunc)(virStreamPtr st,
+                                 const char *data,
+                                 size_t nbytes,
+                                 void *opaque);
+
+int virStreamRecvAll(virStreamPtr st,
+                     virStreamSinkFunc handler,
+                     void *opaque);
+
+typedef enum {
+    VIR_STREAM_EVENT_READABLE  = (1 << 0),
+    VIR_STREAM_EVENT_WRITABLE  = (1 << 1),
+    VIR_STREAM_EVENT_ERROR     = (1 << 2),
+    VIR_STREAM_EVENT_HANGUP    = (1 << 3),
+} virStreamEventType;
+
+
+/**
+ * virStreamEventCallback:
+ *
+ * @stream: stream on which the event occurred
+ * @events: bitset of events from virEventHandleType constants
+ * @opaque: user data registered with handle
+ *
+ * Callback for receiving stream events. The callback will
+ * be invoked once for each event which is pending.
+ */
+typedef void (*virStreamEventCallback)(virStreamPtr stream, int events, void *opaque);
+
+int virStreamEventAddCallback(virStreamPtr stream,
+                              int events,
+                              virStreamEventCallback cb,
+                              void *opaque,
+                              virFreeCallback ff);
+
+int virStreamEventUpdateCallback(virStreamPtr stream,
+                                 int events);
+
+int virStreamEventRemoveCallback(virStreamPtr stream);
+
+
+int virStreamFinish(virStreamPtr st);
+int virStreamAbort(virStreamPtr st);
+
+int virStreamFree(virStreamPtr st);
+
+
+
 #ifdef __cplusplus
 }
 #endif
diff --git a/include/libvirt/libvirt.h.in b/include/libvirt/libvirt.h.in
index e6536c7..db091dc 100644
--- a/include/libvirt/libvirt.h.in
+++ b/include/libvirt/libvirt.h.in
@@ -110,6 +110,24 @@ typedef enum {
      VIR_DOMAIN_NONE = 0
 } virDomainCreateFlags;
 
+
+
+/**
+ * virStream:
+ *
+ * a virStream is a private structure representing a data stream.
+ */
+typedef struct _virStream virStream;
+
+/**
+ * virStreamPtr:
+ *
+ * a virStreamPtr is pointer to a virStream private structure, this is the
+ * type used to reference a data stream in the API.
+ */
+typedef virStream *virStreamPtr;
+
+
 /**
  * VIR_SECURITY_LABEL_BUFLEN:
  *
@@ -1448,6 +1466,81 @@ void virEventRegisterImpl(virEventAddHandleFunc addHandle,
                           virEventAddTimeoutFunc addTimeout,
                           virEventUpdateTimeoutFunc updateTimeout,
                           virEventRemoveTimeoutFunc removeTimeout);
+
+enum {
+    VIR_STREAM_NONBLOCK = (1 << 0),
+};
+
+virStreamPtr virStreamNew(virConnectPtr conn,
+                          unsigned int flags);
+int virStreamRef(virStreamPtr st);
+
+int virStreamSend(virStreamPtr st,
+                  const char *data,
+                  size_t nbytes);
+
+int virStreamRecv(virStreamPtr st,
+                  char *data,
+                  size_t nbytes);
+
+
+typedef int (*virStreamSourceFunc)(virStreamPtr st,
+                                   char *data,
+                                   size_t nbytes,
+                                   void *opaque);
+
+int virStreamSendAll(virStreamPtr st,
+                     virStreamSourceFunc handler,
+                     void *opaque);
+
+typedef int (*virStreamSinkFunc)(virStreamPtr st,
+                                 const char *data,
+                                 size_t nbytes,
+                                 void *opaque);
+
+int virStreamRecvAll(virStreamPtr st,
+                     virStreamSinkFunc handler,
+                     void *opaque);
+
+typedef enum {
+    VIR_STREAM_EVENT_READABLE  = (1 << 0),
+    VIR_STREAM_EVENT_WRITABLE  = (1 << 1),
+    VIR_STREAM_EVENT_ERROR     = (1 << 2),
+    VIR_STREAM_EVENT_HANGUP    = (1 << 3),
+} virStreamEventType;
+
+
+/**
+ * virStreamEventCallback:
+ *
+ * @stream: stream on which the event occurred
+ * @events: bitset of events from virEventHandleType constants
+ * @opaque: user data registered with handle
+ *
+ * Callback for receiving stream events. The callback will
+ * be invoked once for each event which is pending.
+ */
+typedef void (*virStreamEventCallback)(virStreamPtr stream, int events, void *opaque);
+
+int virStreamEventAddCallback(virStreamPtr stream,
+                              int events,
+                              virStreamEventCallback cb,
+                              void *opaque,
+                              virFreeCallback ff);
+
+int virStreamEventUpdateCallback(virStreamPtr stream,
+                                 int events);
+
+int virStreamEventRemoveCallback(virStreamPtr stream);
+
+
+int virStreamFinish(virStreamPtr st);
+int virStreamAbort(virStreamPtr st);
+
+int virStreamFree(virStreamPtr st);
+
+
+
 #ifdef __cplusplus
 }
 #endif
diff --git a/src/datatypes.c b/src/datatypes.c
index d03a679..3611b62 100644
--- a/src/datatypes.c
+++ b/src/datatypes.c
@@ -1129,3 +1129,62 @@ virUnrefNodeDevice(virNodeDevicePtr dev) {
     virMutexUnlock(&dev->conn->lock);
     return (refs);
 }
+
+
+virStreamPtr virGetStream(virConnectPtr conn) {
+    virStreamPtr ret = NULL;
+
+    virMutexLock(&conn->lock);
+
+    if (VIR_ALLOC(ret) < 0) {
+        virReportOOMError(conn);
+        goto error;
+    }
+    ret->magic = VIR_STREAM_MAGIC;
+    ret->conn = conn;
+    conn->refs++;
+    ret->refs++;
+    virMutexUnlock(&conn->lock);
+    return(ret);
+
+error:
+    virMutexUnlock(&conn->lock);
+    VIR_FREE(ret);
+    return(NULL);
+}
+
+static void
+virReleaseStream(virStreamPtr st) {
+    virConnectPtr conn = st->conn;
+    DEBUG("release dev %p", st);
+
+    st->magic = -1;
+    VIR_FREE(st);
+
+    DEBUG("unref connection %p %d", conn, conn->refs);
+    conn->refs--;
+    if (conn->refs == 0) {
+        virReleaseConnect(conn);
+        /* Already unlocked mutex */
+        return;
+    }
+
+    virMutexUnlock(&conn->lock);
+}
+
+int virUnrefStream(virStreamPtr st) {
+    int refs;
+
+    virMutexLock(&st->conn->lock);
+    DEBUG("unref stream %p %d", st, st->refs);
+    st->refs--;
+    refs = st->refs;
+    if (refs == 0) {
+        virReleaseStream(st);
+        /* Already unlocked mutex */
+        return (0);
+    }
+
+    virMutexUnlock(&st->conn->lock);
+    return (refs);
+}
diff --git a/src/datatypes.h b/src/datatypes.h
index da83e02..0fed07f 100644
--- a/src/datatypes.h
+++ b/src/datatypes.h
@@ -100,6 +100,17 @@
 
 
 /**
+ * VIR_STREAM_MAGIC:
+ *
+ * magic value used to protect the API when pointers to stream structures
+ * are passed down by the users.
+ */
+#define VIR_STREAM_MAGIC                   0x1DEAD666
+#define VIR_IS_STREAM(obj)                 ((obj) && (obj)->magic==VIR_STREAM_MAGIC)
+#define VIR_IS_CONNECTED_STREAM(obj)       (VIR_IS_STREAM(obj) && VIR_IS_CONNECT((obj)->conn))
+
+
+/**
  * _virConnect:
  *
  * Internal structure associated to a connection
@@ -234,6 +245,25 @@ struct _virNodeDevice {
 };
 
 
+typedef int (*virStreamAbortFunc)(virStreamPtr, void *opaque);
+typedef int (*virStreamFinishFunc)(virStreamPtr, void *opaque);
+
+/**
+ * _virStream:
+ *
+ * Internal structure associated with an input stream
+ */
+struct _virStream {
+    unsigned int magic;
+    virConnectPtr conn;
+    int refs;
+    int flags;
+
+    virStreamDriverPtr driver;
+    void *privateData;
+};
+
+
 /************************************************************************
  *									*
  *	API for domain/connections (de)allocations and lookups		*
@@ -270,4 +300,7 @@ virNodeDevicePtr virGetNodeDevice(virConnectPtr conn,
                                   const char *name);
 int virUnrefNodeDevice(virNodeDevicePtr dev);
 
+virStreamPtr virGetStream(virConnectPtr conn);
+int virUnrefStream(virStreamPtr st);
+
 #endif
diff --git a/src/driver.h b/src/driver.h
index 79d46ff..25d34b6 100644
--- a/src/driver.h
+++ b/src/driver.h
@@ -799,6 +799,40 @@ struct _virDeviceMonitor {
     virDrvNodeDeviceDestroy deviceDestroy;
 };
 
+typedef struct _virStreamDriver virStreamDriver;
+typedef virStreamDriver *virStreamDriverPtr;
+
+typedef int (*virDrvStreamSend)(virStreamPtr st,
+                                const char *data,
+                                size_t nbytes);
+typedef int (*virDrvStreamRecv)(virStreamPtr st,
+                                char *data,
+                                size_t nbytes);
+
+typedef int (*virDrvStreamEventAddCallback)(virStreamPtr stream,
+                                            int events,
+                                            virStreamEventCallback cb,
+                                            void *opaque,
+                                            virFreeCallback ff);
+
+typedef int (*virDrvStreamEventUpdateCallback)(virStreamPtr stream,
+                                               int events);
+typedef int (*virDrvStreamEventRemoveCallback)(virStreamPtr stream);
+typedef int (*virDrvStreamFinish)(virStreamPtr st);
+typedef int (*virDrvStreamAbort)(virStreamPtr st);
+
+
+struct _virStreamDriver {
+    virDrvStreamSend streamSend;
+    virDrvStreamRecv streamRecv;
+    virDrvStreamEventAddCallback streamAddCallback;
+    virDrvStreamEventUpdateCallback streamUpdateCallback;
+    virDrvStreamEventRemoveCallback streamRemoveCallback;
+    virDrvStreamFinish streamFinish;
+    virDrvStreamAbort streamAbort;
+};
+
+
 /*
  * Registration
  * TODO: also need ways to (des)activate a given driver
diff --git a/src/libvirt.c b/src/libvirt.c
index ca8e003..d6536f4 100644
--- a/src/libvirt.c
+++ b/src/libvirt.c
@@ -559,6 +559,10 @@ virLibNodeDeviceError(virNodeDevicePtr dev, virErrorNumber error,
                     errmsg, info, NULL, 0, 0, errmsg, info);
 }
 
+#define virLibStreamError(conn, code, fmt...)                   \
+    virReportErrorHelper(conn, VIR_FROM_NONE, code, __FILE__,   \
+                         __FUNCTION__, __LINE__, fmt)
+
 /**
  * virRegisterNetworkDriver:
  * @driver: pointer to a network driver block
@@ -8626,3 +8630,682 @@ error:
     virSetConnError(conn);
     return -1;
 }
+
+
+/**
+ * virStreamNew:
+ * @conn: pointer to the connection
+ * @flags: control features of the stream
+ *
+ * Creates a new stream object which can be used to perform
+ * streamed I/O with other public API function.
+ *
+ * When no longer needed, a stream object must be released
+ * with virStreamFree. If a data stream has been used,
+ * then the application must call virStreamFinish or
+ * virStreamAbort before free'ing to, in order to notify
+ * the driver of termination.
+ *
+ * If a non-blocking data stream is required passed
+ * VIR_STREAM_NONBLOCK for flags, otherwise pass 0.
+ *
+ * Returns the new stream, or NULL upon error
+ */
+virStreamPtr
+virStreamNew(virConnectPtr conn,
+             unsigned int flags)
+{
+    virStreamPtr st;
+
+    DEBUG("conn=%p, flags=%u", conn, flags);
+
+    virResetLastError();
+
+    if (!VIR_IS_CONNECT(conn)) {
+        virLibConnError(NULL, VIR_ERR_INVALID_CONN, __FUNCTION__);
+        return (NULL);
+    }
+
+    st = virGetStream(conn);
+    if (st)
+        st->flags = flags;
+
+    return st;
+}
+
+
+/**
+ * virStreamRef:
+ * @stream: pointer to the stream
+ *
+ * Increment the reference count on the stream. For each
+ * additional call to this method, there shall be a corresponding
+ * call to virStreamFree to release the reference count, once
+ * the caller no longer needs the reference to this object.
+ *
+ * Returns 0 in case of success, -1 in case of failure
+ */
+int
+virStreamRef(virStreamPtr stream)
+{
+    if ((!VIR_IS_CONNECTED_STREAM(stream))) {
+        virLibConnError(NULL, VIR_ERR_INVALID_ARG, __FUNCTION__);
+        return(-1);
+    }
+    virMutexLock(&stream->conn->lock);
+    DEBUG("stream=%p refs=%d", stream, stream->refs);
+    stream->refs++;
+    virMutexUnlock(&stream->conn->lock);
+    return 0;
+}
+
+
+/**
+ * virStreamSend:
+ * @stream: pointer to the stream object
+ * @data: buffer to write to stream
+ * @nbytes: size of @data buffer
+ *
+ * Write a series of bytes to the stream. This method may
+ * block the calling application for an arbitrary amount
+ * of time. Once an application has finished sending data
+ * it should call virStreamFinish to wait for succesful
+ * confirmation from the driver, or detect any error
+ *
+ * This method may not be used if a stream source has been
+ * registered
+ *
+ * Errors are not guaranteed to be reported synchronously
+ * with the call, but may instead be delayed until a
+ * subsequent call.
+ *
+ * A example using this with a hypothetical file upload
+ * API looks like
+ *
+ *   virStreamPtr st = virStreamNew(conn, 0);
+ *   int fd = open("demo.iso", O_RDONLY)
+ *
+ *   virConnectUploadFile(conn, "demo.iso", st);
+ *
+ *   while (1) {
+ *       char buf[1024];
+ *       int got = read(fd, buf, 1024);
+ *       if (got < 0) {
+ *          virStreamAbort(st);
+ *          break;
+ *       }
+ *       if (got == 0) {
+ *          virStreamFinish(st);
+ *          break;
+ *       }
+ *       int offset = 0;
+ *       while (offset < got) {
+ *          int sent = virStreamSend(st, buf+offset, got-offset)
+ *          if (sent < 0) {
+ *             virStreamAbort(st);
+ *             goto done;
+ *          }
+ *          offset += sent;
+ *       }
+ *   }
+ * done:
+ *   virStreamFree(st);
+ *   close(fd);
+ *
+ * Returns the number of bytes written, which may be less
+ * than requested.
+ *
+ * Returns -1 upon error, at which time the stream will
+ * be marked as aborted, and the caller should now release
+ * the stream with virStreamFree.
+ *
+ * Returns -2 if the outgoing transmit buffers are full &
+ * the stream is marked as non-blocking.
+ */
+int virStreamSend(virStreamPtr stream,
+                  const char *data,
+                  size_t nbytes)
+{
+    DEBUG("stream=%p, data=%p, nbytes=%zi", stream, data, nbytes);
+
+    virResetLastError();
+
+    if (!VIR_IS_CONNECTED_STREAM(stream)) {
+        virLibConnError(NULL, VIR_ERR_INVALID_CONN, __FUNCTION__);
+        return (-1);
+    }
+
+    if (stream->driver &&
+        stream->driver->streamSend) {
+        int ret;
+        ret = (stream->driver->streamSend)(stream, data, nbytes);
+        if (ret == -2)
+            return -2;
+
+        if (ret < 0)
+            goto error;
+        return ret;
+    }
+
+    virLibConnError(stream->conn, VIR_ERR_NO_SUPPORT, __FUNCTION__);
+error:
+    /* Copy to connection error object for back compatability */
+    virSetConnError(stream->conn);
+    return -1;
+}
+
+/**
+ * virStreamRecv:
+ * @stream: pointer to the stream object
+ * @data: buffer to write to stream
+ * @nbytes: size of @data buffer
+ *
+ * Write a series of bytes to the stream. This method may
+ * block the calling application for an arbitrary amount
+ * of time.
+ *
+ * Errors are not guaranteed to be reported synchronously
+ * with the call, but may instead be delayed until a
+ * subsequent call.
+ *
+ * A example using this with a hypothetical file download
+ * API looks like
+ *
+ *   virStreamPtr st = virStreamNew(conn, 0);
+ *   int fd = open("demo.iso", O_WRONLY, 0600)
+ *
+ *   virConnectDownloadFile(conn, "demo.iso", st);
+ *
+ *   while (1) {
+ *       char buf[1024];
+ *       int got = virStreamRecv(st, buf, 1024);
+ *       if (got < 0)
+ *          break;
+ *       if (got == 0) {
+ *          virStreamFinish(st);
+ *          break;
+ *       }
+ *       int offset = 0;
+ *       while (offset < got) {
+ *          int sent = write(fd, buf+offset, got-offset)
+ *          if (sent < 0) {
+ *             virStreamAbort(st);
+ *             goto done;
+ *          }
+ *          offset += sent;
+ *       }
+ *   }
+ * done:
+ *   virStreamFree(st);
+ *   close(fd);
+ *
+ *
+ * Returns the number of bytes read, which may be less
+ * than requested.
+ *
+ * Returns 0 when the end of the stream is reached, at
+ * which time the caller should invoke virStreamFinish()
+ * to get confirmation of stream completion.
+ *
+ * Returns -1 upon error, at which time the stream will
+ * be marked as aborted, and the caller should now release
+ * the stream with virStreamFree.
+ *
+ * Returns -2 if there is no data pending to be read & the
+ * stream is marked as non-blocking.
+ */
+int virStreamRecv(virStreamPtr stream,
+                  char *data,
+                  size_t nbytes)
+{
+    DEBUG("stream=%p, data=%p, nbytes=%zi", stream, data, nbytes);
+
+    virResetLastError();
+
+    if (!VIR_IS_CONNECTED_STREAM(stream)) {
+        virLibConnError(NULL, VIR_ERR_INVALID_CONN, __FUNCTION__);
+        return (-1);
+    }
+
+    if (stream->driver &&
+        stream->driver->streamRecv) {
+        int ret;
+        ret = (stream->driver->streamRecv)(stream, data, nbytes);
+        if (ret == -2)
+            return -2;
+
+        if (ret < 0)
+            goto error;
+        return ret;
+    }
+
+    virLibConnError(stream->conn, VIR_ERR_NO_SUPPORT, __FUNCTION__);
+error:
+    /* Copy to connection error object for back compatability */
+    virSetConnError(stream->conn);
+    return -1;
+}
+
+
+/**
+ * virStreamSendAll:
+ * @stream: pointer to the stream object
+ * @handler: source callback for reading data from application
+ * @opaque: application defined data
+ *
+ * Send the entire data stream, reading the data from the
+ * requested data source. This is simply a convenient alternative
+ * to virStreamSend, for apps that do blocking-I/o.
+ *
+ * A example using this with a hypothetical file upload
+ * API looks like
+ *
+ *   int mysource(virStreamPtr st, char *buf, int nbytes, void *opaque) {
+ *       int *fd = opaque;
+ *
+ *       return read(*fd, buf, nbytes);
+ *   }
+ *
+ *   virStreamPtr st = virStreamNew(conn, 0);
+ *   int fd = open("demo.iso", O_RDONLY)
+ *
+ *   virConnectUploadFile(conn, st);
+ *   virStreamSendAll(st, mysource, &fd);
+ *   virStreamFree(st);
+ *   close(fd);
+ *
+ * Returns 0 if all the data was succesfully sent. The stream
+ * will be marked as finished on success, so the caller need
+ * only call virStreamFree().
+ *
+ * Returns -1 upon any error, with the stream being marked as
+ * aborted, so the caller need only call virStreamFree()
+ */
+int virStreamSendAll(virStreamPtr stream,
+                     virStreamSourceFunc handler,
+                     void *opaque)
+{
+    char *bytes = NULL;
+    int want = 1024*64;
+    int ret = -1;
+    DEBUG("stream=%p, handler=%p, opaque=%p", stream, handler, opaque);
+
+    virResetLastError();
+
+    if (!VIR_IS_CONNECTED_STREAM(stream)) {
+        virLibConnError(NULL, VIR_ERR_INVALID_CONN, __FUNCTION__);
+        return (-1);
+    }
+
+    if (stream->flags & VIR_STREAM_NONBLOCK) {
+        virLibConnError(NULL, VIR_ERR_OPERATION_INVALID,
+                        _("data sources cannot be used for non-blocking streams"));
+        goto cleanup;
+    }
+
+    if (VIR_ALLOC_N(bytes, want) < 0) {
+        virReportOOMError(stream->conn);
+        goto cleanup;
+    }
+
+    for (;;) {
+        int got, offset = 0;
+        got = (handler)(stream, bytes, want, opaque);
+        if (got < 0) {
+            virStreamAbort(stream);
+            goto cleanup;
+        }
+        if (got == 0)
+            break;
+        while (offset < got) {
+            int done;
+            done = virStreamSend(stream, bytes + offset, got - offset);
+            if (done < 0)
+                goto cleanup;
+            offset += done;
+        }
+    }
+    ret = 0;
+
+cleanup:
+    VIR_FREE(bytes);
+
+    /* Copy to connection error object for back compatability */
+    if (ret != 0)
+        virSetConnError(stream->conn);
+
+    return ret;
+}
+
+
+/**
+ * virStreamRecvAll:
+ * @stream: pointer to the stream object
+ * @handler: sink callback for writing data to application
+ * @opaque: application defined data
+ *
+ * Receive the entire data stream, sending the data to the
+ * requested data sink. This is simply a convenient alternative
+ * to virStreamRecv, for apps that do blocking-I/o.
+ *
+ * A example using this with a hypothetical file download
+ * API looks like
+ *
+ *   int mysink(virStreamPtr st, const char *buf, int nbytes, void *opaque) {
+ *       int *fd = opaque;
+ *
+ *       return write(*fd, buf, nbytes);
+ *   }
+ *
+ *   virStreamPtr st = virStreamNew(conn, 0);
+ *   int fd = open("demo.iso", O_WRONLY)
+ *
+ *   virConnectUploadFile(conn, st);
+ *   virStreamRecvAll(st, mysink, &fd);
+ *   virStreamFree(st);
+ *   close(fd);
+ *
+ * Returns 0 if all the data was succesfully received. The stream
+ * will be marked as finished on success, so the caller need
+ * only call virStreamFree().
+ *
+ * Returns -1 upon any error, with the stream being marked as
+ * aborted, so the caller need only call virStreamFree()
+ */
+int virStreamRecvAll(virStreamPtr stream,
+                     virStreamSinkFunc handler,
+                     void *opaque)
+{
+    char *bytes = NULL;
+    int want = 1024*64;
+    int ret = -1;
+    DEBUG("stream=%p, handler=%p, opaque=%p", stream, handler, opaque);
+
+    virResetLastError();
+
+    if (!VIR_IS_CONNECTED_STREAM(stream)) {
+        virLibConnError(NULL, VIR_ERR_INVALID_CONN, __FUNCTION__);
+        return (-1);
+    }
+
+    if (stream->flags & VIR_STREAM_NONBLOCK) {
+        virLibConnError(NULL, VIR_ERR_OPERATION_INVALID,
+                        _("data sinks cannot be used for non-blocking streams"));
+        goto cleanup;
+    }
+
+
+    if (VIR_ALLOC_N(bytes, want) < 0) {
+        virReportOOMError(stream->conn);
+        goto cleanup;
+    }
+
+    for (;;) {
+        int got, offset = 0;
+        got = virStreamRecv(stream, bytes, want);
+        if (got < 0)
+            goto cleanup;
+        if (got == 0)
+            break;
+        while (offset < got) {
+            int done;
+            done = (handler)(stream, bytes + offset, got - offset, opaque);
+            if (done < 0) {
+                virStreamAbort(stream);
+                goto cleanup;
+            }
+            offset += done;
+        }
+    }
+    ret = 0;
+
+cleanup:
+    VIR_FREE(bytes);
+
+    /* Copy to connection error object for back compatability */
+    if (ret != 0)
+        virSetConnError(stream->conn);
+
+    return ret;
+}
+
+
+/**
+ * virStreamEventAddCallback
+ * @stream: pointer to the stream object
+ * @events: set of events to monitor
+ * @cb: callback to invoke when an event occurs
+ * @opaque: application defined data
+ * @ff: callback to free @opaque data
+ *
+ * Register a callback to be notified when a stream
+ * becomes writable, or readable. This is most commonly
+ * used in conjunction with non-blocking data streams
+ * to integrate into an event loop
+ *
+ * Return 0 on success, -1 upon error
+ */
+int virStreamEventAddCallback(virStreamPtr stream,
+                              int events,
+                              virStreamEventCallback cb,
+                              void *opaque,
+                              virFreeCallback ff)
+{
+    DEBUG("stream=%p, events=%d, cb=%p, opaque=%p, ff=%p", stream, events, cb, opaque, ff);
+
+    virResetLastError();
+
+    if (!VIR_IS_CONNECTED_STREAM(stream)) {
+        virLibConnError(NULL, VIR_ERR_INVALID_CONN, __FUNCTION__);
+        return (-1);
+    }
+
+    if (stream->driver &&
+        stream->driver->streamAddCallback) {
+        int ret;
+        ret = (stream->driver->streamAddCallback)(stream, events, cb, opaque, ff);
+        if (ret < 0)
+            goto error;
+        return ret;
+    }
+
+    virLibConnError(stream->conn, VIR_ERR_NO_SUPPORT, __FUNCTION__);
+error:
+    /* Copy to connection error object for back compatability */
+    virSetConnError(stream->conn);
+    return -1;
+}
+
+
+/**
+ * virStreamEventUpdateCallback
+ * @stream: pointer to the stream object
+ * @events: set of events to monitor
+ *
+ * Changes the set of events to monitor for a stream. This allows
+ * for event notification to be changed without having to
+ * unregister & register the callback completely. This method
+ * is guarenteed to succeed if a callback is already registered
+ *
+ * Returns 0 on success, -1 if no callback is registered
+ */
+int virStreamEventUpdateCallback(virStreamPtr stream,
+                                 int events)
+{
+    DEBUG("stream=%p, events=%d", stream, events);
+
+    virResetLastError();
+
+    if (!VIR_IS_CONNECTED_STREAM(stream)) {
+        virLibConnError(NULL, VIR_ERR_INVALID_CONN, __FUNCTION__);
+        return (-1);
+    }
+
+    if (stream->driver &&
+        stream->driver->streamUpdateCallback) {
+        int ret;
+        ret = (stream->driver->streamUpdateCallback)(stream, events);
+        if (ret < 0)
+            goto error;
+        return ret;
+    }
+
+    virLibConnError(stream->conn, VIR_ERR_NO_SUPPORT, __FUNCTION__);
+error:
+    /* Copy to connection error object for back compatability */
+    virSetConnError(stream->conn);
+    return -1;
+}
+
+
+/**
+ * virStreamEventRemoveCallback
+ * @stream: pointer to the stream object
+ *
+ * Remove a event callback from the stream
+ *
+ * Return 0 on success, -1 on error
+ */
+int virStreamEventRemoveCallback(virStreamPtr stream)
+{
+    DEBUG("stream=%p", stream);
+
+    virResetLastError();
+
+    if (!VIR_IS_CONNECTED_STREAM(stream)) {
+        virLibConnError(NULL, VIR_ERR_INVALID_CONN, __FUNCTION__);
+        return (-1);
+    }
+
+    if (stream->driver &&
+        stream->driver->streamRemoveCallback) {
+        int ret;
+        ret = (stream->driver->streamRemoveCallback)(stream);
+        if (ret < 0)
+            goto error;
+        return ret;
+    }
+
+    virLibConnError(stream->conn, VIR_ERR_NO_SUPPORT, __FUNCTION__);
+error:
+    /* Copy to connection error object for back compatability */
+    virSetConnError(stream->conn);
+    return -1;
+}
+
+
+/**
+ * virStreamFinish:
+ * @stream: pointer to the stream object
+ *
+ * Indicate that there is no further data is to be transmitted
+ * on the stream. For output streams this should be called once
+ * all data has been written. For input streams this should be
+ * called once virStreamRecv returns end-of-file.
+ *
+ * This method is a synchronization point for all asynchronous
+ * errors, so if this returns a success code the application can
+ * be sure that all data has been successfully processed.
+ *
+ * Returns 0 on success, -1 upon error
+ */
+int virStreamFinish(virStreamPtr stream)
+{
+    DEBUG("stream=%p", stream);
+
+    virResetLastError();
+
+    if (!VIR_IS_CONNECTED_STREAM(stream)) {
+        virLibConnError(NULL, VIR_ERR_INVALID_CONN, __FUNCTION__);
+        return (-1);
+    }
+
+    if (stream->driver &&
+        stream->driver->streamFinish) {
+        int ret;
+        ret = (stream->driver->streamFinish)(stream);
+        if (ret < 0)
+            goto error;
+        return ret;
+    }
+
+    virLibConnError(stream->conn, VIR_ERR_NO_SUPPORT, __FUNCTION__);
+error:
+    /* Copy to connection error object for back compatability */
+    virSetConnError(stream->conn);
+    return -1;
+}
+
+
+/**
+ * virStreamAbort:
+ * @stream: pointer to the stream object
+ *
+ * Request that the in progress data transfer be cancelled
+ * abnormally before the end of the stream has been reached.
+ * For output streams this can be used to inform the driver
+ * that the stream is being terminated early. For input
+ * streams this can be used to inform the driver that it
+ * should stop sending data.
+ *
+ * Returns 0 on success, -1 upon error
+ */
+int virStreamAbort(virStreamPtr stream)
+{
+    DEBUG("stream=%p", stream);
+
+    virResetLastError();
+
+    if (!VIR_IS_CONNECTED_STREAM(stream)) {
+        virLibConnError(NULL, VIR_ERR_INVALID_CONN, __FUNCTION__);
+        return (-1);
+    }
+
+    if (stream->driver &&
+        stream->driver->streamAbort) {
+        int ret;
+        ret = (stream->driver->streamAbort)(stream);
+        if (ret < 0)
+            goto error;
+        return ret;
+    }
+
+    virLibConnError(stream->conn, VIR_ERR_NO_SUPPORT, __FUNCTION__);
+error:
+    /* Copy to connection error object for back compatability */
+    virSetConnError(stream->conn);
+    return -1;
+}
+
+
+/**
+ * virStreamFree:
+ * @stream: pointer to the stream object
+ *
+ * Decrement the reference count on a stream, releasing
+ * the stream object if the reference count has hit zero.
+ *
+ * There must not be a active data transfer in progress
+ * when releasing the stream. If a stream needs to be
+ * disposed of prior to end of stream being reached, then
+ * the virStreamAbort function should be called first.
+ *
+ * Returns 0 upon success, or -1 on error
+ */
+int virStreamFree(virStreamPtr stream)
+{
+    DEBUG("stream=%p", stream);
+
+    virResetLastError();
+
+    if (!VIR_IS_CONNECTED_STREAM(stream)) {
+        virLibConnError(NULL, VIR_ERR_INVALID_CONN, __FUNCTION__);
+        return (-1);
+    }
+
+    /* XXX Enforce shutdown before free'ing resources ? */
+
+    if (virUnrefStream(stream) < 0)
+        return (-1);
+    return (0);
+}
diff --git a/src/libvirt_private.syms b/src/libvirt_private.syms
index 2bf4e15..12d552e 100644
--- a/src/libvirt_private.syms
+++ b/src/libvirt_private.syms
@@ -56,6 +56,8 @@ virUnrefStorageVol;
 virGetNodeDevice;
 virUnrefDomain;
 virUnrefConnect;
+virGetStream;
+virUnrefStream;
 
 
 # domain_conf.h
diff --git a/src/libvirt_public.syms b/src/libvirt_public.syms
index c06f51e..f48b8c5 100644
--- a/src/libvirt_public.syms
+++ b/src/libvirt_public.syms
@@ -291,4 +291,19 @@ LIBVIRT_0.7.0 {
 	virConnectListDefinedInterfaces;
 } LIBVIRT_0.6.4;
 
+LIBVIRT_0.7.1 {
+	virStreamNew;
+	virStreamRef;
+	virStreamSend;
+	virStreamRecv;
+	virStreamSendAll;
+	virStreamRecvAll;
+	virStreamEventAddCallback;
+	virStreamEventUpdateCallback;
+	virStreamEventRemoveCallback;
+	virStreamFinish;
+	virStreamAbort;
+	virStreamFree;
+} LIBVIRT_0.7.0;
+
 # .... define new API here using predicted next version number ....
-- 
1.6.2.5




More information about the libvir-list mailing list