[libvirt] PATCH 6/5: Remote driver concurrent RPC calls

Daniel P. Berrange berrange at redhat.com
Tue Dec 9 22:36:49 UTC 2008


This patch is the final one for multi-threading in the remote driver.

With the first 5 patches applied the remote driver is thread safe, but
everything is still serialized, since the call() method which does the
network I/O blocks until the reply is fully received, and holds the
mutex while blocking. This is sub-optimal :-)

We fundamentally only have a single network socket open, so we have to
make sure only one thread is ever doing network I/O in each direction.
The GNUTLS and SASL encryption state is also only safe to use from a
single thread.

This means we can't simply let the call() methods run in parallel. While
one thread is waiting to receive it RPC reply message, there's no reason
why the RPC requests from other threads can't be sent out on the wire.
So called pipelining of requests. In fact, since we have a unique serial
number for every request/reply there's no reason why replies need come
back in the same order as they're sent. This provides optimal concurrency


All the complexity starts off in the call() method. From the caller's
point of view, this method takes an RPC request, sends it, waits for
the reply message, and returns the data. ie it is blocking.

The caller of call() is required to be holding the driver mutex.

Upon entering the call() there are two possible scenarios

 - No thread is currently doing network I/O.
 - Another thread is doing network I/O

In the first scenario, this thread takes immediate responsibilty for 
doing all network I/O until such time as its finished its own RPC
call. This thread is said to be "holding the buck" 

In the second scenario, this thread delegates responsibility for doing
I/O to the existing thread, and puts itself to sleep. Someone else
is "holding the buck"


While the thread holding the buck is waiting for its reply to arrive,
it releases the driver mutex. This allows other threads to enter the
call method and queue up their requests. Once the first thread has
finished sending its initial request data, it will it start to send
requests from other queued threads. It may even start to process other
threads replies, and/or asynchronous events. Eventually its own reply
will arrive.

At this point the first thread is all done. There are again two possible
scenarios for it to consider:

 - No other threads were sleeping waiting for it to finish
 - One or more threads were sleeping waiting for it to finish

If no threads were waiting, it just returns, giving up the buck.

If one or more threads were sleeping, then it picks the first sleeping
thread and wakes it up. This is 'passing the buck'.

>From the point of view of a sleeping thread, there are also two possible
scenarios when it gets woken up

 - It is being woken because the other thread has got its reply
   ready.
 - It is being woken because the other threadis passing it the buck

In the first case, the thread being woken is done with its call and
can just return from call() to its caller.

In the second case, the thread being woken up is now "holding the buck'
and has to take over responsibility for all network I/O, until its desired
reply arrives


To deal with all this 'buck passing', we need to keep various bits of
state around about each thread's RPC call. This is done in the struct
remote_thread_call.

  struct remote_thread_call {
    /* This indicated whether this call is being sent, being
       received, completed successfully, or failed with error */
    int mode;

    /* The data being sent on the wire, the length, and progress */
    /* 4 byte length, followed by RPC message headerbody */
    char buffer[4  REMOTE_MESSAGE_MAX];
    unsigned int bufferLength;
    unsigned int bufferOffset;

    /* The serial number & procedure number, so we can match up
       its reply when it arrives */
    unsigned int serial;
    unsigned int proc_nr;

    /* The condition the thread sleeps on while another thread is
       holding the buck */
    pthread_cond_t cond;

    /* Used to de-serialize the XDR object into the return values */
    xdrproc_t ret_filter;
    /* Struct to hold the return values. Yes, evil XDR casts */
    char *ret;

    /* If the 'mode' indicated  an error, this stores it */
    remote_error err;

    /* The next thread waiting to dispatch, if any */
    struct remote_thread_call *next;
  };


As more and more threads arrie in the call() method, they're all queued
up to have their requests processed in FIFO order for sake of fairness.
Replies are processed in any order - whatever they arrive in, though the
libvirtd daemon currently does strict FIFO ordering for replies.


Since we need to release the mutex while waiting for I/O, we can no longer
use blocking I/O on the socket. So its now put into non-blocking mode and
then we use poll() to watch for readability / writability. We need the
remote driver to work on Windows, but I'm hoping GNULIB's poll() function
will work well enough for this not to be a problem


Since this is a work in progress patch, there's quite a bit of debugging
in it. I've also done a stupid modification to the 'virNodeGetInfo'
impl for the QEMU driver to sleep for 5 seconds to demonstrate overlapping
operations, and another stupid modification to virsh to spawn a thread 
that calls virNodeGetInfo in a loop. This just demonstrates I've got the
hand-off between threads working in the RPC call() method. Obviously
not to be comitted.

Running   virsh -c qemu:///system shows the two threads working in
lock-step. In this case the background thread has started its virNodeGetInfo
call and then I typed 'list' in the foreground thread


DEBUG: libvirt.c: virConnectClone (conn=0x72e110)
DEBUG: libvirt.c: virNodeGetInfo (conn=0x72fb40, info=0x7f5ee6025030)
DEBUG: remote_internal.c: call (Doing call 6 (nil))
Welcome to lt-virsh, the virtualization interactive terminal.

Type:  'help' for help with commands
       'quit' to quit

DEBUG: remote_internal.c: call (We have the buck 6 0x7380f0 0x7380f0)

     ^ at this point the BG thread is waiting for its reply...

virsh # list
DEBUG: libvirt.c: virConnectNumOfDomains (conn=0x72e110)
DEBUG: remote_internal.c: call (Doing call 51 0x7380f0)
DEBUG: remote_internal.c: call (Going to sleep 51 0x7380f0 0x7910b0)

    ^ And thanks to us typing 'list', a new call is triggerd.
      We can see it putting itself to sleep since the bg thread
      has the buck

DEBUG: remote_internal.c: processCallRecv (Do 4 0)
DEBUG: remote_internal.c: processCallRecvLen (Got length, now need 188 total (184 more))
DEBUG: remote_internal.c: processCallRecv (Do 188 4)
DEBUG: remote_internal.c: processCallRecv (Do 0 0)

    ^ Here we see the first thread has got the reply it wanted

DEBUG: remote_internal.c: processCalls (Giving up the buck 6 0x7380f0 0x7910b0)

    ^ So its decided to give up the buck

DEBUG: remote_internal.c: processCalls (Passing the buck to 51 0x7910b0)

    ^ And noticed another thread was waiting, so passed it the buck

DEBUG: remote_internal.c: call (All done with our call 6 0x7910b0 0x7380f0)
2 1 1 8119572
DEBUG: remote_internal.c: call (Wokeup from sleep 51 0x7910b0 0x7910b0)
DEBUG: remote_internal.c: call (We have the buck 51 0x7910b0 0x7910b0)

   ^ The second thread has now woken up and got the buck. Its requests
     has already been sent onto the wire by the first thread, so now
     it merely waits for its reply

DEBUG: remote_internal.c: processCallRecv (Do 4 0)
DEBUG: remote_internal.c: processCallRecvLen (Got length, now need 32 total (28 more))
DEBUG: remote_internal.c: processCallRecv (Do 32 4)
DEBUG: remote_internal.c: processCallRecv (Do 0 0)

   ^ Which is has now got.

DEBUG: remote_internal.c: processCalls (Giving up the buck 51 0x7910b0 (nil))
DEBUG: remote_internal.c: call (All done with our call 51 (nil) 0x7910b0)

   ^ The second thread now gives up the buck, and there's no one
     waiting who wants it

 Id Name                 State
----------------------------------

virsh # 


Daniel

diff --git a/src/libvirt_sym.version.in b/src/libvirt_sym.version.in
--- a/src/libvirt_sym.version.in
+++ b/src/libvirt_sym.version.in
@@ -586,6 +586,7 @@ LIBVIRT_PRIVATE_ at VERSION@ {
 	virEventAddHandle;
 	virEventRemoveHandle;
 	virExec;
+	virSetNonBlock;
 	virFormatMacAddr;
 	virParseMacAddr;
 	virFileDeletePid;
diff --git a/src/qemu_driver.c b/src/qemu_driver.c
--- a/src/qemu_driver.c
+++ b/src/qemu_driver.c
@@ -1338,6 +1338,7 @@ static int qemudGetMaxVCPUs(virConnectPt
 
 static int qemudGetNodeInfo(virConnectPtr conn,
                             virNodeInfoPtr nodeinfo) {
+    sleep(5);
     return virNodeInfoPopulate(conn, nodeinfo);
 }
 
diff --git a/src/remote_internal.c b/src/remote_internal.c
--- a/src/remote_internal.c
+++ b/src/remote_internal.c
@@ -67,6 +67,8 @@
 #include <libxml/uri.h>
 
 #include <netdb.h>
+
+#include <poll.h>
 
 /* AI_ADDRCONFIG is missing on some systems. */
 #ifndef AI_ADDRCONFIG
@@ -88,6 +90,37 @@
 
 static int inside_daemon = 0;
 
+struct remote_thread_call;
+
+
+enum {
+    REMOTE_MODE_WAIT_TX,
+    REMOTE_MODE_WAIT_RX,
+    REMOTE_MODE_COMPLETE,
+    REMOTE_MODE_ERROR,
+};
+
+struct remote_thread_call {
+    int mode;
+
+    /* 4 byte length, followed by RPC message header+body */
+    char buffer[4 + REMOTE_MESSAGE_MAX];
+    unsigned int bufferLength;
+    unsigned int bufferOffset;
+
+    unsigned int serial;
+    unsigned int proc_nr;
+
+    pthread_cond_t cond;
+
+    xdrproc_t ret_filter;
+    char *ret;
+
+    remote_error err;
+
+    struct remote_thread_call *next;
+};
+
 struct private_data {
     PTHREAD_MUTEX_T(lock);
 
@@ -101,12 +134,24 @@ struct private_data {
     int localUses;              /* Ref count for private data */
     char *hostname;             /* Original hostname */
     FILE *debugLog;             /* Debug remote protocol */
+
 #if HAVE_SASL
     sasl_conn_t *saslconn;      /* SASL context */
+
     const char *saslDecoded;
     unsigned int saslDecodedLength;
     unsigned int saslDecodedOffset;
-#endif
+
+    const char *saslEncoded;
+    unsigned int saslEncodedLength;
+    unsigned int saslEncodedOffset;
+#endif
+
+    /* 4 byte length, followed by RPC message header+body */
+    char buffer[4 + REMOTE_MESSAGE_MAX];
+    unsigned int bufferLength;
+    unsigned int bufferOffset;
+
     /* The list of domain event callbacks */
     virDomainEventCallbackListPtr callbackList;
     /* The queue of domain events generated
@@ -114,6 +159,9 @@ struct private_data {
     virDomainEventQueuePtr domainEvents;
     /* Timer for flushing domainEvents queue */
     int eventFlushTimer;
+
+    /* List of threads currently doing dispatch */
+    struct remote_thread_call *waitDispatch;
 };
 
 enum {
@@ -160,7 +208,6 @@ static void make_nonnull_storage_pool (r
 static void make_nonnull_storage_pool (remote_nonnull_storage_pool *pool_dst, virStoragePoolPtr vol_src);
 static void make_nonnull_storage_vol (remote_nonnull_storage_vol *vol_dst, virStorageVolPtr vol_src);
 void remoteDomainEventFired(int watch, int fd, int event, void *data);
-static void remoteDomainProcessEvent(virConnectPtr conn, XDR *xdr);
 static void remoteDomainQueueEvent(virConnectPtr conn, XDR *xdr);
 void remoteDomainEventQueueFlush(int timer, void *opaque);
 /*----------------------------------------------------------------------*/
@@ -695,6 +742,13 @@ doRemoteOpen (virConnectPtr conn,
 
     } /* switch (transport) */
 
+    if (virSetNonBlock(priv->sock) < 0) {
+        errorf (conn, VIR_ERR_SYSTEM_ERROR,
+                _("unable to make socket non-blocking %s"),
+                strerror(errno));
+        goto failed;
+    }
+
 
     /* Try and authenticate with server */
     if (remoteAuthenticate(conn, priv, 1, auth, authtype) == -1)
@@ -767,6 +821,7 @@ doRemoteOpen (virConnectPtr conn,
             DEBUG0("virEventAddTimeout failed: No addTimeoutImpl defined. "
                     "continuing without events.");
             virEventRemoveHandle(priv->watch);
+            priv->watch = -1;
         }
     }
     /* Successful. */
@@ -842,6 +897,7 @@ remoteOpen (virConnectPtr conn,
     pthread_mutex_init(&priv->lock, NULL);
     remoteDriverLock(priv);
     priv->localUses = 1;
+    priv->watch = -1;
 
     if (flags & VIR_CONNECT_RO)
         rflags |= VIR_DRV_OPEN_REMOTE_RO;
@@ -1226,6 +1282,7 @@ doRemoteClose (virConnectPtr conn, struc
         virEventRemoveTimeout(priv->eventFlushTimer);
         /* Remove handle for remote events */
         virEventRemoveHandle(priv->watch);
+        priv->watch = -1;
     }
 
     /* Close socket. */
@@ -5570,12 +5627,616 @@ done:
 
 /*----------------------------------------------------------------------*/
 
-static int really_write (virConnectPtr conn, struct private_data *priv,
-                         int in_open, char *bytes, int len);
-static int really_read (virConnectPtr conn, struct private_data *priv,
-                        int in_open, char *bytes, int len);
-
-/* This function performs a remote procedure call to procedure PROC_NR.
+
+static struct remote_thread_call *
+prepareCall(virConnectPtr conn,
+            struct private_data *priv,
+            int flags,
+            int proc_nr,
+            xdrproc_t args_filter, char *args,
+            xdrproc_t ret_filter, char *ret)
+{
+    XDR xdr;
+    struct remote_message_header hdr;
+    struct remote_thread_call *rv;
+
+    if (VIR_ALLOC(rv) < 0)
+        return NULL;
+
+    /* Get a unique serial number for this message. */
+    rv->serial = priv->counter++;
+    rv->proc_nr = proc_nr;
+    rv->ret_filter = ret_filter;
+    rv->ret = ret;
+
+    pthread_cond_init(&rv->cond, NULL);
+
+    hdr.prog = REMOTE_PROGRAM;
+    hdr.vers = REMOTE_PROTOCOL_VERSION;
+    hdr.proc = proc_nr;
+    hdr.direction = REMOTE_CALL;
+    hdr.serial = rv->serial;
+    hdr.status = REMOTE_OK;
+
+    /* Serialise header followed by args. */
+    xdrmem_create (&xdr, rv->buffer+4, REMOTE_MESSAGE_MAX, XDR_ENCODE);
+    if (!xdr_remote_message_header (&xdr, &hdr)) {
+        error (flags & REMOTE_CALL_IN_OPEN ? NULL : conn,
+               VIR_ERR_RPC, _("xdr_remote_message_header failed"));
+        goto error;
+    }
+
+    if (!(*args_filter) (&xdr, args)) {
+        error (flags & REMOTE_CALL_IN_OPEN ? NULL : conn, VIR_ERR_RPC,
+               _("marshalling args"));
+        goto error;
+    }
+
+    /* Get the length stored in buffer. */
+    rv->bufferLength = xdr_getpos (&xdr);
+    xdr_destroy (&xdr);
+
+    /* Length must include the length word itself (always encoded in
+     * 4 bytes as per RFC 4506).
+     */
+    rv->bufferLength += 4;
+
+    /* Encode the length word. */
+    xdrmem_create (&xdr, rv->buffer, 4, XDR_ENCODE);
+    if (!xdr_int (&xdr, (int *)&rv->bufferLength)) {
+        error (flags & REMOTE_CALL_IN_OPEN ? NULL : conn, VIR_ERR_RPC,
+               _("xdr_int (length word)"));
+        goto error;
+    }
+    xdr_destroy (&xdr);
+
+    return rv;
+
+error:
+    VIR_FREE(ret);
+    return NULL;
+}
+
+
+
+static int
+processCallWrite(virConnectPtr conn,
+                 struct private_data *priv,
+                 int in_open /* if we are in virConnectOpen */,
+                 const char *bytes, int len)
+{
+    int ret;
+
+    if (priv->uses_tls) {
+    tls_resend:
+        ret = gnutls_record_send (priv->session, bytes, len);
+        if (ret < 0) {
+            if (ret == GNUTLS_E_INTERRUPTED)
+                goto tls_resend;
+            if (ret == GNUTLS_E_AGAIN)
+                return 0;
+
+            error (in_open ? NULL : conn,
+                   VIR_ERR_GNUTLS_ERROR, gnutls_strerror (ret));
+            return -1;
+        }
+    } else {
+    resend:
+        ret = send (priv->sock, bytes, len, 0);
+        if (ret == -1) {
+            if (errno == EINTR)
+                goto resend;
+            if (errno == EAGAIN)
+                return 0;
+
+            error (in_open ? NULL : conn,
+                   VIR_ERR_SYSTEM_ERROR, strerror (errno));
+            return -1;
+
+        }
+    }
+
+    return ret;
+}
+
+
+static int
+processCallRead(virConnectPtr conn,
+                struct private_data *priv,
+                int in_open /* if we are in virConnectOpen */,
+                char *bytes, int len)
+{
+    int ret;
+
+    if (priv->uses_tls) {
+    tls_resend:
+        ret = gnutls_record_recv (priv->session, bytes, len);
+        if (ret < 0) {
+            if (ret == GNUTLS_E_INTERRUPTED)
+                goto tls_resend;
+            if (ret == GNUTLS_E_AGAIN)
+                return 0;
+
+            error (in_open ? NULL : conn,
+                   VIR_ERR_GNUTLS_ERROR, gnutls_strerror (ret));
+            return -1;
+        }
+    } else {
+    resend:
+        ret = recv (priv->sock, bytes, len, 0);
+        if (ret == -1) {
+            if (errno == EINTR)
+                goto resend;
+            if (errno == EAGAIN)
+                return 0;
+
+            error (in_open ? NULL : conn,
+                   VIR_ERR_SYSTEM_ERROR, strerror (errno));
+            return -1;
+
+        }
+    }
+
+    return ret;
+}
+
+
+static int
+processCallSendOne(virConnectPtr conn,
+                   struct private_data *priv,
+                   int in_open,
+                   struct remote_thread_call *thecall)
+{
+#if HAVE_SASL
+    if (priv->saslconn) {
+        const char *output;
+        unsigned int outputlen;
+        int err, ret;
+
+        if (!priv->saslEncoded) {
+            err = sasl_encode(priv->saslconn,
+                              thecall->buffer + thecall->bufferOffset,
+                              thecall->bufferLength - thecall->bufferOffset,
+                              &output, &outputlen);
+            if (err != SASL_OK) {
+                return -1;
+            }
+            priv->saslEncoded = output;
+            priv->saslEncodedLength = outputlen;
+            priv->saslEncodedOffset = 0;
+
+            thecall->bufferOffset = thecall->bufferLength;
+        }
+
+        ret = processCallWrite(conn, priv, in_open,
+                               priv->saslEncoded + priv->saslEncodedOffset,
+                               priv->saslEncodedLength - priv->saslEncodedOffset);
+        if (ret < 0)
+            return ret;
+        priv->saslEncodedOffset += ret;
+
+        if (priv->saslEncodedOffset == priv->saslEncodedLength) {
+            priv->saslEncoded = NULL;
+            priv->saslEncodedOffset = priv->saslEncodedLength = 0;
+            thecall->mode = REMOTE_MODE_WAIT_RX;
+        }
+    } else {
+#endif
+        int ret;
+        ret = processCallWrite(conn, priv, in_open,
+                               thecall->buffer + thecall->bufferOffset,
+                               thecall->bufferLength - thecall->bufferOffset);
+        if (ret < 0)
+            return ret;
+        thecall->bufferOffset += ret;
+
+        if (thecall->bufferOffset == thecall->bufferLength) {
+            thecall->bufferOffset = thecall->bufferLength = 0;
+            thecall->mode = REMOTE_MODE_WAIT_RX;
+        }
+#if HAVE_SASL
+    }
+#endif
+    return 0;
+}
+
+
+static int
+processCallSend(virConnectPtr conn, struct private_data *priv,
+                int in_open) {
+    struct remote_thread_call *thecall = priv->waitDispatch;
+
+    while (thecall &&
+           thecall->mode != REMOTE_MODE_WAIT_TX)
+        thecall = thecall->next;
+
+    if (!thecall)
+        return -1; /* Shouldn't happen, but you never know... */
+
+    while (thecall) {
+        int ret = processCallSendOne(conn, priv, in_open, thecall);
+        if (ret < 0)
+            return ret;
+
+        if (thecall->mode == REMOTE_MODE_WAIT_TX)
+            return 0; /* Blocking write, to back to event loop */
+
+        thecall = thecall->next;
+    }
+
+    return 0; /* No more calls to send, all done */
+}
+
+static int
+processCallRecvSome(virConnectPtr conn, struct private_data *priv,
+                    int in_open) {
+    unsigned int wantData;
+
+    /* Start by reading length word */
+    if (priv->bufferLength == 0)
+        priv->bufferLength = 4;
+
+    wantData = priv->bufferLength - priv->bufferOffset;
+
+#if HAVE_SASL
+    if (priv->saslconn) {
+        if (priv->saslDecoded == NULL) {
+            char encoded[8192];
+            unsigned int encodedLen = sizeof(encoded);
+            int ret, err;
+            ret = processCallRead(conn, priv, in_open,
+                                  encoded, encodedLen);
+            if (ret < 0)
+                return -1;
+            if (ret == 0)
+                return 0;
+
+            err = sasl_decode(priv->saslconn, encoded, ret,
+                              &priv->saslDecoded, &priv->saslDecodedLength);
+            if (ret != SASL_OK)
+                return -1;
+            priv->saslDecodedOffset = 0;
+        }
+
+        if ((priv->saslDecodedLength - priv->saslDecodedOffset) < wantData)
+            wantData = (priv->saslDecodedLength - priv->saslDecodedOffset);
+
+        memcpy(priv->buffer + priv->bufferOffset,
+               priv->saslDecoded + priv->saslDecodedOffset,
+               wantData);
+        priv->saslDecodedOffset += wantData;
+        priv->bufferOffset += wantData;
+        if (priv->saslDecodedOffset == priv->saslDecodedLength) {
+            priv->saslDecodedLength = priv->saslDecodedLength = 0;
+            priv->saslDecoded = NULL;
+        }
+
+        return wantData;
+    } else {
+#endif
+        int ret;
+
+        ret = processCallRead(conn, priv, in_open,
+                              priv->buffer + priv->bufferOffset,
+                              wantData);
+        if (ret < 0)
+            return -1;
+        if (ret == 0)
+            return 0;
+
+        priv->bufferOffset += ret;
+
+        return ret;
+#if HAVE_SASL
+    }
+#endif
+}
+
+
+static void
+processCallAsyncEvent(virConnectPtr conn, struct private_data *priv,
+                      int in_open,
+                      remote_message_header *hdr,
+                      XDR *xdr) {
+    /* An async message has come in while we were waiting for the
+     * response. Process it to pull it off the wire, and try again
+     */
+    DEBUG0("Encountered an event while waiting for a response");
+
+    if (in_open) {
+        DEBUG("Ignoring bogus event %d received while in open", hdr->proc);
+        return;
+    }
+
+    if (hdr->proc == REMOTE_PROC_DOMAIN_EVENT) {
+        remoteDomainQueueEvent(conn, xdr);
+        virEventUpdateTimeout(priv->eventFlushTimer, 0);
+    } else {
+        DEBUG("Unexpected event proc %d", hdr->proc);
+    }
+}
+
+static int
+processCallRecvLen(virConnectPtr conn, struct private_data *priv,
+                   int in_open) {
+    XDR xdr;
+    int len;
+
+    xdrmem_create (&xdr, priv->buffer, priv->bufferLength, XDR_DECODE);
+    if (!xdr_int (&xdr, &len)) {
+        error (in_open ? NULL : conn,
+               VIR_ERR_RPC, _("xdr_int (length word, reply)"));
+        return -1;
+    }
+    xdr_destroy (&xdr);
+
+    /* Length includes length word - adjust to real length to read. */
+    len -= 4;
+
+    if (len < 0 || len > REMOTE_MESSAGE_MAX) {
+        error (in_open ? NULL : conn,
+               VIR_ERR_RPC, _("packet received from server too large"));
+        return -1;
+    }
+
+    /* Extend our declared buffer length and carry
+       on reading the header + payload */
+    priv->bufferLength += len;
+    DEBUG("Got length, now need %d total (%d more)", priv->bufferLength, len);
+    return 0;
+}
+
+
+static int
+processCallRecvMsg(virConnectPtr conn, struct private_data *priv,
+                   int in_open) {
+    XDR xdr;
+    struct remote_message_header hdr;
+    int len = priv->bufferLength - 4;
+    struct remote_thread_call *thecall;
+
+    /* Deserialise reply header. */
+    xdrmem_create (&xdr, priv->buffer + 4, len, XDR_DECODE);
+    if (!xdr_remote_message_header (&xdr, &hdr)) {
+        error (in_open ? NULL : conn,
+               VIR_ERR_RPC, _("invalid header in reply"));
+        return -1;
+    }
+
+    /* Check program, version, etc. are what we expect. */
+    if (hdr.prog != REMOTE_PROGRAM) {
+        virRaiseError (in_open ? NULL : conn,
+                       NULL, NULL, VIR_FROM_REMOTE,
+                       VIR_ERR_RPC, VIR_ERR_ERROR, NULL, NULL, NULL, 0, 0,
+                       _("unknown program (received %x, expected %x)"),
+                       hdr.prog, REMOTE_PROGRAM);
+        return -1;
+    }
+    if (hdr.vers != REMOTE_PROTOCOL_VERSION) {
+        virRaiseError (in_open ? NULL : conn,
+                       NULL, NULL, VIR_FROM_REMOTE,
+                       VIR_ERR_RPC, VIR_ERR_ERROR, NULL, NULL, NULL, 0, 0,
+                       _("unknown protocol version (received %x, expected %x)"),
+                       hdr.vers, REMOTE_PROTOCOL_VERSION);
+        return -1;
+    }
+
+    /* Async events from server need special handling */
+    if (hdr.direction == REMOTE_MESSAGE) {
+        processCallAsyncEvent(conn, priv, in_open,
+                              &hdr, &xdr);
+        xdr_destroy(&xdr);
+        return 0;
+    }
+
+    if (hdr.direction != REMOTE_REPLY) {
+        virRaiseError (in_open ? NULL : conn,
+                       NULL, NULL, VIR_FROM_REMOTE,
+                       VIR_ERR_RPC, VIR_ERR_ERROR, NULL, NULL, NULL, 0, 0,
+                       _("got unexpected RPC call %d from server"),
+                       hdr.proc);
+        xdr_destroy(&xdr);
+        return -1;
+    }
+
+    /* Ok, definitely got an RPC reply now find
+       out who's been waiting for it */
+
+    thecall = priv->waitDispatch;
+    while (thecall &&
+           thecall->serial != hdr.serial)
+        thecall = thecall->next;
+
+    if (!thecall) {
+        virRaiseError (in_open ? NULL : conn,
+                       NULL, NULL, VIR_FROM_REMOTE,
+                       VIR_ERR_RPC, VIR_ERR_ERROR, NULL, NULL, NULL, 0, 0,
+                       _("no call waiting for reply with serial %d"),
+                       hdr.serial);
+        xdr_destroy(&xdr);
+        return -1;
+    }
+
+    if (hdr.proc != thecall->proc_nr) {
+        virRaiseError (in_open ? NULL : conn,
+                       NULL, NULL, VIR_FROM_REMOTE,
+                       VIR_ERR_RPC, VIR_ERR_ERROR, NULL, NULL, NULL, 0, 0,
+                       _("unknown procedure (received %x, expected %x)"),
+                       hdr.proc, thecall->proc_nr);
+        xdr_destroy (&xdr);
+        return -1;
+    }
+
+    /* Status is either REMOTE_OK (meaning that what follows is a ret
+     * structure), or REMOTE_ERROR (and what follows is a remote_error
+     * structure).
+     */
+    switch (hdr.status) {
+    case REMOTE_OK:
+        if (!(*thecall->ret_filter) (&xdr, thecall->ret)) {
+            error (in_open ? NULL : conn, VIR_ERR_RPC,
+                   _("unmarshalling ret"));
+            return -1;
+        }
+        thecall->mode = REMOTE_MODE_COMPLETE;
+        xdr_destroy (&xdr);
+        return 0;
+
+    case REMOTE_ERROR:
+        memset (&thecall->err, 0, sizeof thecall->err);
+        if (!xdr_remote_error (&xdr, &thecall->err)) {
+            error (in_open ? NULL : conn,
+                   VIR_ERR_RPC, _("unmarshalling remote_error"));
+            return -1;
+        }
+        xdr_destroy (&xdr);
+        thecall->mode = REMOTE_MODE_ERROR;
+        return 0;
+
+    default:
+        virRaiseError (in_open ? NULL : conn, NULL, NULL, VIR_FROM_REMOTE,
+                       VIR_ERR_RPC, VIR_ERR_ERROR, NULL, NULL, NULL, 0, 0,
+                       _("unknown status (received %x)"),
+                       hdr.status);
+        xdr_destroy (&xdr);
+        return -1;
+    }
+}
+
+
+static int
+processCallRecv(virConnectPtr conn, struct private_data *priv,
+                int in_open) {
+    int ret;
+
+    /* Read as much data as is available, until we get
+     * EGAIN
+     */
+    for (;;) {
+        DEBUG("Do %d %d", priv->bufferLength, priv->bufferOffset);
+        ret = processCallRecvSome(conn, priv, in_open);
+
+        if (ret < 0)
+            return -1;
+        if (ret == 0)
+            return 0;  /* Blocking on read */
+
+        /* Check for completion of our goal */
+        if (priv->bufferOffset == priv->bufferLength) {
+            if (priv->bufferOffset == 4) {
+                ret = processCallRecvLen(conn, priv, in_open);
+            } else {
+                ret = processCallRecvMsg(conn, priv, in_open);
+                priv->bufferOffset = priv->bufferLength = 0;
+            }
+            if (ret < 0)
+                return -1;
+        }
+    }
+}
+
+/*
+ * Process all calls pending dispatch/receive until we
+ * get a reply to our own call. Then quit and pass the buck
+ * to someone else.
+ */
+static int
+processCalls(virConnectPtr conn,
+             struct private_data *priv,
+             int in_open,
+             struct remote_thread_call *thiscall)
+{
+    struct pollfd fds[1];
+    int ret;
+
+    /* XXX add wakeup pipe */
+
+    /* XXX weeeeeeeendows hate, perhaps gnulib poll() will work ? */
+
+    fds[0].fd = priv->sock;
+
+    for (;;) {
+        struct remote_thread_call *tmp = priv->waitDispatch;
+        struct remote_thread_call *prev;
+
+        fds[0].events = fds[0].revents = 0;
+        while (tmp) {
+            if (tmp->mode == REMOTE_MODE_WAIT_RX)
+                fds[0].events |= POLLIN;
+            if (tmp->mode == REMOTE_MODE_WAIT_TX)
+                fds[0].events |= POLLOUT;
+
+            tmp = tmp->next;
+        }
+
+        /* Release lock while poll'ing so other threads
+         * can stuff themselves on the queue */
+        remoteDriverUnlock(priv);
+        ret = poll(fds, ARRAY_CARDINALITY(fds), -1);
+        remoteDriverLock(priv);
+
+        if (ret < 0) {
+            if (errno == EAGAIN)
+                continue;
+            ;/* XXX damn */
+        }
+
+        if (fds[0].revents & POLLOUT)
+            processCallSend(conn, priv, in_open);
+
+        if (fds[0].revents & POLLIN)
+            processCallRecv(conn, priv, in_open);
+
+        /* XXX poll hup/err */
+
+        /* Iterate through waiting threads and if
+         * any are complete then tell 'em to wakeup
+         */
+        tmp = priv->waitDispatch;
+        prev = NULL;
+        while (tmp) {
+            if (tmp != thiscall &&
+                (tmp->mode == REMOTE_MODE_COMPLETE ||
+                 tmp->mode == REMOTE_MODE_ERROR)) {
+                /* Take them out of the list */
+                if (prev)
+                    prev->next = tmp->next;
+                else
+                    priv->waitDispatch = tmp->next;
+
+                /* And wake them up....
+                 * ...they won't actually wakeup until
+                 * we release our mutex a short while
+                 * later...
+                 */
+                DEBUG("Waking up sleep %d %p %p", tmp->proc_nr, tmp, priv->waitDispatch);
+                pthread_cond_signal(&tmp->cond);
+            }
+            prev = tmp;
+            tmp = tmp->next;
+        }
+
+        /* Now see if *we* are done */
+        if (thiscall->mode == REMOTE_MODE_COMPLETE ||
+            thiscall->mode == REMOTE_MODE_ERROR) {
+            /* We're at head of the list already, so
+             * remove us
+             */
+            priv->waitDispatch = thiscall->next;
+            DEBUG("Giving up the buck %d %p %p", thiscall->proc_nr, thiscall, priv->waitDispatch);
+            /* See if someone else is still waiting
+             * and if so, then pass the buck ! */
+            if (priv->waitDispatch) {
+                DEBUG("Passing the buck to %d %p", priv->waitDispatch->proc_nr, priv->waitDispatch);
+                pthread_cond_signal(&priv->waitDispatch->cond);
+            }
+            return 0;
+        }
+    }
+}
+
+/*
+ * This function performs a remote procedure call to procedure PROC_NR.
  *
  * NB. This does not free the args structure (not desirable, since you
  * often want this allocated on the stack or else it contains strings
@@ -5584,204 +6245,29 @@ static int really_read (virConnectPtr co
  *
  * NB(2). Make sure to memset (&ret, 0, sizeof ret) before calling,
  * else Bad Things will happen in the XDR code.
- */
-static int
-doCall (virConnectPtr conn, struct private_data *priv,
-        int flags /* if we are in virConnectOpen */,
-        int proc_nr,
-        xdrproc_t args_filter, char *args,
-        xdrproc_t ret_filter, char *ret)
-{
-    char buffer[REMOTE_MESSAGE_MAX];
-    char buffer2[4];
-    struct remote_message_header hdr;
-    XDR xdr;
-    int len;
-    struct remote_error rerror;
-
-    /* Get a unique serial number for this message. */
-    int serial = priv->counter++;
-
-    hdr.prog = REMOTE_PROGRAM;
-    hdr.vers = REMOTE_PROTOCOL_VERSION;
-    hdr.proc = proc_nr;
-    hdr.direction = REMOTE_CALL;
-    hdr.serial = serial;
-    hdr.status = REMOTE_OK;
-
-    /* Serialise header followed by args. */
-    xdrmem_create (&xdr, buffer, sizeof buffer, XDR_ENCODE);
-    if (!xdr_remote_message_header (&xdr, &hdr)) {
-        error (flags & REMOTE_CALL_IN_OPEN ? NULL : conn,
-               VIR_ERR_RPC, _("xdr_remote_message_header failed"));
-        return -1;
-    }
-
-    if (!(*args_filter) (&xdr, args)) {
-        error (flags & REMOTE_CALL_IN_OPEN ? NULL : conn, VIR_ERR_RPC,
-               _("marshalling args"));
-        return -1;
-    }
-
-    /* Get the length stored in buffer. */
-    len = xdr_getpos (&xdr);
-    xdr_destroy (&xdr);
-
-    /* Length must include the length word itself (always encoded in
-     * 4 bytes as per RFC 4506).
-     */
-    len += 4;
-
-    /* Encode the length word. */
-    xdrmem_create (&xdr, buffer2, sizeof buffer2, XDR_ENCODE);
-    if (!xdr_int (&xdr, &len)) {
-        error (flags & REMOTE_CALL_IN_OPEN ? NULL : conn, VIR_ERR_RPC,
-               _("xdr_int (length word)"));
-        return -1;
-    }
-    xdr_destroy (&xdr);
-
-    /* Send length word followed by header+args. */
-    if (really_write (conn, priv, flags & REMOTE_CALL_IN_OPEN, buffer2, sizeof buffer2) == -1 ||
-        really_write (conn, priv, flags & REMOTE_CALL_IN_OPEN, buffer, len-4) == -1)
-        return -1;
-
-retry_read:
-    /* Read and deserialise length word. */
-    if (really_read (conn, priv, flags & REMOTE_CALL_IN_OPEN, buffer2, sizeof buffer2) == -1)
-        return -1;
-
-    xdrmem_create (&xdr, buffer2, sizeof buffer2, XDR_DECODE);
-    if (!xdr_int (&xdr, &len)) {
-        error (flags & REMOTE_CALL_IN_OPEN ? NULL : conn,
-               VIR_ERR_RPC, _("xdr_int (length word, reply)"));
-        return -1;
-    }
-    xdr_destroy (&xdr);
-
-    /* Length includes length word - adjust to real length to read. */
-    len -= 4;
-
-    if (len < 0 || len > REMOTE_MESSAGE_MAX) {
-        error (flags & REMOTE_CALL_IN_OPEN ? NULL : conn,
-               VIR_ERR_RPC, _("packet received from server too large"));
-        return -1;
-    }
-
-    /* Read reply header and what follows (either a ret or an error). */
-    if (really_read (conn, priv, flags & REMOTE_CALL_IN_OPEN, buffer, len) == -1)
-        return -1;
-
-    /* Deserialise reply header. */
-    xdrmem_create (&xdr, buffer, len, XDR_DECODE);
-    if (!xdr_remote_message_header (&xdr, &hdr)) {
-        error (flags & REMOTE_CALL_IN_OPEN ? NULL : conn,
-               VIR_ERR_RPC, _("invalid header in reply"));
-        return -1;
-    }
-
-    /* Check program, version, etc. are what we expect. */
-    if (hdr.prog != REMOTE_PROGRAM) {
-        virRaiseError (flags & REMOTE_CALL_IN_OPEN ? NULL : conn,
-                         NULL, NULL, VIR_FROM_REMOTE,
-                         VIR_ERR_RPC, VIR_ERR_ERROR, NULL, NULL, NULL, 0, 0,
-                         _("unknown program (received %x, expected %x)"),
-                         hdr.prog, REMOTE_PROGRAM);
-        return -1;
-    }
-    if (hdr.vers != REMOTE_PROTOCOL_VERSION) {
-        virRaiseError (flags & REMOTE_CALL_IN_OPEN ? NULL : conn,
-                         NULL, NULL, VIR_FROM_REMOTE,
-                         VIR_ERR_RPC, VIR_ERR_ERROR, NULL, NULL, NULL, 0, 0,
-                         _("unknown protocol version (received %x, expected %x)"),
-                         hdr.vers, REMOTE_PROTOCOL_VERSION);
-        return -1;
-    }
-
-    if (hdr.proc == REMOTE_PROC_DOMAIN_EVENT &&
-        hdr.direction == REMOTE_MESSAGE) {
-        /* An async message has come in while we were waiting for the
-         * response. Process it to pull it off the wire, and try again
-         */
-        DEBUG0("Encountered an event while waiting for a response");
-
-        remoteDomainQueueEvent(conn, &xdr);
-        virEventUpdateTimeout(priv->eventFlushTimer, 0);
-
-        DEBUG0("Retrying read");
-        xdr_destroy (&xdr);
-        goto retry_read;
-    }
-    if (hdr.proc != proc_nr) {
-        virRaiseError (flags & REMOTE_CALL_IN_OPEN ? NULL : conn,
-                         NULL, NULL, VIR_FROM_REMOTE,
-                         VIR_ERR_RPC, VIR_ERR_ERROR, NULL, NULL, NULL, 0, 0,
-                         _("unknown procedure (received %x, expected %x)"),
-                         hdr.proc, proc_nr);
-        return -1;
-    }
-    if (hdr.direction != REMOTE_REPLY) {
-        virRaiseError (flags & REMOTE_CALL_IN_OPEN ? NULL : conn,
-                         NULL, NULL, VIR_FROM_REMOTE,
-                         VIR_ERR_RPC, VIR_ERR_ERROR, NULL, NULL, NULL, 0, 0,
-                         _("unknown direction (received %x, expected %x)"),
-                         hdr.direction, REMOTE_REPLY);
-        return -1;
-    }
-    if (hdr.serial != serial) {
-        virRaiseError (flags & REMOTE_CALL_IN_OPEN ? NULL : conn, NULL, NULL, VIR_FROM_REMOTE,
-                         VIR_ERR_RPC, VIR_ERR_ERROR, NULL, NULL, NULL, 0, 0,
-                         _("unknown serial (received %x, expected %x)"),
-                         hdr.serial, serial);
-        return -1;
-    }
-
-    /* Status is either REMOTE_OK (meaning that what follows is a ret
-     * structure), or REMOTE_ERROR (and what follows is a remote_error
-     * structure).
-     */
-    switch (hdr.status) {
-    case REMOTE_OK:
-        if (!(*ret_filter) (&xdr, ret)) {
-            error (flags & REMOTE_CALL_IN_OPEN ? NULL : conn, VIR_ERR_RPC,
-                   _("unmarshalling ret"));
-            return -1;
-        }
-        xdr_destroy (&xdr);
-        return 0;
-
-    case REMOTE_ERROR:
-        memset (&rerror, 0, sizeof rerror);
-        if (!xdr_remote_error (&xdr, &rerror)) {
-            error (flags & REMOTE_CALL_IN_OPEN ? NULL : conn,
-                   VIR_ERR_RPC, _("unmarshalling remote_error"));
-            return -1;
-        }
-        xdr_destroy (&xdr);
-        /* See if caller asked us to keep quiet about missing RPCs
-         * eg for interop with older servers */
-        if (flags & REMOTE_CALL_QUIET_MISSING_RPC &&
-            rerror.domain == VIR_FROM_REMOTE &&
-            rerror.code == VIR_ERR_RPC &&
-            rerror.level == VIR_ERR_ERROR &&
-            STRPREFIX(*rerror.message, "unknown procedure")) {
-            return -2;
-        }
-        server_error (flags & REMOTE_CALL_IN_OPEN ? NULL : conn, &rerror);
-        xdr_free ((xdrproc_t) xdr_remote_error, (char *) &rerror);
-        return -1;
-
-    default:
-        virRaiseError (flags & REMOTE_CALL_IN_OPEN ? NULL : conn, NULL, NULL, VIR_FROM_REMOTE,
-                         VIR_ERR_RPC, VIR_ERR_ERROR, NULL, NULL, NULL, 0, 0,
-                         _("unknown status (received %x)"),
-                         hdr.status);
-        xdr_destroy (&xdr);
-        return -1;
-    }
-}
-
-
+ *
+ * NB(3) You must have the private_data lock before calling this
+ *
+ * NB(4) This is very complicated. Due to connection cloning, multiple
+ * threads can want to use the socket at once. Obviously only one of
+ * them can. So if someone's using the socket, other threads are put
+ * to sleep on condition variables. THe existing thread may completely
+ * send & receive their RPC call/reply while they're asleep. Or it
+ * may only get around to dealing with sending the call. Or it may
+ * get around to neither. So upon waking up from slumber, the other
+ * thread may or may not have more work todo.
+ *
+ * We call this dance  'passing the buck'
+ *
+ *      http://en.wikipedia.org/wiki/Passing_the_buck
+ *
+ *   "Buck passing or passing the buck is the action of transferring
+ *    responsibility or blame unto another person. It is also used as
+ *    a strategy in power politics when the actions of one country/
+ *    nation are blamed on another, providing an opportunity for war."
+ *
+ * NB(5) Don't Panic!
+ */
 static int
 call (virConnectPtr conn, struct private_data *priv,
       int flags /* if we are in virConnectOpen */,
@@ -5790,6 +6276,66 @@ call (virConnectPtr conn, struct private
       xdrproc_t ret_filter, char *ret)
 {
     int rv;
+    struct remote_thread_call *thiscall;
+
+    DEBUG("Doing call %d %p", proc_nr, priv->waitDispatch);
+    thiscall = prepareCall(conn, priv, flags, proc_nr,
+                           args_filter, args,
+                           ret_filter, ret);
+
+    if (!thiscall) {
+        error (flags & REMOTE_CALL_IN_OPEN ? NULL : conn,
+               VIR_ERR_NO_MEMORY, NULL);
+        return -1;
+    }
+
+    /* Check to see if another thread is dispatching */
+    if (priv->waitDispatch) {
+        /* Stick ourselves on the end of the wait queue */
+        struct remote_thread_call *tmp = priv->waitDispatch;
+        while (tmp && tmp->next)
+            tmp = tmp->next;
+        if (tmp)
+            tmp->next = thiscall;
+        else
+            priv->waitDispatch = thiscall;
+
+        DEBUG("Going to sleep %d %p %p", proc_nr, priv->waitDispatch, thiscall);
+        /* Go to sleep while other thread is working... */
+        pthread_cond_wait(&thiscall->cond, &priv->lock);
+
+        DEBUG("Wokeup from sleep %d %p %p", proc_nr, priv->waitDispatch, thiscall);
+        /* Two reasons we can be woken up
+         *  1. Other thread has got our reply ready for us
+         *  2. Other thread is all done, and its out turn to
+         *     be the dispatcher to finish waiting for
+         *     out reply
+         */
+        if (thiscall->mode == REMOTE_MODE_COMPLETE ||
+            thiscall->mode == REMOTE_MODE_ERROR) {
+            /*
+             * We avoided catching the buck and our reply is ready !
+             * We've already had 'thiscall' removed from the list
+             * so just need to (maybe) handle errors & free it
+             */
+            goto cleanup;
+        }
+
+        /* Grr, someone passed the buck onto us ... */
+
+    } else {
+        /* We're first to catch the buck */
+        priv->waitDispatch = thiscall;
+    }
+
+    DEBUG("We have the buck %d %p %p", proc_nr, priv->waitDispatch, thiscall);
+    /*
+     * The buck stops here!
+     *
+     * At this point we're about to own the dispatch
+     * process...
+     */
+
     /*
      * Avoid needless wake-ups of the event loop in the
      * case where this call is being made from a different
@@ -5800,209 +6346,147 @@ call (virConnectPtr conn, struct private
     if (priv->watch >= 0)
         virEventUpdateHandle(priv->watch, 0);
 
-    rv = doCall(conn, priv,flags, proc_nr,
-                args_filter, args,
-                ret_filter, ret);
+    rv = processCalls(conn, priv,
+                      flags & REMOTE_CALL_IN_OPEN ? 1 : 0,
+                      thiscall);
 
     if (priv->watch >= 0)
         virEventUpdateHandle(priv->watch, VIR_EVENT_HANDLE_READABLE);
-    return rv;
-}
-
-static int
-really_write_buf (virConnectPtr conn, struct private_data *priv,
-                  int in_open /* if we are in virConnectOpen */,
-                  const char *bytes, int len)
-{
-    const char *p;
-    int err;
-
-    p = bytes;
-    if (priv->uses_tls) {
-        do {
-            err = gnutls_record_send (priv->session, p, len);
-            if (err < 0) {
-                if (err == GNUTLS_E_INTERRUPTED || err == GNUTLS_E_AGAIN)
-                    continue;
-                error (in_open ? NULL : conn,
-                       VIR_ERR_GNUTLS_ERROR, gnutls_strerror (err));
-                return -1;
-            }
-            len -= err;
-            p += err;
-        }
-        while (len > 0);
-    } else {
-        do {
-            err = send (priv->sock, p, len, 0);
-            if (err == -1) {
-                if (errno == EINTR || errno == EAGAIN)
-                    continue;
-                error (in_open ? NULL : conn,
-                       VIR_ERR_SYSTEM_ERROR, strerror (errno));
-                return -1;
-            }
-            len -= err;
-            p += err;
-        }
-        while (len > 0);
-    }
-
-    return 0;
-}
-
-static int
-really_write_plain (virConnectPtr conn, struct private_data *priv,
-                    int in_open /* if we are in virConnectOpen */,
-                    char *bytes, int len)
-{
-    return really_write_buf(conn, priv, in_open, bytes, len);
-}
-
-#if HAVE_SASL
-static int
-really_write_sasl (virConnectPtr conn, struct private_data *priv,
-              int in_open /* if we are in virConnectOpen */,
-              char *bytes, int len)
-{
-    const char *output;
-    unsigned int outputlen;
-    int err;
-
-    err = sasl_encode(priv->saslconn, bytes, len, &output, &outputlen);
-    if (err != SASL_OK) {
-        return -1;
-    }
-
-    return really_write_buf(conn, priv, in_open, output, outputlen);
-}
-#endif
-
-static int
-really_write (virConnectPtr conn, struct private_data *priv,
-              int in_open /* if we are in virConnectOpen */,
-              char *bytes, int len)
-{
-#if HAVE_SASL
-    if (priv->saslconn)
-        return really_write_sasl(conn, priv, in_open, bytes, len);
-    else
-#endif
-        return really_write_plain(conn, priv, in_open, bytes, len);
-}
-
-static int
-really_read_buf (virConnectPtr conn, struct private_data *priv,
-                 int in_open /* if we are in virConnectOpen */,
-                 char *bytes, int len)
-{
-    int err;
-
-    if (priv->uses_tls) {
-    tlsreread:
-        err = gnutls_record_recv (priv->session, bytes, len);
-        if (err < 0) {
-            if (err == GNUTLS_E_INTERRUPTED)
-                goto tlsreread;
-            error (in_open ? NULL : conn,
-                   VIR_ERR_GNUTLS_ERROR, gnutls_strerror (err));
-            return -1;
-        }
-        if (err == 0) {
-            error (in_open ? NULL : conn,
-                   VIR_ERR_RPC, _("socket closed unexpectedly"));
-            return -1;
-        }
-        return err;
-    } else {
-    reread:
-        err = recv (priv->sock, bytes, len, 0);
-        if (err == -1) {
-            if (errno == EINTR)
-                goto reread;
-            error (in_open ? NULL : conn,
-                   VIR_ERR_SYSTEM_ERROR, strerror (errno));
-            return -1;
-        }
-        if (err == 0) {
-            error (in_open ? NULL : conn,
-                   VIR_ERR_RPC, _("socket closed unexpectedly"));
-            return -1;
-        }
-        return err;
-    }
-
-    return 0;
-}
-
-static int
-really_read_plain (virConnectPtr conn, struct private_data *priv,
-                   int in_open /* if we are in virConnectOpen */,
-                   char *bytes, int len)
-{
-    do {
-        int ret = really_read_buf (conn, priv, in_open, bytes, len);
-        if (ret < 0)
-            return -1;
-
-        len -= ret;
-        bytes += ret;
-    } while (len > 0);
-
-    return 0;
-}
-
-#if HAVE_SASL
-static int
-really_read_sasl (virConnectPtr conn, struct private_data *priv,
-                  int in_open /* if we are in virConnectOpen */,
-                  char *bytes, int len)
-{
-    do {
-        int want, got;
-        if (priv->saslDecoded == NULL) {
-            char encoded[8192];
-            int encodedLen = sizeof(encoded);
-            int err, ret;
-            ret = really_read_buf (conn, priv, in_open, encoded, encodedLen);
-            if (ret < 0)
-                return -1;
-
-            err = sasl_decode(priv->saslconn, encoded, ret,
-                              &priv->saslDecoded, &priv->saslDecodedLength);
-        }
-
-        got = priv->saslDecodedLength - priv->saslDecodedOffset;
-        want = len;
-        if (want > got)
-            want = got;
-
-        memcpy(bytes, priv->saslDecoded + priv->saslDecodedOffset, want);
-        priv->saslDecodedOffset += want;
-        if (priv->saslDecodedOffset == priv->saslDecodedLength) {
-            priv->saslDecoded = NULL;
-            priv->saslDecodedOffset = priv->saslDecodedLength = 0;
-        }
-        bytes += want;
-        len -= want;
-    } while (len > 0);
-
-    return 0;
-}
-#endif
-
-static int
-really_read (virConnectPtr conn, struct private_data *priv,
-             int in_open /* if we are in virConnectOpen */,
-             char *bytes, int len)
-{
-#if HAVE_SASL
-    if (priv->saslconn)
-        return really_read_sasl (conn, priv, in_open, bytes, len);
-    else
-#endif
-        return really_read_plain (conn, priv, in_open, bytes, len);
-}
+
+    if (rv < 0) {
+        VIR_FREE(thiscall);
+        return -1;
+    }
+
+cleanup:
+    DEBUG("All done with our call %d %p %p", proc_nr, priv->waitDispatch, thiscall);
+    if (thiscall->mode == REMOTE_MODE_ERROR) {
+        /* See if caller asked us to keep quiet about missing RPCs
+         * eg for interop with older servers */
+        if (flags & REMOTE_CALL_QUIET_MISSING_RPC &&
+            thiscall->err.domain == VIR_FROM_REMOTE &&
+            thiscall->err.code == VIR_ERR_RPC &&
+            thiscall->err.level == VIR_ERR_ERROR &&
+            STRPREFIX(*thiscall->err.message, "unknown procedure")) {
+            rv = -2;
+        } else {
+            server_error (flags & REMOTE_CALL_IN_OPEN ? NULL : conn,
+                          &thiscall->err);
+            rv = -1;
+        }
+    } else {
+        rv = 0;
+    }
+    VIR_FREE(thiscall);
+    return rv;
+}
+
+
+/**
+ * remoteDomainReadEvent
+ *
+ * Read the event data off the wire
+ */
+static virDomainEventPtr
+remoteDomainReadEvent(virConnectPtr conn, XDR *xdr)
+{
+    remote_domain_event_ret ret;
+    virDomainPtr dom;
+    virDomainEventPtr event = NULL;
+    memset (&ret, 0, sizeof ret);
+
+    /* unmarshall parameters, and process it*/
+    if (! xdr_remote_domain_event_ret(xdr, &ret) ) {
+        error (conn, VIR_ERR_RPC,
+               _("remoteDomainProcessEvent: unmarshalling ret"));
+        return NULL;
+    }
+
+    dom = get_nonnull_domain(conn,ret.dom);
+    if (!dom)
+        return NULL;
+
+    event = virDomainEventNewFromDom(dom, ret.event, ret.detail);
+
+    virDomainFree(dom);
+    return event;
+}
+
+static void
+remoteDomainQueueEvent(virConnectPtr conn, XDR *xdr)
+{
+    struct private_data *priv = conn->privateData;
+    virDomainEventPtr event;
+
+    event = remoteDomainReadEvent(conn, xdr);
+    if (!event)
+        return;
+
+    if (virDomainEventQueuePush(priv->domainEvents,
+                                event) < 0)
+        DEBUG0("Error adding event to queue");
+
+    virDomainEventFree(event);
+}
+
+/** remoteDomainEventFired:
+ *
+ * The callback for monitoring the remote socket
+ * for event data
+ */
+void
+remoteDomainEventFired(int watch,
+                       int fd,
+                       int event,
+                       void *opaque)
+{
+    virConnectPtr        conn = opaque;
+    struct private_data *priv = conn->privateData;
+
+    remoteDriverLock(priv);
+
+    /* This should be impossible, but it doesn't hurt to check */
+    if (priv->waitDispatch)
+        goto done;
+
+    DEBUG("Event fired %d %d %d %X", watch, fd, event, event);
+
+    if (event & (VIR_EVENT_HANDLE_HANGUP | VIR_EVENT_HANDLE_ERROR)) {
+         DEBUG("%s : VIR_EVENT_HANDLE_HANGUP or "
+               "VIR_EVENT_HANDLE_ERROR encountered", __FUNCTION__);
+         virEventRemoveHandle(watch);
+         priv->watch = -1;
+         goto done;
+    }
+
+    if (fd != priv->sock) {
+        virEventRemoveHandle(watch);
+        priv->watch = -1;
+        goto done;
+    }
+
+    if (processCallRecv(conn, priv, 0) < 0)
+        DEBUG0("Something went wrong during async message processing");
+
+done:
+    remoteDriverUnlock(priv);
+}
+
+void
+remoteDomainEventQueueFlush(int timer ATTRIBUTE_UNUSED, void *opaque)
+{
+    virConnectPtr conn = opaque;
+    struct private_data *priv = conn->privateData;
+
+    remoteDriverLock(priv);
+
+    virDomainEventQueueDispatch(priv->domainEvents, priv->callbackList,
+                                virDomainEventDispatchDefaultFunc, NULL);
+    virEventUpdateTimeout(priv->eventFlushTimer, -1);
+
+    remoteDriverUnlock(priv);
+}
+
 
 /* For errors internal to this library. */
 static void
@@ -6306,161 +6790,3 @@ remoteRegister (void)
     return 0;
 }
 
-/**
- * remoteDomainReadEvent
- *
- * Read the event data off the wire
- */
-static virDomainEventPtr
-remoteDomainReadEvent(virConnectPtr conn, XDR *xdr)
-{
-    remote_domain_event_ret ret;
-    virDomainPtr dom;
-    virDomainEventPtr event = NULL;
-    memset (&ret, 0, sizeof ret);
-
-    /* unmarshall parameters, and process it*/
-    if (! xdr_remote_domain_event_ret(xdr, &ret) ) {
-        error (conn, VIR_ERR_RPC,
-               _("remoteDomainProcessEvent: unmarshalling ret"));
-        return NULL;
-    }
-
-    dom = get_nonnull_domain(conn,ret.dom);
-    if (!dom)
-        return NULL;
-
-    event = virDomainEventNewFromDom(dom, ret.event, ret.detail);
-
-    virDomainFree(dom);
-    return event;
-}
-
-static void
-remoteDomainProcessEvent(virConnectPtr conn, XDR *xdr)
-{
-    struct private_data *priv = conn->privateData;
-    virDomainEventPtr event;
-
-    event = remoteDomainReadEvent(conn, xdr);
-    if (!event)
-        return;
-
-    DEBUG0("Calling domain event callbacks (no queue)");
-    virDomainEventDispatch(event, priv->callbackList,
-                           virDomainEventDispatchDefaultFunc, NULL);
-    virDomainEventFree(event);
-}
-
-static void
-remoteDomainQueueEvent(virConnectPtr conn, XDR *xdr)
-{
-    struct private_data *priv = conn->privateData;
-    virDomainEventPtr event;
-
-    event = remoteDomainReadEvent(conn, xdr);
-    if (!event)
-        return;
-
-    if (virDomainEventQueuePush(priv->domainEvents,
-                                event) < 0)
-        DEBUG0("Error adding event to queue");
-
-    virDomainEventFree(event);
-}
-
-/** remoteDomainEventFired:
- *
- * The callback for monitoring the remote socket
- * for event data
- */
-void
-remoteDomainEventFired(int watch,
-                       int fd,
-                       int event,
-                       void *opaque)
-{
-    char buffer[REMOTE_MESSAGE_MAX];
-    char buffer2[4];
-    struct remote_message_header hdr;
-    XDR xdr;
-    int len;
-
-    virConnectPtr        conn = opaque;
-    struct private_data *priv = conn->privateData;
-
-    remoteDriverLock(priv);
-
-    DEBUG("Event fired %d %d %d %X", watch, fd, event, event);
-
-    if (event & (VIR_EVENT_HANDLE_HANGUP | VIR_EVENT_HANDLE_ERROR)) {
-         DEBUG("%s : VIR_EVENT_HANDLE_HANGUP or "
-               "VIR_EVENT_HANDLE_ERROR encountered", __FUNCTION__);
-         virEventRemoveHandle(watch);
-         goto done;
-    }
-
-    if (fd != priv->sock) {
-        virEventRemoveHandle(watch);
-        goto done;
-    }
-
-    /* Read and deserialise length word. */
-    if (really_read (conn, priv, 0, buffer2, sizeof buffer2) == -1)
-        goto done;
-
-    xdrmem_create (&xdr, buffer2, sizeof buffer2, XDR_DECODE);
-    if (!xdr_int (&xdr, &len)) {
-        error (conn, VIR_ERR_RPC, _("xdr_int (length word, reply)"));
-        goto done;
-    }
-    xdr_destroy (&xdr);
-
-    /* Length includes length word - adjust to real length to read. */
-    len -= 4;
-
-    if (len < 0 || len > REMOTE_MESSAGE_MAX) {
-        error (conn, VIR_ERR_RPC, _("packet received from server too large"));
-        goto done;
-    }
-
-    /* Read reply header and what follows (either a ret or an error). */
-    if (really_read (conn, priv, 0, buffer, len) == -1) {
-        error (conn, VIR_ERR_RPC, _("error reading buffer from memory"));
-        goto done;
-    }
-
-    /* Deserialise reply header. */
-    xdrmem_create (&xdr, buffer, len, XDR_DECODE);
-    if (!xdr_remote_message_header (&xdr, &hdr)) {
-        error (conn, VIR_ERR_RPC, _("invalid header in event firing"));
-        goto done;
-    }
-
-    if (hdr.proc == REMOTE_PROC_DOMAIN_EVENT &&
-        hdr.direction == REMOTE_MESSAGE) {
-        DEBUG0("Encountered an async event");
-        remoteDomainProcessEvent(conn, &xdr);
-    } else {
-        DEBUG0("invalid proc in event firing");
-        error (conn, VIR_ERR_RPC, _("invalid proc in event firing"));
-    }
-
-done:
-    remoteDriverUnlock(priv);
-}
-
-void
-remoteDomainEventQueueFlush(int timer ATTRIBUTE_UNUSED, void *opaque)
-{
-    virConnectPtr conn = opaque;
-    struct private_data *priv = conn->privateData;
-
-    remoteDriverLock(priv);
-
-    virDomainEventQueueDispatch(priv->domainEvents, priv->callbackList,
-                                virDomainEventDispatchDefaultFunc, NULL);
-    virEventUpdateTimeout(priv->eventFlushTimer, -1);
-
-    remoteDriverUnlock(priv);
-}
diff --git a/src/util.c b/src/util.c
--- a/src/util.c
+++ b/src/util.c
@@ -167,7 +167,7 @@ static int virSetCloseExec(int fd) {
     return 0;
 }
 
-static int virSetNonBlock(int fd) {
+int virSetNonBlock(int fd) {
     int flags;
     if ((flags = fcntl(fd, F_GETFL)) < 0)
         return -1;
diff --git a/src/util.h b/src/util.h
--- a/src/util.h
+++ b/src/util.h
@@ -36,6 +36,8 @@ enum {
     VIR_EXEC_NONBLOCK = (1 << 0),
     VIR_EXEC_DAEMON = (1 << 1),
 };
+
+int virSetNonBlock(int fd);
 
 int virExec(virConnectPtr conn,
             const char *const*argv,
diff --git a/src/virsh.c b/src/virsh.c
--- a/src/virsh.c
+++ b/src/virsh.c
@@ -6572,12 +6572,27 @@ _vshStrdup(vshControl *ctl, const char *
     return NULL;
 }
 
+static 
+void *evil(void *data)
+{
+    virConnectPtr conn = data;
+
+    while (1) {
+        virNodeInfo ni;
+        virNodeGetInfo(conn, &ni);
+        fprintf(stderr, "BG %d %d %d %lu\n", ni.cores, ni.threads, ni.nodes, ni.memory);
+        sleep(2);
+    }
+}
+
 /*
  * Initialize connection.
  */
 static int
 vshInit(vshControl *ctl)
 {
+    pthread_t t;
+    virConnectPtr cloned;
     if (ctl->conn)
         return FALSE;
 
@@ -6590,6 +6605,9 @@ vshInit(vshControl *ctl)
                                    virConnectAuthPtrDefault,
                                    ctl->readonly ? VIR_CONNECT_RO : 0);
 
+
+    cloned = virConnectClone(ctl->conn);
+    pthread_create(&t, NULL, evil, cloned);
 
     /* This is not necessarily fatal.  All the individual commands check
      * vshConnectionUsability, except ones which don't need a connection


-- 
|: Red Hat, Engineering, London   -o-   http://people.redhat.com/berrange/ :|
|: http://libvirt.org  -o-  http://virt-manager.org  -o-  http://ovirt.org :|
|: http://autobuild.org       -o-         http://search.cpan.org/~danberr/ :|
|: GnuPG: 7D3B9505  -o-  F3C9 553F A1DA 4AC2 5648 23C1 B3DF F742 7D3B 9505 :|




More information about the libvir-list mailing list