[Libvirt-cim] [PATCH] [RFC] Implement libvirt event callback management

Hollis Blanchard hollisb at us.ibm.com
Thu Aug 13 17:56:45 UTC 2009


# HG changeset patch
# User Hollis Blanchard <hollisb at us.ibm.com>
# Date 1250181138 25200
# Node ID 36aa26ca62cabda72e027f075889c7a0973a4e6b
# Parent  c345ed88f03432da53efee161d505bd64c02c32e
[RFC] Implement libvirt event callback management.

Libvirt requires that users implement their own event monitoring and
callback-management infrastructure. Actually, two similar lists are needed: one
for file descriptors to monitor, and one for pending timers.

This patch starts exactly one event-monitoring thread per libvirt-cim instance.
Multiple provider threads in the same process will share this thread.

Signed-off-by: Hollis Blanchard <hollisb at us.ibm.com>

---
This builds but isn't tested, and may very well contain embarrassing list
manipulation or locking bugs. It really needs a consumer to test, i.e. for
someone to register a domain event callback.

Comments welcome.

Please CC me on replies.

diff --git a/libxkutil/Makefile.am b/libxkutil/Makefile.am
--- a/libxkutil/Makefile.am
+++ b/libxkutil/Makefile.am
@@ -5,14 +5,14 @@ SUBDIRS = tests
 CFLAGS += $(CFLAGS_STRICT)
 
 noinst_HEADERS = cs_util.h misc_util.h device_parsing.h xmlgen.h infostore.h \
-                 pool_parsing.h
+                 pool_parsing.h event.h
 
 lib_LTLIBRARIES = libxkutil.la
 
 AM_LDFLAGS = -lvirt -luuid
 
 libxkutil_la_SOURCES = cs_util_instance.c misc_util.c device_parsing.c \
-                       xmlgen.c infostore.c pool_parsing.c
+                       xmlgen.c infostore.c pool_parsing.c event.c
 
 noinst_PROGRAMS = xml_parse_test
 
diff --git a/libxkutil/event.c b/libxkutil/event.c
new file mode 100644
--- /dev/null
+++ b/libxkutil/event.c
@@ -0,0 +1,367 @@
+/*
+ * Copyright IBM Corp. 2009
+ *
+ * Authors:
+ *  Hollis Blanchard <hollisb at us.ibm.com>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307  USA
+ */
+
+#include <stdio.h>
+#include <string.h>
+#include <stdlib.h>
+#include <stdbool.h>
+#include <stdarg.h>
+#include <unistd.h>
+#include <pthread.h>
+#include <errno.h>
+#include <poll.h>
+#include <values.h>
+
+#include <libvirt/libvirt.h>
+#include <libvirt/virterror.h>
+
+#include "event.h"
+
+struct timer {
+        int id;
+        int timeout;
+        void *opaque;
+        virFreeCallback ff;
+        struct timer *next;
+        bool deleted;
+};
+
+struct watch {
+        int id;
+        int fd;
+        int events;
+        virEventHandleCallback cb;
+        void *opaque;
+        virFreeCallback ff;
+        struct watch *next;
+        bool deleted;
+};
+
+static pthread_t watch_thread_id;
+
+static int next_watch_id;
+static struct watch *watch_list;
+static int watch_count;
+static pthread_mutex_t watch_list_mutex = PTHREAD_MUTEX_INITIALIZER;
+
+static int next_timer_id;
+static struct timer *timer_list;
+static pthread_mutex_t timer_list_mutex = PTHREAD_MUTEX_INITIALIZER;
+
+
+static int eventAddHandle(int fd, int events, virEventHandleCallback cb,
+                          void *opaque, virFreeCallback ff)
+{
+        struct watch *watch;
+
+        CU_DEBUG("%s", __func__);
+
+        watch = malloc(sizeof(struct watch));
+        if (!watch)
+                return -ENOMEM;
+
+        watch->id = next_watch_id++;
+        watch->fd = fd;
+        watch->events = events;
+        watch->cb = cb;
+        watch->opaque = opaque;
+        watch->ff = ff;
+
+        pthread_mutex_lock(&watch_list_mutex);
+
+        watch->next = watch_list;
+        watch_list = watch;
+        watch_count++;
+
+        pthread_mutex_unlock(&watch_list_mutex);
+
+        return watch->id;
+}
+
+static void eventUpdateHandle(int id, int events)
+{
+        struct watch *cur;
+
+        CU_DEBUG("%s %d", __func__, id);
+
+        for (cur = watch_list; cur != NULL; cur = cur->next) {
+                if (cur->id == id) {
+                        cur->events = events;
+                        break;
+                }
+        }
+}
+
+/* To avoid locking problems, watches are just flagged here, and the memory is
+ * freed later. */
+static int eventRemoveHandle(int id)
+{
+        struct watch *cur;
+
+        CU_DEBUG("%s %d", __func__, id);
+
+        for (cur = watch_list; cur != NULL; cur = cur->next) {
+                if (cur->id == id) {
+                        cur->deleted = 1;
+                        break;
+                }
+        }
+
+        return 0;
+}
+
+/* Delete all watches marked for deletion. */
+static void event_watch_free_deleted(void)
+{
+        struct watch *cur;
+        struct watch **link;
+
+        CU_DEBUG("%s", __func__);
+
+        pthread_mutex_lock(&watch_list_mutex);
+
+        cur = watch_list; 
+        link = &watch_list;
+        while (cur != NULL) {
+                struct watch *next = cur->next;
+
+                if (cur->deleted) {
+                        *link = next;
+
+                        cur->ff(cur->opaque);
+                        free(cur);
+                        watch_count--;
+                } else
+                        link = &cur->next;
+
+                cur = next;
+        }
+
+        pthread_mutex_unlock(&watch_list_mutex);
+}
+
+static int eventAddTimeout(int timeout, virEventTimeoutCallback cb,
+                           void *opaque, virFreeCallback ff)
+{
+        struct timer *timer;
+
+        CU_DEBUG("%s", __func__);
+
+        timer = malloc(sizeof(struct timer));
+        if (!timer)
+                return -ENOMEM;
+
+        timer->id = next_timer_id++;
+        timer->timeout = timeout;
+        timer->opaque = opaque;
+        timer->ff = ff;
+
+        pthread_mutex_lock(&timer_list_mutex);
+
+        timer->next = timer_list;
+        timer_list = timer;
+
+        pthread_mutex_unlock(&timer_list_mutex);
+
+        return timer->id;
+}
+
+static void eventUpdateTimeout(int id, int timeout)
+{
+        struct timer *cur;
+
+        CU_DEBUG("%s %d", __func__, id);
+
+        for (cur = timer_list; cur != NULL; cur = cur->next) {
+                if (cur->id == id) {
+                        cur->timeout = timeout;
+                        break;
+                }
+        }
+}
+
+static int eventRemoveTimeout(int id)
+{
+        struct timer *cur;
+
+        CU_DEBUG("%s %d", __func__, id);
+
+        for (cur = timer_list; cur != NULL; cur = cur->next) {
+                if (cur->id == id) {
+                        cur->deleted = 1;
+                        break;
+                }
+        }
+
+        return 0;
+}
+
+/* Delete all timers marked for deletion. */
+static void event_timer_free_deleted(void)
+{
+        struct timer *cur;
+        struct timer **link;
+
+        CU_DEBUG("%s", __func__);
+
+        pthread_mutex_lock(&timer_list_mutex);
+
+        cur = timer_list; 
+        link = &timer_list;
+        while (cur != NULL) {
+                struct timer *next = cur->next;
+
+                if (cur->deleted) {
+                        *link = next;
+
+                        cur->ff(cur->opaque);
+                        free(cur);
+                } else
+                        link = &cur->next;
+
+                cur = next;
+        }
+
+        pthread_mutex_unlock(&timer_list_mutex);
+}
+
+
+
+static int poll_to_libvirt_events(int pevents)
+{
+        int vevents = 0;
+
+        if (pevents & POLLIN)
+                vevents |= VIR_EVENT_HANDLE_READABLE;
+
+        if (pevents & POLLOUT)
+                vevents |= VIR_EVENT_HANDLE_WRITABLE;
+
+        if (pevents & POLLERR)
+                vevents |= VIR_EVENT_HANDLE_ERROR;
+
+        if (pevents & POLLHUP)
+                vevents |= VIR_EVENT_HANDLE_HANGUP;
+
+        return vevents;
+}
+
+static int libvirt_to_poll_events(int vevents)
+{
+        int pevents = 0;
+
+        if (vevents & VIR_EVENT_HANDLE_READABLE)
+                pevents |= POLLIN;
+
+        if (vevents & VIR_EVENT_HANDLE_WRITABLE)
+                pevents |= POLLOUT;
+
+        if (vevents & VIR_EVENT_HANDLE_ERROR)
+                pevents |= POLLERR;
+
+        if (vevents & VIR_EVENT_HANDLE_HANGUP)
+                pevents |= POLLHUP;
+
+        return pevents;
+}
+
+static void invoke_callback(struct watch *watch, struct pollfd *pollfd)
+{
+        int vevents = poll_to_libvirt_events(watch->events);
+
+        watch->cb(watch->id, watch->fd, vevents, watch->opaque);
+}
+
+static int event_next_timeout(void)
+{
+        struct timer *cur;
+        int closest = MAXINT;
+
+        for (cur = timer_list; cur != NULL; cur = cur->next)
+                if (cur->timeout < closest)
+                        closest = cur->timeout;
+
+        return closest;
+}
+
+/* One thread to watch all fds for all events for all libvirt threads. */
+static void *event_thread(void *ptr)
+{
+        while (1) {
+                struct watch *cur;
+                struct pollfd *pollfds;
+                struct pollfd *pollfd;
+                int timeout;
+                int i;
+
+                pollfds = malloc(sizeof(struct pollfd) * watch_count);
+
+                /* fill in pollfds array from our watch list */
+                for (pollfd = &pollfds[0], cur = watch_list;
+                     cur != NULL;
+                     pollfd++, cur = cur->next) {
+                        pollfd->fd = cur->fd;
+                        pollfd->events = libvirt_to_poll_events(cur->events);
+                }
+
+                timeout = event_next_timeout();
+
+                poll(pollfds, watch_count, timeout);
+
+                /* invoke callbacks */
+                for (i = 0; i < watch_count; i++)
+                        for (cur = watch_list; cur != NULL; cur = cur->next)
+                                if (cur->fd == pollfds[i].fd
+                                    && !cur->deleted) {
+                                        invoke_callback(cur, &pollfds[i]);
+                                        break;
+                                }
+
+                free(pollfds);
+
+                event_watch_free_deleted();
+                event_timer_free_deleted();
+        }
+
+        return NULL;
+}
+
+void init_events(void)
+{
+        static pthread_mutex_t thread_mutex = PTHREAD_MUTEX_INITIALIZER;
+
+        CU_DEBUG("%s", __func__);
+
+        pthread_mutex_lock(&thread_mutex);
+
+        if (!watch_thread_id) {
+                virEventRegisterImpl(eventAddHandle,
+                                     eventUpdateHandle,
+                                     eventRemoveHandle,
+                                     eventAddTimeout,
+                                     eventUpdateTimeout,
+                                     eventRemoveTimeout);
+
+                pthread_create(&watch_thread_id, NULL, event_thread, NULL);
+        }
+
+        pthread_mutex_unlock(&thread_mutex);
+}
diff --git a/libxkutil/event.h b/libxkutil/event.h
new file mode 100644
--- /dev/null
+++ b/libxkutil/event.h
@@ -0,0 +1,29 @@
+/*
+ * Copyright IBM Corp. 2009
+ *
+ * Authors:
+ *  Hollis Blanchard <hollisb at us.ibm.com>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307  USA
+ */
+
+#ifndef __EVENT_H
+#define __EVENT_H
+
+#include <libcmpiutil.h>
+
+void init_events(void);
+
+#endif /* __EVENT_H */
diff --git a/libxkutil/misc_util.c b/libxkutil/misc_util.c
--- a/libxkutil/misc_util.c
+++ b/libxkutil/misc_util.c
@@ -38,6 +38,7 @@
 
 #include "misc_util.h"
 #include "cs_util.h"
+#include "event.h"
 
 #include <config.h>
 
@@ -490,6 +491,7 @@ bool parse_instanceid(const CMPIObjectPa
 
 bool libvirt_cim_init(void)
 {
+        init_events();
         return virInitialize() == 0;
 }
 




More information about the Libvirt-cim mailing list