[libvirt] [PATCH v3 05/13] Implement keepalive protocol in libvirt daemon

Jiri Denemark jdenemar at redhat.com
Wed Oct 12 05:16:23 UTC 2011


---
Notes:
    Version 3:
    - keepalive_supported configuration option can be used to refuse
      connections from clients that do not support keepalive protocol
    - explain what keepalive_interval = -1 means
    - start up the keepalive protocol when a client asks if we support
      it (clients without keepalive support do not ask for it)
    - add filters to the end of the list so that they are processed
      in the same order they were added (and not in reverse order); as
      a result of that the keepalive filter will always be the first one
      and libvirtd will not send keepalive requests while client is
      sending stream packets
    
    Version 2:
    - no change

 daemon/libvirtd.aug          |    5 ++
 daemon/libvirtd.c            |   15 +++++
 daemon/libvirtd.conf         |   22 +++++++
 daemon/libvirtd.h            |    1 +
 daemon/remote.c              |   48 ++++++++++++++-
 src/libvirt_private.syms     |    2 +
 src/remote/remote_protocol.x |    2 +-
 src/rpc/virnetserver.c       |   22 +++++++
 src/rpc/virnetserver.h       |    5 ++
 src/rpc/virnetserverclient.c |  143 ++++++++++++++++++++++++++++++++++++++---
 src/rpc/virnetserverclient.h |    7 ++
 11 files changed, 259 insertions(+), 13 deletions(-)

diff --git a/daemon/libvirtd.aug b/daemon/libvirtd.aug
index ce00db5..9d78bd7 100644
--- a/daemon/libvirtd.aug
+++ b/daemon/libvirtd.aug
@@ -66,6 +66,10 @@ module Libvirtd =
    let auditing_entry = int_entry "audit_level"
                       | bool_entry "audit_logging"
 
+   let keepalive_entry = int_entry "keepalive_interval"
+                       | int_entry "keepalive_count"
+                       | bool_entry "keepalive_required"
+
    (* Each enty in the config is one of the following three ... *)
    let entry = network_entry
              | sock_acl_entry
@@ -75,6 +79,7 @@ module Libvirtd =
              | processing_entry
              | logging_entry
              | auditing_entry
+             | keepalive_entry
    let comment = [ label "#comment" . del /#[ \t]*/ "# " .  store /([^ \t\n][^\n]*)?/ . del /\n/ "\n" ]
    let empty = [ label "#empty" . eol ]
 
diff --git a/daemon/libvirtd.c b/daemon/libvirtd.c
index d1bc3dd..13fba64 100644
--- a/daemon/libvirtd.c
+++ b/daemon/libvirtd.c
@@ -146,6 +146,10 @@ struct daemonConfig {
 
     int audit_level;
     int audit_logging;
+
+    int keepalive_interval;
+    unsigned int keepalive_count;
+    int keepalive_required;
 };
 
 enum {
@@ -899,6 +903,10 @@ daemonConfigNew(bool privileged ATTRIBUTE_UNUSED)
     data->audit_level = 1;
     data->audit_logging = 0;
 
+    data->keepalive_interval = 5;
+    data->keepalive_count = 5;
+    data->keepalive_required = 0;
+
     localhost = virGetHostname(NULL);
     if (localhost == NULL) {
         /* we couldn't resolve the hostname; assume that we are
@@ -1062,6 +1070,10 @@ daemonConfigLoad(struct daemonConfig *data,
     GET_CONF_STR (conf, filename, log_outputs);
     GET_CONF_INT (conf, filename, log_buffer_size);
 
+    GET_CONF_INT (conf, filename, keepalive_interval);
+    GET_CONF_INT (conf, filename, keepalive_count);
+    GET_CONF_INT (conf, filename, keepalive_required);
+
     virConfFree (conf);
     return 0;
 
@@ -1452,6 +1464,9 @@ int main(int argc, char **argv) {
                                 config->max_workers,
                                 config->prio_workers,
                                 config->max_clients,
+                                config->keepalive_interval,
+                                config->keepalive_count,
+                                !!config->keepalive_required,
                                 config->mdns_adv ? config->mdns_name : NULL,
                                 use_polkit_dbus,
                                 remoteClientInitHook))) {
diff --git a/daemon/libvirtd.conf b/daemon/libvirtd.conf
index da3983e..5d1e011 100644
--- a/daemon/libvirtd.conf
+++ b/daemon/libvirtd.conf
@@ -366,3 +366,25 @@
 # it with the output of the 'uuidgen' command and then
 # uncomment this entry
 #host_uuid = "00000000-0000-0000-0000-000000000000"
+
+###################################################################
+# Keepalive protocol:
+# This allows libvirtd to detect broken client connections or even
+# dead client.  A keepalive message is sent to a client after
+# keepalive_interval seconds of inactivity to check if the client is
+# still responding; keepalive_count is a maximum number of keepalive
+# messages that are allowed to be sent to the client without getting
+# any response before the connection is considered broken.  In other
+# words, the connection is automatically closed approximately after
+# keepalive_interval * (keepalive_count + 1) seconds since the last
+# message received from the client.  If keepalive_interval is set to
+# -1, libvirtd will never send keepalive requests; however clients
+# can still send them and the deamon will send responses.
+#
+#keepalive_interval = 5
+#keepalive_count = 5
+#
+# If set to 1, libvirtd will refuse to talk to clients that do not
+# support keepalive protocol.  Defaults to 0.
+#
+#keepalive_required = 1
diff --git a/daemon/libvirtd.h b/daemon/libvirtd.h
index ecb7374..e02d7b8 100644
--- a/daemon/libvirtd.h
+++ b/daemon/libvirtd.h
@@ -62,6 +62,7 @@ struct daemonClientPrivate {
     virConnectPtr conn;
 
     daemonClientStreamPtr streams;
+    bool keepalive_supported;
 };
 
 # if HAVE_SASL
diff --git a/daemon/remote.c b/daemon/remote.c
index 550bed4..b33c619 100644
--- a/daemon/remote.c
+++ b/daemon/remote.c
@@ -533,7 +533,7 @@ int remoteClientInitHook(virNetServerPtr srv ATTRIBUTE_UNUSED,
 /*----- Functions. -----*/
 
 static int
-remoteDispatchOpen(virNetServerPtr server ATTRIBUTE_UNUSED,
+remoteDispatchOpen(virNetServerPtr server,
                    virNetServerClientPtr client,
                    virNetMessageHeaderPtr hdr ATTRIBUTE_UNUSED,
                    virNetMessageErrorPtr rerr,
@@ -552,6 +552,12 @@ remoteDispatchOpen(virNetServerPtr server ATTRIBUTE_UNUSED,
         goto cleanup;
     }
 
+    if (virNetServerKeepAliveRequired(server) && !priv->keepalive_supported) {
+        virNetError(VIR_ERR_OPERATION_FAILED, "%s",
+                    _("keepalive support is required to connect"));
+        goto cleanup;
+    }
+
     name = args->name ? *args->name : NULL;
 
     /* If this connection arrived on a readonly socket, force
@@ -3124,6 +3130,46 @@ cleanup:
 }
 
 
+static int
+remoteDispatchSupportsFeature(virNetServerPtr server ATTRIBUTE_UNUSED,
+                              virNetServerClientPtr client,
+                              virNetMessageHeaderPtr hdr ATTRIBUTE_UNUSED,
+                              virNetMessageErrorPtr rerr,
+                              remote_supports_feature_args *args,
+                              remote_supports_feature_ret *ret)
+{
+    int rv = -1;
+    int supported;
+    struct daemonClientPrivate *priv =
+        virNetServerClientGetPrivateData(client);
+
+    if (args->feature == VIR_DRV_FEATURE_PROGRAM_KEEPALIVE) {
+        if (virNetServerClientStartKeepAlive(client) < 0)
+            goto cleanup;
+        supported = 1;
+        goto done;
+    }
+
+    if (!priv->conn) {
+        virNetError(VIR_ERR_INTERNAL_ERROR, "%s", _("connection not open"));
+        goto cleanup;
+    }
+
+    if ((supported = virDrvSupportsFeature(priv->conn, args->feature)) < 0)
+        goto cleanup;
+
+done:
+    ret->supported = supported;
+    rv = 0;
+
+cleanup:
+    if (rv < 0)
+        virNetMessageSaveError(rerr);
+    return rv;
+}
+
+
+
 /*----- Helpers. -----*/
 
 /* get_nonnull_domain and get_nonnull_network turn an on-wire
diff --git a/src/libvirt_private.syms b/src/libvirt_private.syms
index 11ff705..fcad1f4 100644
--- a/src/libvirt_private.syms
+++ b/src/libvirt_private.syms
@@ -1187,6 +1187,7 @@ virNetServerAutoShutdown;
 virNetServerClose;
 virNetServerFree;
 virNetServerIsPrivileged;
+virNetServerKeepAliveRequired;
 virNetServerNew;
 virNetServerQuit;
 virNetServerRef;
@@ -1219,6 +1220,7 @@ virNetServerClientSendMessage;
 virNetServerClientSetCloseHook;
 virNetServerClientSetIdentity;
 virNetServerClientSetPrivateData;
+virNetServerClientStartKeepAlive;
 
 
 # virnetserverprogram.h
diff --git a/src/remote/remote_protocol.x b/src/remote/remote_protocol.x
index f95253e..8a29b18 100644
--- a/src/remote/remote_protocol.x
+++ b/src/remote/remote_protocol.x
@@ -2340,7 +2340,7 @@ enum remote_procedure {
     REMOTE_PROC_DOMAIN_GET_SCHEDULER_PARAMETERS = 57, /* skipgen autogen */
     REMOTE_PROC_DOMAIN_SET_SCHEDULER_PARAMETERS = 58, /* autogen autogen */
     REMOTE_PROC_GET_HOSTNAME = 59, /* autogen autogen priority:high */
-    REMOTE_PROC_SUPPORTS_FEATURE = 60, /* autogen autogen priority:high */
+    REMOTE_PROC_SUPPORTS_FEATURE = 60, /* skipgen autogen priority:high */
 
     REMOTE_PROC_DOMAIN_MIGRATE_PREPARE = 61, /* skipgen skipgen */
     REMOTE_PROC_DOMAIN_MIGRATE_PERFORM = 62, /* autogen autogen */
diff --git a/src/rpc/virnetserver.c b/src/rpc/virnetserver.c
index f739743..4c8273f8 100644
--- a/src/rpc/virnetserver.c
+++ b/src/rpc/virnetserver.c
@@ -102,6 +102,10 @@ struct _virNetServer {
     size_t nclients_max;
     virNetServerClientPtr *clients;
 
+    int keepaliveInterval;
+    unsigned int keepaliveCount;
+    bool keepaliveRequired;
+
     unsigned int quit :1;
 
     virNetTLSContextPtr tls;
@@ -260,6 +264,9 @@ static int virNetServerDispatchNewClient(virNetServerServicePtr svc ATTRIBUTE_UN
                                     virNetServerDispatchNewMessage,
                                     srv);
 
+    virNetServerClientInitKeepAlive(client, srv->keepaliveInterval,
+                                    srv->keepaliveCount);
+
     virNetServerUnlock(srv);
     return 0;
 
@@ -299,6 +306,9 @@ virNetServerPtr virNetServerNew(size_t min_workers,
                                 size_t max_workers,
                                 size_t priority_workers,
                                 size_t max_clients,
+                                int keepaliveInterval,
+                                unsigned int keepaliveCount,
+                                bool keepaliveRequired,
                                 const char *mdnsGroupName,
                                 bool connectDBus ATTRIBUTE_UNUSED,
                                 virNetServerClientInitHook clientInitHook)
@@ -320,6 +330,9 @@ virNetServerPtr virNetServerNew(size_t min_workers,
         goto error;
 
     srv->nclients_max = max_clients;
+    srv->keepaliveInterval = keepaliveInterval;
+    srv->keepaliveCount = keepaliveCount;
+    srv->keepaliveRequired = keepaliveRequired;
     srv->sigwrite = srv->sigread = -1;
     srv->clientInitHook = clientInitHook;
     srv->privileged = geteuid() == 0 ? true : false;
@@ -839,3 +852,12 @@ void virNetServerClose(virNetServerPtr srv)
 
     virNetServerUnlock(srv);
 }
+
+bool virNetServerKeepAliveRequired(virNetServerPtr srv)
+{
+    bool required;
+    virNetServerLock(srv);
+    required = srv->keepaliveRequired;
+    virNetServerUnlock(srv);
+    return required;
+}
diff --git a/src/rpc/virnetserver.h b/src/rpc/virnetserver.h
index cc9d039..a04ffdd 100644
--- a/src/rpc/virnetserver.h
+++ b/src/rpc/virnetserver.h
@@ -41,6 +41,9 @@ virNetServerPtr virNetServerNew(size_t min_workers,
                                 size_t max_workers,
                                 size_t priority_workers,
                                 size_t max_clients,
+                                int keepaliveInterval,
+                                unsigned int keepaliveCount,
+                                bool keepaliveRequired,
                                 const char *mdnsGroupName,
                                 bool connectDBus,
                                 virNetServerClientInitHook clientInitHook);
@@ -88,4 +91,6 @@ void virNetServerFree(virNetServerPtr srv);
 
 void virNetServerClose(virNetServerPtr srv);
 
+bool virNetServerKeepAliveRequired(virNetServerPtr srv);
+
 #endif
diff --git a/src/rpc/virnetserverclient.c b/src/rpc/virnetserverclient.c
index 05077d6..c5641ef 100644
--- a/src/rpc/virnetserverclient.c
+++ b/src/rpc/virnetserverclient.c
@@ -33,6 +33,7 @@
 #include "virterror_internal.h"
 #include "memory.h"
 #include "threads.h"
+#include "virkeepalive.h"
 
 #define VIR_FROM_THIS VIR_FROM_RPC
 #define virNetError(code, ...)                                    \
@@ -98,6 +99,9 @@ struct _virNetServerClient
     void *privateData;
     virNetServerClientFreeFunc privateDataFreeFunc;
     virNetServerClientCloseFunc privateDataCloseFunc;
+
+    virKeepAlivePtr keepalive;
+    int keepaliveFilter;
 };
 
 
@@ -207,15 +211,15 @@ static void virNetServerClientUpdateEvent(virNetServerClientPtr client)
 }
 
 
-int virNetServerClientAddFilter(virNetServerClientPtr client,
-                                virNetServerClientFilterFunc func,
-                                void *opaque)
+static int
+virNetServerClientAddFilterLocked(virNetServerClientPtr client,
+                                  virNetServerClientFilterFunc func,
+                                  void *opaque)
 {
     virNetServerClientFilterPtr filter;
+    virNetServerClientFilterPtr *place;
     int ret = -1;
 
-    virNetServerClientLock(client);
-
     if (VIR_ALLOC(filter) < 0) {
         virReportOOMError();
         goto cleanup;
@@ -225,22 +229,34 @@ int virNetServerClientAddFilter(virNetServerClientPtr client,
     filter->func = func;
     filter->opaque = opaque;
 
-    filter->next = client->filters;
-    client->filters = filter;
+    place = &client->filters;
+    while (*place)
+        place = &(*place)->next;
+    *place = filter;
 
     ret = filter->id;
 
 cleanup:
-    virNetServerClientUnlock(client);
     return ret;
 }
 
+int virNetServerClientAddFilter(virNetServerClientPtr client,
+                                virNetServerClientFilterFunc func,
+                                void *opaque)
+{
+    int ret;
 
-void virNetServerClientRemoveFilter(virNetServerClientPtr client,
-                                    int filterID)
+    virNetServerClientLock(client);
+    ret = virNetServerClientAddFilterLocked(client, func, opaque);
+    virNetServerClientUnlock(client);
+    return ret;
+}
+
+static void
+virNetServerClientRemoveFilterLocked(virNetServerClientPtr client,
+                                     int filterID)
 {
     virNetServerClientFilterPtr tmp, prev;
-    virNetServerClientLock(client);
 
     prev = NULL;
     tmp = client->filters;
@@ -257,7 +273,13 @@ void virNetServerClientRemoveFilter(virNetServerClientPtr client,
         prev = tmp;
         tmp = tmp->next;
     }
+}
 
+void virNetServerClientRemoveFilter(virNetServerClientPtr client,
+                                    int filterID)
+{
+    virNetServerClientLock(client);
+    virNetServerClientRemoveFilterLocked(client, filterID);
     virNetServerClientUnlock(client);
 }
 
@@ -318,6 +340,7 @@ virNetServerClientPtr virNetServerClientNew(virNetSocketPtr sock,
     client->readonly = readonly;
     client->tlsCtxt = tls;
     client->nrequests_max = nrequests_max;
+    client->keepaliveFilter = -1;
 
     if (tls)
         virNetTLSContextRef(tls);
@@ -577,6 +600,7 @@ void virNetServerClientFree(virNetServerClientPtr client)
 void virNetServerClientClose(virNetServerClientPtr client)
 {
     virNetServerClientCloseFunc cf;
+    virKeepAlivePtr ka;
 
     virNetServerClientLock(client);
     VIR_DEBUG("client=%p refs=%d", client, client->refs);
@@ -585,6 +609,20 @@ void virNetServerClientClose(virNetServerClientPtr client)
         return;
     }
 
+    if (client->keepaliveFilter >= 0)
+        virNetServerClientRemoveFilterLocked(client, client->keepaliveFilter);
+
+    if (client->keepalive) {
+        virKeepAliveStop(client->keepalive);
+        ka = client->keepalive;
+        client->keepalive = NULL;
+        client->refs++;
+        virNetServerClientUnlock(client);
+        virKeepAliveFree(ka);
+        virNetServerClientLock(client);
+        client->refs--;
+    }
+
     if (client->privateDataCloseFunc) {
         cf = client->privateDataCloseFunc;
         client->refs++;
@@ -988,6 +1026,7 @@ int virNetServerClientSendMessage(virNetServerClientPtr client,
     VIR_DEBUG("msg=%p proc=%d len=%zu offset=%zu",
               msg, msg->header.proc,
               msg->bufferLength, msg->bufferOffset);
+
     virNetServerClientLock(client);
 
     if (client->sock && !client->wantClose) {
@@ -1003,6 +1042,7 @@ int virNetServerClientSendMessage(virNetServerClientPtr client,
     }
 
     virNetServerClientUnlock(client);
+
     return ret;
 }
 
@@ -1016,3 +1056,84 @@ bool virNetServerClientNeedAuth(virNetServerClientPtr client)
     virNetServerClientUnlock(client);
     return need;
 }
+
+
+static void
+virNetServerClientKeepAliveDeadCB(void *opaque)
+{
+    virNetServerClientImmediateClose(opaque);
+}
+
+static int
+virNetServerClientKeepAliveSendCB(void *opaque,
+                                  virNetMessagePtr msg)
+{
+    return virNetServerClientSendMessage(opaque, msg);
+}
+
+static void
+virNetServerClientFreeCB(void *opaque)
+{
+    virNetServerClientFree(opaque);
+}
+
+static int
+virNetServerClientKeepAliveFilter(virNetServerClientPtr client,
+                                  virNetMessagePtr msg,
+                                  void *opaque ATTRIBUTE_UNUSED)
+{
+    if (virKeepAliveCheckMessage(client->keepalive, msg)) {
+        virNetMessageFree(msg);
+        client->nrequests--;
+        return 1;
+    }
+
+    return 0;
+}
+
+int
+virNetServerClientInitKeepAlive(virNetServerClientPtr client,
+                                int interval,
+                                unsigned int count)
+{
+    virKeepAlivePtr ka;
+    int ret = -1;
+
+    virNetServerClientLock(client);
+
+    if (!(ka = virKeepAliveNew(interval, count, client,
+                               virNetServerClientKeepAliveSendCB,
+                               virNetServerClientKeepAliveDeadCB,
+                               virNetServerClientFreeCB)))
+        goto cleanup;
+    /* keepalive object has a reference to client */
+    client->refs++;
+
+    client->keepaliveFilter =
+        virNetServerClientAddFilterLocked(client,
+                                          virNetServerClientKeepAliveFilter,
+                                          NULL);
+    if (client->keepaliveFilter < 0)
+        goto cleanup;
+
+    client->keepalive = ka;
+    ka = NULL;
+
+cleanup:
+    virNetServerClientUnlock(client);
+    if (ka)
+        virKeepAliveStop(ka);
+    virKeepAliveFree(ka);
+
+    return ret;
+}
+
+int
+virNetServerClientStartKeepAlive(virNetServerClientPtr client)
+{
+    int ret;
+    virNetServerClientLock(client);
+    ret = virKeepAliveStart(client->keepalive, 0, 0);
+    virNetServerClientUnlock(client);
+    return ret;
+}
diff --git a/src/rpc/virnetserverclient.h b/src/rpc/virnetserverclient.h
index bedb179..a201dca 100644
--- a/src/rpc/virnetserverclient.h
+++ b/src/rpc/virnetserverclient.h
@@ -99,6 +99,13 @@ bool virNetServerClientWantClose(virNetServerClientPtr client);
 
 int virNetServerClientInit(virNetServerClientPtr client);
 
+int virNetServerClientInitKeepAlive(virNetServerClientPtr client,
+                                    int interval,
+                                    unsigned int count);
+bool virNetServerClientCheckKeepAlive(virNetServerClientPtr client,
+                                      virNetMessagePtr msg);
+int virNetServerClientStartKeepAlive(virNetServerClientPtr client);
+
 const char *virNetServerClientLocalAddrString(virNetServerClientPtr client);
 const char *virNetServerClientRemoteAddrString(virNetServerClientPtr client);
 
-- 
1.7.7




More information about the libvir-list mailing list