[libvirt] [PATCH v4 RESEND 4/4] Using threadpool API to manage qemud worker

Daniel P. Berrange berrange at redhat.com
Thu Dec 2 12:42:19 UTC 2010


On Thu, Dec 02, 2010 at 03:30:23PM +0800, Hu Tao wrote:
> ---
>  daemon/libvirtd.c |  172 +++++++++--------------------------------------------
>  daemon/libvirtd.h |    4 +
>  2 files changed, 33 insertions(+), 143 deletions(-)
> 
> diff --git a/daemon/libvirtd.c b/daemon/libvirtd.c
> index 791b3dc..dbd050a 100644
> --- a/daemon/libvirtd.c
> +++ b/daemon/libvirtd.c
> @@ -67,6 +67,7 @@
>  #include "stream.h"
>  #include "hooks.h"
>  #include "virtaudit.h"
> +#include "threadpool.h"
>  #ifdef HAVE_AVAHI
>  # include "mdns.h"
>  #endif
> @@ -248,7 +249,6 @@ static void sig_handler(int sig, siginfo_t * siginfo,
>  
>  static void qemudDispatchClientEvent(int watch, int fd, int events, void *opaque);
>  static void qemudDispatchServerEvent(int watch, int fd, int events, void *opaque);
> -static int qemudStartWorker(struct qemud_server *server, struct qemud_worker *worker);
>  
>  void
>  qemudClientMessageQueuePush(struct qemud_client_message **queue,
> @@ -1383,6 +1383,7 @@ static int qemudDispatchServer(struct qemud_server *server, struct qemud_socket
>      client->auth = sock->auth;
>      client->addr = addr;
>      client->addrstr = addrstr;
> +    client->server = server;
>      addrstr = NULL;

This shouldn't be needed, as 'server' shoudl be passed into
the worker function via the 'void *opaque' parameter.

>  
>      for (i = 0 ; i < VIR_DOMAIN_EVENT_ID_LAST ; i++) {
> @@ -1458,19 +1459,6 @@ static int qemudDispatchServer(struct qemud_server *server, struct qemud_socket
>  
>      server->clients[server->nclients++] = client;
>  
> -    if (server->nclients > server->nactiveworkers &&
> -        server->nactiveworkers < server->nworkers) {
> -        for (i = 0 ; i < server->nworkers ; i++) {
> -            if (!server->workers[i].hasThread) {
> -                if (qemudStartWorker(server, &server->workers[i]) < 0)
> -                    return -1;
> -                server->nactiveworkers++;
> -                break;
> -            }
> -        }
> -    }
> -
> -
>      return 0;
>  
>  error:
> @@ -1534,100 +1522,27 @@ void qemudDispatchClientFailure(struct qemud_client *client) {
>      VIR_FREE(client->addrstr);
>  }
>  
> -
> -/* Caller must hold server lock */
> -static struct qemud_client *qemudPendingJob(struct qemud_server *server)
> +static void qemudWorker(void *data, void *opaque ATTRIBUTE_UNUSED)
>  {
> -    int i;
> -    for (i = 0 ; i < server->nclients ; i++) {
> -        virMutexLock(&server->clients[i]->lock);
> -        if (server->clients[i]->dx) {
> -            /* Delibrately don't unlock client - caller wants the lock */
> -            return server->clients[i];
> -        }
> -        virMutexUnlock(&server->clients[i]->lock);
> -    }
> -    return NULL;
> -}
> +    struct qemud_client *client = data;
> +    struct qemud_client_message *msg;
>  
> -static void *qemudWorker(void *data)
> -{
> -    struct qemud_worker *worker = data;
> -    struct qemud_server *server = worker->server;
> +    virMutexLock(&client->lock);

It is neccessary to hold the lock on 'server' before obtaining a
lock on 'client'. The server lock can be released again immediately
if no longer needed.

>  
> -    while (1) {
> -        struct qemud_client *client = NULL;
> -        struct qemud_client_message *msg;
> +    /* Remove our message from dispatch queue while we use it */
> +    msg = qemudClientMessageQueueServe(&client->dx);
>  
> -        virMutexLock(&server->lock);
> -        while ((client = qemudPendingJob(server)) == NULL) {
> -            if (worker->quitRequest ||
> -                virCondWait(&server->job, &server->lock) < 0) {
> -                virMutexUnlock(&server->lock);
> -                return NULL;
> -            }
> -        }
> -        if (worker->quitRequest) {
> -            virMutexUnlock(&client->lock);
> -            virMutexUnlock(&server->lock);
> -            return NULL;
> -        }
> -        worker->processingCall = 1;
> -        virMutexUnlock(&server->lock);
> -
> -        /* We own a locked client now... */
> -        client->refs++;
> -
> -        /* Remove our message from dispatch queue while we use it */
> -        msg = qemudClientMessageQueueServe(&client->dx);
> -
> -        /* This function drops the lock during dispatch,
> -         * and re-acquires it before returning */
> -        if (remoteDispatchClientRequest (server, client, msg) < 0) {
> -            VIR_FREE(msg);
> -            qemudDispatchClientFailure(client);
> -            client->refs--;
> -            virMutexUnlock(&client->lock);
> -            continue;
> -        }
> -
> -        client->refs--;
> -        virMutexUnlock(&client->lock);
> -
> -        virMutexLock(&server->lock);
> -        worker->processingCall = 0;
> -        virMutexUnlock(&server->lock);
> -    }
> -}
> -
> -static int qemudStartWorker(struct qemud_server *server,
> -                            struct qemud_worker *worker) {
> -    pthread_attr_t attr;
> -    pthread_attr_init(&attr);
> -    /* We want to join workers, so don't detach them */
> -    /*pthread_attr_setdetachstate(&attr, 1);*/
> -
> -    if (worker->hasThread)
> -        return -1;
> -
> -    worker->server = server;
> -    worker->hasThread = 1;
> -    worker->quitRequest = 0;
> -    worker->processingCall = 0;
> -
> -    if (pthread_create(&worker->thread,
> -                       &attr,
> -                       qemudWorker,
> -                       worker) != 0) {
> -        worker->hasThread = 0;
> -        worker->server = NULL;
> -        return -1;
> +    /* This function drops the lock during dispatch,
> +     * and re-acquires it before returning */
> +    if (remoteDispatchClientRequest (client->server, client, msg) < 0) {
> +        VIR_FREE(msg);
> +        qemudDispatchClientFailure(client);
>      }
>  
> -    return 0;
> +    client->refs--;
> +    virMutexUnlock(&client->lock);
>  }
>  
> -
>  /*
>   * Read data into buffer using wire decoding (plain or TLS)
>   *
> @@ -1857,8 +1772,11 @@ readmore:
>          }
>  
>          /* Move completed message to the end of the dispatch queue */
> -        if (msg)
> +        if (msg) {
> +            client->refs++;
>              qemudClientMessageQueuePush(&client->dx, msg);
> +            ignore_value(virThreadPoolSendJob(server->workerPool, client));
> +        }
>          client->nrequests++;
>  
>          /* Possibly need to create another receive buffer */
> @@ -1870,9 +1788,6 @@ readmore:
>                  client->rx->bufferLength = REMOTE_MESSAGE_HEADER_XDR_LEN;
>  
>              qemudUpdateClientEvent(client);
> -
> -            /* Tell one of the workers to get on with it... */
> -            virCondSignal(&server->job);
>          }
>      }
>  }
> @@ -2311,10 +2226,14 @@ static void *qemudRunLoop(void *opaque) {
>          return NULL;
>      }
>  
> -    for (i = 0 ; i < min_workers ; i++) {
> -        if (qemudStartWorker(server, &server->workers[i]) < 0)
> -            goto cleanup;
> -        server->nactiveworkers++;
> +    server->workerPool = virThreadPoolNew(min_workers,
> +                                          max_workers,
> +                                          qemudWorker,
> +                                          NULL);

Should pass 'server' in here, instead of NULL.

> +    if (!server->workerPool) {
> +        VIR_ERROR0(_("Failed to create thread pool"));
> +        virMutexUnlock(&server->lock);
> +        return NULL;
>      }
>  
>      for (;!server->quitEventThread;) {


A small change in that we no longer kill off idle worker threads,
but the improved simplicity of libvirtd code makes this a worthwhile
tradeoff. So looks good to me aside from the minor locking bug.

Regards,
Daniel




More information about the libvir-list mailing list