[libvirt] [PATCH 1/8] Thread pool impl
Hu Tao
hutao at cn.fujitsu.com
Thu Dec 2 01:43:12 UTC 2010
On Wed, Dec 01, 2010 at 05:26:27PM +0000, Daniel P. Berrange wrote:
> From: Hu Tao <hutao at cn.fujitsu.com>
>
> * src/util/threadpool.c, src/util/threadpool.h: Thread pool
> implementation
> * src/Makefile.am: Build thread pool
> ---
> src/Makefile.am | 1 +
> src/util/threadpool.c | 178 +++++++++++++++++++++++++++++++++++++++++++++++++
> src/util/threadpool.h | 23 ++++++
> 3 files changed, 202 insertions(+), 0 deletions(-)
> create mode 100644 src/util/threadpool.c
> create mode 100644 src/util/threadpool.h
>
> diff --git a/src/Makefile.am b/src/Makefile.am
> index a9a1986..d71c644 100644
> --- a/src/Makefile.am
> +++ b/src/Makefile.am
> @@ -73,6 +73,7 @@ UTIL_SOURCES = \
> util/threads.c util/threads.h \
> util/threads-pthread.h \
> util/threads-win32.h \
> + util/threadpool.c util/threadpool.h \
> util/uuid.c util/uuid.h \
> util/util.c util/util.h \
> util/xml.c util/xml.h \
> diff --git a/src/util/threadpool.c b/src/util/threadpool.c
> new file mode 100644
> index 0000000..cf998bf
> --- /dev/null
> +++ b/src/util/threadpool.c
> @@ -0,0 +1,178 @@
> +
> +#include <config.h>
> +
> +#include "threadpool.h"
> +#include "memory.h"
> +#include "threads.h"
> +#include "virterror_internal.h"
> +
> +#define VIR_FROM_THIS VIR_FROM_NONE
> +
> +typedef struct _virThreadPoolJob virThreadPoolJob;
> +typedef virThreadPoolJob *virThreadPoolJobPtr;
> +
> +struct _virThreadPoolJob {
> + virThreadPoolJobPtr next;
> +
> + void *data;
> +};
> +
> +struct _virThreadPool {
> + int quit;
> +
> + virThreadPoolJobFunc jobFunc;
> + void *jobOpaque;
> + virThreadPoolJobPtr jobList;
> +
> + virMutex mutex;
> + virCond cond;
> + virCond quit_cond;
> +
> + size_t maxWorkers;
> + size_t freeWorkers;
> + size_t nWorkers;
> + virThreadPtr workers;
> +};
> +
> +static void virThreadPoolWorker(void *opaque)
> +{
> + virThreadPoolPtr pool = opaque;
> +
> + virMutexLock(&pool->mutex);
> +
> + while (1) {
> + while (!pool->quit &&
> + !pool->jobList) {
> + pool->freeWorkers++;
> + if (virCondWait(&pool->cond, &pool->mutex) < 0) {
> + pool->freeWorkers--;
> + break;
> + }
> + pool->freeWorkers--;
> + }
> +
> + if (pool->quit)
> + break;
> +
> + virThreadPoolJobPtr job = pool->jobList;
> + pool->jobList = pool->jobList->next;
> + job->next = NULL;
> +
> + virMutexUnlock(&pool->mutex);
> + (pool->jobFunc)(job->data, pool->jobOpaque);
This could race if jobFunc does something with jobOpaque unless jobFunc
is aware of this and provides a lock to protect jobOpaque.
> + VIR_FREE(job);
> + virMutexLock(&pool->mutex);
> + }
> +
> + pool->nWorkers--;
> + if (pool->nWorkers == 0)
> + virCondSignal(&pool->quit_cond);
> + virMutexUnlock(&pool->mutex);
> +}
> +
> +virThreadPoolPtr virThreadPoolNew(size_t minWorkers,
> + size_t maxWorkers,
> + virThreadPoolJobFunc func,
> + void *opaque)
> +{
> + virThreadPoolPtr pool;
> + int i;
> +
> + if (VIR_ALLOC(pool) < 0) {
> + virReportOOMError();
> + return NULL;
> + }
> +
> + pool->jobFunc = func;
> + pool->jobOpaque = opaque;
> +
> + if (virMutexInit(&pool->mutex) < 0)
> + goto error;
> + if (virCondInit(&pool->cond) < 0)
> + goto error;
> + if (virCondInit(&pool->quit_cond) < 0)
> + goto error;
> +
> + if (VIR_ALLOC_N(pool->workers, minWorkers) < 0)
> + goto error;
> +
> + pool->maxWorkers = maxWorkers;
> + for (i = 0; i < minWorkers; i++) {
> + if (virThreadCreate(&pool->workers[i],
> + true,
> + virThreadPoolWorker,
> + pool) < 0)
> + goto error;
> + pool->nWorkers++;
> + }
There will be more than maxWorkers threads created if
minWorkers > maxWorkers
> +
> + return pool;
> +
> +error:
> + virThreadPoolFree(pool);
> + return NULL;
> +}
> +
> +void virThreadPoolFree(virThreadPoolPtr pool)
> +{
> + virMutexLock(&pool->mutex);
> + pool->quit = 1;
> + if (pool->nWorkers > 0) {
> + virCondBroadcast(&pool->cond);
> + if (virCondWait(&pool->quit_cond, &pool->mutex) < 0)
> + {}
> + }
> + VIR_FREE(pool->workers);
> + virMutexUnlock(&pool->mutex);
> + VIR_FREE(pool);
> +}
> +
> +int virThreadPoolSendJob(virThreadPoolPtr pool,
> + void *jobData)
> +{
> + virThreadPoolJobPtr job;
> + virThreadPoolJobPtr tmp;
> +
> + virMutexLock(&pool->mutex);
> + if (pool->quit)
> + goto error;
> +
> + if (pool->freeWorkers == 0 &&
> + pool->nWorkers < pool->maxWorkers) {
> + if (VIR_EXPAND_N(pool->workers, pool->nWorkers, 1) < 0) {
> + virReportOOMError();
> + goto error;
> + }
> +
> + if (virThreadCreate(&pool->workers[pool->nWorkers],
> + true,
> + virThreadPoolWorker,
> + pool) < 0)
> + goto error;
> + pool->nWorkers++;
> + }
> +
> + if (VIR_ALLOC(job) < 0) {
> + virReportOOMError();
> + goto error;
> + }
> +
> + job->data = jobData;
> +
> + tmp = pool->jobList;
> + while (tmp && tmp->next)
> + tmp = tmp->next;
> + if (tmp)
> + tmp->next = job;
> + else
> + pool->jobList = job;
> +
> + virCondSignal(&pool->cond);
> + virMutexUnlock(&pool->mutex);
> +
> + return 0;
> +
> +error:
> + virMutexUnlock(&pool->mutex);
> + return -1;
> +}
> diff --git a/src/util/threadpool.h b/src/util/threadpool.h
> new file mode 100644
> index 0000000..093786f
> --- /dev/null
> +++ b/src/util/threadpool.h
> @@ -0,0 +1,23 @@
> +#ifndef __THREADPOOL_H__
> +#define __THREADPOOL_H__
> +
> +#include <pthread.h>
> +
> +typedef struct _virThreadPool virThreadPool;
> +typedef virThreadPool *virThreadPoolPtr;
> +
> +typedef void (*virThreadPoolJobFunc)(void *jobdata, void *opaque);
> +
> +virThreadPoolPtr virThreadPoolNew(size_t minWorkers,
> + size_t maxWorkers,
> + virThreadPoolJobFunc func,
> + void *opaque);
> +
> +void virThreadPoolShutdown(virThreadPoolPtr pool);
> +
> +void virThreadPoolFree(virThreadPoolPtr pool);
> +
> +int virThreadPoolSendJob(virThreadPoolPtr pool,
> + void *jobdata);
> +
> +#endif
> --
> 1.7.2.3
More information about the libvir-list
mailing list