[libvirt] [PATCH] Allow non-blocking message sending on virNetClient

Daniel P. Berrange berrange at redhat.com
Tue Nov 8 18:20:02 UTC 2011


From: "Daniel P. Berrange" <berrange at redhat.com>

Split the existing virNetClientSend into two parts
virNetClientSend and virNetClientSendNoReply, instead
of having a 'bool expectReply' parameter.

Add a new virNetClientSendNonBlock which returns 2 on
full send, 1 on partial send, 0 on no send, -1 on error

If a partial send occurs, then a subsequent call to any
of the virNetClientSend* APIs will finish any outstanding
I/O.

TODO: the virNetClientEvent event handler could be used
to speed up completion of partial sends if an event loop
is present.

* src/rpc/virnetclientprogram.c, src/rpc/virnetclientstream.c:
  Update for changed API
---
 src/rpc/virnetclient.c        |  249 +++++++++++++++++++++++++++++++++-------
 src/rpc/virnetclient.h        |   12 ++-
 src/rpc/virnetclientprogram.c |    2 +-
 src/rpc/virnetclientstream.c  |   11 ++-
 4 files changed, 224 insertions(+), 50 deletions(-)

diff --git a/src/rpc/virnetclient.c b/src/rpc/virnetclient.c
index 4b7d4a9..b0ed507 100644
--- a/src/rpc/virnetclient.c
+++ b/src/rpc/virnetclient.c
@@ -55,6 +55,8 @@ struct _virNetClientCall {
 
     virNetMessagePtr msg;
     bool expectReply;
+    bool nonBlock;
+    bool haveThread;
 
     virCond cond;
 
@@ -86,8 +88,15 @@ struct _virNetClient {
     int wakeupSendFD;
     int wakeupReadFD;
 
-    /* List of threads currently waiting for dispatch */
+    /*
+     * List of calls currently waiting for dispatch
+     * The calls should all have threads waiting for
+     * them, except possibly the first call in the list
+     * which might be a partially sent non-blocking call.
+     */
     virNetClientCallPtr waitDispatch;
+    /* Whether a thread is dispatching */
+    bool haveTheBuck;
 
     size_t nstreams;
     virNetClientStreamPtr *streams;
@@ -555,7 +564,7 @@ virNetClientCallDispatchReply(virNetClientPtr client)
     virNetClientCallPtr thecall;
 
     /* Ok, definitely got an RPC reply now find
-       out who's been waiting for it */
+       out which waiting call is associated with it */
     thecall = client->waitDispatch;
     while (thecall &&
            !(thecall->msg->header.prog == client->msg.header.prog &&
@@ -896,10 +905,31 @@ virNetClientIOHandleInput(virNetClientPtr client)
 }
 
 
+static void virNetClientPassTheBuck(virNetClientPtr client)
+{
+    virNetClientCallPtr tmp = client->waitDispatch;
+
+    /* See if someone else is still waiting
+     * and if so, then pass the buck ! */
+    while (tmp) {
+        if (tmp->haveThread) {
+            VIR_DEBUG("Passing the buck to %p", tmp);
+            virCondSignal(&tmp->cond);
+            return;
+        }
+        tmp = tmp->next;
+    }
+    VIR_DEBUG("No thread to pass the buck to");
+}
+
+
 /*
  * Process all calls pending dispatch/receive until we
  * get a reply to our own call. Then quit and pass the buck
  * to someone else.
+ *
+ * Returns 2 if fully sent, 1 if partially sent (only for nonBlock==true),
+ * 0 if nothing sent (only for nonBlock==true) and -1 on error
  */
 static int virNetClientIOEventLoop(virNetClientPtr client,
                                    virNetClientCallPtr thiscall)
@@ -924,6 +954,12 @@ static int virNetClientIOEventLoop(virNetClientPtr client,
         if (virNetSocketHasCachedData(client->sock))
             timeout = 0;
 
+        /* If we're a non-blocking call, then we don't
+         * want to wait for I/O readyness
+         */
+        if (thiscall->nonBlock)
+            timeout = 0;
+
         fds[0].events = fds[0].revents = 0;
         fds[1].events = fds[1].revents = 0;
 
@@ -975,8 +1011,34 @@ static int virNetClientIOEventLoop(virNetClientPtr client,
         /* If we have existing SASL decoded data, pretend
          * the socket became readable so we consume it
          */
-        if (virNetSocketHasCachedData(client->sock))
+        if (virNetSocketHasCachedData(client->sock)) {
             fds[0].revents |= POLLIN;
+        } else if (ret == 0 && thiscall->nonBlock) {
+            if (thiscall->msg->bufferOffset == 0) {
+                /* No data sent at all, remove ourselves from the list */
+                tmp = client->waitDispatch;
+                prev = NULL;
+                while (tmp) {
+                    if (tmp == thiscall) {
+                        if (prev) {
+                            prev->next = thiscall->next;
+                        } else {
+                            client->waitDispatch = thiscall->next;
+                        }
+                        break;
+                    }
+                    prev = tmp;
+                    tmp = tmp->next;
+                }
+                VIR_DEBUG("Giving up the buck %p %p", thiscall, client->waitDispatch);
+                virNetClientPassTheBuck(client);
+                return 0;
+            } else {
+                VIR_DEBUG("Giving up the buck %p %p", thiscall, client->waitDispatch);
+                virNetClientPassTheBuck(client);
+                return 1; /* partial send */
+            }
+        }
 
         if (fds[1].revents) {
             VIR_DEBUG("Woken up from poll by other thread");
@@ -988,6 +1050,7 @@ static int virNetClientIOEventLoop(virNetClientPtr client,
         }
 
         if (ret < 0) {
+            /* XXX what's this dubious errno check doing ? */
             if (errno == EWOULDBLOCK)
                 continue;
             virReportSystemError(errno,
@@ -1005,8 +1068,8 @@ static int virNetClientIOEventLoop(virNetClientPtr client,
                 goto error;
         }
 
-        /* Iterate through waiting threads and if
-         * any are complete then tell 'em to wakeup
+        /* Iterate through waiting calls and if any are
+         * complete, remove them from the dispatch list..
          */
         tmp = client->waitDispatch;
         prev = NULL;
@@ -1019,13 +1082,25 @@ static int virNetClientIOEventLoop(virNetClientPtr client,
                 else
                     client->waitDispatch = tmp->next;
 
-                /* And wake them up....
-                 * ...they won't actually wakeup until
+                /*
+                 * ...if the call being removed from the list
+                 * still has a thread, then wake that thread up,
+                 * otherwise free the call. The latter should
+                 * only happen for calls without replies.
+                 *
+                 * ...the threads won't actually wakeup until
                  * we release our mutex a short while
                  * later...
                  */
-                VIR_DEBUG("Waking up sleep %p %p", tmp, client->waitDispatch);
-                virCondSignal(&tmp->cond);
+                if (tmp->haveThread) {
+                    VIR_DEBUG("Waking up sleep %p %p", tmp, client->waitDispatch);
+                    virCondSignal(&tmp->cond);
+                } else {
+                    if (tmp->expectReply)
+                        VIR_WARN("Got a call expecting a reply but without a waiting thread");
+                    ignore_value(virCondDestroy(&tmp->cond));
+                    VIR_FREE(tmp);
+                }
             } else {
                 prev = tmp;
             }
@@ -1039,13 +1114,8 @@ static int virNetClientIOEventLoop(virNetClientPtr client,
              */
             client->waitDispatch = thiscall->next;
             VIR_DEBUG("Giving up the buck %p %p", thiscall, client->waitDispatch);
-            /* See if someone else is still waiting
-             * and if so, then pass the buck ! */
-            if (client->waitDispatch) {
-                VIR_DEBUG("Passing the buck to %p", client->waitDispatch);
-                virCondSignal(&client->waitDispatch->cond);
-            }
-            return 0;
+            virNetClientPassTheBuck(client);
+            return 2;
         }
 
 
@@ -1060,16 +1130,21 @@ static int virNetClientIOEventLoop(virNetClientPtr client,
 error:
     client->waitDispatch = thiscall->next;
     VIR_DEBUG("Giving up the buck due to I/O error %p %p", thiscall, client->waitDispatch);
-    /* See if someone else is still waiting
-     * and if so, then pass the buck ! */
-    if (client->waitDispatch) {
-        VIR_DEBUG("Passing the buck to %p", client->waitDispatch);
-        virCondSignal(&client->waitDispatch->cond);
-    }
+    virNetClientPassTheBuck(client);
     return -1;
 }
 
 
+static void
+virNetClientUpdateIOCallback(virNetClientPtr client, bool enabled)
+{
+    int events = 0;
+    if (enabled) {
+        events |= VIR_EVENT_HANDLE_READABLE;
+    }
+    virNetSocketUpdateIOCallback(client->sock, events);
+}
+
 /*
  * This function sends a message to remote server and awaits a reply
  *
@@ -1102,11 +1177,15 @@ error:
  *    nation are blamed on another, providing an opportunity for war."
  *
  * NB(5) Don't Panic!
+ *
+ * Returns 2 if fully sent, 1 if partially sent (only for nonBlock==true),
+ * 0 if nothing sent (only for nonBlock==true) and -1 on error
  */
 static int virNetClientIO(virNetClientPtr client,
                           virNetClientCallPtr thiscall)
 {
     int rv = -1;
+    virNetClientCallPtr tmp;
 
     VIR_DEBUG("Outgoing message prog=%u version=%u serial=%u proc=%d type=%d length=%zu dispatch=%p",
               thiscall->msg->header.prog,
@@ -1117,20 +1196,27 @@ static int virNetClientIO(virNetClientPtr client,
               thiscall->msg->bufferLength,
               client->waitDispatch);
 
+    /* Trivially detect blocking if someone else has the buck already */
+    if (client->haveTheBuck &&
+        thiscall->nonBlock)
+        return 0;
+
+    /* Stick ourselves on the end of the wait queue */
+    tmp = client->waitDispatch;
+    while (tmp && tmp->next)
+        tmp = tmp->next;
+    if (tmp)
+        tmp->next = thiscall;
+    else
+        client->waitDispatch = thiscall;
+
     /* Check to see if another thread is dispatching */
-    if (client->waitDispatch) {
-        /* Stick ourselves on the end of the wait queue */
-        virNetClientCallPtr tmp = client->waitDispatch;
+    if (client->haveTheBuck) {
         char ignore = 1;
-        while (tmp && tmp->next)
-            tmp = tmp->next;
-        if (tmp)
-            tmp->next = thiscall;
-        else
-            client->waitDispatch = thiscall;
 
         /* Force other thread to wakeup from poll */
         if (safewrite(client->wakeupSendFD, &ignore, sizeof(ignore)) != sizeof(ignore)) {
+            /* Something went wrong, so we need to remove that call we just added */
             if (tmp)
                 tmp->next = NULL;
             else
@@ -1143,6 +1229,7 @@ static int virNetClientIO(virNetClientPtr client,
         VIR_DEBUG("Going to sleep %p %p", client->waitDispatch, thiscall);
         /* Go to sleep while other thread is working... */
         if (virCondWait(&thiscall->cond, &client->lock) < 0) {
+            /* Something went wrong, so we need to remove that call we previously added */
             if (client->waitDispatch == thiscall) {
                 client->waitDispatch = thiscall->next;
             } else {
@@ -1167,7 +1254,7 @@ static int virNetClientIO(virNetClientPtr client,
          *     our reply
          */
         if (thiscall->mode == VIR_NET_CLIENT_MODE_COMPLETE) {
-            rv = 0;
+            rv = 2;
             /*
              * We avoided catching the buck and our reply is ready !
              * We've already had 'thiscall' removed from the list
@@ -1177,12 +1264,10 @@ static int virNetClientIO(virNetClientPtr client,
         }
 
         /* Grr, someone passed the buck onto us ... */
-
-    } else {
-        /* We're first to catch the buck */
-        client->waitDispatch = thiscall;
     }
 
+    client->haveTheBuck = true;
+
     VIR_DEBUG("We have the buck %p %p", client->waitDispatch, thiscall);
     /*
      * The buck stops here!
@@ -1198,17 +1283,19 @@ static int virNetClientIO(virNetClientPtr client,
      * cause the event loop thread to be blocked on the
      * mutex for the duration of the call
      */
-    virNetSocketUpdateIOCallback(client->sock, 0);
+    virNetClientUpdateIOCallback(client, false);
 
     virResetLastError();
     rv = virNetClientIOEventLoop(client, thiscall);
 
-    virNetSocketUpdateIOCallback(client->sock, VIR_EVENT_HANDLE_READABLE);
+    virNetClientUpdateIOCallback(client, true);
 
     if (rv == 0 &&
         virGetLastError())
         rv = -1;
 
+    client->haveTheBuck = false;
+
 cleanup:
     VIR_DEBUG("All done with our call %p %p %d", client->waitDispatch, thiscall, rv);
     return rv;
@@ -1227,7 +1314,7 @@ void virNetClientIncomingEvent(virNetSocketPtr sock,
         goto done;
 
     /* This should be impossible, but it doesn't hurt to check */
-    if (client->waitDispatch)
+    if (client->haveTheBuck)
         goto done;
 
     VIR_DEBUG("Event fired %p %d", sock, events);
@@ -1249,9 +1336,14 @@ done:
 }
 
 
-int virNetClientSend(virNetClientPtr client,
-                     virNetMessagePtr msg,
-                     bool expectReply)
+/*
+ * Returns 2 if fully sent, 1 if partially sent (only for nonBlock==true),
+ * 0 if nothing sent (only for nonBlock==true) and -1 on error
+ */
+static int virNetClientSendInternal(virNetClientPtr client,
+                                    virNetMessagePtr msg,
+                                    bool expectReply,
+                                    bool nonBlock)
 {
     virNetClientCallPtr call;
     int ret = -1;
@@ -1270,6 +1362,12 @@ int virNetClientSend(virNetClientPtr client,
         return -1;
     }
 
+    if (expectReply && nonBlock) {
+        virNetError(VIR_ERR_INTERNAL_ERROR, "%s",
+                    _("Attempt to send an non-blocking message with a synchronous reply"));
+        return -1;
+    }
+
     if (VIR_ALLOC(call) < 0) {
         virReportOOMError();
         return -1;
@@ -1290,12 +1388,75 @@ int virNetClientSend(virNetClientPtr client,
         call->mode = VIR_NET_CLIENT_MODE_WAIT_RX;
     call->msg = msg;
     call->expectReply = expectReply;
+    call->nonBlock = nonBlock;
+    call->haveThread = true;
 
     ret = virNetClientIO(client, call);
 
 cleanup:
-    ignore_value(virCondDestroy(&call->cond));
-    VIR_FREE(call);
+    /* If partially sent, then the call is still on the dispatch queue */
+    if (ret == 1) {
+        call->haveThread = false;
+    } else {
+        ignore_value(virCondDestroy(&call->cond));
+        VIR_FREE(call);
+    }
     virNetClientUnlock(client);
     return ret;
 }
+
+
+/*
+ * @msg: a message allocated on heap or stack
+ *
+ * Send a message synchronously, and wait for the reply synchronously
+ *
+ * The caller is responsible for free'ing @msg if it was allocated
+ * on the heap
+ *
+ * Returns 0 on success, -1 on failure
+ */
+int virNetClientSend(virNetClientPtr client,
+                     virNetMessagePtr msg)
+{
+    int ret = virNetClientSendInternal(client, msg, true, false);
+    if (ret < 0)
+        return -1;
+    return 0;
+}
+
+
+/*
+ * @msg: a message allocated on heap or stack
+ *
+ * Send a message synchronously, without any reply
+ *
+ * The caller is responsible for free'ing @msg if it was allocated
+ * on the heap
+ *
+ * Returns 0 on success, -1 on failure
+ */
+int virNetClientSendNoReply(virNetClientPtr client,
+                            virNetMessagePtr msg)
+{
+    int ret = virNetClientSendInternal(client, msg, false, false);
+    if (ret < 0)
+        return -1;
+    return 0;
+}
+
+/*
+ * @msg: a message allocated on the heap.
+ *
+ * Send a message asynchronously, without any reply
+ *
+ * The caller is responsible for free'ing @msg, *except* if
+ * this method returns -1.
+ *
+ * Returns 2 on full send, 1 on partial send, 0 on no send, -1 on error
+ */
+int virNetClientSendNonBlock(virNetClientPtr client,
+                             virNetMessagePtr msg)
+{
+    return virNetClientSendInternal(client, msg, false, true);
+}
diff --git a/src/rpc/virnetclient.h b/src/rpc/virnetclient.h
index fb679e8..d3c112a 100644
--- a/src/rpc/virnetclient.h
+++ b/src/rpc/virnetclient.h
@@ -67,9 +67,17 @@ int virNetClientAddStream(virNetClientPtr client,
 void virNetClientRemoveStream(virNetClientPtr client,
                               virNetClientStreamPtr st);
 
+/* Send a message and wait for reply */
 int virNetClientSend(virNetClientPtr client,
-                     virNetMessagePtr msg,
-                     bool expectReply);
+                     virNetMessagePtr msg);
+
+/* Send a message without needing a reply */
+int virNetClientSendNoReply(virNetClientPtr client,
+                            virNetMessagePtr msg);
+
+/* Send a message without needing a reply, and don't block on I/O */
+int virNetClientSendNonBlock(virNetClientPtr client,
+                             virNetMessagePtr msg);
 
 # ifdef HAVE_SASL
 void virNetClientSetSASLSession(virNetClientPtr client,
diff --git a/src/rpc/virnetclientprogram.c b/src/rpc/virnetclientprogram.c
index 36e2384..cb02a25 100644
--- a/src/rpc/virnetclientprogram.c
+++ b/src/rpc/virnetclientprogram.c
@@ -327,7 +327,7 @@ int virNetClientProgramCall(virNetClientProgramPtr prog,
     if (virNetMessageEncodePayload(msg, args_filter, args) < 0)
         goto error;
 
-    if (virNetClientSend(client, msg, true) < 0)
+    if (virNetClientSend(client, msg) < 0)
         goto error;
 
     /* None of these 3 should ever happen here, because
diff --git a/src/rpc/virnetclientstream.c b/src/rpc/virnetclientstream.c
index 7e2d9ae..309d48d 100644
--- a/src/rpc/virnetclientstream.c
+++ b/src/rpc/virnetclientstream.c
@@ -361,8 +361,13 @@ int virNetClientStreamSendPacket(virNetClientStreamPtr st,
         wantReply = true;
     }
 
-    if (virNetClientSend(client, msg, wantReply) < 0)
-        goto error;
+    if (wantReply) {
+        if (virNetClientSend(client, msg) < 0)
+            goto error;
+    } else {
+        if (virNetClientSendNoReply(client, msg) < 0)
+            goto error;
+    }
 
     virNetMessageFree(msg);
 
@@ -407,7 +412,7 @@ int virNetClientStreamRecvPacket(virNetClientStreamPtr st,
 
         VIR_DEBUG("Dummy packet to wait for stream data");
         virMutexUnlock(&st->lock);
-        ret = virNetClientSend(client, msg, true);
+        ret = virNetClientSend(client, msg);
         virMutexLock(&st->lock);
         virNetMessageFree(msg);
 
-- 
1.7.6.4




More information about the libvir-list mailing list