[libvirt] [PATCH 06/11] Handle data streams in remote client

Daniel P. Berrange berrange at redhat.com
Mon Aug 24 20:51:09 UTC 2009


* src/remote_internal.c: Add helper APIs for processing data streams
---
 src/remote_internal.c |  530 ++++++++++++++++++++++++++++++++++++++++++++++++-
 1 files changed, 524 insertions(+), 6 deletions(-)

diff --git a/src/remote_internal.c b/src/remote_internal.c
index de3c288..12e3bb9 100644
--- a/src/remote_internal.c
+++ b/src/remote_internal.c
@@ -1,4 +1,3 @@
-
 /*
  * remote_internal.c: driver to provide access to libvirtd running
  *   on a remote machine
@@ -111,7 +110,8 @@ enum {
 struct remote_thread_call {
     int mode;
 
-    /* 4 byte length, followed by RPC message header+body */
+    /* Buffer for outgoing data packet
+     * 4 byte length, followed by RPC message header+body */
     char buffer[4 + REMOTE_MESSAGE_MAX];
     unsigned int bufferLength;
     unsigned int bufferOffset;
@@ -121,6 +121,7 @@ struct remote_thread_call {
 
     virCond cond;
 
+    int want_reply;
     xdrproc_t ret_filter;
     char *ret;
 
@@ -129,6 +130,26 @@ struct remote_thread_call {
     struct remote_thread_call *next;
 };
 
+struct private_stream_data {
+    unsigned int has_error : 1;
+    remote_error err;
+
+    unsigned int serial;
+    unsigned int proc_nr;
+
+    /* XXX this is potentially unbounded if the client
+     * app has domain events registered, since packets
+     * may be read off wire, while app isn't ready to
+     * recv them. Figure out how to address this some
+     * time....
+     */
+    char *incoming;
+    unsigned int incomingOffset;
+    unsigned int incomingLength;
+
+    struct private_stream_data *next;
+};
+
 struct private_data {
     virMutex lock;
 
@@ -155,7 +176,8 @@ struct private_data {
     unsigned int saslEncodedOffset;
 #endif
 
-    /* 4 byte length, followed by RPC message header+body */
+    /* Buffer for incoming data packets
+     * 4 byte length, followed by RPC message header+body */
     char buffer[4 + REMOTE_MESSAGE_MAX];
     unsigned int bufferLength;
     unsigned int bufferOffset;
@@ -176,6 +198,8 @@ struct private_data {
 
     /* List of threads currently waiting for dispatch */
     struct remote_thread_call *waitDispatch;
+
+    struct private_stream_data *streams;
 };
 
 enum {
@@ -194,6 +218,10 @@ static void remoteDriverUnlock(struct private_data *driver)
     virMutexUnlock(&driver->lock);
 }
 
+static int remoteIO(virConnectPtr conn,
+                    struct private_data *priv,
+                    int flags,
+                    struct remote_thread_call *thiscall);
 static int call (virConnectPtr conn, struct private_data *priv,
                  int flags, int proc_nr,
                  xdrproc_t args_filter, char *args,
@@ -6319,6 +6347,361 @@ done:
     return rv;
 }
 
+
+#if 0
+static struct private_stream_data *
+remoteStreamOpen(virStreamPtr st,
+                 int output ATTRIBUTE_UNUSED,
+                 unsigned int proc_nr,
+                 unsigned int serial)
+{
+    struct private_data *priv = st->conn->privateData;
+    struct private_stream_data *stpriv;
+
+    if (VIR_ALLOC(stpriv) < 0)
+        return NULL;
+
+    /* Initialize call object used to receive replies */
+    stpriv->proc_nr = proc_nr;
+    stpriv->serial = serial;
+
+    stpriv->next = priv->streams;
+    priv->streams = stpriv;
+
+    return stpriv;
+}
+
+
+static int
+remoteStreamPacket(virStreamPtr st,
+                   int status,
+                   const char *data,
+                   size_t nbytes)
+{
+    DEBUG("st=%p status=%d data=%p nbytes=%d", st, status, data, nbytes);
+    struct private_data *priv = st->conn->privateData;
+    struct private_stream_data *privst = st->privateData;
+    XDR xdr;
+    struct remote_thread_call *thiscall;
+    remote_message_header hdr;
+
+    memset(&hdr, 0, sizeof hdr);
+
+    if (VIR_ALLOC(thiscall) < 0) {
+        virReportOOMError(st->conn);
+        return -1;
+    }
+
+    thiscall->mode = REMOTE_MODE_WAIT_TX;
+    thiscall->serial = privst->serial;
+    thiscall->proc_nr = privst->proc_nr;
+    if (status == REMOTE_OK ||
+        status == REMOTE_ERROR)
+        thiscall->want_reply = 1;
+
+    if (virCondInit(&thiscall->cond) < 0) {
+        VIR_FREE(thiscall);
+        error (st->conn, VIR_ERR_INTERNAL_ERROR,
+               _("cannot initialize mutex"));
+        return -1;
+    }
+
+    /* Don't fill in any other fields in 'thiscall' since
+     * we're not expecting a reply for this */
+
+    hdr.prog = REMOTE_PROGRAM;
+    hdr.vers = REMOTE_PROTOCOL_VERSION;
+    hdr.proc = privst->proc_nr;
+    hdr.type = REMOTE_STREAM;
+    hdr.serial = privst->serial;
+    hdr.status = status;
+
+
+    /* Length must include the length word itself (always encoded in
+     * 4 bytes as per RFC 4506), so offset start length. We write this
+     * later.
+     */
+    thiscall->bufferLength = REMOTE_MESSAGE_HEADER_XDR_LEN;
+
+    /* Serialise header followed by args. */
+    xdrmem_create (&xdr, thiscall->buffer + thiscall->bufferLength,
+                   REMOTE_MESSAGE_MAX, XDR_ENCODE);
+    if (!xdr_remote_message_header (&xdr, &hdr)) {
+        error (st->conn,
+               VIR_ERR_RPC, _("xdr_remote_message_header failed"));
+        goto error;
+    }
+
+    thiscall->bufferLength += xdr_getpos (&xdr);
+    xdr_destroy (&xdr);
+
+    if (status == REMOTE_CONTINUE) {
+        if (((4 + REMOTE_MESSAGE_MAX) - thiscall->bufferLength) < nbytes) {
+            errorf(st->conn,
+                   VIR_ERR_RPC, _("data size %d too large for payload %d"),
+                   nbytes, ((4 + REMOTE_MESSAGE_MAX) - thiscall->bufferLength));
+            goto error;
+        }
+
+        memcpy(thiscall->buffer + thiscall->bufferLength, data, nbytes);
+        thiscall->bufferLength += nbytes;
+    }
+
+    /* Go back to packet start and encode the length word. */
+    xdrmem_create (&xdr, thiscall->buffer, REMOTE_MESSAGE_HEADER_XDR_LEN, XDR_ENCODE);
+    if (!xdr_u_int (&xdr, &thiscall->bufferLength)) {
+        error(st->conn, VIR_ERR_RPC,
+               _("xdr_u_int (length word)"));
+        goto error;
+    }
+    xdr_destroy (&xdr);
+
+    /* remoteIO frees 'thiscall' for us (XXX that's dubious semantics) */
+    if (remoteIO(st->conn, priv, 0, thiscall) < 0)
+        return -1;
+
+    return nbytes;
+
+error:
+    xdr_destroy (&xdr);
+    VIR_FREE(thiscall);
+    return -1;
+}
+
+static int
+remoteStreamHasError(virStreamPtr st) {
+    struct private_stream_data *privst = st->privateData;
+    if (!privst->has_error) {
+        return 0;
+    }
+
+    VIR_WARN0("Raising async error");
+    virRaiseErrorFull(st->conn,
+                      __FILE__, __FUNCTION__, __LINE__,
+                      privst->err.domain,
+                      privst->err.code,
+                      privst->err.level,
+                      privst->err.str1 ? *privst->err.str1 : NULL,
+                      privst->err.str2 ? *privst->err.str2 : NULL,
+                      privst->err.str3 ? *privst->err.str3 : NULL,
+                      privst->err.int1,
+                      privst->err.int2,
+                      "%s", privst->err.message ? *privst->err.message : NULL);
+
+    return 1;
+}
+
+static void
+remoteStreamRelease(virStreamPtr st)
+{
+    struct private_data *priv = st->conn->privateData;
+    struct private_stream_data *privst = st->privateData;
+
+    if (priv->streams == privst)
+        priv->streams = privst->next;
+    else {
+        struct private_stream_data *tmp = priv->streams;
+        while (tmp && tmp->next) {
+            if (tmp->next == privst) {
+                tmp->next = privst->next;
+                break;
+            }
+        }
+    }
+
+    if (privst->has_error)
+        xdr_free((xdrproc_t)xdr_remote_error,  (char *)&privst->err);
+
+    VIR_FREE(privst);
+
+    st->driver = NULL;
+    st->privateData = NULL;
+}
+
+
+static int
+remoteStreamSend(virStreamPtr st,
+                 const char *data,
+                 size_t nbytes)
+{
+    DEBUG("st=%p data=%p nbytes=%d", st, data, nbytes);
+    struct private_data *priv = st->conn->privateData;
+    int rv = -1;
+
+    remoteDriverLock(priv);
+
+    if (remoteStreamHasError(st))
+        goto cleanup;
+
+    rv = remoteStreamPacket(st,
+                            REMOTE_CONTINUE,
+                            data,
+                            nbytes);
+
+cleanup:
+    if (rv == -1)
+        remoteStreamRelease(st);
+
+    remoteDriverUnlock(priv);
+
+    return rv;
+}
+
+
+static int
+remoteStreamRecv(virStreamPtr st,
+                 char *data,
+                 size_t nbytes)
+{
+    DEBUG("st=%p data=%p nbytes=%d", st, data, nbytes);
+    struct private_data *priv = st->conn->privateData;
+    struct private_stream_data *privst = st->privateData;
+    int rv = -1;
+
+    remoteDriverLock(priv);
+
+    if (remoteStreamHasError(st))
+        goto cleanup;
+
+    if (!privst->incomingOffset) {
+        struct remote_thread_call *thiscall;
+
+        if (VIR_ALLOC(thiscall) < 0) {
+            virReportOOMError(st->conn);
+            goto cleanup;
+        }
+
+        /* We're not really doing an RPC calls, so we're
+         * skipping straight to RX part */
+        thiscall->mode = REMOTE_MODE_WAIT_RX;
+        thiscall->serial = privst->serial;
+        thiscall->proc_nr = privst->proc_nr;
+        thiscall->want_reply = 1;
+
+        if (virCondInit(&thiscall->cond) < 0) {
+            VIR_FREE(thiscall);
+            error (st->conn, VIR_ERR_INTERNAL_ERROR,
+                   _("cannot initialize mutex"));
+            goto cleanup;
+        }
+
+        /* remoteIO frees 'thiscall' for us (XXX that's dubious semantics) */
+        if (remoteIO(st->conn, priv, 0, thiscall) < 0)
+            goto cleanup;
+    }
+
+    DEBUG("After IO %d", privst->incomingOffset);
+    if (privst->incomingOffset) {
+        int want = privst->incomingOffset;
+        if (want > nbytes)
+            want = nbytes;
+        memcpy(data, privst->incoming, want);
+        if (want < privst->incomingOffset) {
+            memmove(privst->incoming, privst->incoming + want, privst->incomingOffset - want);
+            privst->incomingOffset -= want;
+        } else {
+            VIR_FREE(privst->incoming);
+            privst->incomingOffset = privst->incomingLength = 0;
+        }
+        rv = want;
+    } else {
+        rv = 0;
+    }
+
+    DEBUG("Done %d", rv);
+
+cleanup:
+    if (rv == -1)
+        remoteStreamRelease(st);
+    remoteDriverUnlock(priv);
+
+    return rv;
+}
+
+static int
+remoteStreamEventAddCallback(virStreamPtr stream ATTRIBUTE_UNUSED,
+                             int events ATTRIBUTE_UNUSED,
+                             virStreamEventCallback cb ATTRIBUTE_UNUSED,
+                             void *opaque ATTRIBUTE_UNUSED,
+                             virFreeCallback ff ATTRIBUTE_UNUSED)
+{
+    return -1;
+}
+
+static int
+remoteStreamEventUpdateCallback(virStreamPtr stream ATTRIBUTE_UNUSED,
+                                int events ATTRIBUTE_UNUSED)
+{
+    return -1;
+}
+
+
+static int
+remoteStreamEventRemoveCallback(virStreamPtr stream ATTRIBUTE_UNUSED)
+{
+    return -1;
+}
+
+static int
+remoteStreamFinish(virStreamPtr st)
+{
+    struct private_data *priv = st->conn->privateData;
+    int ret = -1;
+
+    remoteDriverLock(priv);
+
+    if (remoteStreamHasError(st))
+        goto cleanup;
+
+    ret = remoteStreamPacket(st,
+                             REMOTE_OK,
+                             NULL,
+                             0);
+
+cleanup:
+    remoteStreamRelease(st);
+
+    remoteDriverUnlock(priv);
+    return ret;
+}
+
+static int
+remoteStreamAbort(virStreamPtr st)
+{
+    struct private_data *priv = st->conn->privateData;
+    int ret = -1;
+
+    remoteDriverLock(priv);
+
+    if (remoteStreamHasError(st))
+        goto cleanup;
+
+    ret = remoteStreamPacket(st,
+                             REMOTE_ERROR,
+                             NULL,
+                             0);
+
+cleanup:
+    remoteStreamRelease(st);
+
+    remoteDriverUnlock(priv);
+    return ret;
+}
+
+
+
+static virStreamDriver remoteStreamDrv = {
+    .streamRecv = remoteStreamRecv,
+    .streamSend = remoteStreamSend,
+    .streamFinish = remoteStreamFinish,
+    .streamAbort = remoteStreamAbort,
+    .streamAddCallback = remoteStreamEventAddCallback,
+    .streamUpdateCallback = remoteStreamEventUpdateCallback,
+    .streamRemoveCallback = remoteStreamEventRemoveCallback,
+};
+#endif
+
+
 /*----------------------------------------------------------------------*/
 
 
@@ -6350,6 +6733,7 @@ prepareCall(virConnectPtr conn,
     rv->proc_nr = proc_nr;
     rv->ret_filter = ret_filter;
     rv->ret = ret;
+    rv->want_reply = 1;
 
     hdr.prog = REMOTE_PROGRAM;
     hdr.vers = REMOTE_PROTOCOL_VERSION;
@@ -6535,7 +6919,10 @@ remoteIOWriteMessage(virConnectPtr conn,
         if (priv->saslEncodedOffset == priv->saslEncodedLength) {
             priv->saslEncoded = NULL;
             priv->saslEncodedOffset = priv->saslEncodedLength = 0;
-            thecall->mode = REMOTE_MODE_WAIT_RX;
+            if (thecall->want_reply)
+                thecall->mode = REMOTE_MODE_WAIT_RX;
+            else
+                thecall->mode = REMOTE_MODE_COMPLETE;
         }
     } else {
 #endif
@@ -6549,7 +6936,10 @@ remoteIOWriteMessage(virConnectPtr conn,
 
         if (thecall->bufferOffset == thecall->bufferLength) {
             thecall->bufferOffset = thecall->bufferLength = 0;
-            thecall->mode = REMOTE_MODE_WAIT_RX;
+            if (thecall->want_reply)
+                thecall->mode = REMOTE_MODE_WAIT_RX;
+            else
+                thecall->mode = REMOTE_MODE_COMPLETE;
         }
 #if HAVE_SASL
     }
@@ -6703,6 +7093,12 @@ processCallDispatchMessage(virConnectPtr conn, struct private_data *priv,
                            remote_message_header *hdr,
                            XDR *xdr);
 
+static int
+processCallDispatchStream(virConnectPtr conn, struct private_data *priv,
+                          int in_open,
+                          remote_message_header *hdr,
+                          XDR *xdr);
+
 
 static int
 processCallDispatch(virConnectPtr conn, struct private_data *priv,
@@ -6712,14 +7108,19 @@ processCallDispatch(virConnectPtr conn, struct private_data *priv,
     int len = priv->bufferLength - 4;
     int rv = -1;
 
+    /* Length word has already been read */
+    priv->bufferOffset = 4;
+
     /* Deserialise reply header. */
-    xdrmem_create (&xdr, priv->buffer + 4, len, XDR_DECODE);
+    xdrmem_create (&xdr, priv->buffer + priv->bufferOffset, len, XDR_DECODE);
     if (!xdr_remote_message_header (&xdr, &hdr)) {
         error (in_open ? NULL : conn,
                VIR_ERR_RPC, _("invalid header in reply"));
         return -1;
     }
 
+    priv->bufferOffset += xdr_getpos(&xdr);
+
     /* Check program, version, etc. are what we expect. */
     if (hdr.prog != REMOTE_PROGRAM) {
         virRaiseError (in_open ? NULL : conn,
@@ -6738,6 +7139,7 @@ processCallDispatch(virConnectPtr conn, struct private_data *priv,
         return -1;
     }
 
+
     switch (hdr.type) {
     case REMOTE_REPLY: /* Normal RPC replies */
         rv = processCallDispatchReply(conn, priv, in_open,
@@ -6749,6 +7151,11 @@ processCallDispatch(virConnectPtr conn, struct private_data *priv,
                                         &hdr, &xdr);
         break;
 
+    case REMOTE_STREAM: /* Stream protocol */
+        rv = processCallDispatchStream(conn, priv, in_open,
+                                       &hdr, &xdr);
+        break;
+
     default:
          virRaiseError (in_open ? NULL : conn,
                         NULL, NULL, VIR_FROM_REMOTE,
@@ -6811,6 +7218,7 @@ processCallDispatchReply(virConnectPtr conn, struct private_data *priv,
         return 0;
 
     case REMOTE_ERROR:
+        VIR_WARN0("Method call error");
         memset (&thecall->err, 0, sizeof thecall->err);
         if (!xdr_remote_error (xdr, &thecall->err)) {
             error (in_open ? NULL : conn,
@@ -6854,6 +7262,113 @@ processCallDispatchMessage(virConnectPtr conn, struct private_data *priv,
     return 0;
 }
 
+static int
+processCallDispatchStream(virConnectPtr conn ATTRIBUTE_UNUSED,
+                          struct private_data *priv,
+                          int in_open ATTRIBUTE_UNUSED,
+                          remote_message_header *hdr,
+                          XDR *xdr) {
+    struct private_stream_data *privst;
+    struct remote_thread_call *thecall;
+
+    /* Try and find a matching stream */
+    privst = priv->streams;
+    while (privst &&
+           privst->serial != hdr->serial &&
+           privst->proc_nr != hdr->proc)
+        privst = privst->next;
+
+    if (!privst) {
+        VIR_WARN("No registered stream matching serial=%d, proc=%d",
+                 hdr->serial, hdr->proc);
+        return -1;
+    }
+
+    /* See if there's also a (optional) call waiting for this reply */
+    thecall = priv->waitDispatch;
+    while (thecall &&
+           thecall->serial != hdr->serial)
+        thecall = thecall->next;
+
+
+    /* Status is either REMOTE_OK (meaning that what follows is a ret
+     * structure), or REMOTE_ERROR (and what follows is a remote_error
+     * structure).
+     */
+    switch (hdr->status) {
+    case REMOTE_CONTINUE: {
+        int avail = privst->incomingLength - privst->incomingOffset;
+        int need = priv->bufferLength - priv->bufferOffset;
+        VIR_WARN0("Got a stream data packet");
+
+        /* XXX flag stream as complete somwhere if need==0 */
+
+        if (need > avail) {
+            int extra = need - avail;
+            if (VIR_REALLOC_N(privst->incoming,
+                              privst->incomingLength + extra) < 0) {
+                VIR_WARN0("Out of memory");
+                return -1;
+            }
+            privst->incomingLength += extra;
+        }
+
+        memcpy(privst->incoming + privst->incomingOffset,
+               priv->buffer + priv->bufferOffset,
+               priv->bufferLength - priv->bufferOffset);
+        privst->incomingOffset += (priv->bufferLength - priv->bufferOffset);
+
+        if (thecall && thecall->want_reply) {
+            VIR_WARN("Got sync data packet offset=%d", privst->incomingOffset);
+            thecall->mode = REMOTE_MODE_COMPLETE;
+        } else {
+            VIR_WARN("Got aysnc data packet offset=%d", privst->incomingOffset);
+        }
+        return 0;
+    }
+
+    case REMOTE_OK:
+        VIR_WARN0("Got a synchronous confirm");
+        if (!thecall) {
+            VIR_WARN0("Got unexpected stream finish confirmation");
+            return -1;
+        }
+        thecall->mode = REMOTE_MODE_COMPLETE;
+        return 0;
+
+    case REMOTE_ERROR:
+        if (thecall && thecall->want_reply) {
+            VIR_WARN0("Got a synchronous error");
+            /* Give the error straight to this call */
+            memset (&thecall->err, 0, sizeof thecall->err);
+            if (!xdr_remote_error (xdr, &thecall->err)) {
+                error (in_open ? NULL : conn,
+                       VIR_ERR_RPC, _("unmarshalling remote_error"));
+                return -1;
+            }
+            thecall->mode = REMOTE_MODE_ERROR;
+        } else {
+            VIR_WARN0("Got a asynchronous error");
+            /* No call, so queue the error against the stream */
+            if (privst->has_error) {
+                VIR_WARN0("Got unexpected duplicate stream error");
+                return -1;
+            }
+            privst->has_error = 1;
+            memset (&privst->err, 0, sizeof privst->err);
+            if (!xdr_remote_error (xdr, &privst->err)) {
+                VIR_WARN0("Failed to unmarshall error");
+                return -1;
+            }
+        }
+        return 0;
+
+    default:
+        VIR_WARN("Stream with unexpected serial=%d, proc=%d, status=%d",
+                 hdr->serial, hdr->proc, hdr->status);
+        return -1;
+    }
+}
 
 static int
 remoteIOHandleInput(virConnectPtr conn, struct private_data *priv,
@@ -6934,6 +7449,9 @@ remoteIOEventLoop(virConnectPtr conn,
             tmp = tmp->next;
         }
 
+        if (priv->streams)
+            fds[0].events |= POLLIN;
+
         /* Release lock while poll'ing so other threads
          * can stuff themselves on the queue */
         remoteDriverUnlock(priv);
-- 
1.6.2.5




More information about the libvir-list mailing list