[libvirt PATCH v2 34/56] rpc: convert RPC client to use GMainLoop instead of poll

Daniel P. Berrangé berrange at redhat.com
Wed Feb 5 17:18:03 UTC 2020


On Thu, Jan 30, 2020 at 03:51:05PM +0100, Pavel Hrdina wrote:
> On Tue, Jan 28, 2020 at 01:11:15PM +0000, Daniel P. Berrangé wrote:
> > To eliminate the dependancy on GNULIB's poll impl, we need
> > to change the RPC client code to use GMainLoop. We don't
> > really want to use GIOChannel, but it provides the most
> > convenient way to do socket event watches with Windows
> > portability. The other alternative would be to use GSocket
> > but that is a much more complex change affecting libvirt
> > more broadly.
> > 
> > Signed-off-by: Daniel P. Berrangé <berrange at redhat.com>
> > ---
> >  src/rpc/virnetclient.c | 215 ++++++++++++++++++++++-------------------
> >  1 file changed, 113 insertions(+), 102 deletions(-)
> > 
> > diff --git a/src/rpc/virnetclient.c b/src/rpc/virnetclient.c
> > index 031a99711f..9069c57113 100644
> > --- a/src/rpc/virnetclient.c
> > +++ b/src/rpc/virnetclient.c
> > @@ -800,11 +791,7 @@ static void virNetClientCloseInternal(virNetClientPtr client,
> >       * queue and close the client because we set client->wantClose.
> >       */
> 
> This comment should be probably updated to not reference threads.

I'm not sure what you mean here, as the comment looks still
accurate to me.

> 
> >      if (client->haveTheBuck) {
> > -        char ignore = 1;
> > -        size_t len = sizeof(ignore);
> > -
> > -        if (safewrite(client->wakeupSendFD, &ignore, len) != len)
> > -            VIR_ERROR(_("failed to wake up polling thread"));
> > +        g_main_loop_quit(client->eventLoop);
> >      } else {
> >          virNetClientIOEventLoopPassTheBuck(client, NULL);
> >      }
> > @@ -831,13 +818,70 @@ void virNetClientSetSASLSession(virNetClientPtr client,
> >  #endif
> >  
> >  
> > +static gboolean
> > +virNetClientIOEventTLS(int fd,
> > +                       GIOCondition ev,
> > +                       gpointer opaque);
> > +
> > +static gboolean
> > +virNetClientTLSHandshake(virNetClientPtr client)
> > +{
> > +    GIOCondition ev;
> > +    int ret;
> > +
> > +    ret = virNetTLSSessionHandshake(client->tls);
> > +
> > +    if (ret <= 0)
> > +        return FALSE;
> > +
> > +    if (virNetTLSSessionGetHandshakeStatus(client->tls) ==
> > +        VIR_NET_TLS_HANDSHAKE_RECVING)
> > +        ev = G_IO_IN;
> > +    else
> > +        ev = G_IO_OUT;
> > +
> > +    virEventGLibAddSocketWatch(virNetSocketGetFD(client->sock),
> > +                               ev,
> > +                               client->eventCtx,
> > +                               virNetClientIOEventTLS, client, NULL);
> > +
> > +    return TRUE;
> > +}
> > +
> > +
> > +static gboolean
> > +virNetClientIOEventTLS(int fd G_GNUC_UNUSED,
> > +                       GIOCondition ev G_GNUC_UNUSED,
> > +                       gpointer opaque)
> > +{
> > +    virNetClientPtr client = opaque;
> > +
> > +    if (!virNetClientTLSHandshake(client))
> > +        g_main_loop_quit(client->eventLoop);
> > +
> > +    return G_SOURCE_REMOVE;
> > +}
> > +
> > +
> > +static gboolean
> > +virNetClientIOEventTLSConfirm(int fd G_GNUC_UNUSED,
> > +                              GIOCondition ev G_GNUC_UNUSED,
> > +                              gpointer opaque)
> > +{
> > +    virNetClientPtr client = opaque;
> > +
> > +    g_main_loop_quit(client->eventLoop);
> > +
> > +    return G_SOURCE_REMOVE;
> > +}
> > +
> > +
> >  int virNetClientSetTLSSession(virNetClientPtr client,
> >                                virNetTLSContextPtr tls)
> >  {
> >      int ret;
> >      char buf[1];
> >      int len;
> > -    struct pollfd fds[1];
> >  
> >  #ifndef WIN32
> >      sigset_t oldmask, blockedsigs;
> > @@ -860,22 +904,8 @@ int virNetClientSetTLSSession(virNetClientPtr client,
> >  
> >      virNetSocketSetTLSSession(client->sock, client->tls);
> >  
> > -    for (;;) {
> > -        ret = virNetTLSSessionHandshake(client->tls);
> > -
> > -        if (ret < 0)
> > -            goto error;
> > -        if (ret == 0)
> > -            break;
> > -
> > -        fds[0].fd = virNetSocketGetFD(client->sock);
> > -        fds[0].revents = 0;
> > -        if (virNetTLSSessionGetHandshakeStatus(client->tls) ==
> > -            VIR_NET_TLS_HANDSHAKE_RECVING)
> > -            fds[0].events = POLLIN;
> > -        else
> > -            fds[0].events = POLLOUT;
> > -
> > +    virResetLastError();
> > +    if (virNetClientTLSHandshake(client)) {
> >  #ifndef WIN32
> >          /* Block SIGWINCH from interrupting poll in curses programs,
> >           * then restore the original signal mask again immediately
> > @@ -885,16 +915,16 @@ int virNetClientSetTLSSession(virNetClientPtr client,
> >          ignore_value(pthread_sigmask(SIG_BLOCK, &blockedsigs, &oldmask));
> >  #endif /* !WIN32 */
> >  
> > -    repoll:
> > -        ret = poll(fds, G_N_ELEMENTS(fds), -1);
> > -        if (ret < 0 && (errno == EAGAIN || errno == EINTR))
> > -            goto repoll;
> > +        g_main_loop_run(client->eventLoop);
> >  
> >  #ifndef WIN32
> >          ignore_value(pthread_sigmask(SIG_SETMASK, &oldmask, NULL));
> >  #endif /* !WIN32 */
> >      }
> >  
> > +    if (virGetLastErrorCode() != VIR_ERR_OK)
> > +        goto error;
> > +
> >      ret = virNetTLSContextCheckCertificate(tls, client->tls);
> >  
> >      if (ret < 0)
> > @@ -904,19 +934,17 @@ int virNetClientSetTLSSession(virNetClientPtr client,
> >       * etc.  If we make the grade, it will send us a '\1' byte.
> >       */
> >  
> > -    fds[0].fd = virNetSocketGetFD(client->sock);
> > -    fds[0].revents = 0;
> > -    fds[0].events = POLLIN;
> > +    virEventGLibAddSocketWatch(virNetSocketGetFD(client->sock),
> > +                               G_IO_IN,
> > +                               client->eventCtx,
> > +                               virNetClientIOEventTLSConfirm, client, NULL);
> >  
> >  #ifndef WIN32
> >      /* Block SIGWINCH from interrupting poll in curses programs */
> >      ignore_value(pthread_sigmask(SIG_BLOCK, &blockedsigs, &oldmask));
> >  #endif /* !WIN32 */
> >  
> > -    repoll2:
> > -    ret = poll(fds, G_N_ELEMENTS(fds), -1);
> > -    if (ret < 0 && (errno == EAGAIN || errno == EINTR))
> > -        goto repoll2;
> > +    g_main_loop_run(client->eventLoop);
> >  
> >  #ifndef WIN32
> >      ignore_value(pthread_sigmask(SIG_SETMASK, &oldmask, NULL));
> > @@ -1451,12 +1479,12 @@ virNetClientIOHandleInput(virNetClientPtr client)
> >  static bool virNetClientIOEventLoopPollEvents(virNetClientCallPtr call,
> >                                                void *opaque)
> >  {
> > -    struct pollfd *fd = opaque;
> > +    GIOCondition *ev = opaque;
> >  
> >      if (call->mode == VIR_NET_CLIENT_MODE_WAIT_RX)
> > -        fd->events |= POLLIN;
> > +        *ev |= G_IO_IN;
> >      if (call->mode == VIR_NET_CLIENT_MODE_WAIT_TX)
> > -        fd->events |= POLLOUT;
> > +        *ev |= G_IO_OUT;
> >  
> >      return false;
> >  }
> > @@ -1552,6 +1580,18 @@ virNetClientIOEventLoopPassTheBuck(virNetClientPtr client,
> >  }
> >  
> >  
> > +static gboolean
> > +virNetClientIOEventFD(int fd G_GNUC_UNUSED,
> > +                      GIOCondition ev,
> > +                      gpointer opaque)
> > +{
> > +    GIOCondition *rev = opaque;
> > +    *rev = ev;
> > +
> > +    return G_SOURCE_REMOVE;
> > +}
> > +
> > +
> >  /*
> >   * Process all calls pending dispatch/receive until we
> >   * get a reply to our own call. Then quit and pass the buck
> > @@ -1563,21 +1603,17 @@ virNetClientIOEventLoopPassTheBuck(virNetClientPtr client,
> >  static int virNetClientIOEventLoop(virNetClientPtr client,
> >                                     virNetClientCallPtr thiscall)
> >  {
> > -    struct pollfd fds[2];
> >      bool error = false;
> >      int closeReason;
> > -    int ret;
> > -
> > -    fds[0].fd = virNetSocketGetFD(client->sock);
> > -    fds[1].fd = client->wakeupReadFD;
> >  
> >      for (;;) {
> > -        char ignore;
> >  #ifndef WIN32
> >          sigset_t oldmask, blockedsigs;
> >  #endif /* !WIN32 */
> >          int timeout = -1;
> >          virNetMessagePtr msg = NULL;
> > +        GIOCondition ev = 0;
> > +        GIOCondition rev = 0;
> >  
> >          /* If we have existing SASL decoded data we don't want to sleep in
> >           * the poll(), just check if any other FDs are also ready.
> > @@ -1595,22 +1631,22 @@ static int virNetClientIOEventLoop(virNetClientPtr client,
> >          if (timeout == -1)
> >              timeout = virKeepAliveTimeout(client->keepalive);
> >  
> > -        fds[0].events = fds[0].revents = 0;
> > -        fds[1].events = fds[1].revents = 0;
> > -
> > -        fds[1].events = POLLIN;
> > -
> >          /* Calculate poll events for calls */
> >          virNetClientCallMatchPredicate(client->waitDispatch,
> >                                         virNetClientIOEventLoopPollEvents,
> > -                                       &fds[0]);
> > +                                       &ev);
> >  
> >          /* We have to be prepared to receive stream data
> >           * regardless of whether any of the calls waiting
> >           * for dispatch are for streams.
> >           */
> >          if (client->nstreams)
> > -            fds[0].events |= POLLIN;
> > +            ev |= G_IO_IN;
> > +
> > +        virEventGLibAddSocketWatch(virNetSocketGetFD(client->sock),
> > +                                   ev,
> > +                                   client->eventCtx,
> > +                                   virNetClientIOEventFD, &rev, NULL);
> >  
> >          /* Release lock while poll'ing so other threads
> >           * can stuff themselves on the queue */
> > @@ -1630,13 +1666,12 @@ static int virNetClientIOEventLoop(virNetClientPtr client,
> >          sigaddset(&blockedsigs, SIGCHLD);
> >  # endif
> >          sigaddset(&blockedsigs, SIGPIPE);
> > +
> >          ignore_value(pthread_sigmask(SIG_BLOCK, &blockedsigs, &oldmask));
> >  #endif /* !WIN32 */
> >  
> > -    repoll:
> > -        ret = poll(fds, G_N_ELEMENTS(fds), timeout);
> > -        if (ret < 0 && (errno == EAGAIN || errno == EINTR))
> > -            goto repoll;
> > +        while (!rev)
> > +            g_main_context_iteration(client->eventCtx, TRUE);
> 
> Is there a reason why we don't use g_main_loop_run() here and use
> g_main_loop_quit() in virNetClientIOEventFD() the same way we use it
> in virNetClientIOEventTLSConfirm() ?
> 
> If I'm looking at the code correctly the call to g_main_loop_quit() from
> virNetClientIO() where we want to force other threads from poll would be
> ignored by the g_main_context_iteration().  This would be a change in
> behavior from the old core where the write to "client->wakeupSendFD"
> would make the poll() function wake since it is listening on
> "client->wakeupReadFD" as well.

Yeah you are right here.


Regards,
Daniel
-- 
|: https://berrange.com      -o-    https://www.flickr.com/photos/dberrange :|
|: https://libvirt.org         -o-            https://fstop138.berrange.com :|
|: https://entangle-photo.org    -o-    https://www.instagram.com/dberrange :|




More information about the libvir-list mailing list