[libvirt] [PATCH 01/10] util: Introduce thread queues

Jiri Denemark jdenemar at redhat.com
Thu May 21 22:42:34 UTC 2015


Our usage of pthread conditions does not allow a single thread to wait
for several events from different sources. This is because the condition
is bound to the source of the event. We can invert the usage by giving
each thread its own condition and providing APIs for registering this
thread condition with several sources. Each of the sources can then
signal the thread condition.

Thread queues also support several threads to be registered with a
single event source, which can either wakeup all waiting threads or just
the first one.

Signed-off-by: Jiri Denemark <jdenemar at redhat.com>
---
 po/POTFILES.in            |   1 +
 src/Makefile.am           |   2 +
 src/libvirt_private.syms  |  15 ++
 src/util/virthreadqueue.c | 343 ++++++++++++++++++++++++++++++++++++++++++++++
 src/util/virthreadqueue.h |  54 ++++++++
 5 files changed, 415 insertions(+)
 create mode 100644 src/util/virthreadqueue.c
 create mode 100644 src/util/virthreadqueue.h

diff --git a/po/POTFILES.in b/po/POTFILES.in
index bb0f6e1..edfd1cc 100644
--- a/po/POTFILES.in
+++ b/po/POTFILES.in
@@ -222,6 +222,7 @@ src/util/virstoragefile.c
 src/util/virstring.c
 src/util/virsysinfo.c
 src/util/virthreadjob.c
+src/util/virthreadqueue.c
 src/util/virtime.c
 src/util/virtpm.c
 src/util/virtypedparam.c
diff --git a/src/Makefile.am b/src/Makefile.am
index 579421d..c746ecd 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -162,6 +162,7 @@ UTIL_SOURCES =							\
 		util/virthread.c util/virthread.h		\
 		util/virthreadjob.c util/virthreadjob.h		\
 		util/virthreadpool.c util/virthreadpool.h	\
+		util/virthreadqueue.c util/virthreadqueue.h	\
 		util/virtime.h util/virtime.c			\
 		util/virtpm.h util/virtpm.c			\
 		util/virtypedparam.c util/virtypedparam.h	\
@@ -2169,6 +2170,7 @@ libvirt_setuid_rpc_client_la_SOURCES = 		\
 		util/virtime.c			\
 		util/virthread.c		\
 		util/virthreadjob.c		\
+		util/virthreadqueue.c		\
 		util/virtypedparam.c		\
 		util/viruri.c			\
 		util/virutil.c			\
diff --git a/src/libvirt_private.syms b/src/libvirt_private.syms
index 6a95fb9..cb41d5c 100644
--- a/src/libvirt_private.syms
+++ b/src/libvirt_private.syms
@@ -2242,6 +2242,21 @@ virThreadPoolNewFull;
 virThreadPoolSendJob;
 
 
+# util/virthreadqueue.h
+virThreadCondInit;
+virThreadCondInvalidate;
+virThreadCondNew;
+virThreadCondWait;
+virThreadCondWaitUntil;
+virThreadQueueBroadcast;
+virThreadQueueFree;
+virThreadQueueIsEmpty;
+virThreadQueueNew;
+virThreadQueueRegister;
+virThreadQueueSignal;
+virThreadQueueUnregister;
+
+
 # util/virtime.h
 virTimeFieldsNow;
 virTimeFieldsNowRaw;
diff --git a/src/util/virthreadqueue.c b/src/util/virthreadqueue.c
new file mode 100644
index 0000000..c9908cf
--- /dev/null
+++ b/src/util/virthreadqueue.c
@@ -0,0 +1,343 @@
+/*
+ * virthreadqueue.c: code for managing queues of threads waiting for several
+ * conditions
+ *
+ * Copyright (C) 2015 Red Hat, Inc.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library.  If not, see
+ * <http://www.gnu.org/licenses/>.
+ *
+ * Author: Jiri Denemark <jdenemar at redhat.com>
+ */
+
+#include <config.h>
+
+#include "internal.h"
+#include "viralloc.h"
+#include "virerror.h"
+#include "virlog.h"
+#include "virobject.h"
+#include "virthread.h"
+#include "virthreadqueue.h"
+
+#define VIR_FROM_THIS VIR_FROM_THREAD
+VIR_LOG_INIT("util.threadqueue");
+
+
+typedef struct _virThreadQueueItem virThreadQueueItem;
+typedef virThreadQueueItem *virThreadQueueItemPtr;
+
+struct _virThreadQueue {
+    virThreadQueueItemPtr head;
+    virThreadQueueItemPtr tail;
+};
+
+struct _virThreadQueueItem {
+    virThreadQueueItemPtr prev;
+    virThreadQueueItemPtr next;
+    virThreadCondPtr cond;
+};
+
+struct _virThreadCond {
+    virObject parent;
+
+    bool valid;
+    virCond cond;
+};
+
+static virClassPtr virThreadCondClass;
+static virThreadLocal virThreadQueueCond;
+
+static void virThreadCondDispose(void *obj);
+
+
+static int
+virThreadQueueOnceInit(void)
+{
+    if (virThreadLocalInit(&virThreadQueueCond, NULL) < 0)
+        return -1;
+
+    virThreadCondClass = virClassNew(virClassForObject(),
+                                     "virThreadCond",
+                                     sizeof(virThreadCond),
+                                     virThreadCondDispose);
+    if (!virThreadCondClass)
+        return -1;
+
+    return 0;
+}
+
+VIR_ONCE_GLOBAL_INIT(virThreadQueue)
+
+
+virThreadCondPtr
+virThreadCondNew(void)
+{
+    virThreadCondPtr cond;
+
+    if (virThreadQueueInitialize() < 0 ||
+        !(cond = virObjectNew(virThreadCondClass)))
+        return NULL;
+
+    if (virCondInit(&cond->cond) < 0) {
+        virObjectUnref(cond);
+        return NULL;
+    }
+
+    cond->valid = true;
+    return cond;
+}
+
+
+static void
+virThreadCondDispose(void *obj)
+{
+    virThreadCondPtr cond = obj;
+
+    virCondDestroy(&cond->cond);
+}
+
+
+void
+virThreadCondInit(virThreadCondPtr cond)
+{
+    if (virThreadLocalSet(&virThreadQueueCond, (void *) cond) < 0) {
+        virReportSystemError(errno,
+                             _("cannot set per-thread condition for %llu"),
+                             virThreadSelfID());
+        virObjectUnref(cond);
+    }
+}
+
+
+static virThreadCondPtr
+virThreadCondGet(void)
+{
+    virThreadCondPtr cond;
+
+    if (virThreadQueueInitialize() < 0)
+        return NULL;
+
+    if (!(cond = virThreadLocalGet(&virThreadQueueCond))) {
+        virReportError(VIR_ERR_INTERNAL_ERROR,
+                       _("uninitialized per-thread condition for %llu"),
+                      virThreadSelfID());
+        return NULL;
+    }
+
+    return cond;
+}
+
+
+void
+virThreadCondInvalidate(void)
+{
+    virThreadCondPtr cond;
+
+    if (!(cond = virThreadCondGet()))
+        return;
+
+    cond->valid = false;
+    virObjectUnref(cond);
+}
+
+
+int
+virThreadCondWait(virMutexPtr mtx)
+{
+    virThreadCondPtr cond;
+
+    if (!(cond = virThreadCondGet()))
+        return -1;
+
+    if (virCondWait(&cond->cond, mtx) < 0) {
+        virReportSystemError(errno, "%s",
+                             _("failed to wait for thread condition"));
+        return -1;
+    }
+    return 0;
+}
+
+
+int
+virThreadCondWaitUntil(virMutexPtr mtx,
+                       unsigned long long whenms)
+{
+    virThreadCondPtr cond;
+
+    if (!(cond = virThreadCondGet()))
+        return -1;
+
+    if (virCondWaitUntil(&cond->cond, mtx, whenms) < 0 &&
+        errno != ETIMEDOUT) {
+        virReportSystemError(errno, "%s",
+                             _("failed to wait for thread condition"));
+        return -1;
+    }
+    return 0;
+}
+
+
+static virThreadQueueItemPtr
+virThreadQueueFind(virThreadQueuePtr queue,
+                   virThreadCondPtr cond)
+{
+    virThreadQueueItemPtr item;
+
+    for (item = queue->head; item; item = item->next) {
+        if (item->cond == cond)
+            return item;
+    }
+
+    return NULL;
+}
+
+
+static int
+virThreadQueueAdd(virThreadQueuePtr queue,
+                  virThreadCondPtr cond)
+{
+    virThreadQueueItemPtr item;
+
+    if (VIR_ALLOC(item) < 0)
+        return -1;
+
+    virObjectRef(cond);
+    item->cond = cond;
+
+    if (queue->tail)
+        queue->tail->next = item;
+    else
+        queue->head = item;
+    item->prev = queue->tail;
+    queue->tail = item;
+
+    return 0;
+}
+
+
+static void
+virThreadQueueRemove(virThreadQueuePtr queue,
+                     virThreadQueueItemPtr item)
+{
+    if (item->prev)
+        item->prev->next = item->next;
+    else
+        queue->head = item->next;
+
+    if (item->next)
+        item->next->prev = item->prev;
+    else
+        queue->tail = item->prev;
+
+    virObjectUnref(item->cond);
+    VIR_FREE(item);
+}
+
+
+static void
+virThreadQueueRemoveInvalid(virThreadQueuePtr queue)
+{
+    virThreadQueueItemPtr item;
+
+    item = queue->head;
+    while (item) {
+        virThreadQueueItemPtr next = item->next;
+
+        if (!item->cond->valid)
+            virThreadQueueRemove(queue, item);
+        item = next;
+    }
+}
+
+
+virThreadQueuePtr
+virThreadQueueNew(void)
+{
+    virThreadQueuePtr queue;
+
+    if (VIR_ALLOC(queue) < 0)
+        return NULL;
+
+    return queue;
+}
+
+
+void
+virThreadQueueFree(virThreadQueuePtr queue)
+{
+    if (!queue)
+        return;
+
+    while (queue->head)
+        virThreadQueueRemove(queue, queue->head);
+    VIR_FREE(queue);
+}
+
+
+bool
+virThreadQueueIsEmpty(virThreadQueuePtr queue)
+{
+    virThreadQueueRemoveInvalid(queue);
+    return !queue->head;
+}
+
+
+void
+virThreadQueueSignal(virThreadQueuePtr queue)
+{
+    virThreadQueueRemoveInvalid(queue);
+    if (queue->head)
+        virCondSignal(&queue->head->cond->cond);
+}
+
+
+void
+virThreadQueueBroadcast(virThreadQueuePtr queue)
+{
+    virThreadQueueItemPtr item;
+
+    virThreadQueueRemoveInvalid(queue);
+    for (item = queue->head; item; item = item->next)
+        virCondSignal(&item->cond->cond);
+}
+
+
+int
+virThreadQueueRegister(virThreadQueuePtr queue)
+{
+    virThreadCondPtr cond;
+
+    if (!(cond = virThreadCondGet()))
+        return -1;
+
+    if (virThreadQueueFind(queue, cond))
+        return 0;
+
+    return virThreadQueueAdd(queue, cond);
+}
+
+
+void
+virThreadQueueUnregister(virThreadQueuePtr queue)
+{
+    virThreadCondPtr cond;
+    virThreadQueueItemPtr item;
+
+    if (!(cond = virThreadCondGet()) ||
+        !(item = virThreadQueueFind(queue, cond)))
+        return;
+
+    virThreadQueueRemove(queue, item);
+}
diff --git a/src/util/virthreadqueue.h b/src/util/virthreadqueue.h
new file mode 100644
index 0000000..6ae8d9e
--- /dev/null
+++ b/src/util/virthreadqueue.h
@@ -0,0 +1,54 @@
+/*
+ * virthreadqueue.h: APIs for managing queues of threads waiting for several
+ * conditions
+ *
+ * Copyright (C) 2015 Red Hat, Inc.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library.  If not, see
+ * <http://www.gnu.org/licenses/>.
+ *
+ * Author: Jiri Denemark <jdenemar at redhat.com>
+ */
+
+#ifndef __VIR_THREAD_QUEUE_H__
+# define __VIR_THREAD_QUEUE_H__
+
+# include "virthread.h"
+
+
+typedef struct _virThreadCond virThreadCond;
+typedef virThreadCond *virThreadCondPtr;
+
+virThreadCondPtr virThreadCondNew(void);
+void virThreadCondInit(virThreadCondPtr cond);
+void virThreadCondInvalidate(void);
+int virThreadCondWait(virMutexPtr mtx)
+    ATTRIBUTE_RETURN_CHECK;
+int virThreadCondWaitUntil(virMutexPtr mtx,
+                           unsigned long long whenms)
+    ATTRIBUTE_RETURN_CHECK;
+
+typedef struct _virThreadQueue virThreadQueue;
+typedef virThreadQueue *virThreadQueuePtr;
+
+virThreadQueuePtr virThreadQueueNew(void);
+void virThreadQueueFree(virThreadQueuePtr queue);
+
+bool virThreadQueueIsEmpty(virThreadQueuePtr queue);
+void virThreadQueueSignal(virThreadQueuePtr queue);
+void virThreadQueueBroadcast(virThreadQueuePtr queue);
+int virThreadQueueRegister(virThreadQueuePtr queue);
+void virThreadQueueUnregister(virThreadQueuePtr queue);
+
+#endif
-- 
2.4.1




More information about the libvir-list mailing list