[libvirt] [PATCH v2 1/5] Add a threadpool implementation
Hu Tao
hutao at cn.fujitsu.com
Wed Nov 24 03:53:58 UTC 2010
---
src/Makefile.am | 1 +
src/util/threadpool.c | 140 +++++++++++++++++++++++++++++++++++++++++++++++++
src/util/threadpool.h | 35 ++++++++++++
3 files changed, 176 insertions(+), 0 deletions(-)
create mode 100644 src/util/threadpool.c
create mode 100644 src/util/threadpool.h
diff --git a/src/Makefile.am b/src/Makefile.am
index a9a1986..5febd76 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -76,6 +76,7 @@ UTIL_SOURCES = \
util/uuid.c util/uuid.h \
util/util.c util/util.h \
util/xml.c util/xml.h \
+ util/threadpool.c util/threadpool.h \
util/virtaudit.c util/virtaudit.h \
util/virterror.c util/virterror_internal.h
diff --git a/src/util/threadpool.c b/src/util/threadpool.c
new file mode 100644
index 0000000..4bf0f8d
--- /dev/null
+++ b/src/util/threadpool.c
@@ -0,0 +1,140 @@
+#include <config.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+
+#include "threadpool.h"
+
+static void *workerHandleJob(void *data)
+{
+ struct virData *localData = NULL;
+ struct virWorkerPool *pool = data;
+
+ pthread_mutex_lock(&pool->mutex);
+
+ while (1) {
+ while (!pool->quit && !pool->dataList) {
+ pool->nFreeWorker++;
+ pthread_cond_signal(&pool->worker_cond);
+ pthread_cond_wait(&pool->cond, &pool->mutex);
+ pool->nFreeWorker--;
+
+ if (pool->nWorker > pool->nMaxWorker)
+ goto out;
+ }
+
+ while ((localData = pool->dataList) != NULL) {
+ pool->dataList = pool->dataList->next;
+ localData->next = NULL;
+
+ pthread_mutex_unlock(&pool->mutex);
+
+ (pool->func)(localData->data);
+ free(localData);
+
+ pthread_mutex_lock(&pool->mutex);
+ }
+
+ if (pool->quit)
+ break;
+ }
+
+out:
+ pool->nWorker--;
+ if (pool->nWorker == 0)
+ pthread_cond_signal(&pool->quit_cond);
+ pthread_mutex_unlock(&pool->mutex);
+
+ return NULL;
+}
+
+struct virWorkerPool *virWorkerPoolNew(int nWorker, int maxWorker, virWorkerFunc func)
+{
+ struct virWorkerPool *pool;
+ pthread_t pid;
+ int i;
+
+ if (nWorker < 0)
+ return NULL;
+
+ if (nWorker > maxWorker)
+ return NULL;
+
+ pool = malloc(sizeof(*pool));
+ if (!pool)
+ return NULL;
+
+ memset(pool, 0, sizeof(*pool));
+ pool->func = func;
+ pthread_mutex_init(&pool->mutex, NULL);
+ pthread_cond_init(&pool->cond, NULL);
+ pthread_cond_init(&pool->worker_cond, NULL);
+ pthread_cond_init(&pool->quit_cond, NULL);
+
+ for (i = 0; i < nWorker; i++) {
+ pthread_create(&pid, NULL, workerHandleJob, pool);
+ }
+
+ pool->nFreeWorker = 0;
+ pool->nWorker = nWorker;
+ pool->nMaxWorker = maxWorker;
+
+ return pool;
+}
+
+void virWorkerPoolFree(struct virWorkerPool *pool)
+{
+ pthread_mutex_lock(&pool->mutex);
+ pool->quit = 1;
+ if (pool->nWorker > 0) {
+ pthread_cond_broadcast(&pool->cond);
+ pthread_cond_wait(&pool->quit_cond, &pool->mutex);
+ }
+ pthread_mutex_unlock(&pool->mutex);
+ free(pool);
+}
+
+int virWorkerPoolSendJob(struct virWorkerPool *pool, void *data)
+{
+ pthread_t pid;
+ struct virData *localData;
+
+ localData = malloc(sizeof(*localData));
+ if (!localData)
+ return -1;
+
+ localData->data = data;
+
+ pthread_mutex_lock(&pool->mutex);
+ if (pool->quit) {
+ pthread_mutex_unlock(&pool->mutex);
+ free(localData);
+ return -1;
+ }
+
+ localData->next = pool->dataList;
+ pool->dataList = localData;
+
+ if (pool->nFreeWorker == 0 && pool->nWorker < pool->nMaxWorker) {
+ pthread_create(&pid, NULL, workerHandleJob, pool);
+ pool->nWorker++;
+ }
+
+ pthread_cond_signal(&pool->cond);
+
+ pthread_mutex_unlock(&pool->mutex);
+
+ return 0;
+}
+
+int virWorkerPoolSetMaxWorker(struct virWorkerPool *pool, int maxWorker)
+{
+ if (maxWorker < 0)
+ return -1;
+
+ pthread_mutex_lock(&pool->mutex);
+ pool->nMaxWorker = maxWorker;
+ pthread_mutex_unlock(&pool->mutex);
+
+ return 0;
+}
diff --git a/src/util/threadpool.h b/src/util/threadpool.h
new file mode 100644
index 0000000..5ff3a6b
--- /dev/null
+++ b/src/util/threadpool.h
@@ -0,0 +1,35 @@
+#ifndef __THREADPOOL_H__
+#define __THREADPOOL_H__
+
+#include <pthread.h>
+
+typedef void (*virWorkerFunc)(void *);
+
+struct virData {
+ struct virData *next;
+
+ void *data;
+};
+
+struct virWorkerPool {
+ int nWorker;
+ int nMaxWorker;
+ int nFreeWorker;
+
+ int quit;
+
+ virWorkerFunc func;
+ struct virData *dataList;
+
+ pthread_mutex_t mutex;
+ pthread_cond_t cond;
+ pthread_cond_t worker_cond;
+ pthread_cond_t quit_cond;
+};
+
+struct virWorkerPool *virWorkerPoolNew(int nWorker, int nMaxWorker, virWorkerFunc func);
+void virWorkerPoolFree(struct virWorkerPool *pool);
+int virWorkerPoolSendJob(struct virWorkerPool *wp, void *data);
+int virWorkerPoolSetMaxWorker(struct virWorkerPool *pool, int maxWorker);
+
+#endif
--
1.7.3
--
Thanks,
Hu Tao
More information about the libvir-list
mailing list