[libvirt] [[RFC] 3/8] Setup global and per-VM event queues. Also initialize per-VM queues when libvirt reconnects to an existing VM.

Prerna Saxena saxenap.ltc at gmail.com
Tue Oct 24 17:34:56 UTC 2017


Signed-off-by: Prerna Saxena <saxenap.ltc at gmail.com>
---
 src/conf/domain_conf.h  |   3 +
 src/qemu/qemu_conf.h    |   4 +
 src/qemu/qemu_driver.c  |   9 ++
 src/qemu/qemu_event.c   | 229 ++++++++++++++++++++++++++++++++++++++++++++++++
 src/qemu/qemu_event.h   |   1 -
 src/qemu/qemu_process.c |   2 +
 6 files changed, 247 insertions(+), 1 deletion(-)

diff --git a/src/conf/domain_conf.h b/src/conf/domain_conf.h
index a42efcf..7fe38e7 100644
--- a/src/conf/domain_conf.h
+++ b/src/conf/domain_conf.h
@@ -2496,6 +2496,9 @@ struct _virDomainObj {
 
     unsigned long long original_memlock; /* Original RLIMIT_MEMLOCK, zero if no
                                           * restore will be required later */
+
+    /* Pointer to per-VM Event Queue */
+    void *vmq;
 };
 
 typedef bool (*virDomainObjListACLFilter)(virConnectPtr conn,
diff --git a/src/qemu/qemu_conf.h b/src/qemu/qemu_conf.h
index 13b6f81..e63dc98 100644
--- a/src/qemu/qemu_conf.h
+++ b/src/qemu/qemu_conf.h
@@ -33,6 +33,7 @@
 # include "domain_conf.h"
 # include "snapshot_conf.h"
 # include "domain_event.h"
+# include "qemu_event.h"
 # include "virthread.h"
 # include "security/security_manager.h"
 # include "virpci.h"
@@ -235,6 +236,9 @@ struct _virQEMUDriver {
     /* Immutable pointer, self-locking APIs */
     virDomainObjListPtr domains;
 
+    /* Immutable pointer, contains Qemu Driver Event List */
+    virQemuEventList *ev_list;
+
     /* Immutable pointer */
     char *qemuImgBinary;
 
diff --git a/src/qemu/qemu_driver.c b/src/qemu/qemu_driver.c
index 7c6f167..8a005d0 100644
--- a/src/qemu/qemu_driver.c
+++ b/src/qemu/qemu_driver.c
@@ -52,6 +52,7 @@
 #include "qemu_command.h"
 #include "qemu_parse_command.h"
 #include "qemu_cgroup.h"
+#include "qemu_event.h"
 #include "qemu_hostdev.h"
 #include "qemu_hotplug.h"
 #include "qemu_monitor.h"
@@ -650,6 +651,14 @@ qemuStateInitialize(bool privileged,
     if (!(qemu_driver->domains = virDomainObjListNew()))
         goto error;
 
+    /* Init domain Async QMP events */
+    qemu_driver->ev_list = virQemuEventListInit();
+    if (!qemu_driver->ev_list) {
+        virReportSystemError(VIR_ERR_INTERNAL_ERROR, "%s",
+                             _("Unable to initialize QMP event queues"));
+        goto error;
+    }
+
     /* Init domain events */
     qemu_driver->domainEventState = virObjectEventStateNew();
     if (!qemu_driver->domainEventState)
diff --git a/src/qemu/qemu_event.c b/src/qemu/qemu_event.c
index e27ea0d..d52fad2 100644
--- a/src/qemu/qemu_event.c
+++ b/src/qemu/qemu_event.c
@@ -73,3 +73,232 @@ virQemuEventList* virQemuEventListInit(void)
 
     return ev_list;
 }
+
+int virQemuVmEventListInit(virDomainObjPtr vm)
+{
+    virQemuVmEventQueue* vmq;
+    if (!vm)
+        return -1;
+
+    if (VIR_ALLOC(vmq) < 0)
+        return -1;
+
+    vmq->last = NULL;
+    vmq->head = NULL;
+
+    if (!virMutexInit(&vmq->lock)) {
+        vm->vmq = vmq;
+        return 0;
+    }
+    return -1;
+}
+/**
+ * virEnqueueVMEvent()
+ * Adds a new event to:
+ * - Global event queue
+ * - the event queue for this VM
+ *
+ * Return : 0 (success)
+ *         -1 (failure)
+ */
+int virEnqueueVMEvent(virQemuEventList *qlist, qemuEventPtr ev)
+{
+    struct _qemuGlobalEventListElement *globalEntry;
+    virQemuVmEventQueue *vmq;
+    struct _qemuVmEventQueueElement *vmq_entry;
+
+    if (!qlist || !ev || !ev->vm || !ev->vm->vmq) {
+        virReportError(VIR_ERR_INTERNAL_ERROR,
+                       "No queue list instantiated."
+                       "Dropping event %d for Vm %s",
+                       ev->ev_type, ev->vm->def->name);
+        goto error;
+    }
+
+    if (VIR_ALLOC(globalEntry) < 0) {
+        virReportError(VIR_ERR_INTERNAL_ERROR,
+                       "Allocation error."
+                       "Dropping event %d for Vm %s",
+                       ev->ev_type, ev->vm->def->name);
+        goto error;
+    }
+
+    if (VIR_ALLOC(vmq_entry) < 0) {
+        virReportError(VIR_ERR_INTERNAL_ERROR,
+                       "Allocation error."
+                       "Dropping event %d for Vm %s",
+                       ev->ev_type, ev->vm->def->name);
+        free(globalEntry);
+        goto error;
+    }
+
+    vmq_entry->ev = ev;
+    vmq_entry->next = NULL;
+
+    virObjectRef(ev->vm);
+    globalEntry->vm = ev->vm;
+    globalEntry->next = NULL;
+    globalEntry->prev = NULL;
+    /* Note that this order needs to be maintained
+     * for dequeue too else ABBA deadlocks will happen */
+
+    /* Insert into per-Vm queue */
+    vmq = ev->vm->vmq;
+
+    virMutexLock(&(vmq->lock));
+    if (vmq->last) {
+        vmq->last->next = vmq_entry;
+        vmq_entry->ev->ev_id = vmq->last->ev->ev_id + 1;
+    } else {
+        vmq->head = vmq_entry;
+        vmq_entry->ev->ev_id = 1;
+    }
+    vmq->last = vmq_entry;
+    globalEntry->ev_id = vmq_entry->ev->ev_id;
+    /* Insert the event into the global queue */
+    virMutexLock(&(qlist->lock));
+    if (qlist->last) {
+        qlist->last->next = globalEntry;
+        globalEntry->prev = qlist->last;
+    } else {
+        qlist->head = globalEntry;
+    }
+
+    qlist->last = globalEntry;
+    virMutexUnlock(&(qlist->lock));
+    virMutexUnlock(&(vmq->lock));
+
+    return 0;
+
+error:
+    return -1;
+}
+
+/**
+ * virDequeueVMEvent: Dequeues the first event of this VM from :
+ *  - the global event table;
+ *  - the per-VM event table;
+ *
+ *  Needs to be called with VM lock held. Else the event is deleted forever and
+ *  cannot be picked up by any other worker thread.
+ */
+qemuEventPtr virDequeueVMEvent(virQemuEventList *qlist, virDomainObjPtr vm)
+{
+    qemuEventPtr ret_ev;
+    struct _qemuVmEventQueue *cur_vmq;
+    struct _qemuVmEventQueueElement *vmq_entry;
+    struct _qemuGlobalEventListElement *iter;
+    const char *ref_uuid;
+
+    if (!qlist || !vm || !vm->vmq) {
+        virReportError(VIR_ERR_INTERNAL_ERROR,
+                       "No queue list /VM/ event for this vm %s",
+                        vm?vm->def->name:NULL);
+        goto error;
+    }
+
+    cur_vmq = vm->vmq;
+
+    /* Acquire a ref to first event from per-Vm event queue
+     */
+    virMutexLock(&(cur_vmq->lock));
+    vmq_entry = cur_vmq->head;
+
+    if (cur_vmq->head == NULL) {
+        virMutexUnlock(&(cur_vmq->lock));
+        goto error;
+    }
+    ref_uuid = (const char *)vmq_entry->ev->vm->def->uuid;
+
+    /* Purge the event from global queue, and then from local queue.
+     * So that ev_ids are always consistent.
+     */
+    virMutexLock(&(qlist->lock));
+    iter = qlist->head;
+    while (iter) {
+        if (iter->vm != NULL &&
+            STREQ((const char *)iter->vm->def->uuid, ref_uuid) &&
+            iter->ev_id == vmq_entry->ev->ev_id) {
+            // Found the element, delete it.
+            if (iter->prev != NULL)
+                iter->prev->next = iter->next;
+            else
+                /* This was the first element */
+                qlist->head = iter->next;
+            if (iter->next != NULL)
+                iter->next->prev = iter->prev;
+            else
+                /* This was the last element */
+                qlist->last = iter->prev;
+            break;
+        } else {
+            iter = iter->next;
+        }
+    }
+
+    // Now remove this from per-Vm queue:
+    cur_vmq->head = vmq_entry->next;
+    virMutexUnlock(&(qlist->lock));
+
+    virMutexUnlock(&(cur_vmq->lock));
+
+    ret_ev = vmq_entry->ev;
+    free(vmq_entry);
+    if (iter)
+        free(iter);
+
+    return ret_ev;
+error:
+    return NULL;
+}
+
+void
+virEventWorkerScanQueue(void *dummy ATTRIBUTE_UNUSED, void *opaque)
+{
+    virQEMUDriverPtr driver = opaque;
+    struct _qemuGlobalEventListElement *globalEntry = driver->ev_list->head;
+    virDomainObjPtr vm = NULL;
+
+    if (!globalEntry)
+        return;
+
+    VIR_WARN("Running event driver");
+
+    while (globalEntry) {
+        vm = globalEntry->vm;
+        if (vm != NULL) {
+            if (!virObjectTrylock(vm)) {
+                break;
+            }
+        }
+        // Todo:Clear events for irrelevant VMs
+        globalEntry = globalEntry->next;
+    }
+
+    // Scanned the entire list, but no worthy event found. Exit now.
+    if (!globalEntry)
+        return;
+
+    virDomainConsumeVMEvents(vm, opaque);
+
+    virObjectUnlock(vm);
+
+    return;
+}
+
+/* Called under the VM lock */
+void virDomainConsumeVMEvents(virDomainObjPtr vm, void *opaque)
+{
+    virQEMUDriverPtr driver = opaque;
+    qemuEventPtr evt = virDequeueVMEvent(driver->ev_list, vm);
+
+    while (evt) {
+        VIR_WARN("Processing event %d vm %s", evt->ev_type, vm->def->name);
+        if (evt->handler)
+            (evt->handler)(evt, opaque);
+        free(evt);
+        virObjectUnref(vm);
+        evt = virDequeueVMEvent(driver->ev_list, vm);
+    }
+    return;
+}
diff --git a/src/qemu/qemu_event.h b/src/qemu/qemu_event.h
index 9781795..4173834 100644
--- a/src/qemu/qemu_event.h
+++ b/src/qemu/qemu_event.h
@@ -219,6 +219,5 @@ int virQemuVmEventListInit(virDomainObjPtr vm);
 int virEnqueueVMEvent(virQemuEventList *qlist, qemuEventPtr ev);
 qemuEventPtr virDequeueVMEvent(virQemuEventList *qlist, virDomainObjPtr vm);
 void virEventWorkerScanQueue(void *dummy, void *opaque);
-void virEventRunHandler(qemuEventPtr ev, void *opaque);
 void virDomainConsumeVMEvents(virDomainObjPtr vm, void *opaque);
 #endif
diff --git a/src/qemu/qemu_process.c b/src/qemu/qemu_process.c
index 9f26dfc..8e6498e 100644
--- a/src/qemu/qemu_process.c
+++ b/src/qemu/qemu_process.c
@@ -6941,6 +6941,8 @@ qemuProcessReconnect(void *opaque)
         goto error;
     jobStarted = true;
 
+    if (virQemuVmEventListInit(obj) < 0)
+        goto error;
     /* XXX If we ever gonna change pid file pattern, come up with
      * some intelligence here to deal with old paths. */
     if (!(priv->pidfile = virPidFileBuildPath(cfg->stateDir, obj->def->name)))
-- 
2.9.5




More information about the libvir-list mailing list