[libvirt] [PATCH 04/11] Handle incoming data streams in libvirtd

Daniel P. Berrange berrange at redhat.com
Mon Aug 24 20:51:07 UTC 2009


* qemud/stream.c: Handle incoming stream data packets, queuing until
  stream becomes writable. Handle stream completion handshake
* po/POTFILES.in: Add qemud/stream.c
---
 po/POTFILES.in |    1 +
 qemud/stream.c |  305 +++++++++++++++++++++++++++++++++++++++++++++++++++++++-
 2 files changed, 303 insertions(+), 3 deletions(-)

diff --git a/po/POTFILES.in b/po/POTFILES.in
index 66d3ebd..d144689 100644
--- a/po/POTFILES.in
+++ b/po/POTFILES.in
@@ -1,6 +1,7 @@
 qemud/dispatch.c
 qemud/qemud.c
 qemud/remote.c
+qemud/stream.c
 src/bridge.c
 src/conf.c
 src/console.c
diff --git a/qemud/stream.c b/qemud/stream.c
index 1644a1b..1fe0e58 100644
--- a/qemud/stream.c
+++ b/qemud/stream.c
@@ -28,6 +28,93 @@
 #include "dispatch.h"
 #include "logging.h"
 
+static int
+remoteStreamHandleWrite(struct qemud_client *client,
+                        struct qemud_client_stream *stream);
+static int
+remoteStreamHandleFinish(struct qemud_client *client,
+                         struct qemud_client_stream *stream,
+                         struct qemud_client_message *msg);
+static int
+remoteStreamHandleAbort(struct qemud_client *client,
+                        struct qemud_client_stream *stream,
+                        struct qemud_client_message *msg);
+
+
+
+static void
+remoteStreamUpdateEvents(struct qemud_client_stream *stream)
+{
+    int newEvents = 0;
+    if (stream->rx)
+        newEvents |= VIR_STREAM_EVENT_WRITABLE;
+
+    virStreamEventUpdateCallback(stream->st, newEvents);
+}
+
+
+/*
+ * Callback that gets invoked when a stream becomes writable/readable
+ */
+static void
+remoteStreamEvent(virStreamPtr st, int events, void *opaque)
+{
+    struct qemud_client *client = opaque;
+    struct qemud_client_stream *stream;
+
+    /* XXX sub-optimal - we really should be taking the server lock
+     * first, but we have no handle to the server object
+     * We're lucky to get away with it for now, due to this callback
+     * executing in the main thread, but this should really be fixed
+     */
+    virMutexLock(&client->lock);
+
+    stream = remoteFindClientStream(client, st);
+
+    if (!stream) {
+        VIR_WARN("event for client=%p stream st=%p, but missing stream state", client, st);
+        virStreamEventRemoveCallback(st);
+        goto cleanup;
+    }
+
+    DEBUG("st=%p events=%d", st, events);
+
+    if (events & VIR_STREAM_EVENT_WRITABLE) {
+        if (remoteStreamHandleWrite(client, stream) < 0) {
+            remoteRemoveClientStream(client, stream);
+            qemudDispatchClientFailure(client);
+            goto cleanup;
+        }
+    }
+
+    if (!stream->closed &&
+        (events & (VIR_STREAM_EVENT_ERROR | VIR_STREAM_EVENT_HANGUP))) {
+        int ret;
+        remote_error rerr;
+        memset(&rerr, 0, sizeof rerr);
+        stream->closed = 1;
+        virStreamAbort(stream->st);
+        if (events & VIR_STREAM_EVENT_HANGUP)
+            remoteDispatchFormatError(&rerr, "%s", _("stream had unexpected termination"));
+        else
+            remoteDispatchFormatError(&rerr, "%s", _("stream had I/O failure"));
+        ret = remoteSerializeStreamError(client, &rerr, stream->procedure, stream->serial);
+        remoteRemoveClientStream(client, stream);
+        if (ret < 0)
+            qemudDispatchClientFailure(client);
+        goto cleanup;
+    }
+
+    if (stream->closed) {
+        remoteRemoveClientStream(client, stream);
+    } else {
+        remoteStreamUpdateEvents(stream);
+    }
+
+cleanup:
+    virMutexUnlock(&client->lock);
+}
+
 
 /*
  * @client: a locked client object
@@ -38,10 +125,54 @@
  * -1 on fatal client error
  */
 static int
-remoteStreamFilter(struct qemud_client *client ATTRIBUTE_UNUSED,
-                   struct qemud_client_message *msg ATTRIBUTE_UNUSED,
-                   void *opaque ATTRIBUTE_UNUSED)
+remoteStreamFilter(struct qemud_client *client,
+                   struct qemud_client_message *msg, void *opaque)
 {
+    struct qemud_client_stream *stream = opaque;
+
+    if (msg->hdr.serial == stream->serial &&
+        msg->hdr.proc == stream->procedure &&
+        msg->hdr.type == REMOTE_STREAM) {
+        DEBUG("Incoming rx=%p serial=%d proc=%d status=%d",
+              stream->rx, msg->hdr.proc, msg->hdr.serial, msg->hdr.status);
+
+        /* If there are queued packets, we need to queue all further
+         * messages, since they must be processed strictly in order.
+         * If there are no queued packets, then OK/ERROR messages
+         * should be processed immediately. Data packets are still
+         * queued to only be processed when the stream is marked as
+         * writable.
+         */
+        if (stream->rx) {
+            qemudClientMessageQueuePush(&stream->rx, msg);
+            remoteStreamUpdateEvents(stream);
+        } else {
+            int ret = 0;
+            switch (msg->hdr.status) {
+            case REMOTE_OK:
+                ret = remoteStreamHandleFinish(client, stream, msg);
+                if (ret == 0)
+                    qemudClientMessageRelease(client, msg);
+                break;
+
+            case REMOTE_CONTINUE:
+                qemudClientMessageQueuePush(&stream->rx, msg);
+                remoteStreamUpdateEvents(stream);
+                break;
+
+            case REMOTE_ERROR:
+            default:
+                ret = remoteStreamHandleAbort(client, stream, msg);
+                if (ret == 0)
+                    qemudClientMessageRelease(client, msg);
+                break;
+            }
+
+            if (ret < 0)
+                return -1;
+        }
+        return 1;
+    }
     return 0;
 }
 
@@ -119,6 +250,10 @@ int remoteAddClientStream(struct qemud_client *client,
 
     DEBUG("client=%p proc=%d serial=%d", client, stream->procedure, stream->serial);
 
+    if (virStreamEventAddCallback(stream->st, 0,
+                                  remoteStreamEvent, client, NULL) < 0)
+        return -1;
+
     if (tmp) {
         while (tmp->next)
             tmp = tmp->next;
@@ -132,6 +267,8 @@ int remoteAddClientStream(struct qemud_client *client,
 
     stream->tx = 1;
 
+    remoteStreamUpdateEvents(stream);
+
     return 0;
 }
 
@@ -208,3 +345,165 @@ remoteRemoveClientStream(struct qemud_client *client,
     }
     return -1;
 }
+
+
+/*
+ * Returns:
+ *   -1  if fatal error occurred
+ *    0  if message was fully processed
+ *    1  if message is still being processed
+ */
+static int
+remoteStreamHandleWriteData(struct qemud_client *client,
+                            struct qemud_client_stream *stream,
+                            struct qemud_client_message *msg)
+{
+    remote_error rerr;
+    int ret;
+
+    DEBUG("stream=%p proc=%d serial=%d len=%d offset=%d",
+          stream, msg->hdr.proc, msg->hdr.serial, msg->bufferLength, msg->bufferOffset);
+
+    memset(&rerr, 0, sizeof rerr);
+
+    ret = virStreamSend(stream->st,
+                        msg->buffer + msg->bufferOffset,
+                        msg->bufferLength - msg->bufferOffset);
+
+    if (ret > 0) {
+        msg->bufferOffset += ret;
+
+        /* Partial write, so indicate we have more todo later */
+        if (msg->bufferOffset < msg->bufferLength)
+            return 1;
+    } else if (ret == -2) {
+        /* Blocking, so indicate we have more todo later */
+        return 1;
+    } else {
+        VIR_INFO0("Stream send failed");
+        stream->closed = 1;
+        remoteDispatchConnError(&rerr, client->conn);
+        return remoteSerializeReplyError(client, &rerr, &msg->hdr);
+    }
+
+    return 0;
+}
+
+
+/*
+ * Process an finish handshake from the client.
+ *
+ * Returns a REMOTE_OK confirmation if successful, or a REMOTE_ERROR
+ * if there was a stream error
+ *
+ * Returns 0 if successfully sent RPC reply, -1 upon fatal error
+ */
+static int
+remoteStreamHandleFinish(struct qemud_client *client,
+                         struct qemud_client_stream *stream,
+                         struct qemud_client_message *msg)
+{
+    remote_error rerr;
+    int ret;
+
+    DEBUG("stream=%p proc=%d serial=%d",
+          stream, msg->hdr.proc, msg->hdr.serial);
+
+    memset(&rerr, 0, sizeof rerr);
+
+    stream->closed = 1;
+    ret = virStreamFinish(stream->st);
+
+    if (ret < 0) {
+        remoteDispatchConnError(&rerr, client->conn);
+        return remoteSerializeReplyError(client, &rerr, &msg->hdr);
+    } else {
+        /* Send zero-length confirm */
+        if (remoteSendStreamData(client, stream, NULL, 0) < 0)
+            return -1;
+    }
+
+    return 0;
+}
+
+
+/*
+ * Process an abort request from the client.
+ *
+ * Returns 0 if successfully aborted, -1 upon error
+ */
+static int
+remoteStreamHandleAbort(struct qemud_client *client,
+                        struct qemud_client_stream *stream,
+                        struct qemud_client_message *msg)
+{
+    remote_error rerr;
+
+    DEBUG("stream=%p proc=%d serial=%d",
+          stream, msg->hdr.proc, msg->hdr.serial);
+
+    memset(&rerr, 0, sizeof rerr);
+
+    stream->closed = 1;
+    virStreamAbort(stream->st);
+
+    if (msg->hdr.status == REMOTE_ERROR)
+        remoteDispatchFormatError(&rerr, "%s", _("stream aborted at client request"));
+    else {
+        VIR_WARN("unexpected stream status %d", msg->hdr.status);
+        remoteDispatchFormatError(&rerr, _("stream aborted with unexpected status %d"),
+                                  msg->hdr.status);
+    }
+
+    return remoteSerializeReplyError(client, &rerr, &msg->hdr);
+}
+
+
+
+/*
+ * Called when the stream is signalled has being able to accept
+ * data writes. Will process all pending incoming messages
+ * until they're all gone, or I/O blocks
+ *
+ * Returns 0 on success, or -1 upon fatal error
+ */
+static int
+remoteStreamHandleWrite(struct qemud_client *client,
+                        struct qemud_client_stream *stream)
+{
+    struct qemud_client_message *msg, *tmp;
+
+    DEBUG("stream=%p", stream);
+
+    msg = stream->rx;
+    while (msg && !stream->closed) {
+        int ret;
+        switch (msg->hdr.status) {
+        case REMOTE_OK:
+            ret = remoteStreamHandleFinish(client, stream, msg);
+            break;
+
+        case REMOTE_CONTINUE:
+            ret = remoteStreamHandleWriteData(client, stream, msg);
+            break;
+
+        case REMOTE_ERROR:
+        default:
+            ret = remoteStreamHandleAbort(client, stream, msg);
+            break;
+        }
+
+        if (ret == 0)
+            qemudClientMessageQueueServe(&stream->rx);
+        else if (ret < 0)
+            return -1;
+        else
+            break; /* still processing data */
+
+        tmp = msg->next;
+        qemudClientMessageRelease(client, msg);
+        msg = tmp;
+    }
+
+    return 0;
+}
-- 
1.6.2.5




More information about the libvir-list mailing list