[Libvir] PATCH 1/3: Split out simple event loop

Daniel P. Berrange berrange at redhat.com
Mon Jun 18 18:36:20 UTC 2007


Attached is a new version of the patch which allocs/frees memory for storing
the callback data in blocks. It also adds copious comments and fixes a very
subtle bug in adding handles during dispatch. The printfs are changed into
qemudDebug()s and the nvmfds field is killed here.

On Mon, Jun 18, 2007 at 05:49:59AM -0400, Daniel Veillard wrote:
> On Sun, Jun 17, 2007 at 10:52:55PM +0100, Daniel P. Berrange wrote:
> > The following patch adds a qemud/event.c & qemud/event.h file providing a
> > general purpose event loop built around poll. Users register file handles
> > and associated callbacks, and / or timers. The qemud.c file is changed to
> > make use of these APIs for dealing with server, client, and VM file handles
> > and/or sockets. This decouples much of the QEMU VM I/O code from the main
> > qemud.c daemon code.

 b/qemud/event.c   |  460 ++++++++++++++++++++++++++++++++++++++++++++++++++++++
 b/qemud/event.h   |   98 +++++++++++
 qemud/Makefile.am |    3 
 qemud/internal.h  |    1 
 qemud/qemud.c     |  400 +++++++++++++++++++++++++---------------------
 5 files changed, 774 insertions(+), 188 deletions(-)

Dan.
-- 
|=- Red Hat, Engineering, Emerging Technologies, Boston.  +1 978 392 2496 -=|
|=-           Perl modules: http://search.cpan.org/~danberr/              -=|
|=-               Projects: http://freshmeat.net/~danielpb/               -=|
|=-  GnuPG: 7D3B9505   F3C9 553F A1DA 4AC2 5648 23C1 B3DF F742 7D3B 9505  -=| 
-------------- next part --------------
diff -r 56b36b784a1c qemud/Makefile.am
--- a/qemud/Makefile.am	Mon Jun 18 11:01:38 2007 -0400
+++ b/qemud/Makefile.am	Mon Jun 18 11:02:20 2007 -0400
@@ -16,7 +16,8 @@ libvirt_qemud_SOURCES = \
 		buf.c buf.h \
 		protocol.h protocol.c \
 		remote_protocol.h remote_protocol.c \
-		remote.c
+		remote.c \
+                event.c event.h
 #-D_XOPEN_SOURCE=600 -D_XOPEN_SOURCE_EXTENDED=1 -D_POSIX_C_SOURCE=199506L
 libvirt_qemud_CFLAGS = \
         -I$(top_srcdir)/include -I$(top_builddir)/include $(LIBXML_CFLAGS) \
diff -r 56b36b784a1c qemud/event.c
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/qemud/event.c	Mon Jun 18 14:14:13 2007 -0400
@@ -0,0 +1,460 @@
+/*
+ * event.h: event loop for monitoring file handles
+ *
+ * Copyright (C) 2007 Daniel P. Berrange
+ * Copyright (C) 2007 Red Hat, Inc.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307  USA
+ *
+ * Author: Daniel P. Berrange <berrange at redhat.com>
+ */
+
+
+#include <stdlib.h>
+#include <string.h>
+#include <poll.h>
+#include <sys/time.h>
+#include <errno.h>
+
+#include "internal.h"
+#include "event.h"
+
+/* State for a single file handle being monitored */
+struct virEventHandle {
+    int fd;
+    int events;
+    virEventHandleCallback cb;
+    void *opaque;
+    int deleted;
+};
+
+/* State for a single timer being generated */
+struct virEventTimeout {
+    int timer;
+    int timeout;
+    unsigned long long expiresAt;
+    virEventTimeoutCallback cb;
+    void *opaque;
+    int deleted;
+};
+
+/* Allocate extra slots for virEventHandle/virEventTimeout
+   records in this multiple */
+#define EVENT_ALLOC_EXTENT 10
+
+/* State for the main event loop */
+struct virEventLoop {
+    int handlesCount;
+    int handlesAlloc;
+    struct virEventHandle *handles;
+    int timeoutsCount;
+    int timeoutsAlloc;
+    struct virEventTimeout *timeouts;
+};
+
+/* Only have one event loop */
+static struct virEventLoop eventLoop;
+
+/* Unique ID for the next timer to be registered */
+static int nextTimer = 0;
+
+
+/*
+ * Register a callback for monitoring file handle events.
+ * NB, it *must* be safe to call this from within a callback
+ * For this reason we only ever append to existing list.
+ */
+int virEventAddHandle(int fd, int events, virEventHandleCallback cb, void *opaque) {
+    qemudDebug("Add handle %d %d %p %p\n", fd, events, cb, opaque);
+    if (eventLoop.handlesCount == eventLoop.handlesAlloc) {
+        struct virEventHandle *tmp;
+        qemudDebug("Used %d handle slots, adding %d more\n",
+                   eventLoop.handlesAlloc, EVENT_ALLOC_EXTENT);
+        tmp = realloc(eventLoop.handles,
+                      sizeof(struct virEventHandle) *
+                      (eventLoop.handlesAlloc + EVENT_ALLOC_EXTENT));
+        if (!tmp) {
+            return -1;
+        }
+        eventLoop.handles = tmp;
+        eventLoop.handlesAlloc += EVENT_ALLOC_EXTENT;
+    }
+
+    eventLoop.handles[eventLoop.handlesCount].fd = fd;
+    eventLoop.handles[eventLoop.handlesCount].events = events;
+    eventLoop.handles[eventLoop.handlesCount].cb = cb;
+    eventLoop.handles[eventLoop.handlesCount].opaque = opaque;
+    eventLoop.handles[eventLoop.handlesCount].deleted = 0;
+
+    eventLoop.handlesCount++;
+
+    return 0;
+}
+
+/*
+ * Unregister a callback from a file handle
+ * NB, it *must* be safe to call this from within a callback
+ * For this reason we only ever set a flag in the existing list.
+ * Actual deletion will be done out-of-band
+ */
+int virEventRemoveHandle(int fd) {
+    int i;
+    qemudDebug("Remove handle %d\n", fd);
+    for (i = 0 ; i < eventLoop.handlesCount ; i++) {
+        if (eventLoop.handles[i].deleted)
+            continue;
+
+        if (eventLoop.handles[i].fd == fd) {
+            qemudDebug("mark delete %d\n", i);
+            eventLoop.handles[i].deleted = 1;
+            return 0;
+        }
+    }
+    return -1;
+}
+
+
+/*
+ * Register a callback for a timer event
+ * NB, it *must* be safe to call this from within a callback
+ * For this reason we only ever append to existing list.
+ */
+int virEventAddTimeout(int timeout, virEventTimeoutCallback cb, void *opaque) {
+    struct timeval tv;
+
+    if (gettimeofday(&tv, NULL) < 0) {
+        return -1;
+    }
+
+    if (eventLoop.timeoutsCount == eventLoop.timeoutsAlloc) {
+        struct virEventTimeout *tmp;
+        qemudDebug("Used %d timeout slots, adding %d more\n",
+                   eventLoop.timeoutsAlloc, EVENT_ALLOC_EXTENT);
+        tmp = realloc(eventLoop.timeouts,
+                      sizeof(struct virEventTimeout) *
+                      (eventLoop.timeoutsAlloc + EVENT_ALLOC_EXTENT));
+        if (!tmp) {
+            return -1;
+        }
+        eventLoop.timeouts = tmp;
+        eventLoop.timeoutsAlloc += EVENT_ALLOC_EXTENT;
+    }
+
+    eventLoop.timeouts[eventLoop.timeoutsCount].timer = nextTimer++;
+    eventLoop.timeouts[eventLoop.timeoutsCount].timeout = timeout;
+    eventLoop.timeouts[eventLoop.timeoutsCount].cb = cb;
+    eventLoop.timeouts[eventLoop.timeoutsCount].opaque = opaque;
+    eventLoop.timeouts[eventLoop.timeoutsCount].deleted = 0;
+    eventLoop.timeouts[eventLoop.timeoutsCount].expiresAt =
+        (((unsigned long long)tv.tv_sec)*1000) +
+        (((unsigned long long)tv.tv_usec)/1000);
+
+    eventLoop.timeoutsCount++;
+
+    return 0;
+}
+
+/*
+ * Unregister a callback for a timer
+ * NB, it *must* be safe to call this from within a callback
+ * For this reason we only ever set a flag in the existing list.
+ * Actual deletion will be done out-of-band
+ */
+int virEventRemoveTimeout(int timer) {
+    int i;
+    for (i = 0 ; i < eventLoop.timeoutsCount ; i++) {
+        if (eventLoop.timeouts[i].deleted)
+            continue;
+
+        if (eventLoop.timeouts[i].timer == timer) {
+            eventLoop.timeouts[i].deleted = 1;
+            return 0;
+        }
+    }
+    return -1;
+}
+
+/* Iterates over all registered timeouts and determine which
+ * will be the first to expire.
+ * @timeout: filled with expiry time of soonest timer, or -1 if
+ *           no timeout is pending
+ * returns: 0 on success, -1 on error
+ */
+static int virEventCalculateTimeout(int *timeout) {
+    unsigned long long then = 0;
+    int i;
+
+    /* Figure out if we need a timeout */
+    for (i = 0 ; i < eventLoop.timeoutsCount ; i++) {
+        if (then == 0 ||
+            eventLoop.timeouts[i].expiresAt < then)
+            then = eventLoop.timeouts[i].expiresAt;
+    }
+
+    /* Calculate how long we should wait for a timeout if needed */
+    if (then > 0) {
+        struct timeval tv;
+
+        if (gettimeofday(&tv, NULL) < 0) {
+            return -1;
+        }
+
+        *timeout = then -
+            ((((unsigned long long)tv.tv_sec)*1000) +
+             (((unsigned long long)tv.tv_usec)/1000));
+
+        if (*timeout < 0)
+            *timeout = 1;
+    } else {
+        *timeout = -1;
+    }
+
+    return 0;
+}
+
+/*
+ * Allocate a pollfd array containing data for all registered
+ * file handles. The caller must free the returned data struct
+ * returns: the pollfd array, or NULL on error
+ */
+static struct pollfd *virEventMakePollFDs(void) {
+    struct pollfd *fds;
+    int i;
+
+    /* Setup the poll file handle data structs */
+    if (!(fds = malloc(sizeof(struct pollfd) *
+                       eventLoop.handlesCount)))
+        return NULL;
+
+    for (i = 0 ; i < eventLoop.handlesCount ; i++) {
+        fds[i].fd = eventLoop.handles[i].fd;
+        fds[i].events = eventLoop.handles[i].events;
+        fds[i].revents = 0;
+        qemudDebug("Wait for %d %d\n", eventLoop.handles[i].fd, eventLoop.handles[i].events);
+    }
+
+    return fds;
+}
+
+
+/*
+ * Iterate over all timers and determine if any have expired.
+ * Invoke the user supplied callback for each timer whose
+ * expiry time is met, and schedule the next timeout. Does
+ * not try to 'catch up' on time if the actual expiry time
+ * was later than the requested time.
+ *
+ * This method must cope with new timers being registered
+ * by a callback, and must skip any timers marked as deleted.
+ *
+ * Returns 0 upon success, -1 if an error occurred
+ */
+static int virEventDispatchTimeouts(void) {
+    struct timeval tv;
+    unsigned long long now;
+    int i;
+    /* Save this now - it may be changed during dispatch */
+    int ntimeouts = eventLoop.timeoutsCount;
+
+    if (gettimeofday(&tv, NULL) < 0) {
+        return -1;
+    }
+    now = (((unsigned long long)tv.tv_sec)*1000) +
+        (((unsigned long long)tv.tv_usec)/1000);
+
+    for (i = 0 ; i < ntimeouts ; i++) {
+        if (eventLoop.timeouts[i].deleted)
+            continue;
+
+        if (eventLoop.timeouts[i].expiresAt <= now) {
+            (eventLoop.timeouts[i].cb)(eventLoop.timeouts[i].timer,
+                                       eventLoop.timeouts[i].opaque);
+            eventLoop.timeouts[i].expiresAt =
+                now + eventLoop.timeouts[i].timeout;
+        }
+    }
+    return 0;
+}
+
+
+/* Iterate over all file handles and dispatch any which
+ * have pending events listed in the poll() data. Invoke
+ * the user supplied callback for each handle which has
+ * pending events
+ *
+ * This method must cope with new handles being registered
+ * by a callback, and must skip any handles marked as deleted.
+ *
+ * Returns 0 upon success, -1 if an error occurred
+ */
+static int virEventDispatchHandles(struct pollfd *fds) {
+    int i;
+    /* Save this now - it may be changed during dispatch */
+    int nhandles = eventLoop.handlesCount;
+
+    for (i = 0 ; i < nhandles ; i++) {
+        if (eventLoop.handles[i].deleted) {
+            qemudDebug("Skip deleted %d\n", eventLoop.handles[i].fd);
+            continue;
+        }
+
+        if (fds[i].revents) {
+            qemudDebug("Dispatch %d %d %p\n", fds[i].fd, fds[i].revents, eventLoop.handles[i].opaque);
+            (eventLoop.handles[i].cb)(fds[i].fd, fds[i].revents,
+                                      eventLoop.handles[i].opaque);
+        }
+    }
+
+    return 0;
+}
+
+
+/* Used post dispatch to actually remove any timers that
+ * were previously marked as deleted. This asynchronous
+ * cleanup is needed to make dispatch re-entrant safe.
+ */
+static int virEventCleanupTimeouts(void) {
+    int i;
+
+    /* Remove deleted entries, shuffling down remaining
+     * entries as needed to form contigous series
+     */
+    for (i = 0 ; i < eventLoop.timeoutsCount ; ) {
+        if (!eventLoop.handles[i].deleted) {
+            i++;
+            continue;
+        }
+
+        if ((i+1) < eventLoop.timeoutsCount) {
+            memmove(eventLoop.timeouts+i,
+                    eventLoop.timeouts+i+1,
+                    sizeof(struct virEventTimeout)*(eventLoop.timeoutsCount-(i+1)));
+        }
+        eventLoop.timeoutsCount--;
+    }
+
+    /* Release some memory if we've got a big chunk free */
+    if ((eventLoop.timeoutsAlloc - EVENT_ALLOC_EXTENT) > eventLoop.timeoutsCount) {
+        struct virEventTimeout *tmp;
+        qemudDebug("Releasing %d out of %d timeout slots used, releasing %d\n",
+                   eventLoop.timeoutsCount, eventLoop.timeoutsAlloc, EVENT_ALLOC_EXTENT);
+        tmp = realloc(eventLoop.timeouts,
+                      sizeof(struct virEventTimeout) *
+                      (eventLoop.timeoutsAlloc - EVENT_ALLOC_EXTENT));
+        if (!tmp) {
+            return -1;
+        }
+        eventLoop.timeouts = tmp;
+        eventLoop.timeoutsAlloc -= EVENT_ALLOC_EXTENT;
+    }
+    return 0;
+}
+
+/* Used post dispatch to actually remove any handles that
+ * were previously marked as deleted. This asynchronous
+ * cleanup is needed to make dispatch re-entrant safe.
+ */
+static int virEventCleanupHandles(void) {
+    int i;
+
+    /* Remove deleted entries, shuffling down remaining
+     * entries as needed to form contigous series
+     */
+    for (i = 0 ; i < eventLoop.handlesCount ; ) {
+        if (!eventLoop.handles[i].deleted) {
+            i++;
+            continue;
+        }
+
+        if ((i+1) < eventLoop.handlesCount) {
+            memmove(eventLoop.handles+i,
+                    eventLoop.handles+i+1,
+                    sizeof(struct virEventHandle)*(eventLoop.handlesCount-(i+1)));
+        }
+        eventLoop.handlesCount--;
+    }
+
+    /* Release some memory if we've got a big chunk free */
+    if ((eventLoop.handlesAlloc - EVENT_ALLOC_EXTENT) > eventLoop.handlesCount) {
+        struct virEventHandle *tmp;
+        qemudDebug("Releasing %d out of %d handles slots used, releasing %d\n",
+                   eventLoop.handlesCount, eventLoop.handlesAlloc, EVENT_ALLOC_EXTENT);
+        tmp = realloc(eventLoop.handles,
+                      sizeof(struct virEventHandle) *
+                      (eventLoop.handlesAlloc - EVENT_ALLOC_EXTENT));
+        if (!tmp) {
+            return -1;
+        }
+        eventLoop.handles = tmp;
+        eventLoop.handlesAlloc -= EVENT_ALLOC_EXTENT;
+    }
+    return 0;
+}
+
+/*
+ * Run a single iteration of the event loop, blocking until
+ * at least one file handle has an event, or a timer expires
+ */
+int virEventRunOnce(void) {
+    struct pollfd *fds;
+    int ret, timeout;
+
+    if (!(fds = virEventMakePollFDs()))
+        return -1;
+
+    if (virEventCalculateTimeout(&timeout) < 0) {
+        free(fds);
+        return -1;
+    }
+
+ retry:
+    qemudDebug("Poll on %d handles %p timeout %d\n", eventLoop.handlesCount, fds, timeout);
+    ret = poll(fds, eventLoop.handlesCount, timeout);
+    qemudDebug("Poll got %d event\n", ret);
+    if (ret < 0) {
+        if (errno == EINTR) {
+            goto retry;
+        }
+        free(fds);
+        return -1;
+    }
+    if (virEventDispatchTimeouts() < 0) {
+        free(fds);
+        return -1;
+    }
+
+    if (ret > 0 &&
+        virEventDispatchHandles(fds) < 0) {
+        free(fds);
+        return -1;
+    }
+    free(fds);
+
+    if (virEventCleanupTimeouts() < 0)
+        return -1;
+
+    if (virEventCleanupHandles() < 0)
+        return -1;
+
+    return 0;
+}
+/*
+ * Local variables:
+ *  indent-tabs-mode: nil
+ *  c-indent-level: 4
+ *  c-basic-offset: 4
+ *  tab-width: 4
+ * End:
+ */
diff -r 56b36b784a1c qemud/event.h
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/qemud/event.h	Mon Jun 18 12:20:17 2007 -0400
@@ -0,0 +1,98 @@
+/*
+ * event.h: event loop for monitoring file handles
+ *
+ * Copyright (C) 2007 Daniel P. Berrange
+ * Copyright (C) 2007 Red Hat, Inc.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307  USA
+ *
+ * Author: Daniel P. Berrange <berrange at redhat.com>
+ */
+
+#ifndef __VIRTD_EVENT_H__
+#define __VIRTD_EVENT_H__
+
+
+/**
+ * virEventHandleCallback: callback for receiving file handle events
+ *
+ * @fd: file handle on which the event occured
+ * @events: bitset of events from POLLnnn constants
+ * @opaque: user data registered with handle
+ */
+typedef void (*virEventHandleCallback)(int fd, int events, void *opaque);
+
+/**
+ * virEventAddHandle: register a callback for monitoring file handle events
+ *
+ * @fd: file handle to monitor for events
+ * @events: bitset of events to wach from POLLnnn constants
+ * @cb: callback to invoke when an event occurrs
+ * @opaque: user data to pass to callback
+ *
+ * returns -1 if the file handle cannot be registered, 0 upon success
+ */
+int virEventAddHandle(int fd, int events, virEventHandleCallback cb, void *opaque);
+
+/**
+ * virEventRemoveHandle: unregister a callback from a file handle
+ *
+ * @fd: file handle to stop monitoring for events
+ *
+ * returns -1 if the file handle was not registered, 0 upon success
+ */
+int virEventRemoveHandle(int fd);
+
+/**
+ * virEventTimeoutCallback: callback for receiving timer events
+ *
+ * @timer: timer id emitting the event
+ * @opaque: user data registered with handle
+ */
+typedef void (*virEventTimeoutCallback)(int timer, void *opaque);
+
+/**
+ * virEventAddTimeout: register a callback for a timer event
+ *
+ * @timeout: timeout between events in milliseconds
+ * @cb: callback to invoke when an event occurrs
+ * @opaque: user data to pass to callback
+ *
+ * returns -1 if the file handle cannot be registered, a positive
+ * integer timer id upon success
+ */
+int virEventAddTimeout(int timeout, virEventTimeoutCallback cb, void *opaque);
+
+/**
+ * virEventRemoveTimeout: unregister a callback for a timer
+ *
+ * @timer: the timer id to remove
+ *
+ * returns -1 if the timer was not registered, 0 upon success
+ */
+int virEventRemoveTimeout(int timer);
+
+
+/**
+ * virEventRunOnce: run a single iteration of the event loop.
+ *
+ * Blocks the caller until at least one file handle has an
+ * event or the first timer expires.
+ *
+ * returns -1 if the event monitoring failed
+ */
+int virEventRunOnce(void);
+
+#endif /* __VIRTD_EVENT_H__ */
diff -r 56b36b784a1c qemud/internal.h
--- a/qemud/internal.h	Mon Jun 18 11:01:38 2007 -0400
+++ b/qemud/internal.h	Mon Jun 18 14:20:02 2007 -0400
@@ -338,7 +338,6 @@ struct qemud_server {
     int nclients;
     struct qemud_client *clients;
     int sigread;
-    int nvmfds;
     int nactivevms;
     int ninactivevms;
     struct qemud_vm *vms;
diff -r 56b36b784a1c qemud/qemud.c
--- a/qemud/qemud.c	Mon Jun 18 11:01:38 2007 -0400
+++ b/qemud/qemud.c	Mon Jun 18 14:19:58 2007 -0400
@@ -61,6 +61,7 @@
 #include "driver.h"
 #include "conf.h"
 #include "iptables.h"
+#include "event.h"
 
 static int godaemon = 0;        /* -d: Be a daemon */
 static int verbose = 0;         /* -v: Verbose mode */
@@ -109,6 +110,13 @@ static void sig_handler(int sig) {
     }
     errno = origerrno;
 }
+
+static void qemudDispatchVMEvent(int fd, int events, void *opaque);
+static void qemudDispatchClientEvent(int fd, int events, void *opaque);
+static void qemudDispatchServerEvent(int fd, int events, void *opaque);
+static int qemudRegisterClientEvent(struct qemud_server *server,
+                                    struct qemud_client *client,
+                                    int remove);
 
 static int
 remoteInitializeGnuTLS (void)
@@ -184,8 +192,10 @@ remoteInitializeGnuTLS (void)
     return 0;
 }
 
-static int qemudDispatchSignal(struct qemud_server *server)
-{
+static void qemudDispatchSignalEvent(int fd ATTRIBUTE_UNUSED,
+                                     int events ATTRIBUTE_UNUSED,
+                                     void *opaque) {
+    struct qemud_server *server = (struct qemud_server *)opaque;
     unsigned char sigc;
     struct qemud_vm *vm;
     struct qemud_network *network;
@@ -194,7 +204,7 @@ static int qemudDispatchSignal(struct qe
     if (read(server->sigread, &sigc, 1) != 1) {
         qemudLog(QEMUD_ERR, "Failed to read from signal pipe: %s",
                  strerror(errno));
-        return -1;
+        return;
     }
 
     ret = 0;
@@ -266,7 +276,8 @@ static int qemudDispatchSignal(struct qe
         break;
     }
 
-    return ret;
+    if (ret != 0)
+        server->shutdown = 1;
 }
 
 static int qemudSetCloseExec(int fd) {
@@ -474,19 +485,16 @@ static int qemudListenUnix(struct qemud_
     }
 
     sock->readonly = readonly;
-    sock->next = server->sockets;
-    server->sockets = sock;
-    server->nsockets++;
 
     if ((sock->fd = socket(PF_UNIX, SOCK_STREAM, 0)) < 0) {
         qemudLog(QEMUD_ERR, "Failed to create socket: %s",
                  strerror(errno));
-        return -1;
+        goto cleanup;
     }
 
     if (qemudSetCloseExec(sock->fd) < 0 ||
         qemudSetNonBlock(sock->fd) < 0)
-        return -1;
+        goto cleanup;
 
     memset(&addr, 0, sizeof(addr));
     addr.sun_family = AF_UNIX;
@@ -502,17 +510,35 @@ static int qemudListenUnix(struct qemud_
     if (bind(sock->fd, (struct sockaddr *)&addr, sizeof(addr)) < 0) {
         qemudLog(QEMUD_ERR, "Failed to bind socket to '%s': %s",
                  path, strerror(errno));
-        return -1;
+        goto cleanup;
     }
     umask(oldmask);
 
     if (listen(sock->fd, 30) < 0) {
         qemudLog(QEMUD_ERR, "Failed to listen for connections on '%s': %s",
                  path, strerror(errno));
-        return -1;
-    }
+        goto cleanup;
+    }
+
+    if (virEventAddHandle(sock->fd,
+                          POLLIN| POLLERR | POLLHUP,
+                          qemudDispatchServerEvent,
+                          server) < 0) {
+        qemudLog(QEMUD_ERR, "Failed to add server event callback");
+        goto cleanup;
+    }
+
+    sock->next = server->sockets;
+    server->sockets = sock;
+    server->nsockets++;
 
     return 0;
+
+ cleanup:
+    if (sock->fd)
+        close(sock->fd);
+    free(sock);
+    return -1;
 }
 
 // See: http://people.redhat.com/drepper/userapi-ipv6.html
@@ -606,6 +632,15 @@ remoteListenTCP (struct qemud_server *se
                       "remoteListenTCP: listen: %s", strerror (errno));
             return -1;
         }
+
+        if (virEventAddHandle(sock->fd,
+                              POLLIN| POLLERR | POLLHUP,
+                              qemudDispatchServerEvent,
+                              server) < 0) {
+            qemudLog(QEMUD_ERR, "Failed to add server event callback");
+            return -1;
+        }
+
     }
 
     return 0;
@@ -1026,11 +1061,15 @@ static int qemudDispatchServer(struct qe
     if (!client->tls) {
         client->mode = QEMUD_MODE_RX_HEADER;
         client->bufferLength = QEMUD_PKT_HEADER_XDR_LEN;
+
+        if (qemudRegisterClientEvent (server, client, 0) < 0)
+            goto cleanup;
     } else {
         int ret;
 
         client->session = remoteInitializeTLSSession ();
-        if (client->session == NULL) goto tls_failed;
+        if (client->session == NULL)
+            goto cleanup;
 
         gnutls_transport_set_ptr (client->session,
                                   (gnutls_transport_ptr_t) (long) fd);
@@ -1040,16 +1079,22 @@ static int qemudDispatchServer(struct qe
         if (ret == 0) {
             /* Unlikely, but ...  Next step is to check the certificate. */
             if (remoteCheckAccess (client) == -1)
-                goto tls_failed;
+                goto cleanup;
+
+            if (qemudRegisterClientEvent(server, client, 0) < 0)
+                goto cleanup;
         } else if (ret == GNUTLS_E_INTERRUPTED || ret == GNUTLS_E_AGAIN) {
             /* Most likely. */
             client->mode = QEMUD_MODE_TLS_HANDSHAKE;
             client->bufferLength = -1;
             client->direction = gnutls_record_get_direction (client->session);
+
+            if (qemudRegisterClientEvent (server, client, 0) < 0)
+                goto cleanup;
         } else {
             qemudLog (QEMUD_ERR, "TLS handshake failed: %s",
                       gnutls_strerror (ret));
-            goto tls_failed;
+            goto cleanup;
         }
     }
 
@@ -1059,7 +1104,7 @@ static int qemudDispatchServer(struct qe
 
     return 0;
 
- tls_failed:
+ cleanup:
     if (client->session) gnutls_deinit (client->session);
     close (fd);
     free (client);
@@ -1453,7 +1498,15 @@ int qemudStartVMDaemon(struct qemud_serv
 
         server->ninactivevms--;
         server->nactivevms++;
-        server->nvmfds += 2;
+
+        virEventAddHandle(vm->stdout,
+                          POLLIN | POLLERR | POLLHUP,
+                          qemudDispatchVMEvent,
+                          server);
+        virEventAddHandle(vm->stderr,
+                          POLLIN | POLLERR | POLLHUP,
+                          qemudDispatchVMEvent,
+                          server);
 
         ret = 0;
 
@@ -1497,6 +1550,8 @@ static void qemudDispatchClientFailure(s
         tmp = tmp->next;
     }
 
+    virEventRemoveHandle(client->fd);
+
     if (client->tls && client->session) gnutls_deinit (client->session);
     close(client->fd);
     free(client);
@@ -1590,6 +1645,8 @@ static int qemudClientRead(struct qemud_
     } else {
         ret = gnutls_record_recv (client->session, data, len);
         client->direction = gnutls_record_get_direction (client->session);
+        if (qemudRegisterClientEvent (server, client, 1) < 0)
+            qemudDispatchClientFailure (server, client);
         if (ret <= 0) {
             if (ret == 0 || (ret != GNUTLS_E_AGAIN &&
                              ret != GNUTLS_E_INTERRUPTED)) {
@@ -1655,6 +1712,11 @@ static void qemudDispatchClientRead(stru
 
         xdr_destroy (&x);
 
+        if (qemudRegisterClientEvent(server, client, 1) < 0) {
+            qemudDispatchClientFailure(server, client);
+            return;
+        }
+
         /* Fall through */
     }
 
@@ -1679,6 +1741,8 @@ static void qemudDispatchClientRead(stru
 
         if (remote && h.prog == REMOTE_PROGRAM) {
             remoteDispatchClientRequest (server, client);
+            if (qemudRegisterClientEvent(server, client, 1) < 0)
+                qemudDispatchClientFailure(server, client);
         } else if (!remote && h.prog == QEMUD_PROGRAM) {
             qemud_packet_client p;
 
@@ -1689,6 +1753,9 @@ static void qemudDispatchClientRead(stru
             }
 
             qemudDispatchClientRequest(server, client, &p);
+
+            if (qemudRegisterClientEvent(server, client, 1) < 0)
+                qemudDispatchClientFailure(server, client);
         } else {
             /* An internal error. */
             qemudDebug ("Not REMOTE_PROGRAM or QEMUD_PROGRAM");
@@ -1709,12 +1776,17 @@ static void qemudDispatchClientRead(stru
             /* Finished.  Next step is to check the certificate. */
             if (remoteCheckAccess (client) == -1)
                 qemudDispatchClientFailure (server, client);
+            if (qemudRegisterClientEvent (server, client, 1) < 0)
+                qemudDispatchClientFailure (server, client);
         } else if (ret != GNUTLS_E_AGAIN && ret != GNUTLS_E_INTERRUPTED) {
             qemudLog (QEMUD_ERR, "TLS handshake failed: %s",
                       gnutls_strerror (ret));
             qemudDispatchClientFailure (server, client);
-        } else
+        } else {
             client->direction = gnutls_record_get_direction (client->session);
+            if (qemudRegisterClientEvent (server ,client, 1) < 0)
+                qemudDispatchClientFailure (server, client);
+        }
 
         break;
     }
@@ -1745,6 +1817,8 @@ static int qemudClientWrite(struct qemud
     } else {
         ret = gnutls_record_send (client->session, data, len);
         client->direction = gnutls_record_get_direction (client->session);
+        if (qemudRegisterClientEvent (server, client, 1) < 0)
+            qemudDispatchClientFailure (server, client);
         if (ret < 0) {
             if (ret != GNUTLS_E_INTERRUPTED && ret != GNUTLS_E_AGAIN) {
                 qemudLog (QEMUD_ERR, "gnutls_record_send: %s",
@@ -1772,6 +1846,9 @@ static void qemudDispatchClientWrite(str
             client->bufferLength = QEMUD_PKT_HEADER_XDR_LEN;
             client->bufferOffset = 0;
             if (client->tls) client->direction = QEMUD_TLS_DIRECTION_READ;
+
+            if (qemudRegisterClientEvent (server, client, 1) < 0)
+                qemudDispatchClientFailure (server, client);
         }
         /* Still writing */
         break;
@@ -1786,12 +1863,18 @@ static void qemudDispatchClientWrite(str
             /* Finished.  Next step is to check the certificate. */
             if (remoteCheckAccess (client) == -1)
                 qemudDispatchClientFailure (server, client);
+
+            if (qemudRegisterClientEvent (server, client, 1))
+                qemudDispatchClientFailure (server, client);
         } else if (ret != GNUTLS_E_AGAIN && ret != GNUTLS_E_INTERRUPTED) {
             qemudLog (QEMUD_ERR, "TLS handshake failed: %s",
                       gnutls_strerror (ret));
             qemudDispatchClientFailure (server, client);
-        } else
+        } else {
             client->direction = gnutls_record_get_direction (client->session);
+            if (qemudRegisterClientEvent (server, client, 1))
+                qemudDispatchClientFailure (server, client);
+        }
 
         break;
     }
@@ -1842,6 +1925,10 @@ int qemudShutdownVMDaemon(struct qemud_s
 
     qemudVMData(server, vm, vm->stdout);
     qemudVMData(server, vm, vm->stderr);
+
+    virEventRemoveHandle(vm->stdout);
+    virEventRemoveHandle(vm->stderr);
+
     if (close(vm->logfile) < 0)
         qemudLog(QEMUD_WARN, "Unable to close logfile %d: %s", errno, strerror(errno));
     close(vm->stdout);
@@ -1852,7 +1939,6 @@ int qemudShutdownVMDaemon(struct qemud_s
     vm->stdout = -1;
     vm->stderr = -1;
     vm->monitor = -1;
-    server->nvmfds -= 2;
 
     if (waitpid(vm->pid, NULL, WNOHANG) != vm->pid) {
         kill(vm->pid, SIGKILL);
@@ -2340,94 +2426,104 @@ int qemudShutdownNetworkDaemon(struct qe
 }
 
 
-static int qemudDispatchPoll(struct qemud_server *server, struct pollfd *fds) {
+static void qemudDispatchVMEvent(int fd, int events, void *opaque) {
+    struct qemud_server *server = (struct qemud_server *)opaque;
+    struct qemud_vm *vm = server->vms;
+
+    while (vm) {
+        if (qemudIsActiveVM(vm) &&
+            (vm->stdout == fd ||
+             vm->stderr == fd))
+            break;
+
+        vm = vm->next;
+    }
+
+    if (!vm)
+        return;
+
+    if (events == POLLIN &&
+        qemudDispatchVMLog(server, vm, fd) == 0)
+        return;
+
+    qemudDispatchVMFailure(server, vm, fd);
+}
+
+static void qemudDispatchClientEvent(int fd, int events, void *opaque) {
+    struct qemud_server *server = (struct qemud_server *)opaque;
+    struct qemud_client *client = server->clients;
+
+    while (client) {
+        if (client->fd == fd)
+            break;
+
+        client = client->next;
+    }
+
+    if (!client)
+        return;
+
+    if (events == POLLOUT)
+        qemudDispatchClientWrite(server, client);
+    else if (events == POLLIN)
+        qemudDispatchClientRead(server, client);
+    else
+        qemudDispatchClientFailure(server, client);
+}
+
+static int qemudRegisterClientEvent(struct qemud_server *server,
+                                    struct qemud_client *client,
+                                    int removeFirst) {
+    if (removeFirst)
+        if (virEventRemoveHandle(client->fd) < 0)
+            return -1;
+
+    if (client->tls) {
+        if (virEventAddHandle(client->fd,
+                              (client->direction ?
+                               POLLOUT : POLLIN) | POLLERR | POLLHUP,
+                              qemudDispatchClientEvent,
+                              server) < 0)
+            return -1;
+    } else {
+        if (virEventAddHandle(client->fd,
+                              (client->mode == QEMUD_MODE_TX_PACKET ?
+                               POLLOUT : POLLIN) | POLLERR | POLLHUP,
+                              qemudDispatchClientEvent,
+                              server) < 0)
+            return -1;
+    }
+
+    return 0;
+}
+
+static void qemudDispatchServerEvent(int fd, int events, void *opaque) {
+    struct qemud_server *server = (struct qemud_server *)opaque;
     struct qemud_socket *sock = server->sockets;
-    struct qemud_client *client = server->clients;
-    struct qemud_vm *vm;
-    struct qemud_network *network;
-    int ret = 0;
-    int fd = 0;
-
-    if (fds[fd++].revents && qemudDispatchSignal(server) < 0)
-        return -1;
-
-    if (server->shutdown)
-        return 0;
-
-    vm = server->vms;
+
+    while (sock) {
+        if (sock->fd == fd)
+            break;
+
+        sock = sock->next;
+    }
+
+    if (!sock)
+        return;
+
+    if (events)
+        qemudDispatchServer(server, sock);
+}
+
+
+static void qemudCleanupInactive(struct qemud_server *server) {
+    struct qemud_vm *vm = server->vms;
+    struct qemud_network *network = server->networks;
+
+    /* Cleanup any VMs which shutdown & dont have an associated
+       config file */
     while (vm) {
         struct qemud_vm *next = vm->next;
-        int failed = 0,
-            stdoutfd = vm->stdout,
-            stderrfd = vm->stderr;
-
-        if (!qemudIsActiveVM(vm)) {
-            vm = next;
-            continue;
-        }
-
-        if (stdoutfd != -1) {
-            if (fds[fd].revents) {
-                if (fds[fd].revents == POLLIN) {
-                    if (qemudDispatchVMLog(server, vm, fds[fd].fd) < 0)
-                        failed = 1;
-                } else {
-                    if (qemudDispatchVMFailure(server, vm, fds[fd].fd) < 0)
-                        failed = 1;
-                }
-            }
-            fd++;
-        }
-        if (stderrfd != -1) {
-            if (!failed) {
-                if (fds[fd].revents) {
-                    if (fds[fd].revents == POLLIN) {
-                        if (qemudDispatchVMLog(server, vm, fds[fd].fd) < 0)
-                            failed = 1;
-                    } else {
-                        if (qemudDispatchVMFailure(server, vm, fds[fd].fd) < 0)
-                            failed = 1;
-                    }
-                }
-            }
-            fd++;
-        }
-        vm = next;
-        if (failed)
-            ret = -1; /* FIXME: the daemon shouldn't exit on failure here */
-    }
-    while (client) {
-        struct qemud_client *next = client->next;
-
-        assert (client->magic == QEMUD_CLIENT_MAGIC);
-
-        if (fds[fd].revents) {
-            qemudDebug("Poll data normal");
-            if (fds[fd].revents == POLLOUT)
-                qemudDispatchClientWrite(server, client);
-            else if (fds[fd].revents == POLLIN)
-                qemudDispatchClientRead(server, client);
-            else
-                qemudDispatchClientFailure(server, client);
-        }
-        fd++;
-        client = next;
-    }
-    while (sock) {
-        struct qemud_socket *next = sock->next;
-        /* FIXME: the daemon shouldn't exit on error here */
-        if (fds[fd].revents)
-            if (qemudDispatchServer(server, sock) < 0)
-                return -1;
-        fd++;
-        sock = next;
-    }
-
-    /* Cleanup any VMs which shutdown & dont have an associated
-       config file */
-    vm = server->vms;
-    while (vm) {
-        struct qemud_vm *next = vm->next;
 
         if (!qemudIsActiveVM(vm) && !vm->configFile[0])
             qemudRemoveInactiveVM(server, vm);
@@ -2436,7 +2532,6 @@ static int qemudDispatchPoll(struct qemu
     }
 
     /* Cleanup any networks too */
-    network = server->networks;
     while (network) {
         struct qemud_network *next = network->next;
 
@@ -2446,91 +2541,16 @@ static int qemudDispatchPoll(struct qemu
         network = next;
     }
 
-    return ret;
-}
-
-static void qemudPreparePoll(struct qemud_server *server, struct pollfd *fds) {
-    int  fd = 0;
-    struct qemud_socket *sock;
-    struct qemud_client *client;
-    struct qemud_vm *vm;
-
-    fds[fd].fd = server->sigread;
-    fds[fd].events = POLLIN;
-    fd++;
-
-    for (vm = server->vms ; vm ; vm = vm->next) {
-        if (!qemudIsActiveVM(vm))
-            continue;
-        if (vm->stdout != -1) {
-            fds[fd].fd = vm->stdout;
-            fds[fd].events = POLLIN | POLLERR | POLLHUP;
-            fd++;
-        }
-        if (vm->stderr != -1) {
-            fds[fd].fd = vm->stderr;
-            fds[fd].events = POLLIN | POLLERR | POLLHUP;
-            fd++;
-        }
-    }
-    for (client = server->clients ; client ; client = client->next) {
-        fds[fd].fd = client->fd;
-        if (!client->tls) {
-            /* Refuse to read more from client if tx is pending to
-               rate limit */
-            if (client->mode == QEMUD_MODE_TX_PACKET)
-                fds[fd].events = POLLOUT | POLLERR | POLLHUP;
-            else
-                fds[fd].events = POLLIN | POLLERR | POLLHUP;
-        } else {
-            qemudDebug ("direction = %s",
-                        client->direction ? "WRITE" : "READ");
-            fds[fd].events = client->direction ? POLLOUT : POLLIN;
-            fds[fd].events |= POLLERR | POLLHUP;
-        }
-        fd++;
-    }
-    for (sock = server->sockets ; sock ; sock = sock->next) {
-        fds[fd].fd = sock->fd;
-        fds[fd].events = POLLIN;
-        fd++;
-    }
+    return;
 }
 
 
 
 static int qemudOneLoop(struct qemud_server *server) {
-    int nfds = server->nsockets + server->nclients + server->nvmfds + 1; /* server->sigread */
-    struct pollfd fds[nfds];
-    int thistimeout = -1;
-    int ret;
     sig_atomic_t errors;
 
-    /* If we have no clients or vms, then timeout after
-       30 seconds, letting daemon exit */
-    if (timeout > 0 &&
-        !server->nclients &&
-        !server->nactivevms)
-        thistimeout = timeout;
-
-    qemudPreparePoll(server, fds);
-
- retry:
-
-    if ((ret = poll(fds, nfds, thistimeout * 1000)) < 0) {
-        if (errno == EINTR) {
-            goto retry;
-        }
-        qemudLog(QEMUD_ERR, "Error polling on file descriptors: %s",
-                 strerror(errno));
-        return -1;
-    }
-
-    /* Must have timed out */
-    if (ret == 0) {
-        qemudLog(QEMUD_INFO, "Timed out while polling on file descriptors");
-        return -1;
-    }
+    if (virEventRunOnce() < 0)
+        return -1;
 
     /* Check for any signal handling errors and log them. */
     errors = sig_errors;
@@ -2542,8 +2562,7 @@ static int qemudOneLoop(struct qemud_ser
         return -1;
     }
 
-    if (qemudDispatchPoll(server, fds) < 0)
-        return -1;
+    qemudCleanupInactive(server);
 
     return 0;
 }
@@ -2941,6 +2960,15 @@ int main(int argc, char **argv) {
         goto error2;
     }
 
+    if (virEventAddHandle(sigpipe[0],
+                          POLLIN,
+                          qemudDispatchSignalEvent,
+                          server) < 0) {
+        qemudLog(QEMUD_ERR, "Failed to register callback for signal pipe");
+        ret = 3;
+        goto error2;
+    }
+
     qemudRunLoop(server);
 
     qemudCleanup(server);


More information about the libvir-list mailing list