[libvirt] [PATCH 1/3 v2] daemon: Create priority workers pool

Daniel P. Berrange berrange at redhat.com
Thu Aug 25 14:44:51 UTC 2011


On Tue, Aug 23, 2011 at 08:22:01PM +0200, Michal Privoznik wrote:
> This patch annotates APIs with low or high priority.
> In low set MUST be all APIs which might eventually access monitor
> (and thus block indefinitely). Other APIs may be marked as high
> priority. However, some must be (e.g. domainDestroy).
> 
> For high priority calls (HPC), there is new thread pool created.
> HPC tries to land in usual thread pool firstly. The condition here
> is it contains at least one free worker. As soon as it doesn't,
> HPCs are placed into the new pool. Therefore, only those APIs which
> are guaranteed to end in reasonable small amount of time can be marked
> as HPC.
> 
> The size of this HPC pool is static, because HPC are expected to end
> quickly, therefore jobs assigned to this pool will be served quickly.
> It can be configured in libvirtd.conf via prio_workers variable.
> Default is set to 5.
> 
> To mark API with low or high priority, append priority:{low|high} to
> it's comment in src/remote/remote_protocol.x. This is similar to
> autogen|skipgen.
> ---
>  daemon/libvirtd.aug           |    1 +
>  daemon/libvirtd.c             |   10 +-
>  daemon/libvirtd.conf          |    6 +
>  daemon/remote.c               |   26 ++
>  daemon/remote.h               |    2 +
>  src/qemu/qemu_process.c       |    2 +-
>  src/remote/qemu_protocol.x    |   13 +-
>  src/remote/remote_protocol.x  |  544 +++++++++++++++++++++--------------------
>  src/rpc/gendispatch.pl        |   20 ++-
>  src/rpc/virnetserver.c        |   32 +++-
>  src/rpc/virnetserver.h        |    6 +-
>  src/rpc/virnetserverprogram.h |    1 +
>  src/util/threadpool.c         |   38 ++-
>  src/util/threadpool.h         |    1 +
>  14 files changed, 411 insertions(+), 291 deletions(-)
> 

> diff --git a/daemon/libvirtd.c b/daemon/libvirtd.c
> index 0530ba5..3a6233d 100644
> --- a/daemon/libvirtd.c
> +++ b/daemon/libvirtd.c
> @@ -1433,7 +1439,9 @@ int main(int argc, char **argv) {
>                                  config->max_clients,
>                                  config->mdns_adv ? config->mdns_name : NULL,
>                                  use_polkit_dbus,
> -                                remoteClientInitHook))) {
> +                                remoteClientInitHook,
> +                                config->prio_workers,
> +                                remoteGetProcPriority))) {
>          ret = VIR_DAEMON_ERR_INIT;
>          goto cleanup;
>      }

> diff --git a/daemon/remote.c b/daemon/remote.c
> index 0f088c6..9b1433b 100644
> --- a/daemon/remote.c
> +++ b/daemon/remote.c
> @@ -473,6 +473,32 @@ int remoteClientInitHook(virNetServerPtr srv ATTRIBUTE_UNUSED,
>      return 0;
>  }
>  
> +unsigned int remoteGetProcPriority(virNetMessageHeaderPtr hdr)
> +{
> +    u_int prog = hdr->prog;
> +    int proc = hdr->proc;
> +    virNetServerProgramProc *table = NULL;
> +    size_t max = 0;
> +
> +    switch (prog) {
> +        case REMOTE_PROGRAM:
> +            table = remoteProcs;
> +            max = remoteNProcs;
> +            break;
> +        case QEMU_PROGRAM:
> +            table = qemuProcs;
> +            max = qemuNProcs;
> +            break;
> +    }
> +
> +    if (!table || !max)
> +        return 0;
> +    if (proc >= max)
> +        return 0;
> +
> +    return table[proc].priority;
> +}

This is still not quite the right design. Since the priority information
is directly in the virNetServerProgramProc table, there should be no
need to pass in a function callback to virNetServerNew(). It already has
a list of programs, so it should be able to ask for the priority directly.


> diff --git a/src/rpc/virnetserver.c b/src/rpc/virnetserver.c
> index 1a49dbb..b8150b7 100644
> --- a/src/rpc/virnetserver.c
> +++ b/src/rpc/virnetserver.c
> @@ -182,6 +185,7 @@ static int virNetServerDispatchNewMessage(virNetServerClientPtr client,
>  {
>      virNetServerPtr srv = opaque;
>      virNetServerJobPtr job;
> +    bool priority = false;
>      int ret;
>  
>      VIR_DEBUG("server=%p client=%p message=%p",
> @@ -192,11 +196,25 @@ static int virNetServerDispatchNewMessage(virNetServerClientPtr client,
>          return -1;
>      }
>  
> +    if (srv->prio_func)
> +        priority = srv->prio_func(&msg->header);
> +

IIUC, the problem you're facing is that the 'virNetServerDispatchNewMessage'
method does not yet know what virNetServerProgramPtr is associated with this
RPC call, because that lookup is not done until the virNetServerHandleJob
method is run in the worker.

To address this, we should put move that bit of code. So specifically

  - virNetServerJobPtr struct should gain a virNetServerProgramPtr

  - virNetServerDispatchNewMessage should take the following code
    from virNetServerHandleJob

    for (i = 0 ; i < srv->nprograms ; i++) {
        if (virNetServerProgramMatches(srv->programs[i], job->msg)) {
            prog = srv->programs[i];
            break;
        }
    }

    if (!prog) {
        VIR_DEBUG("Cannot find program %d version %d",
                  job->msg->header.prog,
                  job->msg->header.vers);
        goto error;
    }

    virNetServerProgramRef(prog);

    and then store 'prog' in job->prog

  - virNetServerHandleJob should unref  job->prog when it is done

  - virNetServerDispatchNewMessage can now simply call

        virNetServerProgramGetPriority(prog, xdr->header.proc);

    to obtain the priority of the procedure.

We now do not need any special callback


> diff --git a/src/util/threadpool.c b/src/util/threadpool.c
> index 8217591..ad2d249 100644
> --- a/src/util/threadpool.c
> +++ b/src/util/threadpool.c
> @@ -185,27 +185,41 @@ void virThreadPoolFree(virThreadPoolPtr pool)
>      VIR_FREE(pool);
>  }
>  
> +/*
> + * @only_if_free - place job in pool iff there is
> + * a free worker(s).
> + *
> + * Return: 0 on success,
> + * -1 if no free worker available but requested
> + * -2 otherwise
> + */
>  int virThreadPoolSendJob(virThreadPoolPtr pool,
> +                         bool only_if_free,
>                           void *jobData)
>  {
>      virThreadPoolJobPtr job;
> +    int ret = -2;
>  
>      virMutexLock(&pool->mutex);
>      if (pool->quit)
>          goto error;
>  
> -    if (pool->freeWorkers == 0 &&
> -        pool->nWorkers < pool->maxWorkers) {
> -        if (VIR_EXPAND_N(pool->workers, pool->nWorkers, 1) < 0) {
> -            virReportOOMError();
> -            goto error;
> -        }
> +    if (pool->freeWorkers == 0) {
> +        if (pool->nWorkers < pool->maxWorkers) {
> +            if (VIR_EXPAND_N(pool->workers, pool->nWorkers, 1) < 0) {
> +                virReportOOMError();
> +                goto error;
> +            }

>From my previous mail, I think this needs to be

    if ((pool->freeWorkers - pool->jobQueueDepth) <= 0) {
       ....
    }


(where jobQueueDepth is a new counter we have to introduce)

Daniel
-- 
|: http://berrange.com      -o-    http://www.flickr.com/photos/dberrange/ :|
|: http://libvirt.org              -o-             http://virt-manager.org :|
|: http://autobuild.org       -o-         http://search.cpan.org/~danberr/ :|
|: http://entangle-photo.org       -o-       http://live.gnome.org/gtk-vnc :|




More information about the libvir-list mailing list