[libvirt PATCH v2 34/56] rpc: convert RPC client to use GMainLoop instead of poll
Daniel P. Berrangé
berrange at redhat.com
Wed Feb 5 17:18:03 UTC 2020
On Thu, Jan 30, 2020 at 03:51:05PM +0100, Pavel Hrdina wrote:
> On Tue, Jan 28, 2020 at 01:11:15PM +0000, Daniel P. Berrangé wrote:
> > To eliminate the dependancy on GNULIB's poll impl, we need
> > to change the RPC client code to use GMainLoop. We don't
> > really want to use GIOChannel, but it provides the most
> > convenient way to do socket event watches with Windows
> > portability. The other alternative would be to use GSocket
> > but that is a much more complex change affecting libvirt
> > more broadly.
> >
> > Signed-off-by: Daniel P. Berrangé <berrange at redhat.com>
> > ---
> > src/rpc/virnetclient.c | 215 ++++++++++++++++++++++-------------------
> > 1 file changed, 113 insertions(+), 102 deletions(-)
> >
> > diff --git a/src/rpc/virnetclient.c b/src/rpc/virnetclient.c
> > index 031a99711f..9069c57113 100644
> > --- a/src/rpc/virnetclient.c
> > +++ b/src/rpc/virnetclient.c
> > @@ -800,11 +791,7 @@ static void virNetClientCloseInternal(virNetClientPtr client,
> > * queue and close the client because we set client->wantClose.
> > */
>
> This comment should be probably updated to not reference threads.
I'm not sure what you mean here, as the comment looks still
accurate to me.
>
> > if (client->haveTheBuck) {
> > - char ignore = 1;
> > - size_t len = sizeof(ignore);
> > -
> > - if (safewrite(client->wakeupSendFD, &ignore, len) != len)
> > - VIR_ERROR(_("failed to wake up polling thread"));
> > + g_main_loop_quit(client->eventLoop);
> > } else {
> > virNetClientIOEventLoopPassTheBuck(client, NULL);
> > }
> > @@ -831,13 +818,70 @@ void virNetClientSetSASLSession(virNetClientPtr client,
> > #endif
> >
> >
> > +static gboolean
> > +virNetClientIOEventTLS(int fd,
> > + GIOCondition ev,
> > + gpointer opaque);
> > +
> > +static gboolean
> > +virNetClientTLSHandshake(virNetClientPtr client)
> > +{
> > + GIOCondition ev;
> > + int ret;
> > +
> > + ret = virNetTLSSessionHandshake(client->tls);
> > +
> > + if (ret <= 0)
> > + return FALSE;
> > +
> > + if (virNetTLSSessionGetHandshakeStatus(client->tls) ==
> > + VIR_NET_TLS_HANDSHAKE_RECVING)
> > + ev = G_IO_IN;
> > + else
> > + ev = G_IO_OUT;
> > +
> > + virEventGLibAddSocketWatch(virNetSocketGetFD(client->sock),
> > + ev,
> > + client->eventCtx,
> > + virNetClientIOEventTLS, client, NULL);
> > +
> > + return TRUE;
> > +}
> > +
> > +
> > +static gboolean
> > +virNetClientIOEventTLS(int fd G_GNUC_UNUSED,
> > + GIOCondition ev G_GNUC_UNUSED,
> > + gpointer opaque)
> > +{
> > + virNetClientPtr client = opaque;
> > +
> > + if (!virNetClientTLSHandshake(client))
> > + g_main_loop_quit(client->eventLoop);
> > +
> > + return G_SOURCE_REMOVE;
> > +}
> > +
> > +
> > +static gboolean
> > +virNetClientIOEventTLSConfirm(int fd G_GNUC_UNUSED,
> > + GIOCondition ev G_GNUC_UNUSED,
> > + gpointer opaque)
> > +{
> > + virNetClientPtr client = opaque;
> > +
> > + g_main_loop_quit(client->eventLoop);
> > +
> > + return G_SOURCE_REMOVE;
> > +}
> > +
> > +
> > int virNetClientSetTLSSession(virNetClientPtr client,
> > virNetTLSContextPtr tls)
> > {
> > int ret;
> > char buf[1];
> > int len;
> > - struct pollfd fds[1];
> >
> > #ifndef WIN32
> > sigset_t oldmask, blockedsigs;
> > @@ -860,22 +904,8 @@ int virNetClientSetTLSSession(virNetClientPtr client,
> >
> > virNetSocketSetTLSSession(client->sock, client->tls);
> >
> > - for (;;) {
> > - ret = virNetTLSSessionHandshake(client->tls);
> > -
> > - if (ret < 0)
> > - goto error;
> > - if (ret == 0)
> > - break;
> > -
> > - fds[0].fd = virNetSocketGetFD(client->sock);
> > - fds[0].revents = 0;
> > - if (virNetTLSSessionGetHandshakeStatus(client->tls) ==
> > - VIR_NET_TLS_HANDSHAKE_RECVING)
> > - fds[0].events = POLLIN;
> > - else
> > - fds[0].events = POLLOUT;
> > -
> > + virResetLastError();
> > + if (virNetClientTLSHandshake(client)) {
> > #ifndef WIN32
> > /* Block SIGWINCH from interrupting poll in curses programs,
> > * then restore the original signal mask again immediately
> > @@ -885,16 +915,16 @@ int virNetClientSetTLSSession(virNetClientPtr client,
> > ignore_value(pthread_sigmask(SIG_BLOCK, &blockedsigs, &oldmask));
> > #endif /* !WIN32 */
> >
> > - repoll:
> > - ret = poll(fds, G_N_ELEMENTS(fds), -1);
> > - if (ret < 0 && (errno == EAGAIN || errno == EINTR))
> > - goto repoll;
> > + g_main_loop_run(client->eventLoop);
> >
> > #ifndef WIN32
> > ignore_value(pthread_sigmask(SIG_SETMASK, &oldmask, NULL));
> > #endif /* !WIN32 */
> > }
> >
> > + if (virGetLastErrorCode() != VIR_ERR_OK)
> > + goto error;
> > +
> > ret = virNetTLSContextCheckCertificate(tls, client->tls);
> >
> > if (ret < 0)
> > @@ -904,19 +934,17 @@ int virNetClientSetTLSSession(virNetClientPtr client,
> > * etc. If we make the grade, it will send us a '\1' byte.
> > */
> >
> > - fds[0].fd = virNetSocketGetFD(client->sock);
> > - fds[0].revents = 0;
> > - fds[0].events = POLLIN;
> > + virEventGLibAddSocketWatch(virNetSocketGetFD(client->sock),
> > + G_IO_IN,
> > + client->eventCtx,
> > + virNetClientIOEventTLSConfirm, client, NULL);
> >
> > #ifndef WIN32
> > /* Block SIGWINCH from interrupting poll in curses programs */
> > ignore_value(pthread_sigmask(SIG_BLOCK, &blockedsigs, &oldmask));
> > #endif /* !WIN32 */
> >
> > - repoll2:
> > - ret = poll(fds, G_N_ELEMENTS(fds), -1);
> > - if (ret < 0 && (errno == EAGAIN || errno == EINTR))
> > - goto repoll2;
> > + g_main_loop_run(client->eventLoop);
> >
> > #ifndef WIN32
> > ignore_value(pthread_sigmask(SIG_SETMASK, &oldmask, NULL));
> > @@ -1451,12 +1479,12 @@ virNetClientIOHandleInput(virNetClientPtr client)
> > static bool virNetClientIOEventLoopPollEvents(virNetClientCallPtr call,
> > void *opaque)
> > {
> > - struct pollfd *fd = opaque;
> > + GIOCondition *ev = opaque;
> >
> > if (call->mode == VIR_NET_CLIENT_MODE_WAIT_RX)
> > - fd->events |= POLLIN;
> > + *ev |= G_IO_IN;
> > if (call->mode == VIR_NET_CLIENT_MODE_WAIT_TX)
> > - fd->events |= POLLOUT;
> > + *ev |= G_IO_OUT;
> >
> > return false;
> > }
> > @@ -1552,6 +1580,18 @@ virNetClientIOEventLoopPassTheBuck(virNetClientPtr client,
> > }
> >
> >
> > +static gboolean
> > +virNetClientIOEventFD(int fd G_GNUC_UNUSED,
> > + GIOCondition ev,
> > + gpointer opaque)
> > +{
> > + GIOCondition *rev = opaque;
> > + *rev = ev;
> > +
> > + return G_SOURCE_REMOVE;
> > +}
> > +
> > +
> > /*
> > * Process all calls pending dispatch/receive until we
> > * get a reply to our own call. Then quit and pass the buck
> > @@ -1563,21 +1603,17 @@ virNetClientIOEventLoopPassTheBuck(virNetClientPtr client,
> > static int virNetClientIOEventLoop(virNetClientPtr client,
> > virNetClientCallPtr thiscall)
> > {
> > - struct pollfd fds[2];
> > bool error = false;
> > int closeReason;
> > - int ret;
> > -
> > - fds[0].fd = virNetSocketGetFD(client->sock);
> > - fds[1].fd = client->wakeupReadFD;
> >
> > for (;;) {
> > - char ignore;
> > #ifndef WIN32
> > sigset_t oldmask, blockedsigs;
> > #endif /* !WIN32 */
> > int timeout = -1;
> > virNetMessagePtr msg = NULL;
> > + GIOCondition ev = 0;
> > + GIOCondition rev = 0;
> >
> > /* If we have existing SASL decoded data we don't want to sleep in
> > * the poll(), just check if any other FDs are also ready.
> > @@ -1595,22 +1631,22 @@ static int virNetClientIOEventLoop(virNetClientPtr client,
> > if (timeout == -1)
> > timeout = virKeepAliveTimeout(client->keepalive);
> >
> > - fds[0].events = fds[0].revents = 0;
> > - fds[1].events = fds[1].revents = 0;
> > -
> > - fds[1].events = POLLIN;
> > -
> > /* Calculate poll events for calls */
> > virNetClientCallMatchPredicate(client->waitDispatch,
> > virNetClientIOEventLoopPollEvents,
> > - &fds[0]);
> > + &ev);
> >
> > /* We have to be prepared to receive stream data
> > * regardless of whether any of the calls waiting
> > * for dispatch are for streams.
> > */
> > if (client->nstreams)
> > - fds[0].events |= POLLIN;
> > + ev |= G_IO_IN;
> > +
> > + virEventGLibAddSocketWatch(virNetSocketGetFD(client->sock),
> > + ev,
> > + client->eventCtx,
> > + virNetClientIOEventFD, &rev, NULL);
> >
> > /* Release lock while poll'ing so other threads
> > * can stuff themselves on the queue */
> > @@ -1630,13 +1666,12 @@ static int virNetClientIOEventLoop(virNetClientPtr client,
> > sigaddset(&blockedsigs, SIGCHLD);
> > # endif
> > sigaddset(&blockedsigs, SIGPIPE);
> > +
> > ignore_value(pthread_sigmask(SIG_BLOCK, &blockedsigs, &oldmask));
> > #endif /* !WIN32 */
> >
> > - repoll:
> > - ret = poll(fds, G_N_ELEMENTS(fds), timeout);
> > - if (ret < 0 && (errno == EAGAIN || errno == EINTR))
> > - goto repoll;
> > + while (!rev)
> > + g_main_context_iteration(client->eventCtx, TRUE);
>
> Is there a reason why we don't use g_main_loop_run() here and use
> g_main_loop_quit() in virNetClientIOEventFD() the same way we use it
> in virNetClientIOEventTLSConfirm() ?
>
> If I'm looking at the code correctly the call to g_main_loop_quit() from
> virNetClientIO() where we want to force other threads from poll would be
> ignored by the g_main_context_iteration(). This would be a change in
> behavior from the old core where the write to "client->wakeupSendFD"
> would make the poll() function wake since it is listening on
> "client->wakeupReadFD" as well.
Yeah you are right here.
Regards,
Daniel
--
|: https://berrange.com -o- https://www.flickr.com/photos/dberrange :|
|: https://libvirt.org -o- https://fstop138.berrange.com :|
|: https://entangle-photo.org -o- https://www.instagram.com/dberrange :|
More information about the libvir-list
mailing list