[libvirt] [PATCH 02/10] Support callbacks on virStream APIs in remote driver client

Daniel P. Berrange berrange at redhat.com
Mon Nov 1 16:11:56 UTC 2010


The current remote driver code for streams only supports
blocking I/O mode. This is fine for the usage with migration
but is a problem for more general use cases, in particular
bi-directional streams.

This adds supported for the stream callbacks and non-blocking
I/O. with the minor caveat is that it doesn't actually do
non-blocking I/O for sending stream data, only receiving it.
A future patch will try to do non-blocking sends, but this is
quite tricky to get right.

* src/remote/remote_driver.c: Allow non-blocking I/O for
  streams and support callbacks
---
 src/remote/remote_driver.c |  188 ++++++++++++++++++++++++++++++++++++++++----
 1 files changed, 172 insertions(+), 16 deletions(-)

diff --git a/src/remote/remote_driver.c b/src/remote/remote_driver.c
index 1c874b2..61da8ff 100644
--- a/src/remote/remote_driver.c
+++ b/src/remote/remote_driver.c
@@ -132,6 +132,13 @@ struct private_stream_data {
     unsigned int serial;
     unsigned int proc_nr;
 
+    virStreamEventCallback cb;
+    void *cbOpaque;
+    virFreeCallback cbFree;
+    int cbEvents;
+    int cbTimer;
+    int cbDispatch;
+
     /* XXX this is potentially unbounded if the client
      * app has domain events registered, since packets
      * may be read off wire, while app isn't ready to
@@ -200,9 +207,10 @@ struct private_data {
 };
 
 enum {
-    REMOTE_CALL_IN_OPEN = (1 << 0),
+    REMOTE_CALL_IN_OPEN           = (1 << 0),
     REMOTE_CALL_QUIET_MISSING_RPC = (1 << 1),
-    REMOTE_QEMU_CALL = (1 << 2),
+    REMOTE_CALL_QEMU              = (1 << 2),
+    REMOTE_CALL_NONBLOCK          = (1 << 3),
 };
 
 
@@ -8144,6 +8152,20 @@ remoteStreamOpen(virStreamPtr st,
 }
 
 
+static void
+remoteStreamEventTimerUpdate(struct private_stream_data *privst)
+{
+    if (!privst->cb)
+        return;
+
+    if (!privst->cbEvents)
+        virEventUpdateTimeout(privst->cbTimer, -1);
+    else if (privst->incoming &&
+             (privst->cbEvents & VIR_STREAM_EVENT_READABLE))
+        virEventUpdateTimeout(privst->cbTimer, 0);
+}
+
+
 static int
 remoteStreamPacket(virStreamPtr st,
                    int status,
@@ -8338,6 +8360,12 @@ remoteStreamRecv(virStreamPtr st,
         struct remote_thread_call *thiscall;
         int ret;
 
+        if (st->flags & VIR_STREAM_NONBLOCK) {
+            DEBUG0("Non-blocking mode and no data available");
+            rv = -2;
+            goto cleanup;
+        }
+
         if (VIR_ALLOC(thiscall) < 0) {
             virReportOOMError();
             goto cleanup;
@@ -8381,6 +8409,8 @@ remoteStreamRecv(virStreamPtr st,
         rv = 0;
     }
 
+    remoteStreamEventTimerUpdate(privst);
+
     DEBUG("Done %d", rv);
 
 cleanup:
@@ -8391,28 +8421,153 @@ cleanup:
     return rv;
 }
 
+
+static void
+remoteStreamEventTimer(int timer ATTRIBUTE_UNUSED, void *opaque)
+{
+    virStreamPtr st = opaque;
+    struct private_data *priv = st->conn->privateData;
+    struct private_stream_data *privst = st->privateData;
+
+    remoteDriverLock(priv);
+    if (privst->cb &&
+        (privst->cbEvents & VIR_STREAM_EVENT_READABLE) &&
+        privst->incomingOffset) {
+        virStreamEventCallback cb = privst->cb;
+        void *cbOpaque = privst->cbOpaque;
+        virFreeCallback cbFree = privst->cbFree;
+
+        privst->cbDispatch = 1;
+        remoteDriverUnlock(priv);
+        (cb)(st, VIR_STREAM_EVENT_READABLE, cbOpaque);
+        remoteDriverLock(priv);
+        privst->cbDispatch = 0;
+
+        if (!privst->cb && cbFree)
+            (cbFree)(cbOpaque);
+    }
+    remoteDriverUnlock(priv);
+}
+
+
+static void
+remoteStreamEventTimerFree(void *opaque)
+{
+    virStreamPtr st = opaque;
+    virUnrefStream(st);
+}
+
+
 static int
-remoteStreamEventAddCallback(virStreamPtr stream ATTRIBUTE_UNUSED,
-                             int events ATTRIBUTE_UNUSED,
-                             virStreamEventCallback cb ATTRIBUTE_UNUSED,
-                             void *opaque ATTRIBUTE_UNUSED,
-                             virFreeCallback ff ATTRIBUTE_UNUSED)
+remoteStreamEventAddCallback(virStreamPtr st,
+                             int events,
+                             virStreamEventCallback cb,
+                             void *opaque,
+                             virFreeCallback ff)
 {
-    return -1;
+    struct private_data *priv = st->conn->privateData;
+    struct private_stream_data *privst = st->privateData;
+    int ret = -1;
+
+    remoteDriverLock(priv);
+
+    if (events & ~VIR_STREAM_EVENT_READABLE) {
+        remoteError(VIR_ERR_INTERNAL_ERROR,
+                    _("unsupported stream events %d"), events);
+        goto cleanup;
+    }
+
+    if (privst->cb) {
+        remoteError(VIR_ERR_INTERNAL_ERROR,
+                    _("multiple stream callbacks not supported"));
+        goto cleanup;
+    }
+
+    virStreamRef(st);
+    if ((privst->cbTimer =
+         virEventAddTimeout(-1,
+                            remoteStreamEventTimer,
+                            st,
+                            remoteStreamEventTimerFree)) < 0) {
+        virUnrefStream(st);
+        goto cleanup;
+    }
+
+    privst->cb = cb;
+    privst->cbOpaque = opaque;
+    privst->cbFree = ff;
+    privst->cbEvents = events;
+
+    ret = 0;
+
+cleanup:
+    remoteDriverUnlock(priv);
+    return ret;
 }
 
 static int
-remoteStreamEventUpdateCallback(virStreamPtr stream ATTRIBUTE_UNUSED,
-                                int events ATTRIBUTE_UNUSED)
+remoteStreamEventUpdateCallback(virStreamPtr st,
+                                int events)
 {
-    return -1;
+    struct private_data *priv = st->conn->privateData;
+    struct private_stream_data *privst = st->privateData;
+    int ret = -1;
+
+    remoteDriverLock(priv);
+
+    if (events & ~VIR_STREAM_EVENT_READABLE) {
+        remoteError(VIR_ERR_INTERNAL_ERROR,
+                    _("unsupported stream events %d"), events);
+        goto cleanup;
+    }
+
+    if (!privst->cb) {
+        remoteError(VIR_ERR_INTERNAL_ERROR,
+                    _("no stream callback registered"));
+        goto cleanup;
+    }
+
+    privst->cbEvents = events;
+
+    remoteStreamEventTimerUpdate(privst);
+
+    ret = 0;
+
+cleanup:
+    remoteDriverUnlock(priv);
+    return ret;
 }
 
 
 static int
-remoteStreamEventRemoveCallback(virStreamPtr stream ATTRIBUTE_UNUSED)
+remoteStreamEventRemoveCallback(virStreamPtr st)
 {
-    return -1;
+    struct private_data *priv = st->conn->privateData;
+    struct private_stream_data *privst = st->privateData;
+    int ret = -1;
+
+    remoteDriverLock(priv);
+
+    if (!privst->cb) {
+        remoteError(VIR_ERR_INTERNAL_ERROR,
+                    _("no stream callback registered"));
+        goto cleanup;
+    }
+
+    if (!privst->cbDispatch &&
+        privst->cbFree)
+        (privst->cbFree)(privst->cbOpaque);
+    privst->cb = NULL;
+    privst->cbOpaque = NULL;
+    privst->cbFree = NULL;
+    privst->cbEvents = 0;
+    virEventRemoveTimeout(privst->cbTimer);
+
+    ret = 0;
+
+cleanup:
+    remoteDriverUnlock(priv);
+    return ret;
 }
 
 static int
@@ -9065,7 +9220,7 @@ remoteQemuDomainMonitorCommand (virDomainPtr domain, const char *cmd,
     args.flags = flags;
 
     memset (&ret, 0, sizeof ret);
-    if (call (domain->conn, priv, REMOTE_QEMU_CALL, QEMU_PROC_MONITOR_COMMAND,
+    if (call (domain->conn, priv, REMOTE_CALL_QEMU, QEMU_PROC_MONITOR_COMMAND,
               (xdrproc_t) xdr_qemu_monitor_command_args, (char *) &args,
               (xdrproc_t) xdr_qemu_monitor_command_ret, (char *) &ret) == -1)
         goto done;
@@ -9119,7 +9274,7 @@ prepareCall(struct private_data *priv,
     rv->ret = ret;
     rv->want_reply = 1;
 
-    if (flags & REMOTE_QEMU_CALL) {
+    if (flags & REMOTE_CALL_QEMU) {
         hdr.prog = QEMU_PROGRAM;
         hdr.vers = QEMU_PROTOCOL_VERSION;
     }
@@ -9512,7 +9667,7 @@ processCallDispatch(virConnectPtr conn, struct private_data *priv,
 
     expectedprog = REMOTE_PROGRAM;
     expectedvers = REMOTE_PROTOCOL_VERSION;
-    if (flags & REMOTE_QEMU_CALL) {
+    if (flags & REMOTE_CALL_QEMU) {
         expectedprog = QEMU_PROGRAM;
         expectedvers = QEMU_PROTOCOL_VERSION;
     }
@@ -9738,6 +9893,7 @@ processCallDispatchStream(virConnectPtr conn ATTRIBUTE_UNUSED,
             thecall->mode = REMOTE_MODE_COMPLETE;
         } else {
             VIR_WARN("Got aysnc data packet offset=%d", privst->incomingOffset);
+            remoteStreamEventTimerUpdate(privst);
         }
         return 0;
     }
-- 
1.7.2.3




More information about the libvir-list mailing list