[Libvir] PATCH 1/3: Split out simple event loop
Daniel P. Berrange
berrange at redhat.com
Mon Jun 18 18:36:20 UTC 2007
Attached is a new version of the patch which allocs/frees memory for storing
the callback data in blocks. It also adds copious comments and fixes a very
subtle bug in adding handles during dispatch. The printfs are changed into
qemudDebug()s and the nvmfds field is killed here.
On Mon, Jun 18, 2007 at 05:49:59AM -0400, Daniel Veillard wrote:
> On Sun, Jun 17, 2007 at 10:52:55PM +0100, Daniel P. Berrange wrote:
> > The following patch adds a qemud/event.c & qemud/event.h file providing a
> > general purpose event loop built around poll. Users register file handles
> > and associated callbacks, and / or timers. The qemud.c file is changed to
> > make use of these APIs for dealing with server, client, and VM file handles
> > and/or sockets. This decouples much of the QEMU VM I/O code from the main
> > qemud.c daemon code.
b/qemud/event.c | 460 ++++++++++++++++++++++++++++++++++++++++++++++++++++++
b/qemud/event.h | 98 +++++++++++
qemud/Makefile.am | 3
qemud/internal.h | 1
qemud/qemud.c | 400 +++++++++++++++++++++++++---------------------
5 files changed, 774 insertions(+), 188 deletions(-)
Dan.
--
|=- Red Hat, Engineering, Emerging Technologies, Boston. +1 978 392 2496 -=|
|=- Perl modules: http://search.cpan.org/~danberr/ -=|
|=- Projects: http://freshmeat.net/~danielpb/ -=|
|=- GnuPG: 7D3B9505 F3C9 553F A1DA 4AC2 5648 23C1 B3DF F742 7D3B 9505 -=|
-------------- next part --------------
diff -r 56b36b784a1c qemud/Makefile.am
--- a/qemud/Makefile.am Mon Jun 18 11:01:38 2007 -0400
+++ b/qemud/Makefile.am Mon Jun 18 11:02:20 2007 -0400
@@ -16,7 +16,8 @@ libvirt_qemud_SOURCES = \
buf.c buf.h \
protocol.h protocol.c \
remote_protocol.h remote_protocol.c \
- remote.c
+ remote.c \
+ event.c event.h
#-D_XOPEN_SOURCE=600 -D_XOPEN_SOURCE_EXTENDED=1 -D_POSIX_C_SOURCE=199506L
libvirt_qemud_CFLAGS = \
-I$(top_srcdir)/include -I$(top_builddir)/include $(LIBXML_CFLAGS) \
diff -r 56b36b784a1c qemud/event.c
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/qemud/event.c Mon Jun 18 14:14:13 2007 -0400
@@ -0,0 +1,460 @@
+/*
+ * event.h: event loop for monitoring file handles
+ *
+ * Copyright (C) 2007 Daniel P. Berrange
+ * Copyright (C) 2007 Red Hat, Inc.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+ *
+ * Author: Daniel P. Berrange <berrange at redhat.com>
+ */
+
+
+#include <stdlib.h>
+#include <string.h>
+#include <poll.h>
+#include <sys/time.h>
+#include <errno.h>
+
+#include "internal.h"
+#include "event.h"
+
+/* State for a single file handle being monitored */
+struct virEventHandle {
+ int fd;
+ int events;
+ virEventHandleCallback cb;
+ void *opaque;
+ int deleted;
+};
+
+/* State for a single timer being generated */
+struct virEventTimeout {
+ int timer;
+ int timeout;
+ unsigned long long expiresAt;
+ virEventTimeoutCallback cb;
+ void *opaque;
+ int deleted;
+};
+
+/* Allocate extra slots for virEventHandle/virEventTimeout
+ records in this multiple */
+#define EVENT_ALLOC_EXTENT 10
+
+/* State for the main event loop */
+struct virEventLoop {
+ int handlesCount;
+ int handlesAlloc;
+ struct virEventHandle *handles;
+ int timeoutsCount;
+ int timeoutsAlloc;
+ struct virEventTimeout *timeouts;
+};
+
+/* Only have one event loop */
+static struct virEventLoop eventLoop;
+
+/* Unique ID for the next timer to be registered */
+static int nextTimer = 0;
+
+
+/*
+ * Register a callback for monitoring file handle events.
+ * NB, it *must* be safe to call this from within a callback
+ * For this reason we only ever append to existing list.
+ */
+int virEventAddHandle(int fd, int events, virEventHandleCallback cb, void *opaque) {
+ qemudDebug("Add handle %d %d %p %p\n", fd, events, cb, opaque);
+ if (eventLoop.handlesCount == eventLoop.handlesAlloc) {
+ struct virEventHandle *tmp;
+ qemudDebug("Used %d handle slots, adding %d more\n",
+ eventLoop.handlesAlloc, EVENT_ALLOC_EXTENT);
+ tmp = realloc(eventLoop.handles,
+ sizeof(struct virEventHandle) *
+ (eventLoop.handlesAlloc + EVENT_ALLOC_EXTENT));
+ if (!tmp) {
+ return -1;
+ }
+ eventLoop.handles = tmp;
+ eventLoop.handlesAlloc += EVENT_ALLOC_EXTENT;
+ }
+
+ eventLoop.handles[eventLoop.handlesCount].fd = fd;
+ eventLoop.handles[eventLoop.handlesCount].events = events;
+ eventLoop.handles[eventLoop.handlesCount].cb = cb;
+ eventLoop.handles[eventLoop.handlesCount].opaque = opaque;
+ eventLoop.handles[eventLoop.handlesCount].deleted = 0;
+
+ eventLoop.handlesCount++;
+
+ return 0;
+}
+
+/*
+ * Unregister a callback from a file handle
+ * NB, it *must* be safe to call this from within a callback
+ * For this reason we only ever set a flag in the existing list.
+ * Actual deletion will be done out-of-band
+ */
+int virEventRemoveHandle(int fd) {
+ int i;
+ qemudDebug("Remove handle %d\n", fd);
+ for (i = 0 ; i < eventLoop.handlesCount ; i++) {
+ if (eventLoop.handles[i].deleted)
+ continue;
+
+ if (eventLoop.handles[i].fd == fd) {
+ qemudDebug("mark delete %d\n", i);
+ eventLoop.handles[i].deleted = 1;
+ return 0;
+ }
+ }
+ return -1;
+}
+
+
+/*
+ * Register a callback for a timer event
+ * NB, it *must* be safe to call this from within a callback
+ * For this reason we only ever append to existing list.
+ */
+int virEventAddTimeout(int timeout, virEventTimeoutCallback cb, void *opaque) {
+ struct timeval tv;
+
+ if (gettimeofday(&tv, NULL) < 0) {
+ return -1;
+ }
+
+ if (eventLoop.timeoutsCount == eventLoop.timeoutsAlloc) {
+ struct virEventTimeout *tmp;
+ qemudDebug("Used %d timeout slots, adding %d more\n",
+ eventLoop.timeoutsAlloc, EVENT_ALLOC_EXTENT);
+ tmp = realloc(eventLoop.timeouts,
+ sizeof(struct virEventTimeout) *
+ (eventLoop.timeoutsAlloc + EVENT_ALLOC_EXTENT));
+ if (!tmp) {
+ return -1;
+ }
+ eventLoop.timeouts = tmp;
+ eventLoop.timeoutsAlloc += EVENT_ALLOC_EXTENT;
+ }
+
+ eventLoop.timeouts[eventLoop.timeoutsCount].timer = nextTimer++;
+ eventLoop.timeouts[eventLoop.timeoutsCount].timeout = timeout;
+ eventLoop.timeouts[eventLoop.timeoutsCount].cb = cb;
+ eventLoop.timeouts[eventLoop.timeoutsCount].opaque = opaque;
+ eventLoop.timeouts[eventLoop.timeoutsCount].deleted = 0;
+ eventLoop.timeouts[eventLoop.timeoutsCount].expiresAt =
+ (((unsigned long long)tv.tv_sec)*1000) +
+ (((unsigned long long)tv.tv_usec)/1000);
+
+ eventLoop.timeoutsCount++;
+
+ return 0;
+}
+
+/*
+ * Unregister a callback for a timer
+ * NB, it *must* be safe to call this from within a callback
+ * For this reason we only ever set a flag in the existing list.
+ * Actual deletion will be done out-of-band
+ */
+int virEventRemoveTimeout(int timer) {
+ int i;
+ for (i = 0 ; i < eventLoop.timeoutsCount ; i++) {
+ if (eventLoop.timeouts[i].deleted)
+ continue;
+
+ if (eventLoop.timeouts[i].timer == timer) {
+ eventLoop.timeouts[i].deleted = 1;
+ return 0;
+ }
+ }
+ return -1;
+}
+
+/* Iterates over all registered timeouts and determine which
+ * will be the first to expire.
+ * @timeout: filled with expiry time of soonest timer, or -1 if
+ * no timeout is pending
+ * returns: 0 on success, -1 on error
+ */
+static int virEventCalculateTimeout(int *timeout) {
+ unsigned long long then = 0;
+ int i;
+
+ /* Figure out if we need a timeout */
+ for (i = 0 ; i < eventLoop.timeoutsCount ; i++) {
+ if (then == 0 ||
+ eventLoop.timeouts[i].expiresAt < then)
+ then = eventLoop.timeouts[i].expiresAt;
+ }
+
+ /* Calculate how long we should wait for a timeout if needed */
+ if (then > 0) {
+ struct timeval tv;
+
+ if (gettimeofday(&tv, NULL) < 0) {
+ return -1;
+ }
+
+ *timeout = then -
+ ((((unsigned long long)tv.tv_sec)*1000) +
+ (((unsigned long long)tv.tv_usec)/1000));
+
+ if (*timeout < 0)
+ *timeout = 1;
+ } else {
+ *timeout = -1;
+ }
+
+ return 0;
+}
+
+/*
+ * Allocate a pollfd array containing data for all registered
+ * file handles. The caller must free the returned data struct
+ * returns: the pollfd array, or NULL on error
+ */
+static struct pollfd *virEventMakePollFDs(void) {
+ struct pollfd *fds;
+ int i;
+
+ /* Setup the poll file handle data structs */
+ if (!(fds = malloc(sizeof(struct pollfd) *
+ eventLoop.handlesCount)))
+ return NULL;
+
+ for (i = 0 ; i < eventLoop.handlesCount ; i++) {
+ fds[i].fd = eventLoop.handles[i].fd;
+ fds[i].events = eventLoop.handles[i].events;
+ fds[i].revents = 0;
+ qemudDebug("Wait for %d %d\n", eventLoop.handles[i].fd, eventLoop.handles[i].events);
+ }
+
+ return fds;
+}
+
+
+/*
+ * Iterate over all timers and determine if any have expired.
+ * Invoke the user supplied callback for each timer whose
+ * expiry time is met, and schedule the next timeout. Does
+ * not try to 'catch up' on time if the actual expiry time
+ * was later than the requested time.
+ *
+ * This method must cope with new timers being registered
+ * by a callback, and must skip any timers marked as deleted.
+ *
+ * Returns 0 upon success, -1 if an error occurred
+ */
+static int virEventDispatchTimeouts(void) {
+ struct timeval tv;
+ unsigned long long now;
+ int i;
+ /* Save this now - it may be changed during dispatch */
+ int ntimeouts = eventLoop.timeoutsCount;
+
+ if (gettimeofday(&tv, NULL) < 0) {
+ return -1;
+ }
+ now = (((unsigned long long)tv.tv_sec)*1000) +
+ (((unsigned long long)tv.tv_usec)/1000);
+
+ for (i = 0 ; i < ntimeouts ; i++) {
+ if (eventLoop.timeouts[i].deleted)
+ continue;
+
+ if (eventLoop.timeouts[i].expiresAt <= now) {
+ (eventLoop.timeouts[i].cb)(eventLoop.timeouts[i].timer,
+ eventLoop.timeouts[i].opaque);
+ eventLoop.timeouts[i].expiresAt =
+ now + eventLoop.timeouts[i].timeout;
+ }
+ }
+ return 0;
+}
+
+
+/* Iterate over all file handles and dispatch any which
+ * have pending events listed in the poll() data. Invoke
+ * the user supplied callback for each handle which has
+ * pending events
+ *
+ * This method must cope with new handles being registered
+ * by a callback, and must skip any handles marked as deleted.
+ *
+ * Returns 0 upon success, -1 if an error occurred
+ */
+static int virEventDispatchHandles(struct pollfd *fds) {
+ int i;
+ /* Save this now - it may be changed during dispatch */
+ int nhandles = eventLoop.handlesCount;
+
+ for (i = 0 ; i < nhandles ; i++) {
+ if (eventLoop.handles[i].deleted) {
+ qemudDebug("Skip deleted %d\n", eventLoop.handles[i].fd);
+ continue;
+ }
+
+ if (fds[i].revents) {
+ qemudDebug("Dispatch %d %d %p\n", fds[i].fd, fds[i].revents, eventLoop.handles[i].opaque);
+ (eventLoop.handles[i].cb)(fds[i].fd, fds[i].revents,
+ eventLoop.handles[i].opaque);
+ }
+ }
+
+ return 0;
+}
+
+
+/* Used post dispatch to actually remove any timers that
+ * were previously marked as deleted. This asynchronous
+ * cleanup is needed to make dispatch re-entrant safe.
+ */
+static int virEventCleanupTimeouts(void) {
+ int i;
+
+ /* Remove deleted entries, shuffling down remaining
+ * entries as needed to form contigous series
+ */
+ for (i = 0 ; i < eventLoop.timeoutsCount ; ) {
+ if (!eventLoop.handles[i].deleted) {
+ i++;
+ continue;
+ }
+
+ if ((i+1) < eventLoop.timeoutsCount) {
+ memmove(eventLoop.timeouts+i,
+ eventLoop.timeouts+i+1,
+ sizeof(struct virEventTimeout)*(eventLoop.timeoutsCount-(i+1)));
+ }
+ eventLoop.timeoutsCount--;
+ }
+
+ /* Release some memory if we've got a big chunk free */
+ if ((eventLoop.timeoutsAlloc - EVENT_ALLOC_EXTENT) > eventLoop.timeoutsCount) {
+ struct virEventTimeout *tmp;
+ qemudDebug("Releasing %d out of %d timeout slots used, releasing %d\n",
+ eventLoop.timeoutsCount, eventLoop.timeoutsAlloc, EVENT_ALLOC_EXTENT);
+ tmp = realloc(eventLoop.timeouts,
+ sizeof(struct virEventTimeout) *
+ (eventLoop.timeoutsAlloc - EVENT_ALLOC_EXTENT));
+ if (!tmp) {
+ return -1;
+ }
+ eventLoop.timeouts = tmp;
+ eventLoop.timeoutsAlloc -= EVENT_ALLOC_EXTENT;
+ }
+ return 0;
+}
+
+/* Used post dispatch to actually remove any handles that
+ * were previously marked as deleted. This asynchronous
+ * cleanup is needed to make dispatch re-entrant safe.
+ */
+static int virEventCleanupHandles(void) {
+ int i;
+
+ /* Remove deleted entries, shuffling down remaining
+ * entries as needed to form contigous series
+ */
+ for (i = 0 ; i < eventLoop.handlesCount ; ) {
+ if (!eventLoop.handles[i].deleted) {
+ i++;
+ continue;
+ }
+
+ if ((i+1) < eventLoop.handlesCount) {
+ memmove(eventLoop.handles+i,
+ eventLoop.handles+i+1,
+ sizeof(struct virEventHandle)*(eventLoop.handlesCount-(i+1)));
+ }
+ eventLoop.handlesCount--;
+ }
+
+ /* Release some memory if we've got a big chunk free */
+ if ((eventLoop.handlesAlloc - EVENT_ALLOC_EXTENT) > eventLoop.handlesCount) {
+ struct virEventHandle *tmp;
+ qemudDebug("Releasing %d out of %d handles slots used, releasing %d\n",
+ eventLoop.handlesCount, eventLoop.handlesAlloc, EVENT_ALLOC_EXTENT);
+ tmp = realloc(eventLoop.handles,
+ sizeof(struct virEventHandle) *
+ (eventLoop.handlesAlloc - EVENT_ALLOC_EXTENT));
+ if (!tmp) {
+ return -1;
+ }
+ eventLoop.handles = tmp;
+ eventLoop.handlesAlloc -= EVENT_ALLOC_EXTENT;
+ }
+ return 0;
+}
+
+/*
+ * Run a single iteration of the event loop, blocking until
+ * at least one file handle has an event, or a timer expires
+ */
+int virEventRunOnce(void) {
+ struct pollfd *fds;
+ int ret, timeout;
+
+ if (!(fds = virEventMakePollFDs()))
+ return -1;
+
+ if (virEventCalculateTimeout(&timeout) < 0) {
+ free(fds);
+ return -1;
+ }
+
+ retry:
+ qemudDebug("Poll on %d handles %p timeout %d\n", eventLoop.handlesCount, fds, timeout);
+ ret = poll(fds, eventLoop.handlesCount, timeout);
+ qemudDebug("Poll got %d event\n", ret);
+ if (ret < 0) {
+ if (errno == EINTR) {
+ goto retry;
+ }
+ free(fds);
+ return -1;
+ }
+ if (virEventDispatchTimeouts() < 0) {
+ free(fds);
+ return -1;
+ }
+
+ if (ret > 0 &&
+ virEventDispatchHandles(fds) < 0) {
+ free(fds);
+ return -1;
+ }
+ free(fds);
+
+ if (virEventCleanupTimeouts() < 0)
+ return -1;
+
+ if (virEventCleanupHandles() < 0)
+ return -1;
+
+ return 0;
+}
+/*
+ * Local variables:
+ * indent-tabs-mode: nil
+ * c-indent-level: 4
+ * c-basic-offset: 4
+ * tab-width: 4
+ * End:
+ */
diff -r 56b36b784a1c qemud/event.h
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/qemud/event.h Mon Jun 18 12:20:17 2007 -0400
@@ -0,0 +1,98 @@
+/*
+ * event.h: event loop for monitoring file handles
+ *
+ * Copyright (C) 2007 Daniel P. Berrange
+ * Copyright (C) 2007 Red Hat, Inc.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+ *
+ * Author: Daniel P. Berrange <berrange at redhat.com>
+ */
+
+#ifndef __VIRTD_EVENT_H__
+#define __VIRTD_EVENT_H__
+
+
+/**
+ * virEventHandleCallback: callback for receiving file handle events
+ *
+ * @fd: file handle on which the event occured
+ * @events: bitset of events from POLLnnn constants
+ * @opaque: user data registered with handle
+ */
+typedef void (*virEventHandleCallback)(int fd, int events, void *opaque);
+
+/**
+ * virEventAddHandle: register a callback for monitoring file handle events
+ *
+ * @fd: file handle to monitor for events
+ * @events: bitset of events to wach from POLLnnn constants
+ * @cb: callback to invoke when an event occurrs
+ * @opaque: user data to pass to callback
+ *
+ * returns -1 if the file handle cannot be registered, 0 upon success
+ */
+int virEventAddHandle(int fd, int events, virEventHandleCallback cb, void *opaque);
+
+/**
+ * virEventRemoveHandle: unregister a callback from a file handle
+ *
+ * @fd: file handle to stop monitoring for events
+ *
+ * returns -1 if the file handle was not registered, 0 upon success
+ */
+int virEventRemoveHandle(int fd);
+
+/**
+ * virEventTimeoutCallback: callback for receiving timer events
+ *
+ * @timer: timer id emitting the event
+ * @opaque: user data registered with handle
+ */
+typedef void (*virEventTimeoutCallback)(int timer, void *opaque);
+
+/**
+ * virEventAddTimeout: register a callback for a timer event
+ *
+ * @timeout: timeout between events in milliseconds
+ * @cb: callback to invoke when an event occurrs
+ * @opaque: user data to pass to callback
+ *
+ * returns -1 if the file handle cannot be registered, a positive
+ * integer timer id upon success
+ */
+int virEventAddTimeout(int timeout, virEventTimeoutCallback cb, void *opaque);
+
+/**
+ * virEventRemoveTimeout: unregister a callback for a timer
+ *
+ * @timer: the timer id to remove
+ *
+ * returns -1 if the timer was not registered, 0 upon success
+ */
+int virEventRemoveTimeout(int timer);
+
+
+/**
+ * virEventRunOnce: run a single iteration of the event loop.
+ *
+ * Blocks the caller until at least one file handle has an
+ * event or the first timer expires.
+ *
+ * returns -1 if the event monitoring failed
+ */
+int virEventRunOnce(void);
+
+#endif /* __VIRTD_EVENT_H__ */
diff -r 56b36b784a1c qemud/internal.h
--- a/qemud/internal.h Mon Jun 18 11:01:38 2007 -0400
+++ b/qemud/internal.h Mon Jun 18 14:20:02 2007 -0400
@@ -338,7 +338,6 @@ struct qemud_server {
int nclients;
struct qemud_client *clients;
int sigread;
- int nvmfds;
int nactivevms;
int ninactivevms;
struct qemud_vm *vms;
diff -r 56b36b784a1c qemud/qemud.c
--- a/qemud/qemud.c Mon Jun 18 11:01:38 2007 -0400
+++ b/qemud/qemud.c Mon Jun 18 14:19:58 2007 -0400
@@ -61,6 +61,7 @@
#include "driver.h"
#include "conf.h"
#include "iptables.h"
+#include "event.h"
static int godaemon = 0; /* -d: Be a daemon */
static int verbose = 0; /* -v: Verbose mode */
@@ -109,6 +110,13 @@ static void sig_handler(int sig) {
}
errno = origerrno;
}
+
+static void qemudDispatchVMEvent(int fd, int events, void *opaque);
+static void qemudDispatchClientEvent(int fd, int events, void *opaque);
+static void qemudDispatchServerEvent(int fd, int events, void *opaque);
+static int qemudRegisterClientEvent(struct qemud_server *server,
+ struct qemud_client *client,
+ int remove);
static int
remoteInitializeGnuTLS (void)
@@ -184,8 +192,10 @@ remoteInitializeGnuTLS (void)
return 0;
}
-static int qemudDispatchSignal(struct qemud_server *server)
-{
+static void qemudDispatchSignalEvent(int fd ATTRIBUTE_UNUSED,
+ int events ATTRIBUTE_UNUSED,
+ void *opaque) {
+ struct qemud_server *server = (struct qemud_server *)opaque;
unsigned char sigc;
struct qemud_vm *vm;
struct qemud_network *network;
@@ -194,7 +204,7 @@ static int qemudDispatchSignal(struct qe
if (read(server->sigread, &sigc, 1) != 1) {
qemudLog(QEMUD_ERR, "Failed to read from signal pipe: %s",
strerror(errno));
- return -1;
+ return;
}
ret = 0;
@@ -266,7 +276,8 @@ static int qemudDispatchSignal(struct qe
break;
}
- return ret;
+ if (ret != 0)
+ server->shutdown = 1;
}
static int qemudSetCloseExec(int fd) {
@@ -474,19 +485,16 @@ static int qemudListenUnix(struct qemud_
}
sock->readonly = readonly;
- sock->next = server->sockets;
- server->sockets = sock;
- server->nsockets++;
if ((sock->fd = socket(PF_UNIX, SOCK_STREAM, 0)) < 0) {
qemudLog(QEMUD_ERR, "Failed to create socket: %s",
strerror(errno));
- return -1;
+ goto cleanup;
}
if (qemudSetCloseExec(sock->fd) < 0 ||
qemudSetNonBlock(sock->fd) < 0)
- return -1;
+ goto cleanup;
memset(&addr, 0, sizeof(addr));
addr.sun_family = AF_UNIX;
@@ -502,17 +510,35 @@ static int qemudListenUnix(struct qemud_
if (bind(sock->fd, (struct sockaddr *)&addr, sizeof(addr)) < 0) {
qemudLog(QEMUD_ERR, "Failed to bind socket to '%s': %s",
path, strerror(errno));
- return -1;
+ goto cleanup;
}
umask(oldmask);
if (listen(sock->fd, 30) < 0) {
qemudLog(QEMUD_ERR, "Failed to listen for connections on '%s': %s",
path, strerror(errno));
- return -1;
- }
+ goto cleanup;
+ }
+
+ if (virEventAddHandle(sock->fd,
+ POLLIN| POLLERR | POLLHUP,
+ qemudDispatchServerEvent,
+ server) < 0) {
+ qemudLog(QEMUD_ERR, "Failed to add server event callback");
+ goto cleanup;
+ }
+
+ sock->next = server->sockets;
+ server->sockets = sock;
+ server->nsockets++;
return 0;
+
+ cleanup:
+ if (sock->fd)
+ close(sock->fd);
+ free(sock);
+ return -1;
}
// See: http://people.redhat.com/drepper/userapi-ipv6.html
@@ -606,6 +632,15 @@ remoteListenTCP (struct qemud_server *se
"remoteListenTCP: listen: %s", strerror (errno));
return -1;
}
+
+ if (virEventAddHandle(sock->fd,
+ POLLIN| POLLERR | POLLHUP,
+ qemudDispatchServerEvent,
+ server) < 0) {
+ qemudLog(QEMUD_ERR, "Failed to add server event callback");
+ return -1;
+ }
+
}
return 0;
@@ -1026,11 +1061,15 @@ static int qemudDispatchServer(struct qe
if (!client->tls) {
client->mode = QEMUD_MODE_RX_HEADER;
client->bufferLength = QEMUD_PKT_HEADER_XDR_LEN;
+
+ if (qemudRegisterClientEvent (server, client, 0) < 0)
+ goto cleanup;
} else {
int ret;
client->session = remoteInitializeTLSSession ();
- if (client->session == NULL) goto tls_failed;
+ if (client->session == NULL)
+ goto cleanup;
gnutls_transport_set_ptr (client->session,
(gnutls_transport_ptr_t) (long) fd);
@@ -1040,16 +1079,22 @@ static int qemudDispatchServer(struct qe
if (ret == 0) {
/* Unlikely, but ... Next step is to check the certificate. */
if (remoteCheckAccess (client) == -1)
- goto tls_failed;
+ goto cleanup;
+
+ if (qemudRegisterClientEvent(server, client, 0) < 0)
+ goto cleanup;
} else if (ret == GNUTLS_E_INTERRUPTED || ret == GNUTLS_E_AGAIN) {
/* Most likely. */
client->mode = QEMUD_MODE_TLS_HANDSHAKE;
client->bufferLength = -1;
client->direction = gnutls_record_get_direction (client->session);
+
+ if (qemudRegisterClientEvent (server, client, 0) < 0)
+ goto cleanup;
} else {
qemudLog (QEMUD_ERR, "TLS handshake failed: %s",
gnutls_strerror (ret));
- goto tls_failed;
+ goto cleanup;
}
}
@@ -1059,7 +1104,7 @@ static int qemudDispatchServer(struct qe
return 0;
- tls_failed:
+ cleanup:
if (client->session) gnutls_deinit (client->session);
close (fd);
free (client);
@@ -1453,7 +1498,15 @@ int qemudStartVMDaemon(struct qemud_serv
server->ninactivevms--;
server->nactivevms++;
- server->nvmfds += 2;
+
+ virEventAddHandle(vm->stdout,
+ POLLIN | POLLERR | POLLHUP,
+ qemudDispatchVMEvent,
+ server);
+ virEventAddHandle(vm->stderr,
+ POLLIN | POLLERR | POLLHUP,
+ qemudDispatchVMEvent,
+ server);
ret = 0;
@@ -1497,6 +1550,8 @@ static void qemudDispatchClientFailure(s
tmp = tmp->next;
}
+ virEventRemoveHandle(client->fd);
+
if (client->tls && client->session) gnutls_deinit (client->session);
close(client->fd);
free(client);
@@ -1590,6 +1645,8 @@ static int qemudClientRead(struct qemud_
} else {
ret = gnutls_record_recv (client->session, data, len);
client->direction = gnutls_record_get_direction (client->session);
+ if (qemudRegisterClientEvent (server, client, 1) < 0)
+ qemudDispatchClientFailure (server, client);
if (ret <= 0) {
if (ret == 0 || (ret != GNUTLS_E_AGAIN &&
ret != GNUTLS_E_INTERRUPTED)) {
@@ -1655,6 +1712,11 @@ static void qemudDispatchClientRead(stru
xdr_destroy (&x);
+ if (qemudRegisterClientEvent(server, client, 1) < 0) {
+ qemudDispatchClientFailure(server, client);
+ return;
+ }
+
/* Fall through */
}
@@ -1679,6 +1741,8 @@ static void qemudDispatchClientRead(stru
if (remote && h.prog == REMOTE_PROGRAM) {
remoteDispatchClientRequest (server, client);
+ if (qemudRegisterClientEvent(server, client, 1) < 0)
+ qemudDispatchClientFailure(server, client);
} else if (!remote && h.prog == QEMUD_PROGRAM) {
qemud_packet_client p;
@@ -1689,6 +1753,9 @@ static void qemudDispatchClientRead(stru
}
qemudDispatchClientRequest(server, client, &p);
+
+ if (qemudRegisterClientEvent(server, client, 1) < 0)
+ qemudDispatchClientFailure(server, client);
} else {
/* An internal error. */
qemudDebug ("Not REMOTE_PROGRAM or QEMUD_PROGRAM");
@@ -1709,12 +1776,17 @@ static void qemudDispatchClientRead(stru
/* Finished. Next step is to check the certificate. */
if (remoteCheckAccess (client) == -1)
qemudDispatchClientFailure (server, client);
+ if (qemudRegisterClientEvent (server, client, 1) < 0)
+ qemudDispatchClientFailure (server, client);
} else if (ret != GNUTLS_E_AGAIN && ret != GNUTLS_E_INTERRUPTED) {
qemudLog (QEMUD_ERR, "TLS handshake failed: %s",
gnutls_strerror (ret));
qemudDispatchClientFailure (server, client);
- } else
+ } else {
client->direction = gnutls_record_get_direction (client->session);
+ if (qemudRegisterClientEvent (server ,client, 1) < 0)
+ qemudDispatchClientFailure (server, client);
+ }
break;
}
@@ -1745,6 +1817,8 @@ static int qemudClientWrite(struct qemud
} else {
ret = gnutls_record_send (client->session, data, len);
client->direction = gnutls_record_get_direction (client->session);
+ if (qemudRegisterClientEvent (server, client, 1) < 0)
+ qemudDispatchClientFailure (server, client);
if (ret < 0) {
if (ret != GNUTLS_E_INTERRUPTED && ret != GNUTLS_E_AGAIN) {
qemudLog (QEMUD_ERR, "gnutls_record_send: %s",
@@ -1772,6 +1846,9 @@ static void qemudDispatchClientWrite(str
client->bufferLength = QEMUD_PKT_HEADER_XDR_LEN;
client->bufferOffset = 0;
if (client->tls) client->direction = QEMUD_TLS_DIRECTION_READ;
+
+ if (qemudRegisterClientEvent (server, client, 1) < 0)
+ qemudDispatchClientFailure (server, client);
}
/* Still writing */
break;
@@ -1786,12 +1863,18 @@ static void qemudDispatchClientWrite(str
/* Finished. Next step is to check the certificate. */
if (remoteCheckAccess (client) == -1)
qemudDispatchClientFailure (server, client);
+
+ if (qemudRegisterClientEvent (server, client, 1))
+ qemudDispatchClientFailure (server, client);
} else if (ret != GNUTLS_E_AGAIN && ret != GNUTLS_E_INTERRUPTED) {
qemudLog (QEMUD_ERR, "TLS handshake failed: %s",
gnutls_strerror (ret));
qemudDispatchClientFailure (server, client);
- } else
+ } else {
client->direction = gnutls_record_get_direction (client->session);
+ if (qemudRegisterClientEvent (server, client, 1))
+ qemudDispatchClientFailure (server, client);
+ }
break;
}
@@ -1842,6 +1925,10 @@ int qemudShutdownVMDaemon(struct qemud_s
qemudVMData(server, vm, vm->stdout);
qemudVMData(server, vm, vm->stderr);
+
+ virEventRemoveHandle(vm->stdout);
+ virEventRemoveHandle(vm->stderr);
+
if (close(vm->logfile) < 0)
qemudLog(QEMUD_WARN, "Unable to close logfile %d: %s", errno, strerror(errno));
close(vm->stdout);
@@ -1852,7 +1939,6 @@ int qemudShutdownVMDaemon(struct qemud_s
vm->stdout = -1;
vm->stderr = -1;
vm->monitor = -1;
- server->nvmfds -= 2;
if (waitpid(vm->pid, NULL, WNOHANG) != vm->pid) {
kill(vm->pid, SIGKILL);
@@ -2340,94 +2426,104 @@ int qemudShutdownNetworkDaemon(struct qe
}
-static int qemudDispatchPoll(struct qemud_server *server, struct pollfd *fds) {
+static void qemudDispatchVMEvent(int fd, int events, void *opaque) {
+ struct qemud_server *server = (struct qemud_server *)opaque;
+ struct qemud_vm *vm = server->vms;
+
+ while (vm) {
+ if (qemudIsActiveVM(vm) &&
+ (vm->stdout == fd ||
+ vm->stderr == fd))
+ break;
+
+ vm = vm->next;
+ }
+
+ if (!vm)
+ return;
+
+ if (events == POLLIN &&
+ qemudDispatchVMLog(server, vm, fd) == 0)
+ return;
+
+ qemudDispatchVMFailure(server, vm, fd);
+}
+
+static void qemudDispatchClientEvent(int fd, int events, void *opaque) {
+ struct qemud_server *server = (struct qemud_server *)opaque;
+ struct qemud_client *client = server->clients;
+
+ while (client) {
+ if (client->fd == fd)
+ break;
+
+ client = client->next;
+ }
+
+ if (!client)
+ return;
+
+ if (events == POLLOUT)
+ qemudDispatchClientWrite(server, client);
+ else if (events == POLLIN)
+ qemudDispatchClientRead(server, client);
+ else
+ qemudDispatchClientFailure(server, client);
+}
+
+static int qemudRegisterClientEvent(struct qemud_server *server,
+ struct qemud_client *client,
+ int removeFirst) {
+ if (removeFirst)
+ if (virEventRemoveHandle(client->fd) < 0)
+ return -1;
+
+ if (client->tls) {
+ if (virEventAddHandle(client->fd,
+ (client->direction ?
+ POLLOUT : POLLIN) | POLLERR | POLLHUP,
+ qemudDispatchClientEvent,
+ server) < 0)
+ return -1;
+ } else {
+ if (virEventAddHandle(client->fd,
+ (client->mode == QEMUD_MODE_TX_PACKET ?
+ POLLOUT : POLLIN) | POLLERR | POLLHUP,
+ qemudDispatchClientEvent,
+ server) < 0)
+ return -1;
+ }
+
+ return 0;
+}
+
+static void qemudDispatchServerEvent(int fd, int events, void *opaque) {
+ struct qemud_server *server = (struct qemud_server *)opaque;
struct qemud_socket *sock = server->sockets;
- struct qemud_client *client = server->clients;
- struct qemud_vm *vm;
- struct qemud_network *network;
- int ret = 0;
- int fd = 0;
-
- if (fds[fd++].revents && qemudDispatchSignal(server) < 0)
- return -1;
-
- if (server->shutdown)
- return 0;
-
- vm = server->vms;
+
+ while (sock) {
+ if (sock->fd == fd)
+ break;
+
+ sock = sock->next;
+ }
+
+ if (!sock)
+ return;
+
+ if (events)
+ qemudDispatchServer(server, sock);
+}
+
+
+static void qemudCleanupInactive(struct qemud_server *server) {
+ struct qemud_vm *vm = server->vms;
+ struct qemud_network *network = server->networks;
+
+ /* Cleanup any VMs which shutdown & dont have an associated
+ config file */
while (vm) {
struct qemud_vm *next = vm->next;
- int failed = 0,
- stdoutfd = vm->stdout,
- stderrfd = vm->stderr;
-
- if (!qemudIsActiveVM(vm)) {
- vm = next;
- continue;
- }
-
- if (stdoutfd != -1) {
- if (fds[fd].revents) {
- if (fds[fd].revents == POLLIN) {
- if (qemudDispatchVMLog(server, vm, fds[fd].fd) < 0)
- failed = 1;
- } else {
- if (qemudDispatchVMFailure(server, vm, fds[fd].fd) < 0)
- failed = 1;
- }
- }
- fd++;
- }
- if (stderrfd != -1) {
- if (!failed) {
- if (fds[fd].revents) {
- if (fds[fd].revents == POLLIN) {
- if (qemudDispatchVMLog(server, vm, fds[fd].fd) < 0)
- failed = 1;
- } else {
- if (qemudDispatchVMFailure(server, vm, fds[fd].fd) < 0)
- failed = 1;
- }
- }
- }
- fd++;
- }
- vm = next;
- if (failed)
- ret = -1; /* FIXME: the daemon shouldn't exit on failure here */
- }
- while (client) {
- struct qemud_client *next = client->next;
-
- assert (client->magic == QEMUD_CLIENT_MAGIC);
-
- if (fds[fd].revents) {
- qemudDebug("Poll data normal");
- if (fds[fd].revents == POLLOUT)
- qemudDispatchClientWrite(server, client);
- else if (fds[fd].revents == POLLIN)
- qemudDispatchClientRead(server, client);
- else
- qemudDispatchClientFailure(server, client);
- }
- fd++;
- client = next;
- }
- while (sock) {
- struct qemud_socket *next = sock->next;
- /* FIXME: the daemon shouldn't exit on error here */
- if (fds[fd].revents)
- if (qemudDispatchServer(server, sock) < 0)
- return -1;
- fd++;
- sock = next;
- }
-
- /* Cleanup any VMs which shutdown & dont have an associated
- config file */
- vm = server->vms;
- while (vm) {
- struct qemud_vm *next = vm->next;
if (!qemudIsActiveVM(vm) && !vm->configFile[0])
qemudRemoveInactiveVM(server, vm);
@@ -2436,7 +2532,6 @@ static int qemudDispatchPoll(struct qemu
}
/* Cleanup any networks too */
- network = server->networks;
while (network) {
struct qemud_network *next = network->next;
@@ -2446,91 +2541,16 @@ static int qemudDispatchPoll(struct qemu
network = next;
}
- return ret;
-}
-
-static void qemudPreparePoll(struct qemud_server *server, struct pollfd *fds) {
- int fd = 0;
- struct qemud_socket *sock;
- struct qemud_client *client;
- struct qemud_vm *vm;
-
- fds[fd].fd = server->sigread;
- fds[fd].events = POLLIN;
- fd++;
-
- for (vm = server->vms ; vm ; vm = vm->next) {
- if (!qemudIsActiveVM(vm))
- continue;
- if (vm->stdout != -1) {
- fds[fd].fd = vm->stdout;
- fds[fd].events = POLLIN | POLLERR | POLLHUP;
- fd++;
- }
- if (vm->stderr != -1) {
- fds[fd].fd = vm->stderr;
- fds[fd].events = POLLIN | POLLERR | POLLHUP;
- fd++;
- }
- }
- for (client = server->clients ; client ; client = client->next) {
- fds[fd].fd = client->fd;
- if (!client->tls) {
- /* Refuse to read more from client if tx is pending to
- rate limit */
- if (client->mode == QEMUD_MODE_TX_PACKET)
- fds[fd].events = POLLOUT | POLLERR | POLLHUP;
- else
- fds[fd].events = POLLIN | POLLERR | POLLHUP;
- } else {
- qemudDebug ("direction = %s",
- client->direction ? "WRITE" : "READ");
- fds[fd].events = client->direction ? POLLOUT : POLLIN;
- fds[fd].events |= POLLERR | POLLHUP;
- }
- fd++;
- }
- for (sock = server->sockets ; sock ; sock = sock->next) {
- fds[fd].fd = sock->fd;
- fds[fd].events = POLLIN;
- fd++;
- }
+ return;
}
static int qemudOneLoop(struct qemud_server *server) {
- int nfds = server->nsockets + server->nclients + server->nvmfds + 1; /* server->sigread */
- struct pollfd fds[nfds];
- int thistimeout = -1;
- int ret;
sig_atomic_t errors;
- /* If we have no clients or vms, then timeout after
- 30 seconds, letting daemon exit */
- if (timeout > 0 &&
- !server->nclients &&
- !server->nactivevms)
- thistimeout = timeout;
-
- qemudPreparePoll(server, fds);
-
- retry:
-
- if ((ret = poll(fds, nfds, thistimeout * 1000)) < 0) {
- if (errno == EINTR) {
- goto retry;
- }
- qemudLog(QEMUD_ERR, "Error polling on file descriptors: %s",
- strerror(errno));
- return -1;
- }
-
- /* Must have timed out */
- if (ret == 0) {
- qemudLog(QEMUD_INFO, "Timed out while polling on file descriptors");
- return -1;
- }
+ if (virEventRunOnce() < 0)
+ return -1;
/* Check for any signal handling errors and log them. */
errors = sig_errors;
@@ -2542,8 +2562,7 @@ static int qemudOneLoop(struct qemud_ser
return -1;
}
- if (qemudDispatchPoll(server, fds) < 0)
- return -1;
+ qemudCleanupInactive(server);
return 0;
}
@@ -2941,6 +2960,15 @@ int main(int argc, char **argv) {
goto error2;
}
+ if (virEventAddHandle(sigpipe[0],
+ POLLIN,
+ qemudDispatchSignalEvent,
+ server) < 0) {
+ qemudLog(QEMUD_ERR, "Failed to register callback for signal pipe");
+ ret = 3;
+ goto error2;
+ }
+
qemudRunLoop(server);
qemudCleanup(server);
More information about the libvir-list
mailing list