[libvirt PATCH 09/11] qemu: convert monitor to use the per-VM event loop

Daniel P. Berrangé berrange at redhat.com
Fri Feb 14 12:52:07 UTC 2020


This converts the QEMU monitor APIs to use the per-VM
event loop, which involves switching from virEvent APIs
to GMainContext / GSource APIs.

A GSocket is used as a convenient way to create a GSource
for a socket, but is not yet used for actual I/O.

Signed-off-by: Daniel P. Berrangé <berrange at redhat.com>
---
 src/qemu/qemu_monitor.c      | 145 ++++++++++++++++-------------------
 src/qemu/qemu_monitor.h      |   3 +-
 src/qemu/qemu_process.c      |   6 +-
 tests/qemumonitortestutils.c |   1 +
 4 files changed, 71 insertions(+), 84 deletions(-)

diff --git a/src/qemu/qemu_monitor.c b/src/qemu/qemu_monitor.c
index bf53962872..d969853963 100644
--- a/src/qemu/qemu_monitor.c
+++ b/src/qemu/qemu_monitor.c
@@ -24,6 +24,7 @@
 #include <poll.h>
 #include <unistd.h>
 #include <fcntl.h>
+#include <gio/gio.h>
 
 #include "qemu_monitor.h"
 #include "qemu_monitor_text.h"
@@ -71,12 +72,9 @@ struct _qemuMonitor {
 
     int fd;
 
-    /* Represents the watch number to be used for updating and
-     * unregistering the monitor @fd for events in the event loop:
-     * > 0: valid watch number
-     * = 0: not registered
-     * < 0: an error occurred during the registration of @fd */
-    int watch;
+    GMainContext *context;
+    GSocket *socket;
+    GSource *watch;
 
     virDomainObjPtr vm;
 
@@ -226,6 +224,7 @@ qemuMonitorDispose(void *obj)
         (mon->cb->destroy)(mon, mon->vm, mon->callbackOpaque);
     virObjectUnref(mon->vm);
 
+    g_main_context_unref(mon->context);
     virResetError(&mon->lastError);
     virCondDestroy(&mon->notify);
     VIR_FREE(mon->buffer);
@@ -509,27 +508,16 @@ qemuMonitorIORead(qemuMonitorPtr mon)
 static void
 qemuMonitorUpdateWatch(qemuMonitorPtr mon)
 {
-    int events =
-        VIR_EVENT_HANDLE_HANGUP |
-        VIR_EVENT_HANDLE_ERROR;
-
-    if (!mon->watch)
-        return;
-
-    if (mon->lastError.code == VIR_ERR_OK) {
-        events |= VIR_EVENT_HANDLE_READABLE;
-
-        if ((mon->msg && mon->msg->txOffset < mon->msg->txLength) &&
-            !mon->waitGreeting)
-            events |= VIR_EVENT_HANDLE_WRITABLE;
-    }
-
-    virEventUpdateHandle(mon->watch, events);
+    qemuMonitorUnregister(mon);
+    if (mon->socket)
+        qemuMonitorRegister(mon);
 }
 
 
-static void
-qemuMonitorIO(int watch, int fd, int events, void *opaque)
+static gboolean
+qemuMonitorIO(GSocket *socket G_GNUC_UNUSED,
+              GIOCondition cond,
+              gpointer opaque)
 {
     qemuMonitorPtr mon = opaque;
     bool error = false;
@@ -541,39 +529,29 @@ qemuMonitorIO(int watch, int fd, int events, void *opaque)
     /* lock access to the monitor and protect fd */
     virObjectLock(mon);
 #if DEBUG_IO
-    VIR_DEBUG("Monitor %p I/O on watch %d fd %d events %d", mon, watch, fd, events);
+    VIR_DEBUG("Monitor %p I/O on socket %p cond %d", mon, socket, cond);
 #endif
-    if (mon->fd == -1 || mon->watch == 0) {
+    if (mon->fd == -1 || !mon->watch) {
         virObjectUnlock(mon);
         virObjectUnref(mon);
-        return;
+        return G_SOURCE_REMOVE;
     }
 
-    if (mon->fd != fd || mon->watch != watch) {
-        if (events & (VIR_EVENT_HANDLE_HANGUP | VIR_EVENT_HANDLE_ERROR))
-            eof = true;
-        virReportError(VIR_ERR_INTERNAL_ERROR,
-                       _("event from unexpected fd %d!=%d / watch %d!=%d"),
-                       mon->fd, fd, mon->watch, watch);
-        error = true;
-    } else if (mon->lastError.code != VIR_ERR_OK) {
-        if (events & (VIR_EVENT_HANDLE_HANGUP | VIR_EVENT_HANDLE_ERROR))
+    if (mon->lastError.code != VIR_ERR_OK) {
+        if (cond & (G_IO_HUP | G_IO_ERR))
             eof = true;
         error = true;
     } else {
-        if (events & VIR_EVENT_HANDLE_WRITABLE) {
+        if (cond & G_IO_OUT) {
             if (qemuMonitorIOWrite(mon) < 0) {
                 error = true;
                 if (errno == ECONNRESET)
                     hangup = true;
             }
-            events &= ~VIR_EVENT_HANDLE_WRITABLE;
         }
 
-        if (!error &&
-            events & VIR_EVENT_HANDLE_READABLE) {
+        if (!error && cond & G_IO_IN) {
             int got = qemuMonitorIORead(mon);
-            events &= ~VIR_EVENT_HANDLE_READABLE;
             if (got < 0) {
                 error = true;
                 if (errno == ECONNRESET)
@@ -581,37 +559,29 @@ qemuMonitorIO(int watch, int fd, int events, void *opaque)
             } else if (got == 0) {
                 eof = true;
             } else {
-                /* Ignore hangup/error events if we read some data, to
+                /* Ignore hangup/error cond if we read some data, to
                  * give time for that data to be consumed */
-                events = 0;
+                cond = 0;
 
                 if (qemuMonitorIOProcess(mon) < 0)
                     error = true;
             }
         }
 
-        if (events & VIR_EVENT_HANDLE_HANGUP) {
+        if (cond & G_IO_HUP) {
             hangup = true;
             if (!error) {
                 virReportError(VIR_ERR_INTERNAL_ERROR, "%s",
                                _("End of file from qemu monitor"));
                 eof = true;
-                events &= ~VIR_EVENT_HANDLE_HANGUP;
             }
         }
 
         if (!error && !eof &&
-            events & VIR_EVENT_HANDLE_ERROR) {
+            cond & G_IO_ERR) {
             virReportError(VIR_ERR_INTERNAL_ERROR, "%s",
                            _("Invalid file descriptor while waiting for monitor"));
             eof = true;
-            events &= ~VIR_EVENT_HANDLE_ERROR;
-        }
-        if (!error && events) {
-            virReportError(VIR_ERR_INTERNAL_ERROR,
-                           _("Unhandled event %d for monitor fd %d"),
-                           events, mon->fd);
-            error = true;
         }
     }
 
@@ -679,16 +649,20 @@ qemuMonitorIO(int watch, int fd, int events, void *opaque)
         virObjectUnlock(mon);
         virObjectUnref(mon);
     }
+
+    return G_SOURCE_REMOVE;
 }
 
 
 static qemuMonitorPtr
 qemuMonitorOpenInternal(virDomainObjPtr vm,
                         int fd,
+                        GMainContext *context,
                         qemuMonitorCallbacksPtr cb,
                         void *opaque)
 {
     qemuMonitorPtr mon;
+    g_autoptr(GError) gerr = NULL;
 
     if (!cb->eofNotify) {
         virReportError(VIR_ERR_INTERNAL_ERROR, "%s",
@@ -713,6 +687,7 @@ qemuMonitorOpenInternal(virDomainObjPtr vm,
         goto cleanup;
     }
     mon->fd = fd;
+    mon->context = g_main_context_ref(context);
     mon->vm = virObjectRef(vm);
     mon->waitGreeting = true;
     mon->cb = cb;
@@ -723,20 +698,17 @@ qemuMonitorOpenInternal(virDomainObjPtr vm,
                        "%s", _("Unable to set monitor close-on-exec flag"));
         goto cleanup;
     }
-    if (virSetNonBlock(mon->fd) < 0) {
+
+    mon->socket = g_socket_new_from_fd(fd, &gerr);
+    if (!mon->socket) {
         virReportError(VIR_ERR_INTERNAL_ERROR,
-                       "%s", _("Unable to put monitor into non-blocking mode"));
+                       _("Unable to create socket object: %s"),
+                       gerr->message);
         goto cleanup;
     }
 
-
     virObjectLock(mon);
-    if (!qemuMonitorRegister(mon)) {
-        virObjectUnlock(mon);
-        virReportError(VIR_ERR_INTERNAL_ERROR, "%s",
-                       _("unable to register monitor events"));
-        goto cleanup;
-    }
+    qemuMonitorRegister(mon);
 
     PROBE(QEMU_MONITOR_NEW,
           "mon=%p refs=%d fd=%d",
@@ -782,6 +754,7 @@ qemuMonitorOpen(virDomainObjPtr vm,
                 virDomainChrSourceDefPtr config,
                 bool retry,
                 unsigned long long timeout,
+                GMainContext *context,
                 qemuMonitorCallbacksPtr cb,
                 void *opaque)
 {
@@ -815,7 +788,7 @@ qemuMonitorOpen(virDomainObjPtr vm,
         goto cleanup;
     }
 
-    ret = qemuMonitorOpenInternal(vm, fd, cb, opaque);
+    ret = qemuMonitorOpenInternal(vm, fd, context, cb, opaque);
  cleanup:
     if (!ret)
         VIR_FORCE_CLOSE(fd);
@@ -830,25 +803,32 @@ qemuMonitorOpen(virDomainObjPtr vm,
  *
  * Registers the monitor in the event loop. The caller has to hold the
  * lock for @mon.
- *
- * Returns true in case of success, false otherwise
  */
-bool
+void
 qemuMonitorRegister(qemuMonitorPtr mon)
 {
-    virObjectRef(mon);
-    if ((mon->watch = virEventAddHandle(mon->fd,
-                                        VIR_EVENT_HANDLE_HANGUP |
-                                        VIR_EVENT_HANDLE_ERROR |
-                                        VIR_EVENT_HANDLE_READABLE,
-                                        qemuMonitorIO,
-                                        mon,
-                                        virObjectFreeCallback)) < 0) {
-        virObjectUnref(mon);
-        return false;
+    GIOCondition cond = 0;
+
+    if (mon->lastError.code == VIR_ERR_OK) {
+        cond |= G_IO_IN;
+
+        if ((mon->msg && mon->msg->txOffset < mon->msg->txLength) &&
+            !mon->waitGreeting)
+            cond |= G_IO_OUT;
     }
 
-    return true;
+    mon->watch = g_socket_create_source(mon->socket,
+                                        cond,
+                                        NULL);
+
+    virObjectRef(mon);
+    g_source_set_callback(mon->watch,
+                          (GSourceFunc)qemuMonitorIO,
+                          mon,
+                          NULL);
+
+    g_source_attach(mon->watch,
+                    mon->context);
 }
 
 
@@ -856,8 +836,9 @@ void
 qemuMonitorUnregister(qemuMonitorPtr mon)
 {
     if (mon->watch) {
-        virEventRemoveHandle(mon->watch);
-        mon->watch = 0;
+        g_source_destroy(mon->watch);
+        g_source_unref(mon->watch);
+        mon->watch = NULL;
     }
 }
 
@@ -873,9 +854,11 @@ qemuMonitorClose(qemuMonitorPtr mon)
 
     qemuMonitorSetDomainLogLocked(mon, NULL, NULL, NULL);
 
-    if (mon->fd >= 0) {
+    if (mon->socket) {
         qemuMonitorUnregister(mon);
-        VIR_FORCE_CLOSE(mon->fd);
+        g_object_unref(mon->socket);
+        mon->socket = NULL;
+        mon->fd = -1;
     }
 
     /* In case another thread is waiting for its monitor command to be
diff --git a/src/qemu/qemu_monitor.h b/src/qemu/qemu_monitor.h
index c84cd425df..dd2aaa4691 100644
--- a/src/qemu/qemu_monitor.h
+++ b/src/qemu/qemu_monitor.h
@@ -391,11 +391,12 @@ qemuMonitorPtr qemuMonitorOpen(virDomainObjPtr vm,
                                virDomainChrSourceDefPtr config,
                                bool retry,
                                unsigned long long timeout,
+                               GMainContext *context,
                                qemuMonitorCallbacksPtr cb,
                                void *opaque)
     ATTRIBUTE_NONNULL(1) ATTRIBUTE_NONNULL(2) ATTRIBUTE_NONNULL(5);
 
-bool qemuMonitorRegister(qemuMonitorPtr mon)
+void qemuMonitorRegister(qemuMonitorPtr mon)
     ATTRIBUTE_NONNULL(1);
 void qemuMonitorUnregister(qemuMonitorPtr mon)
     ATTRIBUTE_NONNULL(1);
diff --git a/src/qemu/qemu_process.c b/src/qemu/qemu_process.c
index 7475813e9f..bc57474bdc 100644
--- a/src/qemu/qemu_process.c
+++ b/src/qemu/qemu_process.c
@@ -1976,6 +1976,7 @@ qemuConnectMonitor(virQEMUDriverPtr driver, virDomainObjPtr vm, int asyncJob,
                           priv->monConfig,
                           retry,
                           timeout,
+                          virEventThreadGetContext(priv->eventThread),
                           &monitorCallbacks,
                           driver);
 
@@ -8602,8 +8603,9 @@ qemuProcessQMPConnectMonitor(qemuProcessQMPPtr proc)
 
     proc->vm->pid = proc->pid;
 
-    if (!(proc->mon = qemuMonitorOpen(proc->vm, &monConfig, true,
-                                      0, &callbacks, NULL)))
+    if (!(proc->mon = qemuMonitorOpen(proc->vm, &monConfig, true, 0,
+                                      virEventThreadGetContext(proc->eventThread),
+                                      &callbacks, NULL)))
         goto cleanup;
 
     virObjectLock(proc->mon);
diff --git a/tests/qemumonitortestutils.c b/tests/qemumonitortestutils.c
index a1641050ea..3efdea9cce 100644
--- a/tests/qemumonitortestutils.c
+++ b/tests/qemumonitortestutils.c
@@ -1171,6 +1171,7 @@ qemuMonitorTestNew(virDomainXMLOptionPtr xmlopt,
                                       &src,
                                       true,
                                       0,
+                                      virEventThreadGetContext(test->eventThread),
                                       &qemuMonitorTestCallbacks,
                                       driver)))
         goto error;
-- 
2.24.1




More information about the libvir-list mailing list