[libvirt PATCH 11/11] qemu: convert agent to use the per-VM event loop

Daniel P. Berrangé berrange at redhat.com
Fri Feb 14 12:52:09 UTC 2020


This converts the QEMU agent APIs to use the per-VM
event loop, which involves switching from virEvent APIs
to GMainContext / GSource APIs.

A GSocket is used as a convenient way to create a GSource
for a socket, but is not yet used for actual I/O.

Signed-off-by: Daniel P. Berrangé <berrange at redhat.com>
---
 src/qemu/qemu_agent.c        | 146 +++++++++++++++++++----------------
 src/qemu/qemu_agent.h        |   1 +
 src/qemu/qemu_process.c      |   1 +
 tests/qemumonitortestutils.c |   1 +
 4 files changed, 84 insertions(+), 65 deletions(-)

diff --git a/src/qemu/qemu_agent.c b/src/qemu/qemu_agent.c
index da1081b60b..ecc9eb23d1 100644
--- a/src/qemu/qemu_agent.c
+++ b/src/qemu/qemu_agent.c
@@ -25,6 +25,7 @@
 #include <unistd.h>
 #include <fcntl.h>
 #include <sys/time.h>
+#include <gio/gio.h>
 
 #include "qemu_agent.h"
 #include "qemu_domain.h"
@@ -100,7 +101,10 @@ struct _qemuAgent {
     virCond notify;
 
     int fd;
-    int watch;
+
+    GMainContext *context;
+    GSocket *socket;
+    GSource *watch;
 
     bool running;
 
@@ -171,6 +175,7 @@ static void qemuAgentDispose(void *obj)
         (agent->cb->destroy)(agent, agent->vm);
     virCondDestroy(&agent->notify);
     VIR_FREE(agent->buffer);
+    g_main_context_unref(agent->context);
     virResetError(&agent->lastError);
 }
 
@@ -187,13 +192,6 @@ qemuAgentOpenUnix(const char *socketpath)
         return -1;
     }
 
-    if (virSetNonBlock(agentfd) < 0) {
-        virReportSystemError(errno, "%s",
-                             _("Unable to put monitor "
-                               "into non-blocking mode"));
-        goto error;
-    }
-
     if (virSetCloseExec(agentfd) < 0) {
         virReportSystemError(errno, "%s",
                              _("Unable to set agent "
@@ -497,28 +495,62 @@ qemuAgentIORead(qemuAgentPtr agent)
 }
 
 
-static void qemuAgentUpdateWatch(qemuAgentPtr agent)
-{
-    int events =
-        VIR_EVENT_HANDLE_HANGUP |
-        VIR_EVENT_HANDLE_ERROR;
+static gboolean
+qemuAgentIO(GSocket *socket,
+            GIOCondition cond,
+            gpointer opaque);
 
-    if (!agent->watch)
-        return;
+
+static void
+qemuAgentRegister(qemuAgentPtr agent)
+{
+    GIOCondition cond = 0;
 
     if (agent->lastError.code == VIR_ERR_OK) {
-        events |= VIR_EVENT_HANDLE_READABLE;
+        cond |= G_IO_IN;
 
         if (agent->msg && agent->msg->txOffset < agent->msg->txLength)
-            events |= VIR_EVENT_HANDLE_WRITABLE;
+            cond |= G_IO_OUT;
     }
 
-    virEventUpdateHandle(agent->watch, events);
+    agent->watch = g_socket_create_source(agent->socket,
+                                        cond,
+                                        NULL);
+
+    virObjectRef(agent);
+    g_source_set_callback(agent->watch,
+                          (GSourceFunc)qemuAgentIO,
+                          agent,
+                          NULL);
+
+    g_source_attach(agent->watch,
+                    agent->context);
 }
 
 
 static void
-qemuAgentIO(int watch, int fd, int events, void *opaque)
+qemuAgentUnregister(qemuAgentPtr agent)
+{
+    if (agent->watch) {
+        g_source_destroy(agent->watch);
+        g_source_unref(agent->watch);
+        agent->watch = NULL;
+    }
+}
+
+
+static void qemuAgentUpdateWatch(qemuAgentPtr agent)
+{
+    qemuAgentUnregister(agent);
+    if (agent->socket)
+        qemuAgentRegister(agent);
+}
+
+
+static gboolean
+qemuAgentIO(GSocket *socket G_GNUC_UNUSED,
+            GIOCondition cond,
+            gpointer opaque)
 {
     qemuAgentPtr agent = opaque;
     bool error = false;
@@ -528,45 +560,36 @@ qemuAgentIO(int watch, int fd, int events, void *opaque)
     /* lock access to the agent and protect fd */
     virObjectLock(agent);
 #if DEBUG_IO
-    VIR_DEBUG("Agent %p I/O on watch %d fd %d events %d", agent, watch, fd, events);
+    VIR_DEBUG("Agent %p I/O on watch %d socket %p cond %d", agent, agent->socket, cond);
 #endif
 
-    if (agent->fd == -1 || agent->watch == 0) {
+    if (agent->fd == -1 || !agent->watch) {
         virObjectUnlock(agent);
         virObjectUnref(agent);
-        return;
+        return G_SOURCE_REMOVE;
     }
 
-    if (agent->fd != fd || agent->watch != watch) {
-        if (events & (VIR_EVENT_HANDLE_HANGUP | VIR_EVENT_HANDLE_ERROR))
-            eof = true;
-        virReportError(VIR_ERR_INTERNAL_ERROR,
-                       _("event from unexpected fd %d!=%d / watch %d!=%d"),
-                       agent->fd, fd, agent->watch, watch);
-        error = true;
-    } else if (agent->lastError.code != VIR_ERR_OK) {
-        if (events & (VIR_EVENT_HANDLE_HANGUP | VIR_EVENT_HANDLE_ERROR))
+    if (agent->lastError.code != VIR_ERR_OK) {
+        if (cond & (G_IO_HUP | G_IO_ERR))
             eof = true;
         error = true;
     } else {
-        if (events & VIR_EVENT_HANDLE_WRITABLE) {
+        if (cond & G_IO_OUT) {
             if (qemuAgentIOWrite(agent) < 0)
                 error = true;
-            events &= ~VIR_EVENT_HANDLE_WRITABLE;
         }
 
         if (!error &&
-            events & VIR_EVENT_HANDLE_READABLE) {
+            cond & G_IO_IN) {
             int got = qemuAgentIORead(agent);
-            events &= ~VIR_EVENT_HANDLE_READABLE;
             if (got < 0) {
                 error = true;
             } else if (got == 0) {
                 eof = true;
             } else {
-                /* Ignore hangup/error events if we read some data, to
+                /* Ignore hangup/error cond if we read some data, to
                  * give time for that data to be consumed */
-                events = 0;
+                cond = 0;
 
                 if (qemuAgentIOProcess(agent) < 0)
                     error = true;
@@ -574,25 +597,17 @@ qemuAgentIO(int watch, int fd, int events, void *opaque)
         }
 
         if (!error &&
-            events & VIR_EVENT_HANDLE_HANGUP) {
+            cond & G_IO_HUP) {
             virReportError(VIR_ERR_INTERNAL_ERROR, "%s",
                            _("End of file from agent socket"));
             eof = true;
-            events &= ~VIR_EVENT_HANDLE_HANGUP;
         }
 
         if (!error && !eof &&
-            events & VIR_EVENT_HANDLE_ERROR) {
+            cond & G_IO_ERR) {
             virReportError(VIR_ERR_INTERNAL_ERROR, "%s",
                            _("Invalid file descriptor while waiting for agent"));
             eof = true;
-            events &= ~VIR_EVENT_HANDLE_ERROR;
-        }
-        if (!error && events) {
-            virReportError(VIR_ERR_INTERNAL_ERROR,
-                           _("Unhandled event %d for agent fd %d"),
-                           events, agent->fd);
-            error = true;
         }
     }
 
@@ -648,15 +663,19 @@ qemuAgentIO(int watch, int fd, int events, void *opaque)
         virObjectUnlock(agent);
         virObjectUnref(agent);
     }
+
+    return G_SOURCE_REMOVE;
 }
 
 
 qemuAgentPtr
 qemuAgentOpen(virDomainObjPtr vm,
               const virDomainChrSourceDef *config,
+              GMainContext *context,
               qemuAgentCallbacksPtr cb)
 {
     qemuAgentPtr agent;
+    g_autoptr(GError) gerr = NULL;
 
     if (!cb || !cb->eofNotify) {
         virReportError(VIR_ERR_INTERNAL_ERROR, "%s",
@@ -692,22 +711,20 @@ qemuAgentOpen(virDomainObjPtr vm,
     if (agent->fd == -1)
         goto cleanup;
 
-    virObjectRef(agent);
-    if ((agent->watch = virEventAddHandle(agent->fd,
-                                        VIR_EVENT_HANDLE_HANGUP |
-                                        VIR_EVENT_HANDLE_ERROR |
-                                        VIR_EVENT_HANDLE_READABLE,
-                                        qemuAgentIO,
-                                        agent,
-                                        virObjectFreeCallback)) < 0) {
-        virObjectUnref(agent);
-        virReportError(VIR_ERR_INTERNAL_ERROR, "%s",
-                       _("unable to register agent events"));
+    agent->context = g_main_context_ref(context);
+
+    agent->socket = g_socket_new_from_fd(agent->fd, &gerr);
+    if (!agent->socket) {
+        virReportError(VIR_ERR_INTERNAL_ERROR,
+                       _("Unable to create socket object: %s"),
+                       gerr->message);
         goto cleanup;
     }
 
+    qemuAgentRegister(agent);
+
     agent->running = true;
-    VIR_DEBUG("New agent %p fd =%d watch=%d", agent, agent->fd, agent->watch);
+    VIR_DEBUG("New agent %p fd=%d", agent, agent->fd);
 
     return agent;
 
@@ -762,12 +779,11 @@ void qemuAgentClose(qemuAgentPtr agent)
 
     virObjectLock(agent);
 
-    if (agent->fd >= 0) {
-        if (agent->watch) {
-            virEventRemoveHandle(agent->watch);
-            agent->watch = 0;
-        }
-        VIR_FORCE_CLOSE(agent->fd);
+    if (agent->socket) {
+        qemuAgentUnregister(agent);
+        g_object_unref(agent->socket);
+        agent->socket = NULL;
+        agent->fd = -1;
     }
 
     qemuAgentNotifyCloseLocked(agent);
diff --git a/src/qemu/qemu_agent.h b/src/qemu/qemu_agent.h
index 5656fe60ff..d4d8615323 100644
--- a/src/qemu/qemu_agent.h
+++ b/src/qemu/qemu_agent.h
@@ -41,6 +41,7 @@ struct _qemuAgentCallbacks {
 
 qemuAgentPtr qemuAgentOpen(virDomainObjPtr vm,
                            const virDomainChrSourceDef *config,
+                           GMainContext *context,
                            qemuAgentCallbacksPtr cb);
 
 void qemuAgentClose(qemuAgentPtr mon);
diff --git a/src/qemu/qemu_process.c b/src/qemu/qemu_process.c
index bc57474bdc..cf2b5c260c 100644
--- a/src/qemu/qemu_process.c
+++ b/src/qemu/qemu_process.c
@@ -236,6 +236,7 @@ qemuConnectAgent(virQEMUDriverPtr driver, virDomainObjPtr vm)
 
     agent = qemuAgentOpen(vm,
                           config->source,
+                          virEventThreadGetContext(priv->eventThread),
                           &agentCallbacks);
 
     virObjectLock(vm);
diff --git a/tests/qemumonitortestutils.c b/tests/qemumonitortestutils.c
index 3efdea9cce..1eb06a0e54 100644
--- a/tests/qemumonitortestutils.c
+++ b/tests/qemumonitortestutils.c
@@ -1411,6 +1411,7 @@ qemuMonitorTestNewAgent(virDomainXMLOptionPtr xmlopt)
 
     if (!(test->agent = qemuAgentOpen(test->vm,
                                       &src,
+                                      virEventThreadGetContext(test->eventThread),
                                       &qemuMonitorTestAgentCallbacks)))
         goto error;
 
-- 
2.24.1




More information about the libvir-list mailing list