[libvirt] [PATCH v4 08/13] Implement keepalive protocol in remote driver

Jiri Denemark jdenemar at redhat.com
Thu Oct 27 16:05:44 UTC 2011


---
Notes:
    ACKed

    Version 4:
    - no changes
    
    Version 3:
    - remoteStartKeepAlive renamed as remoteSetKeepAlive
    - clients that implement event loop are required to run it, thus
      keepalive is enabled if event loop implementation is found without
      the need to call remoteAllowKeepAlive (which was dropped)
    - keepalive support is advertised to a server implicitly by asking for
      keepalive support between authentication and virConnectOpen
    
    Version 2:
    - no changes

 src/remote/remote_driver.c |   52 +++++++++++++++++++++++++++
 src/rpc/virnetclient.c     |   83 +++++++++++++++++++++++++++++++++++++++++--
 src/rpc/virnetclient.h     |    5 +++
 3 files changed, 136 insertions(+), 4 deletions(-)

diff --git a/src/remote/remote_driver.c b/src/remote/remote_driver.c
index e98ebd7..f99c32d 100644
--- a/src/remote/remote_driver.c
+++ b/src/remote/remote_driver.c
@@ -68,6 +68,7 @@
 #endif
 
 static int inside_daemon = 0;
+static virDriverPtr remoteDriver = NULL;
 
 struct private_data {
     virMutex lock;
@@ -84,6 +85,7 @@ struct private_data {
     char *type;                 /* Cached return from remoteType. */
     int localUses;              /* Ref count for private data */
     char *hostname;             /* Original hostname */
+    bool serverKeepAlive;       /* Does server support keepalive protocol? */
 
     virDomainEventStatePtr domainEventState;
 };
@@ -663,6 +665,26 @@ doRemoteOpen (virConnectPtr conn,
     if (remoteAuthenticate(conn, priv, auth, authtype) == -1)
         goto failed;
 
+    if (virNetClientKeepAliveIsSupported(priv->client)) {
+        remote_supports_feature_args args =
+            { VIR_DRV_FEATURE_PROGRAM_KEEPALIVE };
+        remote_supports_feature_ret ret = { 0 };
+        int rc;
+
+        rc = call(conn, priv, 0, REMOTE_PROC_SUPPORTS_FEATURE,
+                  (xdrproc_t)xdr_remote_supports_feature_args, (char *) &args,
+                  (xdrproc_t)xdr_remote_supports_feature_ret, (char *) &ret);
+        if (rc == -1)
+            goto failed;
+
+        if (ret.supported) {
+            priv->serverKeepAlive = true;
+        } else {
+            VIR_WARN("Disabling keepalive protocol since it is not supported"
+                     " by the server");
+        }
+    }
+
     /* Finally we can call the remote side's open function. */
     {
         remote_open_args args = { &name, flags };
@@ -4122,6 +4144,33 @@ done:
 }
 
 
+static int
+remoteSetKeepAlive(virConnectPtr conn, int interval, unsigned int count)
+{
+    struct private_data *priv = conn->privateData;
+    int ret = -1;
+
+    remoteDriverLock(priv);
+    if (!virNetClientKeepAliveIsSupported(priv->client)) {
+        remoteError(VIR_ERR_INTERNAL_ERROR, "%s",
+                    _("the caller doesn't support keepalive protocol;"
+                      " perhaps it's missing event loop implementation"));
+        goto cleanup;
+    }
+
+    if (!priv->serverKeepAlive) {
+        ret = 1;
+        goto cleanup;
+    }
+
+    ret = virNetClientKeepAliveStart(priv->client, interval, count);
+
+cleanup:
+    remoteDriverUnlock(priv);
+    return ret;
+}
+
+
 #include "remote_client_bodies.h"
 #include "qemu_client_bodies.h"
 
@@ -4474,6 +4523,7 @@ static virDriver remote_driver = {
     .domainGetBlockJobInfo = remoteDomainGetBlockJobInfo, /* 0.9.4 */
     .domainBlockJobSetSpeed = remoteDomainBlockJobSetSpeed, /* 0.9.4 */
     .domainBlockPull = remoteDomainBlockPull, /* 0.9.4 */
+    .setKeepAlive = remoteSetKeepAlive, /* 0.9.7 */
 };
 
 static virNetworkDriver network_driver = {
@@ -4624,6 +4674,8 @@ static virStateDriver state_driver = {
 int
 remoteRegister (void)
 {
+    remoteDriver = &remote_driver;
+
     if (virRegisterDriver (&remote_driver) == -1) return -1;
     if (virRegisterNetworkDriver (&network_driver) == -1) return -1;
     if (virRegisterInterfaceDriver (&interface_driver) == -1) return -1;
diff --git a/src/rpc/virnetclient.c b/src/rpc/virnetclient.c
index aaf072a..37e9cc8 100644
--- a/src/rpc/virnetclient.c
+++ b/src/rpc/virnetclient.c
@@ -29,6 +29,7 @@
 
 #include "virnetclient.h"
 #include "virnetsocket.h"
+#include "virkeepalive.h"
 #include "memory.h"
 #include "threads.h"
 #include "virfile.h"
@@ -96,11 +97,12 @@ struct _virNetClient {
     size_t nstreams;
     virNetClientStreamPtr *streams;
 
+    virKeepAlivePtr keepalive;
     bool wantClose;
 };
 
 
-void virNetClientRequestClose(virNetClientPtr client);
+static void virNetClientRequestClose(virNetClientPtr client);
 
 static int virNetClientSendInternal(virNetClientPtr client,
                                     virNetMessagePtr msg,
@@ -130,11 +132,51 @@ static void virNetClientEventFree(void *opaque)
     virNetClientFree(client);
 }
 
+bool
+virNetClientKeepAliveIsSupported(virNetClientPtr client)
+{
+    bool supported;
+
+    virNetClientLock(client);
+    supported = !!client->keepalive;
+    virNetClientUnlock(client);
+
+    return supported;
+}
+
+int
+virNetClientKeepAliveStart(virNetClientPtr client,
+                           int interval,
+                           unsigned int count)
+{
+    int ret;
+
+    virNetClientLock(client);
+    ret = virKeepAliveStart(client->keepalive, interval, count);
+    virNetClientUnlock(client);
+
+    return ret;
+}
+
+static void
+virNetClientKeepAliveDeadCB(void *opaque)
+{
+    virNetClientRequestClose(opaque);
+}
+
+static int
+virNetClientKeepAliveSendCB(void *opaque,
+                            virNetMessagePtr msg)
+{
+    return virNetClientSendInternal(opaque, msg, false, true);
+}
+
 static virNetClientPtr virNetClientNew(virNetSocketPtr sock,
                                        const char *hostname)
 {
     virNetClientPtr client = NULL;
     int wakeupFD[2] = { -1, -1 };
+    virKeepAlivePtr ka = NULL;
 
     if (pipe2(wakeupFD, O_CLOEXEC) < 0) {
         virReportSystemError(errno, "%s",
@@ -167,13 +209,24 @@ static virNetClientPtr virNetClientNew(virNetSocketPtr sock,
                                   client,
                                   virNetClientEventFree) < 0) {
         client->refs--;
-        VIR_DEBUG("Failed to add event watch, disabling events");
+        VIR_DEBUG("Failed to add event watch, disabling events and support for"
+                  " keepalive messages");
+    } else {
+        /* Keepalive protocol consists of async messages so it can only be used
+         * if the client supports them */
+        if (!(ka = virKeepAliveNew(-1, 0, client,
+                                   virNetClientKeepAliveSendCB,
+                                   virNetClientKeepAliveDeadCB,
+                                   virNetClientEventFree)))
+            goto error;
+        /* keepalive object has a reference to client */
+        client->refs++;
     }
 
+    client->keepalive = ka;
     PROBE(RPC_CLIENT_NEW,
           "client=%p refs=%d sock=%p",
           client, client->refs, client->sock);
-
     return client;
 
 no_memory:
@@ -181,6 +234,10 @@ no_memory:
 error:
     VIR_FORCE_CLOSE(wakeupFD[0]);
     VIR_FORCE_CLOSE(wakeupFD[1]);
+    if (ka) {
+        virKeepAliveStop(ka);
+        virKeepAliveFree(ka);
+    }
     virNetClientFree(client);
     return NULL;
 }
@@ -314,6 +371,8 @@ void virNetClientFree(virNetClientPtr client)
 static void
 virNetClientCloseLocked(virNetClientPtr client)
 {
+    virKeepAlivePtr ka;
+
     VIR_DEBUG("client=%p, sock=%p", client, client->sock);
 
     if (!client->sock)
@@ -328,7 +387,20 @@ virNetClientCloseLocked(virNetClientPtr client)
     virNetSASLSessionFree(client->sasl);
     client->sasl = NULL;
 #endif
+    ka = client->keepalive;
+    client->keepalive = NULL;
     client->wantClose = false;
+
+    if (ka) {
+        client->refs++;
+        virNetClientUnlock(client);
+
+        virKeepAliveStop(ka);
+        virKeepAliveFree(ka);
+
+        virNetClientLock(client);
+        client->refs--;
+    }
 }
 
 void virNetClientClose(virNetClientPtr client)
@@ -341,7 +413,7 @@ void virNetClientClose(virNetClientPtr client)
     virNetClientUnlock(client);
 }
 
-void
+static void
 virNetClientRequestClose(virNetClientPtr client)
 {
     VIR_DEBUG("client=%p", client);
@@ -743,6 +815,9 @@ virNetClientCallDispatch(virNetClientPtr client)
           client->msg.header.prog, client->msg.header.vers, client->msg.header.proc,
           client->msg.header.type, client->msg.header.status, client->msg.header.serial);
 
+    if (virKeepAliveCheckMessage(client->keepalive, &client->msg))
+        return 0;
+
     switch (client->msg.header.type) {
     case VIR_NET_REPLY: /* Normal RPC replies */
         return virNetClientCallDispatchReply(client);
diff --git a/src/rpc/virnetclient.h b/src/rpc/virnetclient.h
index 1fabcfd..3227a4e 100644
--- a/src/rpc/virnetclient.h
+++ b/src/rpc/virnetclient.h
@@ -87,4 +87,9 @@ int virNetClientGetTLSKeySize(virNetClientPtr client);
 void virNetClientFree(virNetClientPtr client);
 void virNetClientClose(virNetClientPtr client);
 
+bool virNetClientKeepAliveIsSupported(virNetClientPtr client);
+int virNetClientKeepAliveStart(virNetClientPtr client,
+                               int interval,
+                               unsigned int count);
+
 #endif /* __VIR_NET_CLIENT_H__ */
-- 
1.7.7.1




More information about the libvir-list mailing list