[libvirt] [PATCH 1/8] Thread pool impl

Eric Blake eblake at redhat.com
Wed Dec 1 19:17:39 UTC 2010


On 12/01/2010 10:26 AM, Daniel P. Berrange wrote:
> From: Hu Tao <hutao at cn.fujitsu.com>

Not sure whether Daniel or Hu will be the next person to post an
improvement of this patch.  Some of my comments are repeated from Hu's
first attempt, although many of them have been addressed in updating to
libvirt APIs.

> 
> * src/util/threadpool.c, src/util/threadpool.h: Thread pool
>   implementation
> * src/Makefile.am: Build thread pool
> ---
>  src/Makefile.am       |    1 +
>  src/util/threadpool.c |  178 +++++++++++++++++++++++++++++++++++++++++++++++++
>  src/util/threadpool.h |   23 ++++++
>  3 files changed, 202 insertions(+), 0 deletions(-)
>  create mode 100644 src/util/threadpool.c
>  create mode 100644 src/util/threadpool.h

> +++ b/src/util/threadpool.c
> @@ -0,0 +1,178 @@

Still missing copyright blurb.

> +
> +struct _virThreadPool {
> +    int quit;

s/int/bool/ (which means including <stdbool.h> earlier)


> +virThreadPoolPtr virThreadPoolNew(size_t minWorkers,
> +                                  size_t maxWorkers,
> +                                  virThreadPoolJobFunc func,
> +                                  void *opaque)
> +{
> +    virThreadPoolPtr pool;
> +    int i;

s/int/size_t/

> +void virThreadPoolFree(virThreadPoolPtr pool)
> +{

if (!pool) return;

> +    virMutexLock(&pool->mutex);

What happens if the reason we called this was because virMutexInit
failed during virThreadPoolNew?  Is it still safe to blindly call
virMutexLock on a broken mutex?

> +    pool->quit = 1;

s/1/true/

> +    if (pool->nWorkers > 0) {
> +        virCondBroadcast(&pool->cond);
> +        if (virCondWait(&pool->quit_cond, &pool->mutex) < 0)
> +        {}

Is it any better to use "ignore-value.h" and ignore_value(virCondWait),
instead of the empty if () {}?

> +    }
> +    VIR_FREE(pool->workers);
> +    virMutexUnlock(&pool->mutex);

missing cleanups for mutex, cond, quit_cond (virMutexDestroy,
virCondDestroy).  And if you are forcefully quitting while jobs are
still unclaimed, is it possible that jobList might also need freeing, or
is that guaranteed to be taken care of by the virCondWait?

> +    VIR_FREE(pool);
> +}
> +
> +int virThreadPoolSendJob(virThreadPoolPtr pool,
> +                         void *jobData)
> +{
> +    virThreadPoolJobPtr job;
> +    virThreadPoolJobPtr tmp;
> +
> +    virMutexLock(&pool->mutex);
> +    if (pool->quit)
> +        goto error;
> +
> +    if (pool->freeWorkers == 0 &&
> +        pool->nWorkers < pool->maxWorkers) {
> +        if (VIR_EXPAND_N(pool->workers, pool->nWorkers, 1) < 0) {
> +            virReportOOMError();
> +            goto error;
> +        }
> +
> +        if (virThreadCreate(&pool->workers[pool->nWorkers],
> +                            true,
> +                            virThreadPoolWorker,
> +                            pool) < 0)
> +            goto error;
> +        pool->nWorkers++;

Problem.  VIR_EXPAND_N already incremented pool->nWorkers, but you are
incrementing it again.  You either need to use:

VIR_RESIZE_N(pool->workers, pool->maxworkers, pool->nWorkers, 1)
pool->nWorkers++

or keep the VIR_EXPAND_N, but set pool->workers[pool->nWorkers - 1] to
the created thread.

> +    }
> +
> +    if (VIR_ALLOC(job) < 0) {
> +        virReportOOMError();
> +        goto error;
> +    }
> +
> +    job->data = jobData;
> +
> +    tmp = pool->jobList;
> +    while (tmp && tmp->next)
> +        tmp = tmp->next;
> +    if (tmp)
> +        tmp->next = job;
> +    else
> +        pool->jobList = job;

Good - FIFO job ordering.  However, this is O(n) list traversal; would
it make sense to keep track of a list head and list tail pointer, so
that we can have O(1) job insertion, or is that extra bookkeeping not
worth it given that our maximum job queue size is not going to typically
be that large?

> +
> +    virCondSignal(&pool->cond);
> +    virMutexUnlock(&pool->mutex);
> +
> +    return 0;
> +
> +error:
> +    virMutexUnlock(&pool->mutex);
> +    return -1;
> +}
> diff --git a/src/util/threadpool.h b/src/util/threadpool.h
> new file mode 100644
> index 0000000..093786f
> --- /dev/null
> +++ b/src/util/threadpool.h
> @@ -0,0 +1,23 @@

Missing copyright blurb.

> +#ifndef __THREADPOOL_H__
> +#define __THREADPOOL_H__

I'd prefer __VIR_THREADPOOL_H__, to avoid potential collisions in the __
namespace.

> +
> +#include <pthread.h>

Wrong header; this should be "threads.h"

> +
> +typedef struct _virThreadPool virThreadPool;
> +typedef virThreadPool *virThreadPoolPtr;
> +
> +typedef void (*virThreadPoolJobFunc)(void *jobdata, void *opaque);
> +
> +virThreadPoolPtr virThreadPoolNew(size_t minWorkers,
> +                                  size_t maxWorkers,
> +                                  virThreadPoolJobFunc func,
> +                                  void *opaque);

ATTRIBUTE_NONNULL(3) ATTRIBUTE_RETURN_CHECK

> +
> +void virThreadPoolShutdown(virThreadPoolPtr pool);

ATTRIBUTE_NONNULL(1)

> +
> +void virThreadPoolFree(virThreadPoolPtr pool);

add this to the list of free-like functions in cfg.mk

> +
> +int virThreadPoolSendJob(virThreadPoolPtr pool,
> +                         void *jobdata);

ATTRIBUTE_NONNULL(1) ATTRIBUTE_NONNULL(2) ATTRIBUTE_RETURN_CHECK

-- 
Eric Blake   eblake at redhat.com    +1-801-349-2682
Libvirt virtualization library http://libvirt.org

-------------- next part --------------
A non-text attachment was scrubbed...
Name: signature.asc
Type: application/pgp-signature
Size: 619 bytes
Desc: OpenPGP digital signature
URL: <http://listman.redhat.com/archives/libvir-list/attachments/20101201/96195704/attachment-0001.sig>


More information about the libvir-list mailing list