[libvirt] [PATCH 07/10] Introduce generic RPC server objects

Daniel P. Berrange berrange at redhat.com
Tue May 24 16:31:35 UTC 2011


To facilitate creation of new daemons providing XDR RPC services,
pull alot of the libvirtd daemon code into a set of reusable
objects.

 * virNetServer: A server contains one or more services which
   accept incoming clients. It maintains the list of active
   clients. It has a list of RPC programs which can be used
   by clients. When clients produce a complete RPC message,
   the server passes this onto the corresponding program for
   handling, and queues any response back with the client.

 * virNetServerClient: Encapsulates a single client connection.
   All I/O for the client is handled, reading & writing RPC
   messages.

 * virNetServerProgram: Handles processing and dispatch of
   RPC method calls for a single RPC (program,version).
   Multiple programs can be registered with the server.

 * virNetServerService: Encapsulates socket(s) listening for
   new connections. Each service listens on a single host/port,
   but may have multiple sockets if on a dual IPv4/6 host.

Each new daemon now merely has to define the list of RPC procedures
& their handlers. It does not need to deal with any network related
functionality at all.
---
 cfg.mk                        |    4 +
 po/POTFILES.in                |    3 +
 src/Makefile.am               |   17 +-
 src/rpc/virnetserver.c        |  714 +++++++++++++++++++++++++++++++
 src/rpc/virnetserver.h        |   80 ++++
 src/rpc/virnetserverclient.c  |  937 +++++++++++++++++++++++++++++++++++++++++
 src/rpc/virnetserverclient.h  |  106 +++++
 src/rpc/virnetserverprogram.c |  455 ++++++++++++++++++++
 src/rpc/virnetserverprogram.h |  107 +++++
 src/rpc/virnetserverservice.c |  247 +++++++++++
 src/rpc/virnetserverservice.h |   65 +++
 11 files changed, 2734 insertions(+), 1 deletions(-)
 create mode 100644 src/rpc/virnetserver.c
 create mode 100644 src/rpc/virnetserver.h
 create mode 100644 src/rpc/virnetserverclient.c
 create mode 100644 src/rpc/virnetserverclient.h
 create mode 100644 src/rpc/virnetserverprogram.c
 create mode 100644 src/rpc/virnetserverprogram.h
 create mode 100644 src/rpc/virnetserverservice.c
 create mode 100644 src/rpc/virnetserverservice.h

diff --git a/cfg.mk b/cfg.mk
index d4a7387..1cd2a0f 100644
--- a/cfg.mk
+++ b/cfg.mk
@@ -126,6 +126,10 @@ useless_free_options =				\
   --name=virJSONValueFree			\
   --name=virLastErrFreeData			\
   --name=virNetMessageFree                      \
+  --name=virNetServerFree                       \
+  --name=virNetServerClientFree                 \
+  --name=virNetServerProgramFree                \
+  --name=virNetServerServiceFree                \
   --name=virNetSocketFree                       \
   --name=virNetSASLContextFree                  \
   --name=virNetSASLSessionFree                  \
diff --git a/po/POTFILES.in b/po/POTFILES.in
index 59316f1..8a0e89f 100644
--- a/po/POTFILES.in
+++ b/po/POTFILES.in
@@ -70,6 +70,9 @@ src/remote/remote_driver.c
 src/rpc/virnetmessage.c
 src/rpc/virnetsaslcontext.c
 src/rpc/virnetsocket.c
+src/rpc/virnetserver.c
+src/rpc/virnetserverclient.c
+src/rpc/virnetserverprogram.c
 src/rpc/virnettlscontext.c
 src/secret/secret_driver.c
 src/security/security_apparmor.c
diff --git a/src/Makefile.am b/src/Makefile.am
index 4907806..2b4a6e4 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -1154,7 +1154,7 @@ libvirt_qemu_la_LIBADD = libvirt.la $(CYGWIN_EXTRA_LIBADD)
 EXTRA_DIST += $(LIBVIRT_QEMU_SYMBOL_FILE)
 
 
-noinst_LTLIBRARIES += libvirt-net-rpc.la
+noinst_LTLIBRARIES += libvirt-net-rpc.la libvirt-net-rpc-server.la
 
 libvirt_net_rpc_la_SOURCES = \
 	rpc/virnetmessage.h rpc/virnetmessage.c \
@@ -1181,6 +1181,21 @@ libvirt_net_rpc_la_LDFLAGS = \
 libvirt_net_rpc_la_LIBADD = \
 			$(CYGWIN_EXTRA_LIBADD)
 
+libvirt_net_rpc_server_la_SOURCES = \
+	rpc/virnetserverprogram.h rpc/virnetserverprogram.c \
+	rpc/virnetserverservice.h rpc/virnetserverservice.c \
+	rpc/virnetserverclient.h rpc/virnetserverclient.c \
+	rpc/virnetserver.h rpc/virnetserver.c
+libvirt_net_rpc_server_la_CFLAGS = \
+			$(AM_CFLAGS)
+libvirt_net_rpc_server_la_LDFLAGS = \
+			$(AM_LDFLAGS) \
+			$(CYGWIN_EXTRA_LDFLAGS) \
+			$(MINGW_EXTRA_LDFLAGS)l
+libvirt_net_rpc_server_la_LIBADD = \
+			$(CYGWIN_EXTRA_LIBADD)
+
+
 libexec_PROGRAMS =
 
 if WITH_LIBVIRTD
diff --git a/src/rpc/virnetserver.c b/src/rpc/virnetserver.c
new file mode 100644
index 0000000..b71f34e
--- /dev/null
+++ b/src/rpc/virnetserver.c
@@ -0,0 +1,714 @@
+/*
+ * virnetserver.c: generic network RPC server
+ *
+ * Copyright (C) 2006-2011 Red Hat, Inc.
+ * Copyright (C) 2006 Daniel P. Berrange
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307  USA
+ *
+ * Author: Daniel P. Berrange <berrange at redhat.com>
+ */
+
+#include <config.h>
+
+#include <unistd.h>
+#include <string.h>
+
+#include "virnetserver.h"
+#include "logging.h"
+#include "memory.h"
+#include "virterror_internal.h"
+#include "threads.h"
+#include "threadpool.h"
+#include "util.h"
+#include "files.h"
+#include "event.h"
+
+#define VIR_FROM_THIS VIR_FROM_RPC
+#define virNetError(code, ...)                                    \
+    virReportErrorHelper(VIR_FROM_THIS, code, __FILE__,           \
+                         __FUNCTION__, __LINE__, __VA_ARGS__)
+
+typedef struct _virNetServerSignal virNetServerSignal;
+typedef virNetServerSignal *virNetServerSignalPtr;
+
+struct _virNetServerSignal {
+    struct sigaction oldaction;
+    int signum;
+    virNetServerSignalFunc func;
+    void *opaque;
+};
+
+typedef struct _virNetServerJob virNetServerJob;
+typedef virNetServerJob *virNetServerJobPtr;
+
+struct _virNetServerJob {
+    virNetServerClientPtr client;
+    virNetMessagePtr msg;
+};
+
+struct _virNetServer {
+    int refs;
+
+    virMutex lock;
+
+    virThreadPoolPtr workers;
+
+    bool privileged;
+
+    size_t nsignals;
+    virNetServerSignalPtr *signals;
+    int sigread;
+    int sigwrite;
+    int sigwatch;
+
+    size_t nservices;
+    virNetServerServicePtr *services;
+
+    size_t nprograms;
+    virNetServerProgramPtr *programs;
+
+    size_t nclients;
+    size_t nclients_max;
+    virNetServerClientPtr *clients;
+
+    unsigned int quit :1;
+
+    virNetTLSContextPtr tls;
+
+    unsigned int autoShutdownTimeout;
+    virNetServerAutoShutdownFunc autoShutdownFunc;
+    void *autoShutdownOpaque;
+
+    virNetServerClientInitHook clientInitHook;
+};
+
+
+static void virNetServerLock(virNetServerPtr srv)
+{
+    virMutexLock(&srv->lock);
+}
+
+static void virNetServerUnlock(virNetServerPtr srv)
+{
+    virMutexUnlock(&srv->lock);
+}
+
+
+static void virNetServerHandleJob(void *jobOpaque, void *opaque)
+{
+    virNetServerPtr srv = opaque;
+    virNetServerJobPtr job = jobOpaque;
+    virNetServerProgramPtr prog = NULL;
+    size_t i;
+
+    virNetServerClientRef(job->client);
+
+    virNetServerLock(srv);
+    VIR_DEBUG("server=%p client=%p message=%p",
+              srv, job->client, job->msg);
+
+    for (i = 0 ; i < srv->nprograms ; i++) {
+        if (virNetServerProgramMatches(srv->programs[i], job->msg)) {
+            prog = srv->programs[i];
+            break;
+        }
+    }
+
+    if (!prog) {
+        VIR_DEBUG("Cannot find program %d version %d",
+                  job->msg->header.prog,
+                  job->msg->header.vers);
+        goto error;
+    }
+
+    virNetServerProgramRef(prog);
+    virNetServerUnlock(srv);
+
+    if (virNetServerProgramDispatch(prog,
+                                    srv,
+                                    job->client,
+                                    job->msg) < 0)
+        goto error;
+
+    virNetServerLock(srv);
+    virNetServerProgramFree(prog);
+    virNetServerUnlock(srv);
+    virNetServerClientFree(job->client);
+
+    VIR_FREE(job);
+    return;
+
+error:
+    virNetServerUnlock(srv);
+    virNetServerProgramFree(prog);
+    virNetMessageFree(job->msg);
+    virNetServerClientClose(job->client);
+    virNetServerClientFree(job->client);
+    VIR_FREE(job);
+}
+
+
+static int virNetServerDispatchNewMessage(virNetServerClientPtr client,
+                                          virNetMessagePtr msg,
+                                          void *opaque)
+{
+    virNetServerPtr srv = opaque;
+    virNetServerJobPtr job;
+    int ret;
+
+    VIR_DEBUG("server=%p client=%p message=%p",
+              srv, client, msg);
+
+    if (VIR_ALLOC(job) < 0) {
+        virReportOOMError();
+        return -1;
+    }
+
+    job->client = client;
+    job->msg = msg;
+
+    virNetServerLock(srv);
+    if ((ret = virThreadPoolSendJob(srv->workers, job)) < 0)
+        VIR_FREE(job);
+    virNetServerUnlock(srv);
+
+    return ret;
+}
+
+
+static int virNetServerDispatchNewClient(virNetServerServicePtr svc ATTRIBUTE_UNUSED,
+                                         virNetServerClientPtr client,
+                                         void *opaque)
+{
+    virNetServerPtr srv = opaque;
+
+    virNetServerLock(srv);
+
+    if (srv->nclients >= srv->nclients_max) {
+        virNetError(VIR_ERR_RPC,
+                    _("Too many active clients (%zu), dropping connection from %s"),
+                    srv->nclients_max, virNetServerClientRemoteAddrString(client));
+        goto error;
+    }
+
+    if (virNetServerClientInit(client) < 0)
+        goto error;
+
+    if (srv->clientInitHook &&
+        srv->clientInitHook(srv, client) < 0)
+        goto error;
+
+    if (VIR_EXPAND_N(srv->clients, srv->nclients, 1) < 0) {
+        virReportOOMError();
+        goto error;
+    }
+    srv->clients[srv->nclients-1] = client;
+    virNetServerClientRef(client);
+
+    virNetServerClientSetDispatcher(client,
+                                    virNetServerDispatchNewMessage,
+                                    srv);
+
+    virNetServerUnlock(srv);
+    return 0;
+
+error:
+    virNetServerUnlock(srv);
+    return -1;
+}
+
+
+static void virNetServerFatalSignal(int sig, siginfo_t * siginfo ATTRIBUTE_UNUSED,
+                                    void* context ATTRIBUTE_UNUSED)
+{
+    struct sigaction sig_action;
+    int origerrno;
+
+    origerrno = errno;
+    virLogEmergencyDumpAll(sig);
+
+    /*
+     * If the signal is fatal, avoid looping over this handler
+     * by desactivating it
+     */
+#ifdef SIGUSR2
+    if (sig != SIGUSR2) {
+#endif
+        sig_action.sa_handler = SIG_IGN;
+        sigaction(sig, &sig_action, NULL);
+#ifdef SIGUSR2
+    }
+#endif
+    errno = origerrno;
+}
+
+
+virNetServerPtr virNetServerNew(size_t min_workers,
+                                size_t max_workers,
+                                size_t max_clients,
+                                virNetServerClientInitHook clientInitHook)
+{
+    virNetServerPtr srv;
+    struct sigaction sig_action;
+
+    if (VIR_ALLOC(srv) < 0) {
+        virReportOOMError();
+        return NULL;
+    }
+
+    srv->refs = 1;
+
+    if (!(srv->workers = virThreadPoolNew(min_workers, max_workers,
+                                          virNetServerHandleJob,
+                                          srv)))
+        goto error;
+
+    srv->nclients_max = max_clients;
+    srv->sigwrite = srv->sigread = -1;
+    srv->clientInitHook = clientInitHook;
+    srv->privileged = geteuid() == 0 ? true : false;
+
+    if (virMutexInit(&srv->lock) < 0) {
+        virNetError(VIR_ERR_INTERNAL_ERROR, "%s",
+                    _("cannot initialize mutex"));
+        goto error;
+    }
+
+    if (virEventRegisterDefaultImpl() < 0)
+        goto error;
+
+    memset(&sig_action, 0, sizeof(sig_action));
+    sig_action.sa_handler = SIG_IGN;
+    sigaction(SIGPIPE, &sig_action, NULL);
+
+    /*
+     * catch fatal errors to dump a log, also hook to USR2 for dynamic
+     * debugging purposes or testing
+     */
+    sig_action.sa_sigaction = virNetServerFatalSignal;
+    sigaction(SIGFPE, &sig_action, NULL);
+    sigaction(SIGSEGV, &sig_action, NULL);
+    sigaction(SIGILL, &sig_action, NULL);
+    sigaction(SIGABRT, &sig_action, NULL);
+#ifdef SIGBUS
+    sigaction(SIGBUS, &sig_action, NULL);
+#endif
+#ifdef SIGUSR2
+    sigaction(SIGUSR2, &sig_action, NULL);
+#endif
+
+    VIR_DEBUG("srv=%p refs=%d", srv, srv->refs);
+    return srv;
+
+error:
+    virNetServerFree(srv);
+    return NULL;
+}
+
+
+void virNetServerRef(virNetServerPtr srv)
+{
+    virNetServerLock(srv);
+    srv->refs++;
+    VIR_DEBUG("srv=%p refs=%d", srv, srv->refs);
+    virNetServerUnlock(srv);
+}
+
+
+bool virNetServerIsPrivileged(virNetServerPtr srv)
+{
+    bool priv;
+    virNetServerLock(srv);
+    priv = srv->privileged;
+    virNetServerUnlock(srv);
+    return priv;
+}
+
+
+void virNetServerAutoShutdown(virNetServerPtr srv,
+                              unsigned int timeout,
+                              virNetServerAutoShutdownFunc func,
+                              void *opaque)
+{
+    virNetServerLock(srv);
+
+    srv->autoShutdownTimeout = timeout;
+    srv->autoShutdownFunc = func;
+    srv->autoShutdownOpaque = opaque;
+
+    virNetServerUnlock(srv);
+}
+
+
+static sig_atomic_t sigErrors = 0;
+static int sigLastErrno = 0;
+static int sigWrite = -1;
+
+static void virNetServerSignalHandler(int sig, siginfo_t * siginfo,
+                                      void* context ATTRIBUTE_UNUSED)
+{
+    int origerrno;
+    int r;
+
+    /* set the sig num in the struct */
+    siginfo->si_signo = sig;
+
+    origerrno = errno;
+    r = safewrite(sigWrite, siginfo, sizeof(*siginfo));
+    if (r == -1) {
+        sigErrors++;
+        sigLastErrno = errno;
+    }
+    errno = origerrno;
+}
+
+static void
+virNetServerSignalEvent(int watch,
+                        int fd ATTRIBUTE_UNUSED,
+                        int events ATTRIBUTE_UNUSED,
+                        void *opaque) {
+    virNetServerPtr srv = opaque;
+    siginfo_t siginfo;
+    int i;
+
+    virNetServerLock(srv);
+
+    if (saferead(srv->sigread, &siginfo, sizeof(siginfo)) != sizeof(siginfo)) {
+        virReportSystemError(errno, "%s",
+                             _("Failed to read from signal pipe"));
+        virEventRemoveHandle(watch);
+        srv->sigwatch = -1;
+        goto cleanup;
+    }
+
+    for (i = 0 ; i < srv->nsignals ; i++) {
+        if (siginfo.si_signo == srv->signals[i]->signum) {
+            virNetServerSignalFunc func = srv->signals[i]->func;
+            void *funcopaque = srv->signals[i]->opaque;
+            virNetServerUnlock(srv);
+            func(srv, &siginfo, funcopaque);
+            return;
+        }
+    }
+
+    virNetError(VIR_ERR_INTERNAL_ERROR,
+                _("Unexpected signal received: %d"), siginfo.si_signo);
+
+cleanup:
+    virNetServerUnlock(srv);
+}
+
+static int virNetServerSignalSetup(virNetServerPtr srv)
+{
+    int fds[2];
+
+    if (srv->sigwrite != -1)
+        return 0;
+
+    if (pipe(fds) < 0) {
+        virReportSystemError(errno, "%s",
+                             _("Unable to create signal pipe"));
+        return -1;
+    }
+
+    if (virSetNonBlock(fds[0]) < 0 ||
+        virSetNonBlock(fds[1]) < 0 ||
+        virSetCloseExec(fds[0]) < 0 ||
+        virSetCloseExec(fds[1]) < 0) {
+        virReportSystemError(errno, "%s",
+                             _("Failed to setup pipe flags"));
+        goto error;
+    }
+
+    if ((srv->sigwatch = virEventAddHandle(fds[0],
+                                           VIR_EVENT_HANDLE_READABLE,
+                                           virNetServerSignalEvent,
+                                           srv, NULL)) < 0) {
+        virNetError(VIR_ERR_INTERNAL_ERROR, "%s",
+                    _("Failed to add signal handle watch"));
+        goto error;
+    }
+
+    srv->sigread = fds[0];
+    srv->sigwrite = fds[1];
+    sigWrite = fds[1];
+
+    return 0;
+
+error:
+    VIR_FORCE_CLOSE(fds[0]);
+    VIR_FORCE_CLOSE(fds[1]);
+    return -1;
+}
+
+int virNetServerAddSignalHandler(virNetServerPtr srv,
+                                 int signum,
+                                 virNetServerSignalFunc func,
+                                 void *opaque)
+{
+    virNetServerSignalPtr sigdata;
+    struct sigaction sig_action;
+
+    virNetServerLock(srv);
+
+    if (virNetServerSignalSetup(srv) < 0)
+        goto error;
+
+    if (VIR_EXPAND_N(srv->signals, srv->nsignals, 1) < 0)
+        goto no_memory;
+
+    if (VIR_ALLOC(sigdata) < 0)
+        goto no_memory;
+
+    sigdata->signum = signum;
+    sigdata->func = func;
+    sigdata->opaque = opaque;
+
+    memset(&sig_action, 0, sizeof(sig_action));
+    sig_action.sa_sigaction = virNetServerSignalHandler;
+#ifdef SA_SIGINFO
+    sig_action.sa_flags = SA_SIGINFO;
+#endif
+    sigemptyset(&sig_action.sa_mask);
+
+    sigaction(signum, &sig_action, &sigdata->oldaction);
+
+    srv->signals[srv->nsignals-1] = sigdata;
+
+    virNetServerUnlock(srv);
+    return 0;
+
+no_memory:
+    virReportOOMError();
+error:
+    VIR_FREE(sigdata);
+    virNetServerUnlock(srv);
+    return -1;
+}
+
+
+
+int virNetServerAddService(virNetServerPtr srv,
+                           virNetServerServicePtr svc)
+{
+    virNetServerLock(srv);
+
+    if (VIR_EXPAND_N(srv->services, srv->nservices, 1) < 0)
+        goto no_memory;
+
+    srv->services[srv->nservices-1] = svc;
+    virNetServerServiceRef(svc);
+
+    virNetServerServiceSetDispatcher(svc,
+                                     virNetServerDispatchNewClient,
+                                     srv);
+
+    virNetServerUnlock(srv);
+    return 0;
+
+no_memory:
+    virReportOOMError();
+    virNetServerUnlock(srv);
+    return -1;
+}
+
+int virNetServerAddProgram(virNetServerPtr srv,
+                           virNetServerProgramPtr prog)
+{
+    virNetServerLock(srv);
+
+    if (VIR_EXPAND_N(srv->programs, srv->nprograms, 1) < 0)
+        goto no_memory;
+
+    srv->programs[srv->nprograms-1] = prog;
+    virNetServerProgramRef(prog);
+
+    virNetServerUnlock(srv);
+    return 0;
+
+no_memory:
+    virReportOOMError();
+    virNetServerUnlock(srv);
+    return -1;
+}
+
+int virNetServerSetTLSContext(virNetServerPtr srv,
+                              virNetTLSContextPtr tls)
+{
+    srv->tls = tls;
+    virNetTLSContextRef(tls);
+    return 0;
+}
+
+
+static void virNetServerAutoShutdownTimer(int timerid ATTRIBUTE_UNUSED,
+                                          void *opaque) {
+    virNetServerPtr srv = opaque;
+
+    virNetServerLock(srv);
+
+    if (srv->autoShutdownFunc(srv, srv->autoShutdownOpaque)) {
+        VIR_DEBUG("Automatic shutdown triggered");
+        srv->quit = 1;
+    }
+
+    virNetServerUnlock(srv);
+}
+
+
+void virNetServerUpdateServices(virNetServerPtr srv,
+                                bool enabled)
+{
+    int i;
+
+    virNetServerLock(srv);
+    for (i = 0 ; i < srv->nservices ; i++)
+        virNetServerServiceToggle(srv->services[i], enabled);
+
+    virNetServerUnlock(srv);
+}
+
+
+void virNetServerRun(virNetServerPtr srv)
+{
+    int timerid = -1;
+    int timerActive = 0;
+    int i;
+
+    virNetServerLock(srv);
+
+    if (srv->autoShutdownTimeout &&
+        (timerid = virEventAddTimeout(-1,
+                                      virNetServerAutoShutdownTimer,
+                                      srv, NULL)) < 0) {
+        virNetError(VIR_ERR_INTERNAL_ERROR, "%s",
+                    _("Failed to register shutdown timeout"));
+        goto cleanup;
+    }
+
+    while (!srv->quit) {
+        /* A shutdown timeout is specified, so check
+         * if any drivers have active state, if not
+         * shutdown after timeout seconds
+         */
+        if (srv->autoShutdownTimeout) {
+            if (timerActive) {
+                if (srv->clients) {
+                    VIR_DEBUG("Deactivating shutdown timer %d", timerid);
+                    virEventUpdateTimeout(timerid, -1);
+                    timerActive = 0;
+                }
+            } else {
+                if (!srv->clients) {
+                    VIR_DEBUG("Activating shutdown timer %d", timerid);
+                    virEventUpdateTimeout(timerid,
+                                          srv->autoShutdownTimeout * 1000);
+                    timerActive = 1;
+                }
+            }
+        }
+
+        virNetServerUnlock(srv);
+        if (virEventRunDefaultImpl() < 0) {
+            virNetServerLock(srv);
+            VIR_DEBUG("Loop iteration error, exiting");
+            break;
+        }
+        virNetServerLock(srv);
+
+    reprocess:
+        for (i = 0 ; i < srv->nclients ; i++) {
+            if (virNetServerClientWantClose(srv->clients[i]))
+                virNetServerClientClose(srv->clients[i]);
+            if (virNetServerClientIsClosed(srv->clients[i])) {
+                virNetServerClientFree(srv->clients[i]);
+                if (srv->nclients > 1) {
+                    memmove(srv->clients + i,
+                            srv->clients + i + 1,
+                            sizeof(*srv->clients) * (srv->nclients - (i + 1)));
+                    VIR_SHRINK_N(srv->clients, srv->nclients, 1);
+                } else {
+                    VIR_FREE(srv->clients);
+                    srv->nclients = 0;
+                }
+
+                goto reprocess;
+            }
+        }
+    }
+
+cleanup:
+    virNetServerUnlock(srv);
+}
+
+
+void virNetServerQuit(virNetServerPtr srv)
+{
+    virNetServerLock(srv);
+
+    srv->quit = 1;
+
+    virNetServerUnlock(srv);
+}
+
+void virNetServerFree(virNetServerPtr srv)
+{
+    int i;
+
+    if (!srv)
+        return;
+
+    virNetServerLock(srv);
+    VIR_DEBUG("srv=%p refs=%d", srv, srv->refs);
+    srv->refs--;
+    if (srv->refs > 0) {
+        virNetServerUnlock(srv);
+        return;
+    }
+
+    for (i = 0 ; i < srv->nservices ; i++)
+        virNetServerServiceToggle(srv->services[i], false);
+
+    virThreadPoolFree(srv->workers);
+
+    for (i = 0 ; i < srv->nsignals ; i++) {
+        sigaction(srv->signals[i]->signum, &srv->signals[i]->oldaction, NULL);
+        VIR_FREE(srv->signals[i]);
+    }
+    VIR_FREE(srv->signals);
+    VIR_FORCE_CLOSE(srv->sigread);
+    VIR_FORCE_CLOSE(srv->sigwrite);
+    if (srv->sigwatch > 0)
+        virEventRemoveHandle(srv->sigwatch);
+
+    for (i = 0 ; i < srv->nservices ; i++)
+        virNetServerServiceFree(srv->services[i]);
+    VIR_FREE(srv->services);
+
+    for (i = 0 ; i < srv->nprograms ; i++)
+        virNetServerProgramFree(srv->programs[i]);
+    VIR_FREE(srv->programs);
+
+    for (i = 0 ; i < srv->nclients ; i++) {
+        virNetServerClientClose(srv->clients[i]);
+        virNetServerClientFree(srv->clients[i]);
+    }
+    VIR_FREE(srv->clients);
+
+    virNetServerUnlock(srv);
+    virMutexDestroy(&srv->lock);
+    VIR_FREE(srv);
+}
diff --git a/src/rpc/virnetserver.h b/src/rpc/virnetserver.h
new file mode 100644
index 0000000..d8d7c8e
--- /dev/null
+++ b/src/rpc/virnetserver.h
@@ -0,0 +1,80 @@
+/*
+ * virnetserver.h: generic network RPC server
+ *
+ * Copyright (C) 2006-2011 Red Hat, Inc.
+ * Copyright (C) 2006 Daniel P. Berrange
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307  USA
+ *
+ * Author: Daniel P. Berrange <berrange at redhat.com>
+ */
+
+#ifndef __VIR_NET_SERVER_H__
+# define __VIR_NET_SERVER_H__
+
+# include <stdbool.h>
+# include <signal.h>
+
+# include "virnettlscontext.h"
+# include "virnetserverprogram.h"
+# include "virnetserverclient.h"
+# include "virnetserverservice.h"
+
+typedef int (*virNetServerClientInitHook)(virNetServerPtr srv,
+                                          virNetServerClientPtr client);
+
+virNetServerPtr virNetServerNew(size_t min_workers,
+                                size_t max_workers,
+                                size_t max_clients,
+                                virNetServerClientInitHook clientInitHook);
+
+typedef int (*virNetServerAutoShutdownFunc)(virNetServerPtr srv, void *opaque);
+
+void virNetServerRef(virNetServerPtr srv);
+
+bool virNetServerIsPrivileged(virNetServerPtr srv);
+
+void virNetServerAutoShutdown(virNetServerPtr srv,
+                              unsigned int timeout,
+                              virNetServerAutoShutdownFunc func,
+                              void *opaque);
+
+typedef void (*virNetServerSignalFunc)(virNetServerPtr srv, siginfo_t *info, void *opaque);
+
+int virNetServerAddSignalHandler(virNetServerPtr srv,
+                                 int signum,
+                                 virNetServerSignalFunc func,
+                                 void *opaque);
+
+int virNetServerAddService(virNetServerPtr srv,
+                           virNetServerServicePtr svc);
+
+int virNetServerAddProgram(virNetServerPtr srv,
+                           virNetServerProgramPtr prog);
+
+int virNetServerSetTLSContext(virNetServerPtr srv,
+                              virNetTLSContextPtr tls);
+
+void virNetServerUpdateServices(virNetServerPtr srv,
+                                bool enabled);
+
+void virNetServerRun(virNetServerPtr srv);
+
+void virNetServerQuit(virNetServerPtr srv);
+
+void virNetServerFree(virNetServerPtr srv);
+
+
+#endif
diff --git a/src/rpc/virnetserverclient.c b/src/rpc/virnetserverclient.c
new file mode 100644
index 0000000..327b121
--- /dev/null
+++ b/src/rpc/virnetserverclient.c
@@ -0,0 +1,937 @@
+/*
+ * virnetserverclient.c: generic network RPC server client
+ *
+ * Copyright (C) 2006-2011 Red Hat, Inc.
+ * Copyright (C) 2006 Daniel P. Berrange
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307  USA
+ *
+ * Author: Daniel P. Berrange <berrange at redhat.com>
+ */
+
+#include <config.h>
+
+#if HAVE_SASL
+# include <sasl/sasl.h>
+#endif
+
+#include "virnetserverclient.h"
+
+#include "logging.h"
+#include "virterror_internal.h"
+#include "memory.h"
+#include "threads.h"
+
+#define VIR_FROM_THIS VIR_FROM_RPC
+#define virNetError(code, ...)                                    \
+    virReportErrorHelper(VIR_FROM_THIS, code, __FILE__,           \
+                         __FUNCTION__, __LINE__, __VA_ARGS__)
+
+/* Allow for filtering of incoming messages to a custom
+ * dispatch processing queue, instead of the workers.
+ * This allows for certain types of messages to be handled
+ * strictly "in order"
+ */
+
+typedef struct _virNetServerClientFilter virNetServerClientFilter;
+typedef virNetServerClientFilter *virNetServerClientFilterPtr;
+
+struct _virNetServerClientFilter {
+    int id;
+    virNetServerClientFilterFunc func;
+    void *opaque;
+
+    virNetServerClientFilterPtr next;
+};
+
+
+struct _virNetServerClient
+{
+    int refs;
+    bool wantClose;
+    virMutex lock;
+    virNetSocketPtr sock;
+    int auth;
+    bool readonly;
+    char *identity;
+    virNetTLSContextPtr tlsCtxt;
+    virNetTLSSessionPtr tls;
+#if HAVE_SASL
+    virNetSASLSessionPtr sasl;
+#endif
+
+    /* Count of messages in the 'tx' queue,
+     * and the server worker pool queue
+     * ie RPC calls in progress. Does not count
+     * async events which are not used for
+     * throttling calculations */
+    size_t nrequests;
+    size_t nrequests_max;
+    /* Zero or one messages being received. Zero if
+     * nrequests >= max_clients and throttling */
+    virNetMessagePtr rx;
+    /* Zero or many messages waiting for transmit
+     * back to client, including async events */
+    virNetMessagePtr tx;
+
+    /* Filters to capture messages that would otherwise
+     * end up on the 'dx' queue */
+    virNetServerClientFilterPtr filters;
+    int nextFilterID;
+
+    virNetServerClientDispatchFunc dispatchFunc;
+    void *dispatchOpaque;
+
+    void *privateData;
+    virNetServerClientFreeFunc privateDataFreeFunc;
+};
+
+
+static void virNetServerClientDispatchEvent(virNetSocketPtr sock, int events, void *opaque);
+static void virNetServerClientUpdateEvent(virNetServerClientPtr client);
+
+static void virNetServerClientLock(virNetServerClientPtr client)
+{
+    virMutexLock(&client->lock);
+}
+
+static void virNetServerClientUnlock(virNetServerClientPtr client)
+{
+    virMutexUnlock(&client->lock);
+}
+
+
+/*
+ * @client: a locked client object
+ */
+static int
+virNetServerClientCalculateHandleMode(virNetServerClientPtr client) {
+    int mode = 0;
+
+
+    VIR_DEBUG("tls=%p hs=%d, rx=%p tx=%p",
+              client->tls,
+              client->tls ? virNetTLSSessionGetHandshakeStatus(client->tls) : -1,
+              client->rx,
+              client->tx);
+    if (!client->sock || client->wantClose)
+        return 0;
+
+    if (client->tls) {
+        switch (virNetTLSSessionGetHandshakeStatus(client->tls)) {
+        case VIR_NET_TLS_HANDSHAKE_RECVING:
+            mode |= VIR_EVENT_HANDLE_READABLE;
+            break;
+        case VIR_NET_TLS_HANDSHAKE_SENDING:
+            mode |= VIR_EVENT_HANDLE_WRITABLE;
+            break;
+        default:
+        case VIR_NET_TLS_HANDSHAKE_COMPLETE:
+            if (client->rx)
+                mode |= VIR_EVENT_HANDLE_READABLE;
+            if (client->tx)
+                mode |= VIR_EVENT_HANDLE_WRITABLE;
+        }
+    } else {
+        /* If there is a message on the rx queue then
+         * we're wanting more input */
+        if (client->rx)
+            mode |= VIR_EVENT_HANDLE_READABLE;
+
+        /* If there are one or more messages to send back to client,
+           then monitor for writability on socket */
+        if (client->tx)
+            mode |= VIR_EVENT_HANDLE_WRITABLE;
+    }
+    VIR_DEBUG("mode=%d", mode);
+    return mode;
+}
+
+/*
+ * @server: a locked or unlocked server object
+ * @client: a locked client object
+ */
+static int virNetServerClientRegisterEvent(virNetServerClientPtr client)
+{
+    int mode = virNetServerClientCalculateHandleMode(client);
+
+    VIR_DEBUG("Registering client event callback %d", mode);
+    if (virNetSocketAddIOCallback(client->sock,
+                                  mode,
+                                  virNetServerClientDispatchEvent,
+                                  client) < 0)
+        return -1;
+
+    return 0;
+}
+
+/*
+ * @client: a locked client object
+ */
+static void virNetServerClientUpdateEvent(virNetServerClientPtr client)
+{
+    int mode;
+
+    if (!client->sock)
+        return;
+
+    mode = virNetServerClientCalculateHandleMode(client);
+
+    virNetSocketUpdateIOCallback(client->sock, mode);
+}
+
+
+int virNetServerClientAddFilter(virNetServerClientPtr client,
+                                virNetServerClientFilterFunc func,
+                                void *opaque)
+{
+    virNetServerClientFilterPtr filter;
+    int ret = -1;
+
+    virNetServerClientLock(client);
+
+    if (VIR_ALLOC(filter) < 0) {
+        virReportOOMError();
+        goto cleanup;
+    }
+
+    filter->id = client->nextFilterID++;
+    filter->func = func;
+    filter->opaque = opaque;
+
+    filter->next = client->filters;
+    client->filters = filter;
+
+    ret = filter->id;
+
+cleanup:
+    virNetServerClientUnlock(client);
+    return ret;
+}
+
+
+void virNetServerClientRemoveFilter(virNetServerClientPtr client,
+                                    int filterID)
+{
+    virNetServerClientFilterPtr tmp, prev;
+    virNetServerClientLock(client);
+
+    prev = NULL;
+    tmp = client->filters;
+    while (tmp) {
+        if (tmp->id == filterID) {
+            if (prev)
+                prev->next = tmp->next;
+            else
+                client->filters = tmp->next;
+
+            VIR_FREE(tmp);
+            break;
+        }
+        tmp = tmp->next;
+    }
+
+    virNetServerClientUnlock(client);
+}
+
+
+/* Check the client's access. */
+static int
+virNetServerClientCheckAccess(virNetServerClientPtr client)
+{
+    virNetMessagePtr confirm;
+
+    /* Verify client certificate. */
+    if (virNetTLSContextCheckCertificate(client->tlsCtxt, client->tls) < 0)
+        return -1;
+
+    if (client->tx) {
+        VIR_DEBUG("client had unexpected data pending tx after access check");
+        return -1;
+    }
+
+    if (!(confirm = virNetMessageNew()))
+        return -1;
+
+    /* Checks have succeeded.  Write a '\1' byte back to the client to
+     * indicate this (otherwise the socket is abruptly closed).
+     * (NB. The '\1' byte is sent in an encrypted record).
+     */
+    confirm->bufferLength = 1;
+    confirm->bufferOffset = 0;
+    confirm->buffer[0] = '\1';
+
+    client->tx = confirm;
+
+    return 0;
+}
+
+
+virNetServerClientPtr virNetServerClientNew(virNetSocketPtr sock,
+                                            int auth,
+                                            bool readonly,
+                                            virNetTLSContextPtr tls)
+{
+    virNetServerClientPtr client;
+
+    VIR_DEBUG("sock=%p auth=%d tls=%p", sock, auth, tls);
+
+    if (VIR_ALLOC(client) < 0) {
+        virReportOOMError();
+        return NULL;
+    }
+
+    if (virMutexInit(&client->lock) < 0)
+        goto error;
+
+    client->refs = 1;
+    client->sock = sock;
+    client->auth = auth;
+    client->readonly = readonly;
+    client->tlsCtxt = tls;
+    client->nrequests_max = 10; /* XXX */
+
+    if (tls)
+        virNetTLSContextRef(tls);
+
+    /* Prepare one for packet receive */
+    if (!(client->rx = virNetMessageNew()))
+        goto error;
+    client->rx->bufferLength = VIR_NET_MESSAGE_LEN_MAX;
+    client->nrequests = 1;
+
+    VIR_DEBUG("client=%p refs=%d", client, client->refs);
+
+    return client;
+
+error:
+    /* XXX ref counting is better than this */
+    client->sock = NULL; /* Caller owns 'sock' upon failure */
+    virNetServerClientFree(client);
+    return NULL;
+}
+
+void virNetServerClientRef(virNetServerClientPtr client)
+{
+    virNetServerClientLock(client);
+    client->refs++;
+    VIR_DEBUG("client=%p refs=%d", client, client->refs);
+    virNetServerClientUnlock(client);
+}
+
+
+int virNetServerClientGetAuth(virNetServerClientPtr client)
+{
+    int auth;
+    virNetServerClientLock(client);
+    auth = client->auth;
+    virNetServerClientUnlock(client);
+    return auth;
+}
+
+bool virNetServerClientGetReadonly(virNetServerClientPtr client)
+{
+    bool readonly;
+    virNetServerClientLock(client);
+    readonly = client->readonly;
+    virNetServerClientUnlock(client);
+    return readonly;
+}
+
+
+bool virNetServerClientHasTLSSession(virNetServerClientPtr client)
+{
+    bool has;
+    virNetServerClientLock(client);
+    has = client->tls ? true : false;
+    virNetServerClientUnlock(client);
+    return has;
+}
+
+int virNetServerClientGetTLSKeySize(virNetServerClientPtr client)
+{
+    int size = 0;
+    virNetServerClientLock(client);
+    if (client->tls)
+        size = virNetTLSSessionGetKeySize(client->tls);
+    virNetServerClientUnlock(client);
+    return size;
+}
+
+int virNetServerClientGetFD(virNetServerClientPtr client)
+{
+    int fd = 0;
+    virNetServerClientLock(client);
+    fd = virNetSocketGetFD(client->sock);
+    virNetServerClientUnlock(client);
+    return fd;
+}
+
+int virNetServerClientGetLocalIdentity(virNetServerClientPtr client,
+                                       uid_t *uid, pid_t *pid)
+{
+    int ret;
+    virNetServerClientLock(client);
+    ret = virNetSocketGetLocalIdentity(client->sock, uid, pid);
+    virNetServerClientUnlock(client);
+    return ret;
+}
+
+bool virNetServerClientIsSecure(virNetServerClientPtr client)
+{
+    bool secure = false;
+    virNetServerClientLock(client);
+    if (client->tls)
+        secure = true;
+#if HAVE_SASL
+    if (client->sasl)
+        secure = true;
+#endif
+    if (virNetSocketIsLocal(client->sock))
+        secure = true;
+    virNetServerClientUnlock(client);
+    return secure;
+}
+
+
+#if HAVE_SASL
+void virNetServerClientSetSASLSession(virNetServerClientPtr client,
+                                      virNetSASLSessionPtr sasl)
+{
+    /* We don't set the sasl session on the socket here
+     * because we need to send out the auth confirmation
+     * in the clear. Only once we complete the next 'tx'
+     * operation do we switch to SASL mode
+     */
+    virNetServerClientLock(client);
+    client->sasl = sasl;
+    virNetSASLSessionRef(sasl);
+    virNetServerClientUnlock(client);
+}
+#endif
+
+
+int virNetServerClientSetIdentity(virNetServerClientPtr client,
+                                  const char *identity)
+{
+    int ret = -1;
+    virNetServerClientLock(client);
+    if (!(client->identity = strdup(identity))) {
+        virReportOOMError();
+        goto error;
+    }
+    ret = 0;
+
+error:
+    virNetServerClientUnlock(client);
+    return ret;
+}
+
+const char *virNetServerClientGetIdentity(virNetServerClientPtr client)
+{
+    const char *identity;
+    virNetServerClientLock(client);
+    identity = client->identity;
+    virNetServerClientLock(client);
+    return identity;
+}
+
+void virNetServerClientSetPrivateData(virNetServerClientPtr client,
+                                      void *opaque,
+                                      virNetServerClientFreeFunc ff)
+{
+    virNetServerClientLock(client);
+
+    if (client->privateData &&
+        client->privateDataFreeFunc)
+        client->privateDataFreeFunc(client->privateData);
+
+    client->privateData = opaque;
+    client->privateDataFreeFunc = ff;
+
+    virNetServerClientUnlock(client);
+}
+
+
+void *virNetServerClientGetPrivateData(virNetServerClientPtr client)
+{
+    void *data;
+    virNetServerClientLock(client);
+    data = client->privateData;
+    virNetServerClientUnlock(client);
+    return data;
+}
+
+
+void virNetServerClientSetDispatcher(virNetServerClientPtr client,
+                                     virNetServerClientDispatchFunc func,
+                                     void *opaque)
+{
+    virNetServerClientLock(client);
+    client->dispatchFunc = func;
+    client->dispatchOpaque = opaque;
+    virNetServerClientUnlock(client);
+}
+
+
+const char *virNetServerClientLocalAddrString(virNetServerClientPtr client)
+{
+    return virNetSocketLocalAddrString(client->sock);
+}
+
+
+const char *virNetServerClientRemoteAddrString(virNetServerClientPtr client)
+{
+    return virNetSocketRemoteAddrString(client->sock);
+}
+
+
+void virNetServerClientFree(virNetServerClientPtr client)
+{
+    if (!client)
+        return;
+
+    virNetServerClientLock(client);
+    VIR_DEBUG("client=%p refs=%d", client, client->refs);
+
+    client->refs--;
+    if (client->refs > 0) {
+        virNetServerClientUnlock(client);
+        return;
+    }
+
+    if (client->privateData &&
+        client->privateDataFreeFunc)
+        client->privateDataFreeFunc(client->privateData);
+
+    VIR_FREE(client->identity);
+#if HAVE_SASL
+    virNetSASLSessionFree(client->sasl);
+#endif
+    virNetTLSSessionFree(client->tls);
+    virNetTLSContextFree(client->tlsCtxt);
+    virNetSocketFree(client->sock);
+    virNetServerClientUnlock(client);
+    virMutexDestroy(&client->lock);
+    VIR_FREE(client);
+}
+
+
+/*
+ *
+ * We don't free stuff here, merely disconnect the client's
+ * network socket & resources.
+ *
+ * Full free of the client is done later in a safe point
+ * where it can be guaranteed it is no longer in use
+ */
+void virNetServerClientClose(virNetServerClientPtr client)
+{
+    virNetServerClientLock(client);
+    VIR_DEBUG("client=%p refs=%d", client, client->refs);
+    if (!client->sock) {
+        virNetServerClientUnlock(client);
+        return;
+    }
+
+    /* Do now, even though we don't close the socket
+     * until end, to ensure we don't get invoked
+     * again due to tls shutdown */
+    if (client->sock)
+        virNetSocketRemoveIOCallback(client->sock);
+
+    if (client->tls) {
+        virNetTLSSessionFree(client->tls);
+        client->tls = NULL;
+    }
+    if (client->sock) {
+        virNetSocketFree(client->sock);
+        client->sock = NULL;
+    }
+
+    while (client->rx) {
+        virNetMessagePtr msg
+            = virNetMessageQueueServe(&client->rx);
+        virNetMessageFree(msg);
+    }
+    while (client->tx) {
+        virNetMessagePtr msg
+            = virNetMessageQueueServe(&client->tx);
+        virNetMessageFree(msg);
+    }
+
+    virNetServerClientUnlock(client);
+}
+
+
+bool virNetServerClientIsClosed(virNetServerClientPtr client)
+{
+    bool closed;
+    virNetServerClientLock(client);
+    closed = client->sock == NULL ? true : false;
+    virNetServerClientUnlock(client);
+    return closed;
+}
+
+void virNetServerClientMarkClose(virNetServerClientPtr client)
+{
+    virNetServerClientLock(client);
+    client->wantClose = true;
+    virNetServerClientUnlock(client);
+}
+
+bool virNetServerClientWantClose(virNetServerClientPtr client)
+{
+    bool wantClose;
+    virNetServerClientLock(client);
+    wantClose = client->wantClose;
+    virNetServerClientUnlock(client);
+    return wantClose;
+}
+
+
+int virNetServerClientInit(virNetServerClientPtr client)
+{
+    virNetServerClientLock(client);
+
+    if (!client->tlsCtxt) {
+        /* Plain socket, so prepare to read first message */
+        if (virNetServerClientRegisterEvent(client) < 0)
+            goto error;
+    } else {
+        int ret;
+
+        if (!(client->tls = virNetTLSSessionNew(client->tlsCtxt,
+                                                NULL)))
+            goto error;
+
+        virNetSocketSetTLSSession(client->sock,
+                                  client->tls);
+
+        /* Begin the TLS handshake. */
+        ret = virNetTLSSessionHandshake(client->tls);
+        if (ret == 0) {
+            /* Unlikely, but ...  Next step is to check the certificate. */
+            if (virNetServerClientCheckAccess(client) < 0)
+                goto error;
+
+            /* Handshake & cert check OK,  so prepare to read first message */
+            if (virNetServerClientRegisterEvent(client) < 0)
+                goto error;
+        } else if (ret > 0) {
+            /* Most likely, need to do more handshake data */
+            if (virNetServerClientRegisterEvent(client) < 0)
+                goto error;
+        } else {
+            goto error;
+        }
+    }
+
+    virNetServerClientUnlock(client);
+    return 0;
+
+error:
+    client->wantClose = true;
+    virNetServerClientUnlock(client);
+    return -1;
+}
+
+
+
+/*
+ * Read data into buffer using wire decoding (plain or TLS)
+ *
+ * Returns:
+ *   -1 on error or EOF
+ *    0 on EAGAIN
+ *    n number of bytes
+ */
+static ssize_t virNetServerClientRead(virNetServerClientPtr client)
+{
+    ssize_t ret;
+
+    if (client->rx->bufferLength <= client->rx->bufferOffset) {
+        virNetError(VIR_ERR_RPC,
+                    _("unexpected zero/negative length request %lld"),
+                    (long long int)(client->rx->bufferLength - client->rx->bufferOffset));
+        client->wantClose = true;
+        return -1;
+    }
+
+    ret = virNetSocketRead(client->sock,
+                           client->rx->buffer + client->rx->bufferOffset,
+                           client->rx->bufferLength - client->rx->bufferOffset);
+
+    if (ret <= 0)
+        return ret;
+
+    client->rx->bufferOffset += ret;
+    return ret;
+}
+
+
+/*
+ * Read data until we get a complete message to process
+ */
+static void virNetServerClientDispatchRead(virNetServerClientPtr client)
+{
+readmore:
+    if (virNetServerClientRead(client) < 0) {
+        client->wantClose = true;
+        return; /* Error */
+    }
+
+    if (client->rx->bufferOffset < client->rx->bufferLength)
+        return; /* Still not read enough */
+
+    /* Either done with length word header */
+    if (client->rx->bufferLength == VIR_NET_MESSAGE_LEN_MAX) {
+        if (virNetMessageDecodeLength(client->rx) < 0)
+            return;
+
+        virNetServerClientUpdateEvent(client);
+
+        /* Try and read payload immediately instead of going back
+           into poll() because chances are the data is already
+           waiting for us */
+        goto readmore;
+    } else {
+        /* Grab the completed message */
+        virNetMessagePtr msg = virNetMessageQueueServe(&client->rx);
+        virNetServerClientFilterPtr filter;
+
+        /* Decode the header so we can use it for routing decisions */
+        if (virNetMessageDecodeHeader(msg) < 0) {
+            virNetMessageFree(msg);
+            client->wantClose = true;
+            return;
+        }
+
+        /* Maybe send off for queue against a filter */
+        filter = client->filters;
+        while (filter) {
+            int ret = filter->func(client, msg, filter->opaque);
+            if (ret < 0 || ret > 0) {
+                virNetMessageFree(msg);
+                msg = NULL;
+                if (ret < 0)
+                    client->wantClose = true;
+                break;
+            }
+
+            filter = filter->next;
+        }
+
+        /* Send off to for normal dispatch to workers */
+        if (msg) {
+            if (!client->dispatchFunc ||
+                client->dispatchFunc(client, msg, client->dispatchOpaque) < 0) {
+                virNetMessageFree(msg);
+                client->wantClose = true;
+                return;
+            }
+        }
+
+        /* Possibly need to create another receive buffer */
+        if (client->nrequests < client->nrequests_max) {
+            if (!(client->rx = virNetMessageNew())) {
+                client->wantClose = true;
+            }
+            client->rx->bufferLength = VIR_NET_MESSAGE_LEN_MAX;
+            client->nrequests++;
+        }
+        virNetServerClientUpdateEvent(client);
+    }
+}
+
+
+/*
+ * Send client->tx using no encoding
+ *
+ * Returns:
+ *   -1 on error or EOF
+ *    0 on EAGAIN
+ *    n number of bytes
+ */
+static ssize_t virNetServerClientWrite(virNetServerClientPtr client)
+{
+    ssize_t ret;
+
+    if (client->tx->bufferLength < client->tx->bufferOffset) {
+        virNetError(VIR_ERR_RPC,
+                    _("unexpected zero/negative length request %lld"),
+                    (long long int)(client->tx->bufferLength - client->tx->bufferOffset));
+        client->wantClose = true;
+        return -1;
+    }
+
+    if (client->tx->bufferLength == client->tx->bufferOffset)
+        return 1;
+
+    ret = virNetSocketWrite(client->sock,
+                            client->tx->buffer + client->tx->bufferOffset,
+                            client->tx->bufferLength - client->tx->bufferOffset);
+    if (ret <= 0)
+        return ret; /* -1 error, 0 = egain */
+
+    client->tx->bufferOffset += ret;
+    return ret;
+}
+
+
+/*
+ * Process all queued client->tx messages until
+ * we would block on I/O
+ */
+static void
+virNetServerClientDispatchWrite(virNetServerClientPtr client)
+{
+    while (client->tx) {
+        ssize_t ret;
+
+        ret = virNetServerClientWrite(client);
+        if (ret < 0) {
+            client->wantClose = true;
+            return;
+        }
+        if (ret == 0)
+            return; /* Would block on write EAGAIN */
+
+        if (client->tx->bufferOffset == client->tx->bufferLength) {
+            virNetMessagePtr msg;
+#if HAVE_SASL
+            /* Completed this 'tx' operation, so now read for all
+             * future rx/tx to be under a SASL SSF layer
+             */
+            if (client->sasl) {
+                virNetSocketSetSASLSession(client->sock, client->sasl);
+                virNetSASLSessionFree(client->sasl);
+                client->sasl = NULL;
+            }
+#endif
+
+            /* Get finished msg from head of tx queue */
+            msg = virNetMessageQueueServe(&client->tx);
+
+            if (msg->header.type == VIR_NET_REPLY) {
+                client->nrequests--;
+                /* See if the recv queue is currently throttled */
+                if (!client->rx &&
+                    client->nrequests < client->nrequests_max) {
+                    /* Ready to recv more messages */
+                    client->rx = msg;
+                    client->rx->bufferLength = VIR_NET_MESSAGE_LEN_MAX;
+                    msg = NULL;
+                    client->nrequests++;
+                }
+            }
+
+            virNetMessageFree(msg);
+
+            virNetServerClientUpdateEvent(client);
+         }
+    }
+}
+
+static void
+virNetServerClientDispatchHandshake(virNetServerClientPtr client)
+{
+    int ret;
+    /* Continue the handshake. */
+    ret = virNetTLSSessionHandshake(client->tls);
+    if (ret == 0) {
+        /* Finished.  Next step is to check the certificate. */
+        if (virNetServerClientCheckAccess(client) < 0)
+            client->wantClose = true;
+        else
+            virNetServerClientUpdateEvent(client);
+    } else if (ret > 0) {
+        /* Carry on waiting for more handshake. Update
+           the events just in case handshake data flow
+           direction has changed */
+        virNetServerClientUpdateEvent (client);
+    } else {
+        /* Fatal error in handshake */
+        client->wantClose = true;
+    }
+}
+
+static void
+virNetServerClientDispatchEvent(virNetSocketPtr sock, int events, void *opaque)
+{
+    virNetServerClientPtr client = opaque;
+
+    virNetServerClientLock(client);
+
+    if (client->sock != sock) {
+        virNetSocketRemoveIOCallback(sock);
+        virNetServerClientUnlock(client);
+        return;
+    }
+
+    if (events & (VIR_EVENT_HANDLE_WRITABLE |
+                  VIR_EVENT_HANDLE_READABLE)) {
+        if (client->tls &&
+            virNetTLSSessionGetHandshakeStatus(client->tls) !=
+            VIR_NET_TLS_HANDSHAKE_COMPLETE) {
+            virNetServerClientDispatchHandshake(client);
+        } else {
+            if (events & VIR_EVENT_HANDLE_WRITABLE)
+                virNetServerClientDispatchWrite(client);
+            if (events & VIR_EVENT_HANDLE_READABLE)
+                virNetServerClientDispatchRead(client);
+        }
+    }
+
+    /* NB, will get HANGUP + READABLE at same time upon
+     * disconnect */
+    if (events & (VIR_EVENT_HANDLE_ERROR |
+                  VIR_EVENT_HANDLE_HANGUP))
+        client->wantClose = true;
+
+    virNetServerClientUnlock(client);
+}
+
+
+int virNetServerClientSendMessage(virNetServerClientPtr client,
+                                  virNetMessagePtr msg)
+{
+    int ret = -1;
+    VIR_DEBUG("msg=%p proc=%d len=%zu offset=%zu",
+              msg, msg->header.proc,
+              msg->bufferLength, msg->bufferOffset);
+    virNetServerClientLock(client);
+
+    if (client->sock && !client->wantClose) {
+        virNetMessageQueuePush(&client->tx, msg);
+
+        virNetServerClientUpdateEvent(client);
+        ret = 0;
+    }
+
+    virNetServerClientUnlock(client);
+    return ret;
+}
+
+
+bool virNetServerClientNeedAuth(virNetServerClientPtr client)
+{
+    bool need = false;
+    virNetServerClientLock(client);
+    if (client->auth && !client->identity)
+        need = true;
+    virNetServerClientUnlock(client);
+    return need;
+}
diff --git a/src/rpc/virnetserverclient.h b/src/rpc/virnetserverclient.h
new file mode 100644
index 0000000..8554590
--- /dev/null
+++ b/src/rpc/virnetserverclient.h
@@ -0,0 +1,106 @@
+/*
+ * virnetserverclient.h: generic network RPC server client
+ *
+ * Copyright (C) 2006-2011 Red Hat, Inc.
+ * Copyright (C) 2006 Daniel P. Berrange
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307  USA
+ *
+ * Author: Daniel P. Berrange <berrange at redhat.com>
+ */
+
+#ifndef __VIR_NET_SERVER_CLIENT_H__
+# define __VIR_NET_SERVER_CLIENT_H__
+
+# include "virnetsocket.h"
+# include "virnetmessage.h"
+
+typedef struct _virNetServerClient virNetServerClient;
+typedef virNetServerClient *virNetServerClientPtr;
+
+typedef int (*virNetServerClientDispatchFunc)(virNetServerClientPtr client,
+                                              virNetMessagePtr msg,
+                                              void *opaque);
+
+typedef int (*virNetServerClientFilterFunc)(virNetServerClientPtr client,
+                                            virNetMessagePtr msg,
+                                            void *opaque);
+
+virNetServerClientPtr virNetServerClientNew(virNetSocketPtr sock,
+                                            int auth,
+                                            bool readonly,
+                                            virNetTLSContextPtr tls);
+
+int virNetServerClientAddFilter(virNetServerClientPtr client,
+                                virNetServerClientFilterFunc func,
+                                void *opaque);
+
+void virNetServerClientRemoveFilter(virNetServerClientPtr client,
+                                    int filterID);
+
+int virNetServerClientGetAuth(virNetServerClientPtr client);
+bool virNetServerClientGetReadonly(virNetServerClientPtr client);
+
+bool virNetServerClientHasTLSSession(virNetServerClientPtr client);
+int virNetServerClientGetTLSKeySize(virNetServerClientPtr client);
+
+#ifdef HAVE_SASL
+void virNetServerClientSetSASLSession(virNetServerClientPtr client,
+                                      virNetSASLSessionPtr sasl);
+#endif
+
+int virNetServerClientGetFD(virNetServerClientPtr client);
+
+bool virNetServerClientIsSecure(virNetServerClientPtr client);
+
+int virNetServerClientSetIdentity(virNetServerClientPtr client,
+                                  const char *identity);
+const char *virNetServerClientGetIdentity(virNetServerClientPtr client);
+
+int virNetServerClientGetLocalIdentity(virNetServerClientPtr client,
+                                       uid_t *uid, pid_t *pid);
+
+void virNetServerClientRef(virNetServerClientPtr client);
+
+typedef void (*virNetServerClientFreeFunc)(void *data);
+
+void virNetServerClientSetPrivateData(virNetServerClientPtr client,
+                                      void *opaque,
+                                      virNetServerClientFreeFunc ff);
+void *virNetServerClientGetPrivateData(virNetServerClientPtr client);
+
+void virNetServerClientSetDispatcher(virNetServerClientPtr client,
+                                     virNetServerClientDispatchFunc func,
+                                     void *opaque);
+void virNetServerClientClose(virNetServerClientPtr client);
+
+bool virNetServerClientIsClosed(virNetServerClientPtr client);
+void virNetServerClientMarkClose(virNetServerClientPtr client);
+bool virNetServerClientWantClose(virNetServerClientPtr client);
+
+int virNetServerClientInit(virNetServerClientPtr client);
+
+const char *virNetServerClientLocalAddrString(virNetServerClientPtr client);
+const char *virNetServerClientRemoteAddrString(virNetServerClientPtr client);
+
+int virNetServerClientSendMessage(virNetServerClientPtr client,
+                                  virNetMessagePtr msg);
+
+bool virNetServerClientNeedAuth(virNetServerClientPtr client);
+
+void virNetServerClientFree(virNetServerClientPtr client);
+
+
+#endif /* __VIR_NET_SERVER_CLIENT_H__ */
diff --git a/src/rpc/virnetserverprogram.c b/src/rpc/virnetserverprogram.c
new file mode 100644
index 0000000..0d1577a
--- /dev/null
+++ b/src/rpc/virnetserverprogram.c
@@ -0,0 +1,455 @@
+/*
+ * virnetserverprogram.c: generic network RPC server program
+ *
+ * Copyright (C) 2006-2011 Red Hat, Inc.
+ * Copyright (C) 2006 Daniel P. Berrange
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307  USA
+ *
+ * Author: Daniel P. Berrange <berrange at redhat.com>
+ */
+
+#include <config.h>
+
+#include "virnetserverprogram.h"
+#include "virnetserverclient.h"
+
+#include "memory.h"
+#include "virterror_internal.h"
+#include "logging.h"
+
+#define VIR_FROM_THIS VIR_FROM_RPC
+#define virNetError(code, ...)                                    \
+    virReportErrorHelper(VIR_FROM_THIS, code, __FILE__,           \
+                         __FUNCTION__, __LINE__, __VA_ARGS__)
+
+struct _virNetServerProgram {
+    int refs;
+
+    unsigned program;
+    unsigned version;
+    virNetServerProgramProcPtr procs;
+    size_t nprocs;
+};
+
+virNetServerProgramPtr virNetServerProgramNew(unsigned program,
+                                              unsigned version,
+                                              virNetServerProgramProcPtr procs,
+                                              size_t nprocs)
+{
+    virNetServerProgramPtr prog;
+
+    if (VIR_ALLOC(prog) < 0) {
+        virReportOOMError();
+        return NULL;
+    }
+
+    prog->refs = 1;
+    prog->program = program;
+    prog->version = version;
+    prog->procs = procs;
+    prog->nprocs = nprocs;
+
+    VIR_DEBUG("prog=%p refs=%d", prog, prog->refs);
+
+    return prog;
+}
+
+
+int virNetServerProgramGetID(virNetServerProgramPtr prog)
+{
+    return prog->program;
+}
+
+
+int virNetServerProgramGetVersion(virNetServerProgramPtr prog)
+{
+    return prog->version;
+}
+
+
+void virNetServerProgramRef(virNetServerProgramPtr prog)
+{
+    prog->refs++;
+    VIR_DEBUG("prog=%p refs=%d", prog, prog->refs);
+}
+
+
+int virNetServerProgramMatches(virNetServerProgramPtr prog,
+                               virNetMessagePtr msg)
+{
+    if (prog->program == msg->header.prog &&
+        prog->version == msg->header.vers)
+        return 1;
+    return 0;
+}
+
+
+static virNetServerProgramProcPtr virNetServerProgramGetProc(virNetServerProgramPtr prog,
+                                                             int procedure)
+{
+    if (procedure < 0)
+        return NULL;
+    if (procedure >= prog->nprocs)
+        return NULL;
+
+    return &prog->procs[procedure];
+}
+
+
+static int
+virNetServerProgramSendError(virNetServerProgramPtr prog,
+                             virNetServerClientPtr client,
+                             virNetMessagePtr msg,
+                             virNetMessageErrorPtr rerr,
+                             int procedure,
+                             int type,
+                             int serial)
+{
+    VIR_DEBUG("prog=%d ver=%d proc=%d type=%d serial=%d msg=%p rerr=%p",
+              prog->program, prog->version, procedure, type, serial, msg, rerr);
+
+    virNetMessageSaveError(rerr);
+
+    /* Return header. */
+    msg->header.prog = prog->program;
+    msg->header.vers = prog->version;
+    msg->header.proc = procedure;
+    msg->header.type = type;
+    msg->header.serial = serial;
+    msg->header.status = VIR_NET_ERROR;
+
+    if (virNetMessageEncodeHeader(msg) < 0)
+        goto error;
+
+    if (virNetMessageEncodePayload(msg, (xdrproc_t)xdr_virNetMessageError, rerr) < 0)
+        goto error;
+    xdr_free((xdrproc_t)xdr_virNetMessageError, (void*)rerr);
+
+    /* Put reply on end of tx queue to send out  */
+    if (virNetServerClientSendMessage(client, msg) < 0)
+        return -1;
+
+    return 0;
+
+error:
+    VIR_WARN("Failed to serialize remote error '%p'", rerr);
+    xdr_free((xdrproc_t)xdr_virNetMessageError, (void*)rerr);
+    return -1;
+}
+
+
+/*
+ * @client: the client to send the error to
+ * @req: the message this error is in reply to
+ *
+ * Send an error message to the client
+ *
+ * Returns 0 if the error was sent, -1 upon fatal error
+ */
+int
+virNetServerProgramSendReplyError(virNetServerProgramPtr prog,
+                                  virNetServerClientPtr client,
+                                  virNetMessagePtr msg,
+                                  virNetMessageErrorPtr rerr,
+                                  virNetMessageHeaderPtr req)
+{
+    /*
+     * For data streams, errors are sent back as data streams
+     * For method calls, errors are sent back as method replies
+     */
+    return virNetServerProgramSendError(prog,
+                                        client,
+                                        msg,
+                                        rerr,
+                                        req->proc,
+                                        req->type == VIR_NET_STREAM ? VIR_NET_STREAM : VIR_NET_REPLY,
+                                        req->serial);
+}
+
+
+int virNetServerProgramSendStreamError(virNetServerProgramPtr prog,
+                                       virNetServerClientPtr client,
+                                       virNetMessagePtr msg,
+                                       virNetMessageErrorPtr rerr,
+                                       int procedure,
+                                       int serial)
+{
+    return virNetServerProgramSendError(prog,
+                                        client,
+                                        msg,
+                                        rerr,
+                                        procedure,
+                                        VIR_NET_STREAM,
+                                        serial);
+}
+
+
+static int
+virNetServerProgramDispatchCall(virNetServerProgramPtr prog,
+                                virNetServerPtr server,
+                                virNetServerClientPtr client,
+                                virNetMessagePtr msg);
+
+/*
+ * @server: the unlocked server object
+ * @client: the unlocked client object
+ * @msg: the complete incoming message packet, with header already decoded
+ *
+ * This function is intended to be called from worker threads
+ * when an incoming message is ready to be dispatched for
+ * execution.
+ *
+ * Upon successful return the '@msg' instance will be released
+ * by this function (or more often, reused to send a reply).
+ * Upon failure, the '@msg' must be freed by the caller.
+ *
+ * Returns 0 if the message was dispatched, -1 upon fatal error
+ */
+int virNetServerProgramDispatch(virNetServerProgramPtr prog,
+                                virNetServerPtr server,
+                                virNetServerClientPtr client,
+                                virNetMessagePtr msg)
+{
+    int ret = -1;
+    virNetMessageError rerr;
+
+    memset(&rerr, 0, sizeof(rerr));
+
+    VIR_DEBUG("prog=%d ver=%d type=%d status=%d serial=%d proc=%d",
+              msg->header.prog, msg->header.vers, msg->header.type,
+              msg->header.status, msg->header.serial, msg->header.proc);
+
+    /* Check version, etc. */
+    if (msg->header.prog != prog->program) {
+        virNetError(VIR_ERR_RPC,
+                    _("program mismatch (actual %x, expected %x)"),
+                    msg->header.prog, prog->program);
+        goto error;
+    }
+
+    if (msg->header.vers != prog->version) {
+        virNetError(VIR_ERR_RPC,
+                    _("version mismatch (actual %x, expected %x)"),
+                    msg->header.vers, prog->version);
+        goto error;
+    }
+
+    switch (msg->header.type) {
+    case VIR_NET_CALL:
+        ret = virNetServerProgramDispatchCall(prog, server, client, msg);
+        break;
+
+    case VIR_NET_STREAM:
+        /* Since stream data is non-acked, async, we may continue to receive
+         * stream packets after we closed down a stream. Just drop & ignore
+         * these.
+         */
+        VIR_INFO("Ignoring unexpected stream data serial=%d proc=%d status=%d",
+                 msg->header.serial, msg->header.proc, msg->header.status);
+        virNetMessageFree(msg);
+        ret = 0;
+        break;
+
+    default:
+        virNetError(VIR_ERR_RPC,
+                    _("Unexpected message type %u"),
+                    msg->header.type);
+        goto error;
+    }
+
+    return ret;
+
+error:
+    ret = virNetServerProgramSendReplyError(prog, client, msg, &rerr, &msg->header);
+
+    return ret;
+}
+
+
+/*
+ * @server: the unlocked server object
+ * @client: the unlocked client object
+ * @msg: the complete incoming method call, with header already decoded
+ *
+ * This method is used to dispatch an message representing an
+ * incoming method call from a client. It decodes the payload
+ * to obtain method call arguments, invokves the method and
+ * then sends a reply packet with the return values
+ *
+ * Returns 0 if the reply was sent, or -1 upon fatal error
+ */
+static int
+virNetServerProgramDispatchCall(virNetServerProgramPtr prog,
+                                virNetServerPtr server,
+                                virNetServerClientPtr client,
+                                virNetMessagePtr msg)
+{
+    char *arg = NULL;
+    char *ret = NULL;
+    int rv = -1;
+    virNetServerProgramProcPtr dispatcher;
+    virNetMessageError rerr;
+
+    memset(&rerr, 0, sizeof(rerr));
+
+    if (msg->header.status != VIR_NET_OK) {
+        virNetError(VIR_ERR_RPC,
+                    _("Unexpected message status %u"),
+                    msg->header.status);
+        goto error;
+    }
+
+    dispatcher = virNetServerProgramGetProc(prog, msg->header.proc);
+
+    if (!dispatcher) {
+        virNetError(VIR_ERR_RPC,
+                    _("unknown procedure: %d"),
+                    msg->header.proc);
+        goto error;
+    }
+
+    /* If client is marked as needing auth, don't allow any RPC ops
+     * which are except for authentication ones
+     */
+    if (virNetServerClientNeedAuth(client) &&
+        dispatcher->needAuth) {
+        /* Explicitly *NOT* calling  remoteDispatchAuthError() because
+           we want back-compatability with libvirt clients which don't
+           support the VIR_ERR_AUTH_FAILED error code */
+        virNetError(VIR_ERR_RPC,
+                    "%s", _("authentication required"));
+        goto error;
+    }
+
+    if (VIR_ALLOC_N(arg, dispatcher->arg_len) < 0) {
+        virReportOOMError();
+        goto error;
+    }
+    if (VIR_ALLOC_N(ret, dispatcher->ret_len) < 0) {
+        virReportOOMError();
+        goto error;
+    }
+
+    if (virNetMessageDecodePayload(msg, dispatcher->arg_filter, arg) < 0)
+        goto error;
+
+    /*
+     * When the RPC handler is called:
+     *
+     *  - Server object is unlocked
+     *  - Client object is unlocked
+     *
+     * Without locking, it is safe to use:
+     *
+     *   'args and 'ret'
+     */
+    rv = (dispatcher->func)(server, client, &msg->header, &rerr, arg, ret);
+
+    xdr_free(dispatcher->arg_filter, arg);
+
+    if (rv < 0)
+        goto error;
+
+    /* Return header. We're re-using same message object, so
+     * only need to tweak type/status fields */
+    /*msg->header.prog = msg->header.prog;*/
+    /*msg->header.vers = msg->header.vers;*/
+    /*msg->header.proc = msg->header.proc;*/
+    msg->header.type = VIR_NET_REPLY;
+    /*msg->header.serial = msg->header.serial;*/
+    msg->header.status = VIR_NET_OK;
+
+    if (virNetMessageEncodeHeader(msg) < 0) {
+        xdr_free(dispatcher->ret_filter, ret);
+        goto error;
+    }
+
+    if (virNetMessageEncodePayload(msg, dispatcher->ret_filter, ret) < 0) {
+        xdr_free(dispatcher->ret_filter, ret);
+        goto error;
+    }
+
+    xdr_free(dispatcher->ret_filter, ret);
+    VIR_FREE(arg);
+    VIR_FREE(ret);
+
+    /* Put reply on end of tx queue to send out  */
+    return virNetServerClientSendMessage(client, msg);
+
+error:
+    /* Bad stuff (de-)serializing message, but we have an
+     * RPC error message we can send back to the client */
+    rv = virNetServerProgramSendReplyError(prog, client, msg, &rerr, &msg->header);
+
+    VIR_FREE(arg);
+    VIR_FREE(ret);
+
+    return rv;
+}
+
+
+int virNetServerProgramSendStreamData(virNetServerProgramPtr prog,
+                                      virNetServerClientPtr client,
+                                      virNetMessagePtr msg,
+                                      int procedure,
+                                      int serial,
+                                      const char *data,
+                                      size_t len)
+{
+    VIR_DEBUG("client=%p msg=%p data=%p len=%zu", client, msg, data, len);
+
+    /* Return header. We're reusing same message object, so
+     * only need to tweak type/status fields */
+    msg->header.prog = prog->program;
+    msg->header.vers = prog->version;
+    msg->header.proc = procedure;
+    msg->header.type = VIR_NET_STREAM;
+    msg->header.serial = serial;
+    /*
+     * NB
+     *   data != NULL + len > 0    => REMOTE_CONTINUE   (Sending back data)
+     *   data != NULL + len == 0   => REMOTE_CONTINUE   (Sending read EOF)
+     *   data == NULL              => REMOTE_OK         (Sending finish handshake confirmation)
+     */
+    msg->header.status = data ? VIR_NET_CONTINUE : VIR_NET_OK;
+
+    if (virNetMessageEncodeHeader(msg) < 0)
+        return -1;
+
+    if (data && len) {
+        if (virNetMessageEncodePayloadRaw(msg, data, len) < 0)
+            return -1;
+
+        VIR_DEBUG("Total %zu", msg->bufferOffset);
+    }
+
+    return virNetServerClientSendMessage(client, msg);
+}
+
+
+void virNetServerProgramFree(virNetServerProgramPtr prog)
+{
+    if (!prog)
+        return;
+
+    VIR_DEBUG("prog=%p refs=%d", prog, prog->refs);
+
+    prog->refs--;
+    if (prog->refs > 0)
+        return;
+
+    VIR_FREE(prog);
+}
diff --git a/src/rpc/virnetserverprogram.h b/src/rpc/virnetserverprogram.h
new file mode 100644
index 0000000..b68a3ef
--- /dev/null
+++ b/src/rpc/virnetserverprogram.h
@@ -0,0 +1,107 @@
+/*
+ * virnetserverprogram.h: generic network RPC server program
+ *
+ * Copyright (C) 2006-2011 Red Hat, Inc.
+ * Copyright (C) 2006 Daniel P. Berrange
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307  USA
+ *
+ * Author: Daniel P. Berrange <berrange at redhat.com>
+ */
+
+#ifndef __VIR_NET_PROGRAM_H__
+# define __VIR_NET_PROGRAM_H__
+
+# include <stdbool.h>
+
+# include "virnetmessage.h"
+# include "virnetserverclient.h"
+
+typedef struct _virNetServer virNetServer;
+typedef virNetServer *virNetServerPtr;
+
+typedef struct _virNetServerService virNetServerService;
+typedef virNetServerService *virNetServerServicePtr;
+
+typedef struct _virNetServerProgram virNetServerProgram;
+typedef virNetServerProgram *virNetServerProgramPtr;
+
+typedef struct _virNetServerProgramProc virNetServerProgramProc;
+typedef virNetServerProgramProc *virNetServerProgramProcPtr;
+
+typedef struct _virNetServerProgramErrorHandler virNetServerProgramErrorHander;
+typedef virNetServerProgramErrorHander *virNetServerProgramErrorHanderPtr;
+
+typedef int (*virNetServerProgramDispatchFunc)(virNetServerPtr server,
+                                               virNetServerClientPtr client,
+                                               virNetMessageHeaderPtr hdr,
+                                               virNetMessageErrorPtr rerr,
+                                               void *args,
+                                               void *ret);
+
+struct _virNetServerProgramProc {
+    virNetServerProgramDispatchFunc func;
+    size_t arg_len;
+    xdrproc_t arg_filter;
+    size_t ret_len;
+    xdrproc_t ret_filter;
+    bool needAuth;
+};
+
+virNetServerProgramPtr virNetServerProgramNew(unsigned program,
+                                              unsigned version,
+                                              virNetServerProgramProcPtr procs,
+                                              size_t nprocs);
+
+int virNetServerProgramGetID(virNetServerProgramPtr prog);
+int virNetServerProgramGetVersion(virNetServerProgramPtr prog);
+
+void virNetServerProgramRef(virNetServerProgramPtr prog);
+
+int virNetServerProgramMatches(virNetServerProgramPtr prog,
+                               virNetMessagePtr msg);
+
+int virNetServerProgramDispatch(virNetServerProgramPtr prog,
+                                virNetServerPtr server,
+                                virNetServerClientPtr client,
+                                virNetMessagePtr msg);
+
+int virNetServerProgramSendReplyError(virNetServerProgramPtr prog,
+                                      virNetServerClientPtr client,
+                                      virNetMessagePtr msg,
+                                      virNetMessageErrorPtr rerr,
+                                      virNetMessageHeaderPtr req);
+
+int virNetServerProgramSendStreamError(virNetServerProgramPtr prog,
+                                       virNetServerClientPtr client,
+                                       virNetMessagePtr msg,
+                                       virNetMessageErrorPtr rerr,
+                                       int procedure,
+                                       int serial);
+
+int virNetServerProgramSendStreamData(virNetServerProgramPtr prog,
+                                      virNetServerClientPtr client,
+                                      virNetMessagePtr msg,
+                                      int procedure,
+                                      int serial,
+                                      const char *data,
+                                      size_t len);
+
+void virNetServerProgramFree(virNetServerProgramPtr prog);
+
+
+
+
+#endif /* __VIR_NET_SERVER_PROGRAM_H__ */
diff --git a/src/rpc/virnetserverservice.c b/src/rpc/virnetserverservice.c
new file mode 100644
index 0000000..0cc65c3
--- /dev/null
+++ b/src/rpc/virnetserverservice.c
@@ -0,0 +1,247 @@
+/*
+ * virnetserverservice.c: generic network RPC server service
+ *
+ * Copyright (C) 2006-2011 Red Hat, Inc.
+ * Copyright (C) 2006 Daniel P. Berrange
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307  USA
+ *
+ * Author: Daniel P. Berrange <berrange at redhat.com>
+ */
+
+#include <config.h>
+
+#include "virnetserverservice.h"
+
+#include "memory.h"
+#include "virterror_internal.h"
+
+
+#define VIR_FROM_THIS VIR_FROM_RPC
+
+struct _virNetServerService {
+    int refs;
+
+    size_t nsocks;
+    virNetSocketPtr *socks;
+
+    int auth;
+    bool readonly;
+
+    virNetTLSContextPtr tls;
+
+    virNetServerServiceDispatchFunc dispatchFunc;
+    void *dispatchOpaque;
+};
+
+
+
+static void virNetServerServiceAccept(virNetSocketPtr sock,
+                                      int events ATTRIBUTE_UNUSED,
+                                      void *opaque)
+{
+    virNetServerServicePtr svc = opaque;
+    virNetServerClientPtr client = NULL;
+    virNetSocketPtr clientsock = NULL;
+
+    if (virNetSocketAccept(sock, &clientsock) < 0)
+        goto error;
+
+    if (!clientsock) /* Connection already went away */
+        goto cleanup;
+
+    if (!(client = virNetServerClientNew(clientsock,
+                                         svc->auth,
+                                         svc->readonly,
+                                         svc->tls)))
+        goto error;
+
+    if (!svc->dispatchFunc)
+        goto error;
+
+    if (svc->dispatchFunc(svc, client, svc->dispatchOpaque) < 0)
+        virNetServerClientClose(client);
+
+    virNetServerClientFree(client);
+
+cleanup:
+    return;
+
+error:
+    virNetSocketFree(clientsock);
+}
+
+
+virNetServerServicePtr virNetServerServiceNewTCP(const char *nodename,
+                                                 const char *service,
+                                                 int auth,
+                                                 bool readonly,
+                                                 virNetTLSContextPtr tls)
+{
+    virNetServerServicePtr svc;
+    size_t i;
+
+    if (VIR_ALLOC(svc) < 0)
+        goto no_memory;
+
+    svc->refs = 1;
+    svc->auth = auth;
+    svc->readonly = readonly;
+    svc->tls = tls;
+    if (tls)
+        virNetTLSContextRef(tls);
+
+    if (virNetSocketNewListenTCP(nodename,
+                                 service,
+                                 &svc->socks,
+                                 &svc->nsocks) < 0)
+        goto error;
+
+    for (i = 0 ; i < svc->nsocks ; i++) {
+        if (virNetSocketListen(svc->socks[i]) < 0)
+            goto error;
+
+        /* IO callback is initially disabled, until we're ready
+         * to deal with incoming clients */
+        if (virNetSocketAddIOCallback(svc->socks[i],
+                                      0,
+                                      virNetServerServiceAccept,
+                                      svc) < 0)
+            goto error;
+    }
+
+
+    return svc;
+
+no_memory:
+    virReportOOMError();
+error:
+    virNetServerServiceFree(svc);
+    return NULL;
+}
+
+
+virNetServerServicePtr virNetServerServiceNewUNIX(const char *path,
+                                                  mode_t mask,
+                                                  gid_t grp,
+                                                  int auth,
+                                                  bool readonly,
+                                                  virNetTLSContextPtr tls)
+{
+    virNetServerServicePtr svc;
+    int i;
+
+    if (VIR_ALLOC(svc) < 0)
+        goto no_memory;
+
+    svc->refs = 1;
+    svc->auth = auth;
+    svc->readonly = readonly;
+    svc->tls = tls;
+    if (tls)
+        virNetTLSContextRef(tls);
+
+    svc->nsocks = 1;
+    if (VIR_ALLOC_N(svc->socks, svc->nsocks) < 0)
+        goto no_memory;
+
+    if (virNetSocketNewListenUNIX(path,
+                                  mask,
+                                  grp,
+                                  &svc->socks[0]) < 0)
+        goto error;
+
+    for (i = 0 ; i < svc->nsocks ; i++) {
+        if (virNetSocketListen(svc->socks[i]) < 0)
+            goto error;
+
+        /* IO callback is initially disabled, until we're ready
+         * to deal with incoming clients */
+        if (virNetSocketAddIOCallback(svc->socks[i],
+                                      0,
+                                      virNetServerServiceAccept,
+                                      svc) < 0)
+            goto error;
+    }
+
+
+    return svc;
+
+no_memory:
+    virReportOOMError();
+error:
+    virNetServerServiceFree(svc);
+    return NULL;
+}
+
+
+int virNetServerServiceGetAuth(virNetServerServicePtr svc)
+{
+    return svc->auth;
+}
+
+
+bool virNetServerServiceIsReadonly(virNetServerServicePtr svc)
+{
+    return svc->readonly;
+}
+
+
+void virNetServerServiceRef(virNetServerServicePtr svc)
+{
+    svc->refs++;
+}
+
+
+void virNetServerServiceSetDispatcher(virNetServerServicePtr svc,
+                                      virNetServerServiceDispatchFunc func,
+                                      void *opaque)
+{
+    svc->dispatchFunc = func;
+    svc->dispatchOpaque = opaque;
+}
+
+
+void virNetServerServiceFree(virNetServerServicePtr svc)
+{
+    int i;
+
+    if (!svc)
+        return;
+
+    svc->refs--;
+    if (svc->refs > 0)
+        return;
+
+    for (i = 0 ; i < svc->nsocks ; i++)
+        virNetSocketFree(svc->socks[i]);
+    VIR_FREE(svc->socks);
+
+    virNetTLSContextFree(svc->tls);
+
+    VIR_FREE(svc);
+}
+
+void virNetServerServiceToggle(virNetServerServicePtr svc,
+                               bool enabled)
+{
+    int i;
+
+    for (i = 0 ; i < svc->nsocks ; i++)
+        virNetSocketUpdateIOCallback(svc->socks[i],
+                                     enabled ?
+                                     VIR_EVENT_HANDLE_READABLE :
+                                     0);
+}
diff --git a/src/rpc/virnetserverservice.h b/src/rpc/virnetserverservice.h
new file mode 100644
index 0000000..b8ccd55
--- /dev/null
+++ b/src/rpc/virnetserverservice.h
@@ -0,0 +1,65 @@
+/*
+ * virnetserverservice.h: generic network RPC server service
+ *
+ * Copyright (C) 2006-2011 Red Hat, Inc.
+ * Copyright (C) 2006 Daniel P. Berrange
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307  USA
+ *
+ * Author: Daniel P. Berrange <berrange at redhat.com>
+ */
+
+#ifndef __VIR_NET_SERVER_SERVICE_H__
+# define __VIR_NET_SERVER_SERVICE_H__
+
+# include "virnetserverprogram.h"
+
+enum {
+    VIR_NET_SERVER_SERVICE_AUTH_NONE = 0,
+    VIR_NET_SERVER_SERVICE_AUTH_SASL,
+    VIR_NET_SERVER_SERVICE_AUTH_POLKIT,
+};
+
+typedef int (*virNetServerServiceDispatchFunc)(virNetServerServicePtr svc,
+                                               virNetServerClientPtr client,
+                                               void *opaque);
+
+virNetServerServicePtr virNetServerServiceNewTCP(const char *nodename,
+                                                 const char *service,
+                                                 int auth,
+                                                 bool readonly,
+                                                 virNetTLSContextPtr tls);
+virNetServerServicePtr virNetServerServiceNewUNIX(const char *path,
+                                                  mode_t mask,
+                                                  gid_t grp,
+                                                  int auth,
+                                                  bool readonly,
+                                                  virNetTLSContextPtr tls);
+
+int virNetServerServiceGetAuth(virNetServerServicePtr svc);
+bool virNetServerServiceIsReadonly(virNetServerServicePtr svc);
+
+void virNetServerServiceRef(virNetServerServicePtr svc);
+
+void virNetServerServiceSetDispatcher(virNetServerServicePtr svc,
+                                      virNetServerServiceDispatchFunc func,
+                                      void *opaque);
+
+void virNetServerServiceFree(virNetServerServicePtr svc);
+
+void virNetServerServiceToggle(virNetServerServicePtr svc,
+                               bool enabled);
+
+#endif
-- 
1.7.4.4




More information about the libvir-list mailing list