[libvirt] [PATCH v4 RESEND 4/4] Using threadpool API to manage qemud worker
Daniel P. Berrange
berrange at redhat.com
Thu Dec 2 12:42:19 UTC 2010
On Thu, Dec 02, 2010 at 03:30:23PM +0800, Hu Tao wrote:
> ---
> daemon/libvirtd.c | 172 +++++++++--------------------------------------------
> daemon/libvirtd.h | 4 +
> 2 files changed, 33 insertions(+), 143 deletions(-)
>
> diff --git a/daemon/libvirtd.c b/daemon/libvirtd.c
> index 791b3dc..dbd050a 100644
> --- a/daemon/libvirtd.c
> +++ b/daemon/libvirtd.c
> @@ -67,6 +67,7 @@
> #include "stream.h"
> #include "hooks.h"
> #include "virtaudit.h"
> +#include "threadpool.h"
> #ifdef HAVE_AVAHI
> # include "mdns.h"
> #endif
> @@ -248,7 +249,6 @@ static void sig_handler(int sig, siginfo_t * siginfo,
>
> static void qemudDispatchClientEvent(int watch, int fd, int events, void *opaque);
> static void qemudDispatchServerEvent(int watch, int fd, int events, void *opaque);
> -static int qemudStartWorker(struct qemud_server *server, struct qemud_worker *worker);
>
> void
> qemudClientMessageQueuePush(struct qemud_client_message **queue,
> @@ -1383,6 +1383,7 @@ static int qemudDispatchServer(struct qemud_server *server, struct qemud_socket
> client->auth = sock->auth;
> client->addr = addr;
> client->addrstr = addrstr;
> + client->server = server;
> addrstr = NULL;
This shouldn't be needed, as 'server' shoudl be passed into
the worker function via the 'void *opaque' parameter.
>
> for (i = 0 ; i < VIR_DOMAIN_EVENT_ID_LAST ; i++) {
> @@ -1458,19 +1459,6 @@ static int qemudDispatchServer(struct qemud_server *server, struct qemud_socket
>
> server->clients[server->nclients++] = client;
>
> - if (server->nclients > server->nactiveworkers &&
> - server->nactiveworkers < server->nworkers) {
> - for (i = 0 ; i < server->nworkers ; i++) {
> - if (!server->workers[i].hasThread) {
> - if (qemudStartWorker(server, &server->workers[i]) < 0)
> - return -1;
> - server->nactiveworkers++;
> - break;
> - }
> - }
> - }
> -
> -
> return 0;
>
> error:
> @@ -1534,100 +1522,27 @@ void qemudDispatchClientFailure(struct qemud_client *client) {
> VIR_FREE(client->addrstr);
> }
>
> -
> -/* Caller must hold server lock */
> -static struct qemud_client *qemudPendingJob(struct qemud_server *server)
> +static void qemudWorker(void *data, void *opaque ATTRIBUTE_UNUSED)
> {
> - int i;
> - for (i = 0 ; i < server->nclients ; i++) {
> - virMutexLock(&server->clients[i]->lock);
> - if (server->clients[i]->dx) {
> - /* Delibrately don't unlock client - caller wants the lock */
> - return server->clients[i];
> - }
> - virMutexUnlock(&server->clients[i]->lock);
> - }
> - return NULL;
> -}
> + struct qemud_client *client = data;
> + struct qemud_client_message *msg;
>
> -static void *qemudWorker(void *data)
> -{
> - struct qemud_worker *worker = data;
> - struct qemud_server *server = worker->server;
> + virMutexLock(&client->lock);
It is neccessary to hold the lock on 'server' before obtaining a
lock on 'client'. The server lock can be released again immediately
if no longer needed.
>
> - while (1) {
> - struct qemud_client *client = NULL;
> - struct qemud_client_message *msg;
> + /* Remove our message from dispatch queue while we use it */
> + msg = qemudClientMessageQueueServe(&client->dx);
>
> - virMutexLock(&server->lock);
> - while ((client = qemudPendingJob(server)) == NULL) {
> - if (worker->quitRequest ||
> - virCondWait(&server->job, &server->lock) < 0) {
> - virMutexUnlock(&server->lock);
> - return NULL;
> - }
> - }
> - if (worker->quitRequest) {
> - virMutexUnlock(&client->lock);
> - virMutexUnlock(&server->lock);
> - return NULL;
> - }
> - worker->processingCall = 1;
> - virMutexUnlock(&server->lock);
> -
> - /* We own a locked client now... */
> - client->refs++;
> -
> - /* Remove our message from dispatch queue while we use it */
> - msg = qemudClientMessageQueueServe(&client->dx);
> -
> - /* This function drops the lock during dispatch,
> - * and re-acquires it before returning */
> - if (remoteDispatchClientRequest (server, client, msg) < 0) {
> - VIR_FREE(msg);
> - qemudDispatchClientFailure(client);
> - client->refs--;
> - virMutexUnlock(&client->lock);
> - continue;
> - }
> -
> - client->refs--;
> - virMutexUnlock(&client->lock);
> -
> - virMutexLock(&server->lock);
> - worker->processingCall = 0;
> - virMutexUnlock(&server->lock);
> - }
> -}
> -
> -static int qemudStartWorker(struct qemud_server *server,
> - struct qemud_worker *worker) {
> - pthread_attr_t attr;
> - pthread_attr_init(&attr);
> - /* We want to join workers, so don't detach them */
> - /*pthread_attr_setdetachstate(&attr, 1);*/
> -
> - if (worker->hasThread)
> - return -1;
> -
> - worker->server = server;
> - worker->hasThread = 1;
> - worker->quitRequest = 0;
> - worker->processingCall = 0;
> -
> - if (pthread_create(&worker->thread,
> - &attr,
> - qemudWorker,
> - worker) != 0) {
> - worker->hasThread = 0;
> - worker->server = NULL;
> - return -1;
> + /* This function drops the lock during dispatch,
> + * and re-acquires it before returning */
> + if (remoteDispatchClientRequest (client->server, client, msg) < 0) {
> + VIR_FREE(msg);
> + qemudDispatchClientFailure(client);
> }
>
> - return 0;
> + client->refs--;
> + virMutexUnlock(&client->lock);
> }
>
> -
> /*
> * Read data into buffer using wire decoding (plain or TLS)
> *
> @@ -1857,8 +1772,11 @@ readmore:
> }
>
> /* Move completed message to the end of the dispatch queue */
> - if (msg)
> + if (msg) {
> + client->refs++;
> qemudClientMessageQueuePush(&client->dx, msg);
> + ignore_value(virThreadPoolSendJob(server->workerPool, client));
> + }
> client->nrequests++;
>
> /* Possibly need to create another receive buffer */
> @@ -1870,9 +1788,6 @@ readmore:
> client->rx->bufferLength = REMOTE_MESSAGE_HEADER_XDR_LEN;
>
> qemudUpdateClientEvent(client);
> -
> - /* Tell one of the workers to get on with it... */
> - virCondSignal(&server->job);
> }
> }
> }
> @@ -2311,10 +2226,14 @@ static void *qemudRunLoop(void *opaque) {
> return NULL;
> }
>
> - for (i = 0 ; i < min_workers ; i++) {
> - if (qemudStartWorker(server, &server->workers[i]) < 0)
> - goto cleanup;
> - server->nactiveworkers++;
> + server->workerPool = virThreadPoolNew(min_workers,
> + max_workers,
> + qemudWorker,
> + NULL);
Should pass 'server' in here, instead of NULL.
> + if (!server->workerPool) {
> + VIR_ERROR0(_("Failed to create thread pool"));
> + virMutexUnlock(&server->lock);
> + return NULL;
> }
>
> for (;!server->quitEventThread;) {
A small change in that we no longer kill off idle worker threads,
but the improved simplicity of libvirtd code makes this a worthwhile
tradeoff. So looks good to me aside from the minor locking bug.
Regards,
Daniel
More information about the libvir-list
mailing list