[libvirt] [PATCH 05/10] qemu: Use thread queues for synchronous block jobs

Jiri Denemark jdenemar at redhat.com
Thu May 21 22:42:38 UTC 2015


By switching block jobs to use thread queues and thread conditions, we
can drop some pretty complicated code in NBD storage migration.
Moreover, we are getting ready for migration events (to replace our 50ms
polling on query-migrate). The ultimate goal is to have a single loop
waiting (using virThreadCondWait) for any migration related event
(changed status of a migration, disk mirror events, internal abort
requests, ...). This patch makes NBD storage migration ready for this:
first we call a QMP command to start or cancel drive mirror on all disks
we are interested in and then we wait for a single condition which is
signaled on any event related to any of the mirrors.

Signed-off-by: Jiri Denemark <jdenemar at redhat.com>
---
 po/POTFILES.in            |   1 -
 src/qemu/qemu_blockjob.c  | 139 ++-------------------
 src/qemu/qemu_blockjob.h  |  12 +-
 src/qemu/qemu_domain.c    |   5 +-
 src/qemu/qemu_domain.h    |   4 +-
 src/qemu/qemu_driver.c    |  24 ++--
 src/qemu/qemu_migration.c | 299 ++++++++++++++++++++++++++--------------------
 src/qemu/qemu_process.c   |   9 +-
 8 files changed, 201 insertions(+), 292 deletions(-)

diff --git a/po/POTFILES.in b/po/POTFILES.in
index edfd1cc..9108ccf 100644
--- a/po/POTFILES.in
+++ b/po/POTFILES.in
@@ -112,7 +112,6 @@ src/parallels/parallels_utils.h
 src/parallels/parallels_storage.c
 src/phyp/phyp_driver.c
 src/qemu/qemu_agent.c
-src/qemu/qemu_blockjob.c
 src/qemu/qemu_capabilities.c
 src/qemu/qemu_cgroup.c
 src/qemu/qemu_command.c
diff --git a/src/qemu/qemu_blockjob.c b/src/qemu/qemu_blockjob.c
index 605c2a5..e97f449 100644
--- a/src/qemu/qemu_blockjob.c
+++ b/src/qemu/qemu_blockjob.c
@@ -204,19 +204,17 @@ qemuBlockJobEventProcess(virQEMUDriverPtr driver,
  *
  * During a synchronous block job, a block job event for @disk
  * will not be processed asynchronously. Instead, it will be
- * processed only when qemuBlockJobSyncWait* or
- * qemuBlockJobSyncEnd is called.
+ * processed only when qemuBlockJobUpdate or qemuBlockJobSyncEnd
+ * is called.
  */
 void
 qemuBlockJobSyncBegin(virDomainDiskDefPtr disk)
 {
     qemuDomainDiskPrivatePtr diskPriv = QEMU_DOMAIN_DISK_PRIVATE(disk);
 
-    if (diskPriv->blockJobSync)
-        VIR_WARN("Disk %s already has synchronous block job",
-                 disk->dst);
-
-    diskPriv->blockJobSync = true;
+    VIR_DEBUG("disk=%s", disk->dst);
+    virThreadQueueRegister(diskPriv->blockJobQueue);
+    diskPriv->blockJobStatus = -1;
 }
 
 
@@ -225,135 +223,16 @@ qemuBlockJobSyncBegin(virDomainDiskDefPtr disk)
  * @driver: qemu driver
  * @vm: domain
  * @disk: domain disk
- * @ret_status: pointer to virConnectDomainEventBlockJobStatus
  *
  * End a synchronous block job for @disk. Any pending block job event
- * for the disk is processed, and its status is recorded in the
- * virConnectDomainEventBlockJobStatus field pointed to by
- * @ret_status.
+ * for the disk is processed.
  */
 void
 qemuBlockJobSyncEnd(virQEMUDriverPtr driver,
                     virDomainObjPtr vm,
-                    virDomainDiskDefPtr disk,
-                    virConnectDomainEventBlockJobStatus *ret_status)
+                    virDomainDiskDefPtr disk)
 {
-    qemuDomainDiskPrivatePtr diskPriv = QEMU_DOMAIN_DISK_PRIVATE(disk);
-
-    if (diskPriv->blockJobSync && diskPriv->blockJobStatus != -1) {
-        if (ret_status)
-            *ret_status = diskPriv->blockJobStatus;
-        qemuBlockJobUpdate(driver, vm, disk);
-        diskPriv->blockJobStatus = -1;
-    }
-    diskPriv->blockJobSync = false;
-}
-
-
-/**
- * qemuBlockJobSyncWaitWithTimeout:
- * @driver: qemu driver
- * @vm: domain
- * @disk: domain disk
- * @timeout: timeout in milliseconds
- * @ret_status: pointer to virConnectDomainEventBlockJobStatus
- *
- * Wait up to @timeout milliseconds for a block job event for @disk.
- * If an event is received it is processed, and its status is recorded
- * in the virConnectDomainEventBlockJobStatus field pointed to by
- * @ret_status.
- *
- * If @timeout is not 0, @vm will be unlocked while waiting for the event.
- *
- * Returns 0 if an event was received or the timeout expired,
- *        -1 otherwise.
- */
-int
-qemuBlockJobSyncWaitWithTimeout(virQEMUDriverPtr driver,
-                                virDomainObjPtr vm,
-                                virDomainDiskDefPtr disk,
-                                unsigned long long timeout,
-                                virConnectDomainEventBlockJobStatus *ret_status)
-{
-    qemuDomainDiskPrivatePtr diskPriv = QEMU_DOMAIN_DISK_PRIVATE(disk);
-
-    if (!diskPriv->blockJobSync) {
-        virReportError(VIR_ERR_INTERNAL_ERROR, "%s",
-                       _("No current synchronous block job"));
-        return -1;
-    }
-
-    while (diskPriv->blockJobSync && diskPriv->blockJobStatus == -1) {
-        int r;
-
-        if (!virDomainObjIsActive(vm)) {
-            virReportError(VIR_ERR_INTERNAL_ERROR, "%s",
-                           _("guest unexpectedly quit"));
-            diskPriv->blockJobSync = false;
-            return -1;
-        }
-
-        if (timeout == (unsigned long long)-1) {
-            r = virCondWait(&diskPriv->blockJobSyncCond, &vm->parent.lock);
-        } else if (timeout) {
-            unsigned long long now;
-            if (virTimeMillisNow(&now) < 0) {
-                virReportSystemError(errno, "%s",
-                                     _("Unable to get current time"));
-                return -1;
-            }
-            r = virCondWaitUntil(&diskPriv->blockJobSyncCond,
-                                 &vm->parent.lock,
-                                 now + timeout);
-            if (r < 0 && errno == ETIMEDOUT)
-                return 0;
-        } else {
-            errno = ETIMEDOUT;
-            return 0;
-        }
-
-        if (r < 0) {
-            diskPriv->blockJobSync = false;
-            virReportSystemError(errno, "%s",
-                                 _("Unable to wait on block job sync "
-                                   "condition"));
-            return -1;
-        }
-    }
-
-    if (ret_status)
-        *ret_status = diskPriv->blockJobStatus;
+    VIR_DEBUG("disk=%s", disk->dst);
     qemuBlockJobUpdate(driver, vm, disk);
-    diskPriv->blockJobStatus = -1;
-
-    return 0;
-}
-
-
-/**
- * qemuBlockJobSyncWait:
- * @driver: qemu driver
- * @vm: domain
- * @disk: domain disk
- * @ret_status: pointer to virConnectDomainEventBlockJobStatus
- *
- * Wait for a block job event for @disk. If an event is received it
- * is processed, and its status is recorded in the
- * virConnectDomainEventBlockJobStatus field pointed to by
- * @ret_status.
- *
- * @vm will be unlocked while waiting for the event.
- *
- * Returns 0 if an event was received,
- *        -1 otherwise.
- */
-int
-qemuBlockJobSyncWait(virQEMUDriverPtr driver,
-                     virDomainObjPtr vm,
-                     virDomainDiskDefPtr disk,
-                     virConnectDomainEventBlockJobStatus *ret_status)
-{
-    return qemuBlockJobSyncWaitWithTimeout(driver, vm, disk,
-                                           (unsigned long long)-1,
-                                           ret_status);
+    virThreadQueueUnregister(QEMU_DOMAIN_DISK_PRIVATE(disk)->blockJobQueue);
 }
diff --git a/src/qemu/qemu_blockjob.h b/src/qemu/qemu_blockjob.h
index 81e893e..775ce95 100644
--- a/src/qemu/qemu_blockjob.h
+++ b/src/qemu/qemu_blockjob.h
@@ -37,16 +37,6 @@ void qemuBlockJobEventProcess(virQEMUDriverPtr driver,
 void qemuBlockJobSyncBegin(virDomainDiskDefPtr disk);
 void qemuBlockJobSyncEnd(virQEMUDriverPtr driver,
                          virDomainObjPtr vm,
-                         virDomainDiskDefPtr disk,
-                         virConnectDomainEventBlockJobStatus *ret_status);
-int qemuBlockJobSyncWaitWithTimeout(virQEMUDriverPtr driver,
-                                    virDomainObjPtr vm,
-                                    virDomainDiskDefPtr disk,
-                                    unsigned long long timeout,
-                                    virConnectDomainEventBlockJobStatus *ret_status);
-int qemuBlockJobSyncWait(virQEMUDriverPtr driver,
-                         virDomainObjPtr vm,
-                         virDomainDiskDefPtr disk,
-                         virConnectDomainEventBlockJobStatus *ret_status);
+                         virDomainDiskDefPtr disk);
 
 #endif /* __QEMU_BLOCKJOB_H__ */
diff --git a/src/qemu/qemu_domain.c b/src/qemu/qemu_domain.c
index db8554b..c25d5a5 100644
--- a/src/qemu/qemu_domain.c
+++ b/src/qemu/qemu_domain.c
@@ -441,8 +441,7 @@ qemuDomainDiskPrivateNew(void)
     if (!(priv = virObjectNew(qemuDomainDiskPrivateClass)))
         return NULL;
 
-    if (virCondInit(&priv->blockJobSyncCond) < 0) {
-        virReportSystemError(errno, "%s", _("Failed to initialize condition"));
+    if (!(priv->blockJobQueue = virThreadQueueNew())) {
         virObjectUnref(priv);
         return NULL;
     }
@@ -455,7 +454,7 @@ qemuDomainDiskPrivateDispose(void *obj)
 {
     qemuDomainDiskPrivatePtr priv = obj;
 
-    virCondDestroy(&priv->blockJobSyncCond);
+    virThreadQueueFree(priv->blockJobQueue);
 }
 
 
diff --git a/src/qemu/qemu_domain.h b/src/qemu/qemu_domain.h
index a6df199..2117a3d 100644
--- a/src/qemu/qemu_domain.h
+++ b/src/qemu/qemu_domain.h
@@ -35,6 +35,7 @@
 # include "qemu_capabilities.h"
 # include "virchrdev.h"
 # include "virobject.h"
+# include "virthreadqueue.h"
 
 # define QEMU_DOMAIN_FORMAT_LIVE_FLAGS      \
     (VIR_DOMAIN_XML_SECURE |                \
@@ -214,10 +215,9 @@ struct _qemuDomainDiskPrivate {
     bool blockjob;
 
     /* for some synchronous block jobs, we need to notify the owner */
-    virCond blockJobSyncCond;
+    virThreadQueuePtr blockJobQueue;
     int blockJobType;   /* type of the block job from the event */
     int blockJobStatus; /* status of the finished block job */
-    bool blockJobSync; /* the block job needs synchronized termination */
 
     bool migrating; /* the disk is being migrated */
 };
diff --git a/src/qemu/qemu_driver.c b/src/qemu/qemu_driver.c
index aa0acde..bce631f 100644
--- a/src/qemu/qemu_driver.c
+++ b/src/qemu/qemu_driver.c
@@ -16679,10 +16679,8 @@ qemuDomainBlockJobAbort(virDomainPtr dom,
         goto endjob;
     }
 
-    if (modern && !async) {
-        /* prepare state for event delivery */
+    if (modern && !async)
         qemuBlockJobSyncBegin(disk);
-    }
 
     if (pivot) {
         if ((ret = qemuDomainBlockPivot(driver, vm, device, disk)) < 0)
@@ -16730,21 +16728,21 @@ qemuDomainBlockJobAbort(virDomainPtr dom,
                                      VIR_DOMAIN_BLOCK_JOB_TYPE_PULL,
                                      VIR_DOMAIN_BLOCK_JOB_CANCELED);
         } else {
-            virConnectDomainEventBlockJobStatus status = -1;
-            if (qemuBlockJobSyncWait(driver, vm, disk, &status) < 0) {
-                ret = -1;
-            } else if (status == VIR_DOMAIN_BLOCK_JOB_FAILED) {
-                virReportError(VIR_ERR_OPERATION_FAILED,
-                               _("failed to terminate block job on disk '%s'"),
-                               disk->dst);
-                ret = -1;
+            qemuDomainDiskPrivatePtr diskPriv = QEMU_DOMAIN_DISK_PRIVATE(disk);
+            qemuBlockJobUpdate(driver, vm, disk);
+            while (diskPriv->blockjob) {
+                if (virThreadCondWait(&vm->parent.lock) < 0) {
+                    ret = -1;
+                    goto endjob;
+                }
+                qemuBlockJobUpdate(driver, vm, disk);
             }
         }
     }
 
  endjob:
-    if (disk && QEMU_DOMAIN_DISK_PRIVATE(disk)->blockJobSync)
-        qemuBlockJobSyncEnd(driver, vm, disk, NULL);
+    if (disk)
+        qemuBlockJobSyncEnd(driver, vm, disk);
     qemuDomainObjEndJob(driver, vm);
 
  cleanup:
diff --git a/src/qemu/qemu_migration.c b/src/qemu/qemu_migration.c
index b2c4559..f866ac5 100644
--- a/src/qemu/qemu_migration.c
+++ b/src/qemu/qemu_migration.c
@@ -1720,7 +1720,7 @@ qemuMigrationStopNBDServer(virQEMUDriverPtr driver,
 
 
 /**
- * qemuMigrationCheckDriveMirror:
+ * qemuMigrationDriveMirrorReady:
  * @driver: qemu driver
  * @vm: domain
  *
@@ -1733,111 +1733,148 @@ qemuMigrationStopNBDServer(virQEMUDriverPtr driver,
  *        -1 on error.
  */
 static int
-qemuMigrationCheckDriveMirror(virQEMUDriverPtr driver,
+qemuMigrationDriveMirrorReady(virQEMUDriverPtr driver,
                               virDomainObjPtr vm)
 {
     size_t i;
-    int ret = 1;
+    size_t notReady = 0;
+    int status;
 
     for (i = 0; i < vm->def->ndisks; i++) {
         virDomainDiskDefPtr disk = vm->def->disks[i];
         qemuDomainDiskPrivatePtr diskPriv = QEMU_DOMAIN_DISK_PRIVATE(disk);
 
-        if (!diskPriv->migrating || !diskPriv->blockJobSync)
+        if (!diskPriv->migrating)
             continue;
 
-        /* process any pending event */
-        if (qemuBlockJobSyncWaitWithTimeout(driver, vm, disk,
-                                            0ull, NULL) < 0)
-            return -1;
-
-        switch (disk->mirrorState) {
-        case VIR_DOMAIN_DISK_MIRROR_STATE_NONE:
-            ret = 0;
-            break;
-        case VIR_DOMAIN_DISK_MIRROR_STATE_ABORT:
+        status = qemuBlockJobUpdate(driver, vm, disk);
+        if (status == VIR_DOMAIN_BLOCK_JOB_FAILED) {
             virReportError(VIR_ERR_OPERATION_FAILED,
                            _("migration of disk %s failed"),
                            disk->dst);
             return -1;
         }
+
+        if (disk->mirrorState != VIR_DOMAIN_DISK_MIRROR_STATE_READY)
+            notReady++;
     }
 
-    return ret;
+    if (notReady) {
+        VIR_DEBUG("Waiting for %zu disk mirrors to get ready", notReady);
+        return 0;
+    } else {
+        VIR_DEBUG("All disk mirrors are ready");
+        return 1;
+    }
 }
 
 
-/**
- * qemuMigrationCancelOneDriveMirror:
- * @driver: qemu driver
- * @vm: domain
+/*
+ * If @failed is not NULL, the function will report an error and set @failed
+ * to true in case a block job fails. This way we can properly abort migration
+ * in case some block job failed once all memory has already been transferred.
  *
- * Cancel all drive-mirrors started by qemuMigrationDriveMirror.
- * Any pending block job events for the mirrored disks will be
- * processed.
- *
- * Returns 0 on success, -1 otherwise.
+ * Returns 1 if all mirrors are gone,
+ *         0 if some mirrors are still active,
+ *         -1 on error.
  */
 static int
-qemuMigrationCancelOneDriveMirror(virQEMUDriverPtr driver,
+qemuMigrationDriveMirrorCancelled(virQEMUDriverPtr driver,
                                   virDomainObjPtr vm,
-                                  virDomainDiskDefPtr disk)
+                                  bool *failed)
 {
-    qemuDomainObjPrivatePtr priv = vm->privateData;
-    char *diskAlias = NULL;
-    int ret = -1;
+    size_t i;
+    size_t active = 0;
+    int status;
 
-    /* No need to cancel if mirror already aborted */
-    if (disk->mirrorState == VIR_DOMAIN_DISK_MIRROR_STATE_ABORT) {
-        ret = 0;
-    } else {
-        virConnectDomainEventBlockJobStatus status = -1;
+    for (i = 0; i < vm->def->ndisks; i++) {
+        virDomainDiskDefPtr disk = vm->def->disks[i];
+        qemuDomainDiskPrivatePtr diskPriv = QEMU_DOMAIN_DISK_PRIVATE(disk);
 
-        if (virAsprintf(&diskAlias, "%s%s",
-                        QEMU_DRIVE_HOST_PREFIX, disk->info.alias) < 0)
-            goto cleanup;
+        if (!diskPriv->migrating)
+            continue;
 
-        if (qemuDomainObjEnterMonitorAsync(driver, vm,
-                                           QEMU_ASYNC_JOB_MIGRATION_OUT) < 0)
-            goto endjob;
-        ret = qemuMonitorBlockJobCancel(priv->mon, diskAlias, true);
-        if (qemuDomainObjExitMonitor(driver, vm) < 0)
-            goto endjob;
-
-        if (ret < 0) {
-            virDomainBlockJobInfo info;
-
-            /* block-job-cancel can fail if QEMU simultaneously
-             * aborted the job; probe for it again to detect this */
-            if (qemuMonitorBlockJobInfo(priv->mon, diskAlias,
-                                        &info, NULL) == 0) {
-                ret = 0;
-            } else {
+        status = qemuBlockJobUpdate(driver, vm, disk);
+        switch (status) {
+        case VIR_DOMAIN_BLOCK_JOB_FAILED:
+            if (failed) {
                 virReportError(VIR_ERR_OPERATION_FAILED,
-                               _("could not cancel migration of disk %s"),
+                               _("migration of disk %s failed"),
                                disk->dst);
+                *failed = true;
             }
+            /* fallthrough */
+        case VIR_DOMAIN_BLOCK_JOB_CANCELED:
+        case VIR_DOMAIN_BLOCK_JOB_COMPLETED:
+            qemuBlockJobSyncEnd(driver, vm, disk);
+            diskPriv->migrating = false;
+            break;
 
-            goto endjob;
+        default:
+            active++;
         }
+    }
 
-        /* Mirror may become ready before cancellation takes
-         * effect; loop if we get that event first */
-        do {
-            ret = qemuBlockJobSyncWait(driver, vm, disk, &status);
-            if (ret < 0) {
-                VIR_WARN("Unable to wait for block job on %s to cancel",
-                         diskAlias);
-                goto endjob;
-            }
-        } while (status == VIR_DOMAIN_BLOCK_JOB_READY);
+    if (active) {
+        VIR_DEBUG("Waiting for %zu disk mirrors to finish", active);
+        return 0;
+    } else {
+        if (failed && *failed)
+            VIR_DEBUG("All disk mirrors are gone; some of them failed");
+        else
+            VIR_DEBUG("All disk mirrors are gone");
+        return 1;
     }
+}
+
+
+/*
+ * Returns 0 on success,
+ *         1 when job is already completed or it failed and failNoJob is false,
+ *         -1 on error or when job failed and failNoJob is true.
+ */
+static int
+qemuMigrationCancelOneDriveMirror(virQEMUDriverPtr driver,
+                                  virDomainObjPtr vm,
+                                  virDomainDiskDefPtr disk,
+                                  bool failNoJob)
+{
+    qemuDomainObjPrivatePtr priv = vm->privateData;
+    char *diskAlias = NULL;
+    int ret = -1;
+    int status;
+    int rv;
+
+    status = qemuBlockJobUpdate(driver, vm, disk);
+    switch (status) {
+    case VIR_DOMAIN_BLOCK_JOB_FAILED:
+    case VIR_DOMAIN_BLOCK_JOB_CANCELED:
+        if (failNoJob) {
+            virReportError(VIR_ERR_OPERATION_FAILED,
+                           _("migration of disk %s failed"),
+                           disk->dst);
+            return -1;
+        }
+        return 1;
+
+    case VIR_DOMAIN_BLOCK_JOB_COMPLETED:
+        return 1;
+    }
+
+    if (virAsprintf(&diskAlias, "%s%s",
+                    QEMU_DRIVE_HOST_PREFIX, disk->info.alias) < 0)
+        return -1;
+
+    if (qemuDomainObjEnterMonitorAsync(driver, vm,
+                                       QEMU_ASYNC_JOB_MIGRATION_OUT) < 0)
+        goto cleanup;
+
+    rv = qemuMonitorBlockJobCancel(priv->mon, diskAlias, true);
 
- endjob:
-    qemuBlockJobSyncEnd(driver, vm, disk, NULL);
+    if (qemuDomainObjExitMonitor(driver, vm) < 0 || rv < 0)
+        goto cleanup;
 
-    if (disk->mirrorState == VIR_DOMAIN_DISK_MIRROR_STATE_ABORT)
-        disk->mirrorState = VIR_DOMAIN_DISK_MIRROR_STATE_NONE;
+    ret = 0;
 
  cleanup:
     VIR_FREE(diskAlias);
@@ -1849,6 +1886,7 @@ qemuMigrationCancelOneDriveMirror(virQEMUDriverPtr driver,
  * qemuMigrationCancelDriveMirror:
  * @driver: qemu driver
  * @vm: domain
+ * @check: if true report an error when some of the mirrors fails
  *
  * Cancel all drive-mirrors started by qemuMigrationDriveMirror.
  * Any pending block job events for the affected disks will be
@@ -1858,28 +1896,48 @@ qemuMigrationCancelOneDriveMirror(virQEMUDriverPtr driver,
  */
 static int
 qemuMigrationCancelDriveMirror(virQEMUDriverPtr driver,
-                               virDomainObjPtr vm)
+                               virDomainObjPtr vm,
+                               bool check)
 {
     virErrorPtr err = NULL;
-    int ret = 0;
+    int ret = -1;
     size_t i;
+    int rv;
+    bool failed = false;
+    bool *failedPtr = check ? &failed : NULL;
+
+    VIR_DEBUG("Cancelling drive mirrors for domain %s", vm->def->name);
 
     for (i = 0; i < vm->def->ndisks; i++) {
         virDomainDiskDefPtr disk = vm->def->disks[i];
         qemuDomainDiskPrivatePtr diskPriv = QEMU_DOMAIN_DISK_PRIVATE(disk);
 
-        if (!diskPriv->migrating || !diskPriv->blockJobSync)
+        if (!diskPriv->migrating)
             continue;
 
-        if (qemuMigrationCancelOneDriveMirror(driver, vm, disk) < 0) {
-            ret = -1;
-            if (!err)
-                err = virSaveLastError();
+        rv = qemuMigrationCancelOneDriveMirror(driver, vm, disk, check);
+        if (rv != 0) {
+            if (rv < 0) {
+                if (!err)
+                    err = virSaveLastError();
+                failed = true;
+            }
+            qemuBlockJobSyncEnd(driver, vm, disk);
+            diskPriv->migrating = false;
         }
+    }
 
-        diskPriv->migrating = false;
+    while ((rv = qemuMigrationDriveMirrorCancelled(driver, vm,
+                                                   failedPtr)) != 1) {
+        if (failed && !err)
+            err = virSaveLastError();
+        if (rv < 0 || virThreadCondWait(&vm->parent.lock) < 0)
+            goto cleanup;
     }
 
+    ret = failed ? -1 : 0;
+
+ cleanup:
     if (err) {
         virSetError(err);
         virFreeError(err);
@@ -1924,6 +1982,9 @@ qemuMigrationDriveMirror(virQEMUDriverPtr driver,
     char *nbd_dest = NULL;
     char *hoststr = NULL;
     unsigned int mirror_flags = VIR_DOMAIN_BLOCK_REBASE_REUSE_EXT;
+    int rv;
+
+    VIR_DEBUG("Starting drive mirrors for domain %s", vm->def->name);
 
     /* steal NBD port and thus prevent its propagation back to destination */
     port = mig->nbd->port;
@@ -1950,60 +2011,46 @@ qemuMigrationDriveMirror(virQEMUDriverPtr driver,
             !virDomainDiskGetSource(disk))
             continue;
 
-        VIR_FREE(diskAlias);
-        VIR_FREE(nbd_dest);
         if ((virAsprintf(&diskAlias, "%s%s",
                          QEMU_DRIVE_HOST_PREFIX, disk->info.alias) < 0) ||
             (virAsprintf(&nbd_dest, "nbd:%s:%d:exportname=%s",
                          hoststr, port, diskAlias) < 0))
             goto cleanup;
 
+        if (qemuDomainObjEnterMonitorAsync(driver, vm,
+                                           QEMU_ASYNC_JOB_MIGRATION_OUT) < 0)
+            goto cleanup;
+
         qemuBlockJobSyncBegin(disk);
-
-        if (qemuDomainObjEnterMonitorAsync(driver, vm,
-                                           QEMU_ASYNC_JOB_MIGRATION_OUT) < 0) {
-            qemuBlockJobSyncEnd(driver, vm, disk, NULL);
-            goto cleanup;
-        }
-
         mon_ret = qemuMonitorDriveMirror(priv->mon, diskAlias, nbd_dest,
                                          NULL, speed, 0, 0, mirror_flags);
+        VIR_FREE(diskAlias);
+        VIR_FREE(nbd_dest);
 
         if (qemuDomainObjExitMonitor(driver, vm) < 0 || mon_ret < 0) {
-            qemuBlockJobSyncEnd(driver, vm, disk, NULL);
+            qemuBlockJobSyncEnd(driver, vm, disk);
             goto cleanup;
         }
         diskPriv->migrating = true;
     }
 
-    /* Wait for each disk to become ready in turn, but check the status
-     * for *all* mirrors to determine if any have aborted. */
-    for (i = 0; i < vm->def->ndisks; i++) {
-        virDomainDiskDefPtr disk = vm->def->disks[i];
-        qemuDomainDiskPrivatePtr diskPriv = QEMU_DOMAIN_DISK_PRIVATE(disk);
-
-        if (!diskPriv->migrating)
-            continue;
-
-        while (disk->mirrorState != VIR_DOMAIN_DISK_MIRROR_STATE_READY) {
-            /* The following check should be race free as long as the variable
-             * is set only with domain object locked. And here we have the
-             * domain object locked too. */
-            if (priv->job.asyncAbort) {
-                priv->job.current->type = VIR_DOMAIN_JOB_CANCELLED;
-                virReportError(VIR_ERR_OPERATION_ABORTED, _("%s: %s"),
-                               qemuDomainAsyncJobTypeToString(priv->job.asyncJob),
-                               _("canceled by client"));
-                goto cleanup;
-            }
-
-            if (qemuBlockJobSyncWaitWithTimeout(driver, vm, disk,
-                                                500ull, NULL) < 0)
-                goto cleanup;
-
-            if (qemuMigrationCheckDriveMirror(driver, vm) < 0)
-                goto cleanup;
+    while ((rv = qemuMigrationDriveMirrorReady(driver, vm)) != 1) {
+        unsigned long long now;
+
+        if (rv < 0)
+            goto cleanup;
+
+        if (priv->job.asyncAbort) {
+            priv->job.current->type = VIR_DOMAIN_JOB_CANCELLED;
+            virReportError(VIR_ERR_OPERATION_ABORTED, _("%s: %s"),
+                           qemuDomainAsyncJobTypeToString(priv->job.asyncJob),
+                           _("canceled by client"));
+            goto cleanup;
         }
+
+        if (virTimeMillisNow(&now) < 0 ||
+            virThreadCondWaitUntil(&vm->parent.lock, now + 500) < 0)
+            goto cleanup;
     }
 
     /* Okay, all disks are ready. Modify migrate_flags */
@@ -2436,7 +2483,8 @@ qemuMigrationWaitForCompletion(virQEMUDriverPtr driver,
                                virDomainObjPtr vm,
                                qemuDomainAsyncJob asyncJob,
                                virConnectPtr dconn,
-                               bool abort_on_error)
+                               bool abort_on_error,
+                               bool storage)
 {
     qemuDomainObjPrivatePtr priv = vm->privateData;
     qemuDomainJobInfoPtr jobInfo = priv->job.current;
@@ -2466,6 +2514,10 @@ qemuMigrationWaitForCompletion(virQEMUDriverPtr driver,
         if (qemuMigrationUpdateJobStatus(driver, vm, job, asyncJob) == -1)
             break;
 
+        if (storage &&
+            qemuMigrationDriveMirrorReady(driver, vm) < 0)
+            break;
+
         /* cancel migration if disk I/O error is emitted while migrating */
         if (abort_on_error &&
             virDomainObjGetState(vm, &pauseReason) == VIR_DOMAIN_PAUSED &&
@@ -3541,7 +3593,7 @@ qemuMigrationConfirmPhase(virQEMUDriverPtr driver,
         virErrorPtr orig_err = virSaveLastError();
 
         /* cancel any outstanding NBD jobs */
-        qemuMigrationCancelDriveMirror(driver, vm);
+        qemuMigrationCancelDriveMirror(driver, vm, false);
 
         virSetError(orig_err);
         virFreeError(orig_err);
@@ -4083,20 +4135,12 @@ qemuMigrationRun(virQEMUDriverPtr driver,
 
     rc = qemuMigrationWaitForCompletion(driver, vm,
                                         QEMU_ASYNC_JOB_MIGRATION_OUT,
-                                        dconn, abort_on_error);
+                                        dconn, abort_on_error, !!mig->nbd);
     if (rc == -2)
         goto cancel;
     else if (rc == -1)
         goto cleanup;
 
-    /* Confirm state of drive mirrors */
-    if (mig->nbd) {
-        if (qemuMigrationCheckDriveMirror(driver, vm) != 1) {
-            ret = -1;
-            goto cancel;
-        }
-    }
-
     /* When migration completed, QEMU will have paused the
      * CPUs for us, but unless we're using the JSON monitor
      * we won't have been notified of this, so might still
@@ -4120,7 +4164,7 @@ qemuMigrationRun(virQEMUDriverPtr driver,
 
     /* cancel any outstanding NBD jobs */
     if (mig && mig->nbd) {
-        if (qemuMigrationCancelDriveMirror(driver, vm) < 0)
+        if (qemuMigrationCancelDriveMirror(driver, vm, !!ret) < 0)
             ret = -1;
     }
 
@@ -5574,7 +5618,8 @@ qemuMigrationToFile(virQEMUDriverPtr driver, virDomainObjPtr vm,
     if (rc < 0)
         goto cleanup;
 
-    rc = qemuMigrationWaitForCompletion(driver, vm, asyncJob, NULL, false);
+    rc = qemuMigrationWaitForCompletion(driver, vm, asyncJob,
+                                        NULL, false, false);
 
     if (rc < 0) {
         if (rc == -2) {
diff --git a/src/qemu/qemu_process.c b/src/qemu/qemu_process.c
index 9c5d0f4..b66502c 100644
--- a/src/qemu/qemu_process.c
+++ b/src/qemu/qemu_process.c
@@ -1000,11 +1000,11 @@ qemuProcessHandleBlockJob(qemuMonitorPtr mon ATTRIBUTE_UNUSED,
         goto error;
     diskPriv = QEMU_DOMAIN_DISK_PRIVATE(disk);
 
-    if (diskPriv->blockJobSync) {
+    if (!virThreadQueueIsEmpty(diskPriv->blockJobQueue)) {
         diskPriv->blockJobType = type;
         diskPriv->blockJobStatus = status;
-        /* We have an SYNC API waiting for this event, dispatch it back */
-        virCondSignal(&diskPriv->blockJobSyncCond);
+        /* We have a SYNC API waiting for this event, dispatch it back */
+        virThreadQueueBroadcast(diskPriv->blockJobQueue);
     } else {
         /* there is no waiting SYNC API, dispatch the update to a thread */
         if (VIR_ALLOC(processEvent) < 0)
@@ -5060,8 +5060,7 @@ void qemuProcessStop(virQEMUDriverPtr driver,
     for (i = 0; i < vm->def->ndisks; i++) {
         qemuDomainDiskPrivatePtr diskPriv =
             QEMU_DOMAIN_DISK_PRIVATE(vm->def->disks[i]);
-        if (diskPriv->blockJobSync && diskPriv->blockJobStatus == -1)
-            virCondSignal(&diskPriv->blockJobSyncCond);
+        virThreadQueueBroadcast(diskPriv->blockJobQueue);
     }
 
     if ((logfile = qemuDomainCreateLog(driver, vm, true)) < 0) {
-- 
2.4.1




More information about the libvir-list mailing list