[libvirt] [PATCH v6 4/4] Using threadpool API to manage qemud worker

Hu Tao hutao at cn.fujitsu.com
Wed Dec 8 06:19:23 UTC 2010


---
 daemon/libvirtd.c |  187 ++++++++---------------------------------------------
 daemon/libvirtd.h |   16 +----
 2 files changed, 30 insertions(+), 173 deletions(-)

diff --git a/daemon/libvirtd.c b/daemon/libvirtd.c
index 791b3dc..229c0cc 100644
--- a/daemon/libvirtd.c
+++ b/daemon/libvirtd.c
@@ -67,6 +67,7 @@
 #include "stream.h"
 #include "hooks.h"
 #include "virtaudit.h"
+#include "threadpool.h"
 #ifdef HAVE_AVAHI
 # include "mdns.h"
 #endif
@@ -248,7 +249,6 @@ static void sig_handler(int sig, siginfo_t * siginfo,
 
 static void qemudDispatchClientEvent(int watch, int fd, int events, void *opaque);
 static void qemudDispatchServerEvent(int watch, int fd, int events, void *opaque);
-static int qemudStartWorker(struct qemud_server *server, struct qemud_worker *worker);
 
 void
 qemudClientMessageQueuePush(struct qemud_client_message **queue,
@@ -842,18 +842,10 @@ static struct qemud_server *qemudInitialize(void) {
         VIR_FREE(server);
         return NULL;
     }
-    if (virCondInit(&server->job) < 0) {
-        VIR_ERROR0(_("cannot initialize condition variable"));
-        virMutexDestroy(&server->lock);
-        VIR_FREE(server);
-        return NULL;
-    }
 
     if (virEventInit() < 0) {
         VIR_ERROR0(_("Failed to initialize event system"));
         virMutexDestroy(&server->lock);
-        if (virCondDestroy(&server->job) < 0)
-        {}
         VIR_FREE(server);
         return NULL;
     }
@@ -1458,19 +1450,6 @@ static int qemudDispatchServer(struct qemud_server *server, struct qemud_socket
 
     server->clients[server->nclients++] = client;
 
-    if (server->nclients > server->nactiveworkers &&
-        server->nactiveworkers < server->nworkers) {
-        for (i = 0 ; i < server->nworkers ; i++) {
-            if (!server->workers[i].hasThread) {
-                if (qemudStartWorker(server, &server->workers[i]) < 0)
-                    return -1;
-                server->nactiveworkers++;
-                break;
-            }
-        }
-    }
-
-
     return 0;
 
 error:
@@ -1534,100 +1513,28 @@ void qemudDispatchClientFailure(struct qemud_client *client) {
     VIR_FREE(client->addrstr);
 }
 
-
-/* Caller must hold server lock */
-static struct qemud_client *qemudPendingJob(struct qemud_server *server)
-{
-    int i;
-    for (i = 0 ; i < server->nclients ; i++) {
-        virMutexLock(&server->clients[i]->lock);
-        if (server->clients[i]->dx) {
-            /* Delibrately don't unlock client - caller wants the lock */
-            return server->clients[i];
-        }
-        virMutexUnlock(&server->clients[i]->lock);
-    }
-    return NULL;
-}
-
-static void *qemudWorker(void *data)
+static void qemudWorker(void *data, void *opaque)
 {
-    struct qemud_worker *worker = data;
-    struct qemud_server *server = worker->server;
-
-    while (1) {
-        struct qemud_client *client = NULL;
-        struct qemud_client_message *msg;
-
-        virMutexLock(&server->lock);
-        while ((client = qemudPendingJob(server)) == NULL) {
-            if (worker->quitRequest ||
-                virCondWait(&server->job, &server->lock) < 0) {
-                virMutexUnlock(&server->lock);
-                return NULL;
-            }
-        }
-        if (worker->quitRequest) {
-            virMutexUnlock(&client->lock);
-            virMutexUnlock(&server->lock);
-            return NULL;
-        }
-        worker->processingCall = 1;
-        virMutexUnlock(&server->lock);
-
-        /* We own a locked client now... */
-        client->refs++;
-
-        /* Remove our message from dispatch queue while we use it */
-        msg = qemudClientMessageQueueServe(&client->dx);
-
-        /* This function drops the lock during dispatch,
-         * and re-acquires it before returning */
-        if (remoteDispatchClientRequest (server, client, msg) < 0) {
-            VIR_FREE(msg);
-            qemudDispatchClientFailure(client);
-            client->refs--;
-            virMutexUnlock(&client->lock);
-            continue;
-        }
-
-        client->refs--;
-        virMutexUnlock(&client->lock);
-
-        virMutexLock(&server->lock);
-        worker->processingCall = 0;
-        virMutexUnlock(&server->lock);
-    }
-}
-
-static int qemudStartWorker(struct qemud_server *server,
-                            struct qemud_worker *worker) {
-    pthread_attr_t attr;
-    pthread_attr_init(&attr);
-    /* We want to join workers, so don't detach them */
-    /*pthread_attr_setdetachstate(&attr, 1);*/
+    struct qemud_server *server = opaque;
+    struct qemud_client *client = data;
+    struct qemud_client_message *msg;
 
-    if (worker->hasThread)
-        return -1;
+    virMutexLock(&client->lock);
 
-    worker->server = server;
-    worker->hasThread = 1;
-    worker->quitRequest = 0;
-    worker->processingCall = 0;
+    /* Remove our message from dispatch queue while we use it */
+    msg = qemudClientMessageQueueServe(&client->dx);
 
-    if (pthread_create(&worker->thread,
-                       &attr,
-                       qemudWorker,
-                       worker) != 0) {
-        worker->hasThread = 0;
-        worker->server = NULL;
-        return -1;
+    /* This function drops the lock during dispatch,
+     * and re-acquires it before returning */
+    if (remoteDispatchClientRequest (server, client, msg) < 0) {
+        VIR_FREE(msg);
+        qemudDispatchClientFailure(client);
     }
 
-    return 0;
+    client->refs--;
+    virMutexUnlock(&client->lock);
 }
 
-
 /*
  * Read data into buffer using wire decoding (plain or TLS)
  *
@@ -1857,8 +1764,11 @@ readmore:
         }
 
         /* Move completed message to the end of the dispatch queue */
-        if (msg)
+        if (msg) {
+            client->refs++;
             qemudClientMessageQueuePush(&client->dx, msg);
+            ignore_value(virThreadPoolSendJob(server->workerPool, client));
+        }
         client->nrequests++;
 
         /* Possibly need to create another receive buffer */
@@ -1870,9 +1780,6 @@ readmore:
                 client->rx->bufferLength = REMOTE_MESSAGE_HEADER_XDR_LEN;
 
             qemudUpdateClientEvent(client);
-
-            /* Tell one of the workers to get on with it... */
-            virCondSignal(&server->job);
         }
     }
 }
@@ -2305,18 +2212,16 @@ static void *qemudRunLoop(void *opaque) {
     if (min_workers > max_workers)
         max_workers = min_workers;
 
-    server->nworkers = max_workers;
-    if (VIR_ALLOC_N(server->workers, server->nworkers) < 0) {
-        VIR_ERROR0(_("Failed to allocate workers"));
+    server->workerPool = virThreadPoolNew(min_workers,
+                                          max_workers,
+                                          qemudWorker,
+                                          server);
+    if (!server->workerPool) {
+        VIR_ERROR0(_("Failed to create thread pool"));
+        virMutexUnlock(&server->lock);
         return NULL;
     }
 
-    for (i = 0 ; i < min_workers ; i++) {
-        if (qemudStartWorker(server, &server->workers[i]) < 0)
-            goto cleanup;
-        server->nactiveworkers++;
-    }
-
     for (;!server->quitEventThread;) {
         /* A shutdown timeout is specified, so check
          * if any drivers have active state, if not
@@ -2367,47 +2272,14 @@ static void *qemudRunLoop(void *opaque) {
                 goto reprocess;
             }
         }
-
-        /* If number of active workers exceeds both the min_workers
-         * threshold and the number of clients, then kill some
-         * off */
-        for (i = 0 ; (i < server->nworkers &&
-                      server->nactiveworkers > server->nclients &&
-                      server->nactiveworkers > min_workers) ; i++) {
-
-            if (server->workers[i].hasThread &&
-                !server->workers[i].processingCall) {
-                server->workers[i].quitRequest = 1;
-
-                virCondBroadcast(&server->job);
-                virMutexUnlock(&server->lock);
-                pthread_join(server->workers[i].thread, NULL);
-                virMutexLock(&server->lock);
-                server->workers[i].hasThread = 0;
-                server->nactiveworkers--;
-            }
-        }
     }
-
-cleanup:
-    for (i = 0 ; i < server->nworkers ; i++) {
-        if (!server->workers[i].hasThread)
-            continue;
-
-        server->workers[i].quitRequest = 1;
-        virCondBroadcast(&server->job);
-
-        virMutexUnlock(&server->lock);
-        pthread_join(server->workers[i].thread, NULL);
-        virMutexLock(&server->lock);
-        server->workers[i].hasThread = 0;
-    }
-    VIR_FREE(server->workers);
     for (i = 0; i < server->nclients; i++)
         qemudFreeClient(server->clients[i]);
     server->nclients = 0;
     VIR_SHRINK_N(server->clients, server->nclients_max, server->nclients_max);
 
+    virThreadPoolFree(server->workerPool);
+    server->workerPool = NULL;
     virMutexUnlock(&server->lock);
     return NULL;
 }
@@ -2475,9 +2347,6 @@ static void qemudCleanup(struct qemud_server *server) {
 
     virStateCleanup();
 
-    if (virCondDestroy(&server->job) < 0) {
-        ;
-    }
     virMutexDestroy(&server->lock);
 
     VIR_FREE(server);
diff --git a/daemon/libvirtd.h b/daemon/libvirtd.h
index af20e56..e4ee63b 100644
--- a/daemon/libvirtd.h
+++ b/daemon/libvirtd.h
@@ -49,6 +49,7 @@
 # include "logging.h"
 # include "threads.h"
 # include "network.h"
+# include "threadpool.h"
 
 # if WITH_DTRACE
 #  ifndef LIBVIRTD_PROBES_H
@@ -266,26 +267,13 @@ struct qemud_socket {
     struct qemud_socket *next;
 };
 
-struct qemud_worker {
-    pthread_t thread;
-    unsigned int hasThread :1;
-    unsigned int processingCall :1;
-    unsigned int quitRequest :1;
-
-    /* back-pointer to our server */
-    struct qemud_server *server;
-};
-
 /* Main server state */
 struct qemud_server {
     virMutex lock;
-    virCond job;
 
     int privileged;
 
-    size_t nworkers;
-    size_t nactiveworkers;
-    struct qemud_worker *workers;
+    virThreadPoolPtr workerPool;
     size_t nsockets;
     struct qemud_socket *sockets;
     size_t nclients;
-- 
1.7.3


-- 
Thanks,
Hu Tao




More information about the libvir-list mailing list