[Libvirt-cim] [PATCH 3 of 3] [RFC] Implement libvirt event callback management

Richard Maciel rmaciel at linux.vnet.ibm.com
Mon Jan 11 18:16:54 UTC 2010


# HG changeset patch
# User Hollis Blanchard <hollisb at us.ibm.com>
# Date 1250181138 25200
# Node ID 9c4cb3443e88714c2029cff3c37d59e3ab8a5a01
# Parent  22540c8901bd5a58a6631c6b449c92a3d0c5fd5b
[RFC] Implement libvirt event callback management.

Libvirt requires that users implement their own event monitoring and
callback-management infrastructure. Actually, two similar lists are needed: one
for file descriptors to monitor, and one for pending timers.

This patch starts exactly one event-monitoring thread per libvirt-cim instance.
Multiple provider threads in the same process will share this thread.

Signed-off-by: Hollis Blanchard <hollisb at us.ibm.com>

---
This builds but isn't tested, and may very well contain embarrassing list
manipulation or locking bugs. It really needs a consumer to test, i.e. for
someone to register a domain event callback.

Comments welcome.

Please CC me on replies.

diff -r 22540c8901bd -r 9c4cb3443e88 libxkutil/Makefile.am
--- a/libxkutil/Makefile.am	Sat Jul 25 16:19:43 2009 -0300
+++ b/libxkutil/Makefile.am	Thu Aug 13 09:32:18 2009 -0700
@@ -5,14 +5,14 @@
 CFLAGS += $(CFLAGS_STRICT)
 
 noinst_HEADERS = cs_util.h misc_util.h device_parsing.h xmlgen.h infostore.h \
-                 pool_parsing.h
+                 pool_parsing.h event.h
 
 lib_LTLIBRARIES = libxkutil.la
 
 AM_LDFLAGS = -lvirt -luuid
 
 libxkutil_la_SOURCES = cs_util_instance.c misc_util.c device_parsing.c \
-                       xmlgen.c infostore.c pool_parsing.c
+                       xmlgen.c infostore.c pool_parsing.c event.c
 
 noinst_PROGRAMS = xml_parse_test
 
diff -r 22540c8901bd -r 9c4cb3443e88 libxkutil/event.c
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/libxkutil/event.c	Thu Aug 13 09:32:18 2009 -0700
@@ -0,0 +1,418 @@
+/*
+ * Copyright IBM Corp. 2009
+ *
+ * Authors:
+ *  Hollis Blanchard <hollisb at us.ibm.com>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307  USA
+ */
+
+#include <stdio.h>
+#include <string.h>
+#include <stdlib.h>
+#include <stdbool.h>
+#include <stdarg.h>
+#include <unistd.h>
+#include <pthread.h>
+#include <errno.h>
+#include <poll.h>
+#include <values.h>
+
+#include <libvirt/libvirt.h>
+#include <libvirt/virterror.h>
+
+#include "event.h"
+
+struct timer {
+        int id;
+        int timeout;
+        void *opaque;
+        virFreeCallback ff;
+        struct timer *next;
+        bool deleted;
+};
+
+struct watch {
+        int id;
+        int fd;
+        int events;
+        virEventHandleCallback cb;
+        void *opaque;
+        virFreeCallback ff;
+        struct watch *next;
+        bool deleted;
+};
+
+static pthread_t watch_thread_id;
+
+static int next_watch_id;
+static struct watch *watch_list;
+static int watch_count;
+static pthread_mutex_t watch_list_mutex = PTHREAD_MUTEX_INITIALIZER;
+
+static int next_timer_id;
+static struct timer *timer_list;
+static pthread_mutex_t timer_list_mutex = PTHREAD_MUTEX_INITIALIZER;
+
+
+static int eventAddHandle(int fd, int events, virEventHandleCallback cb,
+                          void *opaque, virFreeCallback ff)
+{
+        struct watch *watch;
+
+        CU_DEBUG("%s", __func__);
+
+        watch = malloc(sizeof(struct watch));
+        if (!watch)
+                return -ENOMEM;
+
+        CU_DEBUG("watch->fd = %d", fd);
+
+        watch->fd = fd;
+        watch->events = events;
+        watch->cb = cb;
+        watch->opaque = opaque;
+        watch->ff = ff;
+        watch->deleted = 0;
+
+        pthread_mutex_lock(&watch_list_mutex);
+
+        watch->id = next_watch_id++;
+        CU_DEBUG("watch->id = %d", watch->id);
+        watch->next = watch_list;
+        watch_list = watch;
+        watch_count++;
+
+        pthread_mutex_unlock(&watch_list_mutex);
+
+        return watch->id;
+}
+
+static void eventUpdateHandle(int id, int events)
+{
+        struct watch *cur;
+
+        CU_DEBUG("%s %d", __func__, id);
+
+        pthread_mutex_lock(&watch_list_mutex);
+
+        for (cur = watch_list; cur != NULL; cur = cur->next) {
+                if (cur->id == id) {
+                        cur->events = events;
+                        CU_DEBUG("id: %d, events: %d", cur->id, cur->events);
+                        break;
+                }
+        }
+
+        pthread_mutex_unlock(&watch_list_mutex);
+}
+
+/* To avoid locking problems, watches are just flagged here, and the memory is
+ * freed later. */
+static int eventRemoveHandle(int id)
+{
+        struct watch *cur;
+
+        CU_DEBUG("%s %d", __func__, id);
+
+        pthread_mutex_lock(&watch_list_mutex);
+
+        for (cur = watch_list; cur != NULL; cur = cur->next) {
+                if (cur->id == id) {
+                        cur->deleted = 1;
+                        break;
+                }
+        }
+
+        pthread_mutex_unlock(&watch_list_mutex);
+
+        return 0;
+}
+
+/* Delete all watches marked for deletion. */
+static void event_watch_free_deleted(void)
+{
+        struct watch *cur;
+        struct watch **link;
+
+        CU_DEBUG("%s", __func__);
+
+        pthread_mutex_lock(&watch_list_mutex);
+
+        cur = watch_list; 
+        link = &watch_list;
+        while (cur != NULL) {
+                struct watch *next = cur->next;
+
+                if (cur->deleted) {
+                        CU_DEBUG("Deleting id: %d", cur->id);
+                        *link = next;
+
+                        cur->ff(cur->opaque);
+                        free(cur);
+                        watch_count--;
+                } else
+                        link = &cur->next;
+
+                cur = next;
+        }
+
+        pthread_mutex_unlock(&watch_list_mutex);
+}
+
+static int eventAddTimeout(int timeout, virEventTimeoutCallback cb,
+                           void *opaque, virFreeCallback ff)
+{
+        struct timer *timer;
+
+        CU_DEBUG("%s", __func__);
+
+        timer = malloc(sizeof(struct timer));
+        if (!timer)
+                return -ENOMEM;
+
+        CU_DEBUG("timeout: %d", timeout);
+
+        timer->timeout = timeout;
+        timer->opaque = opaque;
+        timer->ff = ff;
+        timer->deleted = 0;
+
+        pthread_mutex_lock(&timer_list_mutex);
+
+        timer->id = next_timer_id++;
+        timer->next = timer_list;
+        timer_list = timer;
+        
+        pthread_mutex_unlock(&timer_list_mutex);
+
+        return timer->id;
+}
+
+static void eventUpdateTimeout(int id, int timeout)
+{
+        struct timer *cur;
+
+        CU_DEBUG("%s %d", __func__, id);
+
+        pthread_mutex_lock(&timer_list_mutex);
+
+        for (cur = timer_list; cur != NULL; cur = cur->next) {
+                if (cur->id == id) {
+                        cur->timeout = timeout;
+                        break;
+                }
+        }
+
+        pthread_mutex_unlock(&timer_list_mutex);
+}
+
+static int eventRemoveTimeout(int id)
+{
+        struct timer *cur;
+
+        CU_DEBUG("%s %d", __func__, id);
+
+        pthread_mutex_lock(&timer_list_mutex);
+
+        for (cur = timer_list; cur != NULL; cur = cur->next) {
+                if (cur->id == id) {
+                        cur->deleted = 1;
+                        break;
+                }
+        }
+
+        pthread_mutex_unlock(&timer_list_mutex);
+
+        return 0;
+}
+
+/* Delete all timers marked for deletion. */
+static void event_timer_free_deleted(void)
+{
+        struct timer *cur;
+        struct timer **link;
+
+        CU_DEBUG("%s", __func__);
+
+        pthread_mutex_lock(&timer_list_mutex);
+
+        cur = timer_list; 
+        link = &timer_list;
+        while (cur != NULL) {
+                struct timer *next = cur->next;
+
+                if (cur->deleted) {
+                        CU_DEBUG("deleted timer");
+                        *link = next;
+
+                        cur->ff(cur->opaque);
+                        free(cur);
+                } else
+                        link = &cur->next;
+
+                cur = next;
+        }
+
+        pthread_mutex_unlock(&timer_list_mutex);
+}
+
+
+
+static int poll_to_libvirt_events(int pevents)
+{
+        int vevents = 0;
+
+        if (pevents & POLLIN)
+                vevents |= VIR_EVENT_HANDLE_READABLE;
+
+        if (pevents & POLLOUT)
+                vevents |= VIR_EVENT_HANDLE_WRITABLE;
+
+        if (pevents & POLLERR)
+                vevents |= VIR_EVENT_HANDLE_ERROR;
+
+        if (pevents & POLLHUP)
+                vevents |= VIR_EVENT_HANDLE_HANGUP;
+
+        return vevents;
+}
+
+static int libvirt_to_poll_events(int vevents)
+{
+        int pevents = 0;
+
+        if (vevents & VIR_EVENT_HANDLE_READABLE)
+                pevents |= POLLIN;
+
+        if (vevents & VIR_EVENT_HANDLE_WRITABLE)
+                pevents |= POLLOUT;
+
+        if (vevents & VIR_EVENT_HANDLE_ERROR)
+                pevents |= POLLERR;
+
+        if (vevents & VIR_EVENT_HANDLE_HANGUP)
+                pevents |= POLLHUP;
+
+        return pevents;
+}
+
+static void invoke_callback(struct watch *watch, struct pollfd *pollfd)
+{
+        int vevents = poll_to_libvirt_events(watch->events);
+
+        CU_DEBUG("invoke_callback. events: %d", vevents);
+
+        watch->cb(watch->id, watch->fd, vevents, watch->opaque);
+}
+
+static int event_next_timeout(void)
+{
+        struct timer *cur;
+        int closest = 10000;
+
+        for (cur = timer_list; cur != NULL; cur = cur->next) {
+                CU_DEBUG("cur->timeout: %d", cur->timeout);
+                if (cur->timeout > 0 && cur->timeout < closest)
+                        closest = cur->timeout;
+        }
+
+        return closest;
+}
+
+/* One thread to watch all fds for all events for all libvirt threads. */
+static void *event_thread(void *ptr)
+{
+        while (1) {
+                struct watch *cur;
+                struct pollfd *pollfds;
+                struct pollfd *pollfd;
+                int timeout;
+                int i;
+                int pollrv;
+
+                CU_DEBUG("event_thread");
+
+                //pthread_mutex_lock(&watch_list_mutex);
+
+                pollfds = malloc(sizeof(struct pollfd) * watch_count);
+
+                /* fill in pollfds array from our watch list */
+                for (pollfd = &pollfds[0], cur = watch_list;
+                     cur != NULL;
+                     pollfd++, cur = cur->next) {
+                        CU_DEBUG("pollfd = %d", *pollfd);
+                        pollfd->fd = cur->fd;
+                        pollfd->events = libvirt_to_poll_events(cur->events);
+                }
+
+                //CU_DEBUG("event_next_timeout");
+
+                timeout = event_next_timeout();
+                
+                //CU_DEBUG("poll timeout: %d", timeout);
+
+                pollrv = poll(pollfds, watch_count, timeout);
+
+                //CU_DEBUG("pool");
+
+                /* invoke callbacks in case of event */
+                if (pollrv > 0) {
+                        CU_DEBUG("Got movement");
+                        for (i = 0; i < watch_count; i++) {
+                                for (cur = watch_list; cur != NULL; cur = cur->next)
+                                        if (cur->fd == pollfds[i].fd
+                                        && !cur->deleted) {
+                                                CU_DEBUG("Invoke call back fds: %d", pollfds[i]);
+                                                invoke_callback(cur, &pollfds[i]);
+                                                break;
+                                        }
+                        }
+                } else {
+                        CU_DEBUG("All clear: %d", pollrv);
+                }
+
+                //pthread_mutex_unlock(&watch_list_mutex);
+
+                free(pollfds);
+
+                event_watch_free_deleted();
+                event_timer_free_deleted();
+        }
+
+        return NULL;
+}
+
+void init_events(void)
+{
+        static pthread_mutex_t thread_mutex = PTHREAD_MUTEX_INITIALIZER;
+
+        CU_DEBUG("%s", __func__);
+
+        pthread_mutex_lock(&thread_mutex);
+
+        if (!watch_thread_id) {
+                virEventRegisterImpl(eventAddHandle,
+                                     eventUpdateHandle,
+                                     eventRemoveHandle,
+                                     eventAddTimeout,
+                                     eventUpdateTimeout,
+                                     eventRemoveTimeout);
+
+                pthread_create(&watch_thread_id, NULL, event_thread, NULL);
+        }
+
+        pthread_mutex_unlock(&thread_mutex);
+}
diff -r 22540c8901bd -r 9c4cb3443e88 libxkutil/event.h
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/libxkutil/event.h	Thu Aug 13 09:32:18 2009 -0700
@@ -0,0 +1,29 @@
+/*
+ * Copyright IBM Corp. 2009
+ *
+ * Authors:
+ *  Hollis Blanchard <hollisb at us.ibm.com>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307  USA
+ */
+
+#ifndef __EVENT_H
+#define __EVENT_H
+
+#include <libcmpiutil.h>
+
+void init_events(void);
+
+#endif /* __EVENT_H */
diff -r 22540c8901bd -r 9c4cb3443e88 libxkutil/misc_util.c
--- a/libxkutil/misc_util.c	Sat Jul 25 16:19:43 2009 -0300
+++ b/libxkutil/misc_util.c	Thu Aug 13 09:32:18 2009 -0700
@@ -38,6 +38,7 @@
 
 #include "misc_util.h"
 #include "cs_util.h"
+#include "event.h"
 
 #include <config.h>
 
@@ -489,6 +490,7 @@
 
 bool libvirt_cim_init(void)
 {
+        init_events();
         return virInitialize() == 0;
 }
 




More information about the Libvirt-cim mailing list