[libvirt] [PATCH 6/6 (v2)] Allow non-blocking message sending on virNetClient

Jiri Denemark jdenemar at redhat.com
Tue Nov 15 15:20:50 UTC 2011


On Mon, Nov 14, 2011 at 12:03:52 +0000, Daniel P. Berrange wrote:
> From: "Daniel P. Berrange" <berrange at redhat.com>
> 
> Split the existing virNetClientSend into two parts
> virNetClientSend and virNetClientSendNoReply, instead
> of having a 'bool expectReply' parameter.

This is done by a separate patch, so remove the paragraph.

> Add a new virNetClientSendNonBlock which returns 2 on
> full send, 1 on partial send, 0 on no send, -1 on error
> 
> If a partial send occurs, then a subsequent call to any
> of the virNetClientSend* APIs will finish any outstanding
> I/O.
> 
> TODO: the virNetClientEvent event handler could be used
> to speed up completion of partial sends if an event loop
> is present.
> 
> In v2:
>    - Fix logic in virNetClientIOEventLoopRemoveNonBlocking
> 
> * src/rpc/virnetclientprogram.c, src/rpc/virnetclientstream.c:
>   Update for changed API

Neither of these is really changed by this patch.

> ---
>  src/rpc/virnetclient.c |  200 +++++++++++++++++++++++++++++++++++++++++++-----
>  src/rpc/virnetclient.h |    4 +
>  src/rpc/virnetsocket.c |   13 +++
>  src/rpc/virnetsocket.h |    1 +
>  4 files changed, 200 insertions(+), 18 deletions(-)
> 
> diff --git a/src/rpc/virnetclient.c b/src/rpc/virnetclient.c
> index 96d1886..17105fe 100644
> --- a/src/rpc/virnetclient.c
> +++ b/src/rpc/virnetclient.c
> @@ -55,6 +55,9 @@ struct _virNetClientCall {
>  
>      virNetMessagePtr msg;
>      bool expectReply;
> +    bool nonBlock;
> +    bool haveThread;
> +    bool sentSomeData;
>  
>      virCond cond;
>  
> @@ -86,7 +89,12 @@ struct _virNetClient {
>      int wakeupSendFD;
>      int wakeupReadFD;
>  
> -    /* List of threads currently waiting for dispatch */
> +    /*
> +     * List of calls currently waiting for dispatch
> +     * The calls should all have threads waiting for
> +     * them, except possibly the first call in the list
> +     * which might be a partially sent non-blocking call.
> +     */
>      virNetClientCallPtr waitDispatch;
>      /* True if a thread holds the buck */
>      bool haveTheBuck;
> @@ -648,7 +656,7 @@ virNetClientCallDispatchReply(virNetClientPtr client)
>      virNetClientCallPtr thecall;
>  
>      /* Ok, definitely got an RPC reply now find
> -       out who's been waiting for it */
> +       out which waiting call is associated with it */
>      thecall = client->waitDispatch;
>      while (thecall &&
>             !(thecall->msg->header.prog == client->msg.header.prog &&
> @@ -824,6 +832,8 @@ virNetClientIOWriteMessage(virNetClientPtr client,
>          ret = virNetSocketWrite(client->sock,
>                                  thecall->msg->buffer + thecall->msg->bufferOffset,
>                                  thecall->msg->bufferLength - thecall->msg->bufferOffset);
> +        if (ret || virNetSocketHasPendingData(client->sock))
> +            thecall->sentSomeData = true;

Should the above condition check for just ret > 0? It won't make any
difference in practise since we are in error condition anyway but it seems
more logical.

>          if (ret <= 0)
>              return ret;
>  
> @@ -1015,17 +1025,69 @@ static bool virNetClientIOEventLoopRemoveDone(virNetClientCallPtr call,
>          return false;
>  
>      /*
> -     * ...they won't actually wakeup until
> +     * ...if the call being removed from the list
> +     * still has a thread, then wake that thread up,
> +     * otherwise free the call. The latter should
> +     * only happen for calls without replies.
> +     *
> +     * ...the threads won't actually wakeup until
>       * we release our mutex a short while
>       * later...
>       */
> -    VIR_DEBUG("Waking up sleeping call %p", call);
> -    virCondSignal(&call->cond);
> +    if (call->haveThread) {
> +        VIR_DEBUG("Waking up sleep %p", call);
> +        virCondSignal(&call->cond);
> +    } else {
> +        if (call->expectReply)
> +            VIR_WARN("Got a call expecting a reply but without a waiting thread");
> +        ignore_value(virCondDestroy(&call->cond));
> +        VIR_FREE(call);

We should also free the associated message.

> +    }
>  
>      return true;
>  }
>  
>  
> +static bool virNetClientIOEventLoopRemoveNonBlocking(virNetClientCallPtr call,
> +                                                     void *opaque)
> +{
> +    virNetClientCallPtr thiscall = opaque;
> +
> +    if (call == thiscall)
> +        return false;
> +
> +    if (!call->nonBlock)
> +        return false;
> +
> +    if (call->sentSomeData) {
> +        /*
> +         * If some data has been sent we must keep it in the list,
> +         * but still wakeup any thread
> +         */
> +        if (call->haveThread) {
> +            VIR_DEBUG("Waking up sleep %p", call);
> +            virCondSignal(&call->cond);
> +        }
> +        return false;
> +    } else {
> +        /*
> +         * If no data has been sent, we can remove it from the list.
> +         * Wakup any thread, otherwise free the caller ourselves
> +         */
> +        if (call->haveThread) {
> +            VIR_DEBUG("Waking up sleep %p", call);
> +            virCondSignal(&call->cond);
> +        } else {
> +            if (call->expectReply)
> +                VIR_WARN("Got a call expecting a reply but without a waiting thread");
> +            ignore_value(virCondDestroy(&call->cond));
> +            VIR_FREE(call);

We should also free the associated message.

> +        }
> +        return true;
> +    }
> +}
> +
> +
>  static void virNetClientIOEventLoopPassTheBuck(virNetClientPtr client, virNetClientCallPtr thiscall)
>  {
>      VIR_DEBUG("Giving up the buck %p", thiscall);
> @@ -1033,19 +1095,29 @@ static void virNetClientIOEventLoopPassTheBuck(virNetClientPtr client, virNetCli
>      /* See if someone else is still waiting
>       * and if so, then pass the buck ! */
>      while (tmp) {
> -        if (tmp != thiscall) {
> +        if (tmp != thiscall && tmp->haveThread) {
>              VIR_DEBUG("Passing the buck to %p", tmp);
>              virCondSignal(&tmp->cond);
>              break;
>          }
>          tmp = tmp->next;
>      }
> +    VIR_DEBUG("No thread to pass the buck to");
> +}
> +
> +
> +static bool virNetClientIOEventLoopWantNonBlock(virNetClientCallPtr call, void *opaque ATTRIBUTE_UNUSED)
> +{
> +    return call->nonBlock;
>  }
>  
>  /*
>   * Process all calls pending dispatch/receive until we
>   * get a reply to our own call. Then quit and pass the buck
>   * to someone else.
> + *
> + * Returns 2 if fully sent, 1 if partially sent (only for nonBlock==true),
> + * 0 if nothing sent (only for nonBlock==true) and -1 on error
>   */
>  static int virNetClientIOEventLoop(virNetClientPtr client,
>                                     virNetClientCallPtr thiscall)
> @@ -1068,6 +1140,14 @@ static int virNetClientIOEventLoop(virNetClientPtr client,
>          if (virNetSocketHasCachedData(client->sock))
>              timeout = 0;
>  
> +        /* If there are any non-blocking calls in the queue,
> +         * then we don't want to sleep in poll()
> +         */
> +        if (virNetClientCallMatchPredicate(client->waitDispatch,
> +                                           virNetClientIOEventLoopWantNonBlock,
> +                                           NULL))
> +            timeout = 0;
> +
>          fds[0].events = fds[0].revents = 0;
>          fds[1].events = fds[1].revents = 0;
>  
> @@ -1116,8 +1196,9 @@ static int virNetClientIOEventLoop(virNetClientPtr client,
>          /* If we have existing SASL decoded data, pretend
>           * the socket became readable so we consume it
>           */
> -        if (virNetSocketHasCachedData(client->sock))
> +        if (virNetSocketHasCachedData(client->sock)) {
>              fds[0].revents |= POLLIN;
> +        }
>  
>          if (fds[1].revents) {
>              VIR_DEBUG("Woken up from poll by other thread");
> @@ -1129,6 +1210,7 @@ static int virNetClientIOEventLoop(virNetClientPtr client,
>          }
>  
>          if (ret < 0) {
> +            /* XXX what's this dubious errno check doing ? */
>              if (errno == EWOULDBLOCK)
>                  continue;

Yeah, I don't understand this either. According to poll(2,3p) man pages, poll
doesn't seem to ever set EWOULDBLOCK to errno.

>              virReportSystemError(errno,
> @@ -1146,20 +1228,33 @@ static int virNetClientIOEventLoop(virNetClientPtr client,
>                  goto error;
>          }
>  
> -        /* Iterate through waiting threads and if
> -         * any are complete then tell 'em to wakeup
> +        /* Iterate through waiting calls and if any are
> +         * complete, remove them from the dispatch list..
>           */
>          virNetClientCallRemovePredicate(&client->waitDispatch,
>                                          virNetClientIOEventLoopRemoveDone,
>                                          thiscall);
>  
> +        /* Iterate through waiting calls and if any are
> +         * non-blocking, remove them from the dispatch list...
> +         */
> +        virNetClientCallRemovePredicate(&client->waitDispatch,
> +                                        virNetClientIOEventLoopRemoveNonBlocking,
> +                                        thiscall);
> +
>          /* Now see if *we* are done */
>          if (thiscall->mode == VIR_NET_CLIENT_MODE_COMPLETE) {
>              virNetClientCallRemove(&client->waitDispatch, thiscall);
>              virNetClientIOEventLoopPassTheBuck(client, thiscall);
> -            return 0;
> +            return 2;
>          }
>  
> +        /* We're not done, but we're non-blocking */
> +        if (thiscall->nonBlock) {
> +            VIR_DEBUG("Giving up the buck %p %p", thiscall, client->waitDispatch);

This debug line is redundant since virNetClientIOEventLoopPassTheBuck will
print similar message itself.

> +            virNetClientIOEventLoopPassTheBuck(client, thiscall);
> +            return thiscall->sentSomeData ? 1 : 0;
> +        }
>  
>          if (fds[0].revents & (POLLHUP | POLLERR)) {
>              virNetError(VIR_ERR_INTERNAL_ERROR, "%s",
...

So to sum this up, when a non blocking message is being sent while another
thread has the buck, we put the message in the queue, wake up the dispatching
thread, which will try to send all it can without blocking and signal us back.
We than check whether the message was sent completely, partially or not at
all. In other words, we do the same as if no thread had the buck but instead
of doing it in our thread, we offload it to the dispatching thread.

In case my analysis is right, ACK with the nits fixed.

Jirka




More information about the libvir-list mailing list