[libvirt] [PATCH v3 1/5] Add a threadpool implementation
Eric Blake
eblake at redhat.com
Tue Nov 30 20:54:43 UTC 2010
On 11/30/2010 12:14 AM, Hu Tao wrote:
> ---
> src/Makefile.am | 1 +
> src/util/threadpool.c | 140 +++++++++++++++++++++++++++++++++++++++++++++++++
> src/util/threadpool.h | 35 ++++++++++++
> 3 files changed, 176 insertions(+), 0 deletions(-)
> create mode 100644 src/util/threadpool.c
> create mode 100644 src/util/threadpool.h
>
> diff --git a/src/Makefile.am b/src/Makefile.am
> index a9a1986..5febd76 100644
> --- a/src/Makefile.am
> +++ b/src/Makefile.am
> @@ -76,6 +76,7 @@ UTIL_SOURCES = \
> util/uuid.c util/uuid.h \
> util/util.c util/util.h \
> util/xml.c util/xml.h \
> + util/threadpool.c util/threadpool.h \
> util/virtaudit.c util/virtaudit.h \
> util/virterror.c util/virterror_internal.h
>
> diff --git a/src/util/threadpool.c b/src/util/threadpool.c
> new file mode 100644
> index 0000000..4bf0f8d
> --- /dev/null
> +++ b/src/util/threadpool.c
> @@ -0,0 +1,140 @@
Copyright header?
> +#include <config.h>
> +#include <stdio.h>
> +#include <stdlib.h>
> +#include <string.h>
> +
> +#include "threadpool.h"
> +
> +static void *workerHandleJob(void *data)
> +{
> + struct virData *localData = NULL;
> + struct virWorkerPool *pool = data;
> +
> + pthread_mutex_lock(&pool->mutex);
We should be using virMutexLock here, so as to also be portable to mingw.
> +
> + while (1) {
> + while (!pool->quit && !pool->dataList) {
> + pool->nFreeWorker++;
> + pthread_cond_signal(&pool->worker_cond);
Likewise, virCondSignal here.
> + pthread_cond_wait(&pool->cond, &pool->mutex);
and virCondWait.
> + pool->nFreeWorker--;
> +
> + if (pool->nWorker > pool->nMaxWorker)
> + goto out;
> + }
> +
> + while ((localData = pool->dataList) != NULL) {
> + pool->dataList = pool->dataList->next;
> + localData->next = NULL;
> +
> + pthread_mutex_unlock(&pool->mutex);
> +
> + (pool->func)(localData->data);
> + free(localData);
VIR_FREE().
> +
> + pthread_mutex_lock(&pool->mutex);
> + }
> +
> + if (pool->quit)
> + break;
> + }
> +
> +out:
> + pool->nWorker--;
> + if (pool->nWorker == 0)
> + pthread_cond_signal(&pool->quit_cond);
> + pthread_mutex_unlock(&pool->mutex);
> +
> + return NULL;
> +}
> +
> +struct virWorkerPool *virWorkerPoolNew(int nWorker, int maxWorker, virWorkerFunc func)
> +{
> + struct virWorkerPool *pool;
> + pthread_t pid;
> + int i;
> +
> + if (nWorker < 0)
> + return NULL;
> +
> + if (nWorker > maxWorker)
> + return NULL;
daemon/libvirtd.c already has a notion of worker threads; I'm wondering
how much overlap there is between your implementation and that one. A
better proof that this would be a useful API addition would be to have
the next patch in the series convert libvirtd.c over to using this API.
> +
> + pool = malloc(sizeof(*pool));
Run 'make syntax-check' - it would have complained about this. Use
VIR_ALLOC or VIR_ALLOC_N instead of malloc.
> + if (!pool)
> + return NULL;
> +
> + memset(pool, 0, sizeof(*pool));
> + pool->func = func;
> + pthread_mutex_init(&pool->mutex, NULL);
virMutexInit()
> + pthread_cond_init(&pool->cond, NULL);
> + pthread_cond_init(&pool->worker_cond, NULL);
> + pthread_cond_init(&pool->quit_cond, NULL);
virCondInit()
> +
> + for (i = 0; i < nWorker; i++) {
> + pthread_create(&pid, NULL, workerHandleJob, pool);
virThreadCreate()
> + }
> +
> + pool->nFreeWorker = 0;
> + pool->nWorker = nWorker;
> + pool->nMaxWorker = maxWorker;
> +
> + return pool;
> +}
> +
> +void virWorkerPoolFree(struct virWorkerPool *pool)
> +{
> + pthread_mutex_lock(&pool->mutex);
> + pool->quit = 1;
Use <stdbool.h> and bool if a value will only ever be 0 or 1.
> + if (pool->nWorker > 0) {
> + pthread_cond_broadcast(&pool->cond);
> + pthread_cond_wait(&pool->quit_cond, &pool->mutex);
> + }
> + pthread_mutex_unlock(&pool->mutex);
> + free(pool);
VIR_FREE()
> +}
> +
> +int virWorkerPoolSendJob(struct virWorkerPool *pool, void *data)
> +{
> + pthread_t pid;
> + struct virData *localData;
> +
> + localData = malloc(sizeof(*localData));
VIR_ALLOC()
> + if (!localData)
> + return -1;
> +
> + localData->data = data;
> +
> + pthread_mutex_lock(&pool->mutex);
> + if (pool->quit) {
> + pthread_mutex_unlock(&pool->mutex);
> + free(localData);
> + return -1;
> + }
> +
> + localData->next = pool->dataList;
> + pool->dataList = localData;
> +
> + if (pool->nFreeWorker == 0 && pool->nWorker < pool->nMaxWorker) {
> + pthread_create(&pid, NULL, workerHandleJob, pool);
> + pool->nWorker++;
> + }
> +
> + pthread_cond_signal(&pool->cond);
> +
> + pthread_mutex_unlock(&pool->mutex);
> +
> + return 0;
> +}
> +
> +int virWorkerPoolSetMaxWorker(struct virWorkerPool *pool, int maxWorker)
> +{
> + if (maxWorker < 0)
> + return -1;
> +
> + pthread_mutex_lock(&pool->mutex);
> + pool->nMaxWorker = maxWorker;
> + pthread_mutex_unlock(&pool->mutex);
Does this do the right thing if maxWorker < pool->nMaxWorker, or does it
silently lose existing workers?
> +
> + return 0;
> +}
> diff --git a/src/util/threadpool.h b/src/util/threadpool.h
> new file mode 100644
> index 0000000..5ff3a6b
> --- /dev/null
> +++ b/src/util/threadpool.h
> @@ -0,0 +1,35 @@
Copyright header?
> +#ifndef __THREADPOOL_H__
> +#define __THREADPOOL_H__
Use of the __ namespace risks collision with the system; I'd feel better
if this were __VIR_THREADPOOL_H__.
> +
> +#include <pthread.h>
"threads.h", not <pthread.h>, so we can support mingw
> +
> +typedef void (*virWorkerFunc)(void *);
pthread_create() takes a function that can return void*. Should worker
functions be allowed to return a value?
> +
> +struct virData {
> + struct virData *next;
> +
> + void *data;
> +};
We've typically used typedefs to avoid having to type 'struct virData'
everywhere else.
> +
> +struct virWorkerPool {
> + int nWorker;
> + int nMaxWorker;
> + int nFreeWorker;
s/int/size_t/ when dealing with non-zero counts.
> +
> + int quit;
s/int/bool/
> +
> + virWorkerFunc func;
> + struct virData *dataList;
> +
> + pthread_mutex_t mutex;
> + pthread_cond_t cond;
> + pthread_cond_t worker_cond;
> + pthread_cond_t quit_cond;
virMutex, virCond
> +};
> +
> +struct virWorkerPool *virWorkerPoolNew(int nWorker, int nMaxWorker, virWorkerFunc func);
needs ATTRIBUTE_RETURN_CHECK.
> +void virWorkerPoolFree(struct virWorkerPool *pool);
> +int virWorkerPoolSendJob(struct virWorkerPool *wp, void *data);
ATTRIBUTE_NONNULL(1)
> +int virWorkerPoolSetMaxWorker(struct virWorkerPool *pool, int maxWorker);
ATTRIBUTE_NONNULL(1)
> +
> +#endif
--
Eric Blake eblake at redhat.com +1-801-349-2682
Libvirt virtualization library http://libvirt.org
-------------- next part --------------
A non-text attachment was scrubbed...
Name: signature.asc
Type: application/pgp-signature
Size: 619 bytes
Desc: OpenPGP digital signature
URL: <http://listman.redhat.com/archives/libvir-list/attachments/20101130/dc1f0231/attachment-0001.sig>
More information about the libvir-list
mailing list