[libvirt] [PATCH 3/6] Implement disk streaming in the qemu driver

Adam Litke agl at us.ibm.com
Wed Nov 17 19:14:00 UTC 2010


Add support for: starting/stopping full device streaming, streaming a single
sector, and getting the status of streaming.  These operations are done by
using the 'stream' and 'info stream' qemu monitor commands.

* src/qemu/qemu_driver.c src/qemu/qemu_monitor_text.[ch]: implement disk
  streaming by using the stream and info stream text monitor commands
* src/qemu/qemu_monitor_json.[ch]: implement commands using the qmp monitor

Signed-off-by: Adam Litke <agl at us.ibm.com>
---
 src/qemu/qemu_driver.c       |   77 ++++++++++++++++++++-
 src/qemu/qemu_monitor.c      |   41 +++++++++++
 src/qemu/qemu_monitor.h      |    6 ++
 src/qemu/qemu_monitor_json.c |  104 ++++++++++++++++++++++++++++
 src/qemu/qemu_monitor_json.h |    7 ++
 src/qemu/qemu_monitor_text.c |  156 ++++++++++++++++++++++++++++++++++++++++++
 src/qemu/qemu_monitor_text.h |    8 ++
 7 files changed, 397 insertions(+), 2 deletions(-)

diff --git a/src/qemu/qemu_driver.c b/src/qemu/qemu_driver.c
index dbde9e7..d7c049a 100644
--- a/src/qemu/qemu_driver.c
+++ b/src/qemu/qemu_driver.c
@@ -13143,6 +13143,79 @@ cleanup:
     return ret;
 }
 
+static unsigned long long
+qemudDomainStreamDisk (virDomainPtr dom, const char *path,
+                       unsigned long long offset, unsigned int flags)
+{
+    struct qemud_driver *driver = dom->conn->privateData;
+    virDomainObjPtr vm;
+    unsigned long long ret = -1;
+
+    qemuDriverLock(driver);
+    vm = virDomainFindByUUID(&driver->domains, dom->uuid);
+    qemuDriverUnlock(driver);
+
+    if (!vm) {
+        char uuidstr[VIR_UUID_STRING_BUFLEN];
+        virUUIDFormat(dom->uuid, uuidstr);
+        qemuReportError(VIR_ERR_NO_DOMAIN,
+                        _("no domain with matching uuid '%s'"), uuidstr);
+        goto cleanup;
+    }
+
+    if (virDomainObjIsActive(vm)) {
+        qemuDomainObjPrivatePtr priv = vm->privateData;
+        qemuDomainObjEnterMonitor(vm);
+        ret = qemuMonitorStreamDisk(priv->mon, path, offset, flags);
+        qemuDomainObjExitMonitor(vm);
+    } else {
+        qemuReportError(VIR_ERR_OPERATION_INVALID,
+                        "%s", _("domain is not running"));
+    }
+
+cleanup:
+    if (vm)
+        virDomainObjUnlock(vm);
+    return ret;
+}
+
+static int
+qemudDomainStreamDiskInfo (virDomainPtr dom, virStreamDiskStatePtr states,
+                           unsigned int nr_states,
+                           unsigned int flags ATTRIBUTE_UNUSED)
+{
+    struct qemud_driver *driver = dom->conn->privateData;
+    virDomainObjPtr vm;
+    unsigned int ret = -1;
+
+    qemuDriverLock(driver);
+    vm = virDomainFindByUUID(&driver->domains, dom->uuid);
+    qemuDriverUnlock(driver);
+
+    if (!vm) {
+        char uuidstr[VIR_UUID_STRING_BUFLEN];
+        virUUIDFormat(dom->uuid, uuidstr);
+        qemuReportError(VIR_ERR_NO_DOMAIN,
+                        _("no domain with matching uuid '%s'"), uuidstr);
+        goto cleanup;
+    }
+
+    if (virDomainObjIsActive(vm)) {
+        qemuDomainObjPrivatePtr priv = vm->privateData;
+        qemuDomainObjEnterMonitor(vm);
+        ret = qemuMonitorStreamDiskInfo(priv->mon, states, nr_states);
+        qemuDomainObjExitMonitor(vm);
+    } else {
+        qemuReportError(VIR_ERR_OPERATION_INVALID,
+                        "%s", _("domain is not running"));
+    }
+
+cleanup:
+    if (vm)
+        virDomainObjUnlock(vm);
+    return ret;
+}
+
 static int qemuDomainMonitorCommand(virDomainPtr domain, const char *cmd,
                                     char **result, unsigned int flags)
 {
@@ -13298,8 +13371,8 @@ static virDriver qemuDriver = {
     qemuDomainMonitorCommand, /* qemuDomainMonitorCommand */
     qemuDomainSetMemoryParameters, /* domainSetMemoryParameters */
     qemuDomainGetMemoryParameters, /* domainGetMemoryParameters */
-    NULL, /* domainStreamDisk */
-    NULL, /* domainStreamDiskInfo */
+    qemudDomainStreamDisk, /* domainStreamDisk */
+    qemudDomainStreamDiskInfo, /* domainStreamDiskInfo */
 };
 
 
diff --git a/src/qemu/qemu_monitor.c b/src/qemu/qemu_monitor.c
index 2366fdb..9169e23 100644
--- a/src/qemu/qemu_monitor.c
+++ b/src/qemu/qemu_monitor.c
@@ -1917,6 +1917,47 @@ int qemuMonitorDeleteSnapshot(qemuMonitorPtr mon, const char *name)
     return ret;
 }
 
+unsigned long long
+qemuMonitorStreamDisk(qemuMonitorPtr mon, const char *path,
+                      unsigned long long offset, unsigned int flags)
+{
+    unsigned long long ret;
+
+    DEBUG("mon=%p, path=%p, offset=%llu, flags=%u", mon, path, offset, flags);
+
+    if (!mon) {
+        qemuReportError(VIR_ERR_INVALID_ARG, "%s",
+                        _("monitor must not be NULL"));
+        return -1;
+    }
+
+    if (mon->json)
+        ret = qemuMonitorJSONStreamDisk(mon, path, offset, flags);
+    else
+        ret = qemuMonitorTextStreamDisk(mon, path, offset, flags);
+    return ret;
+}
+
+int qemuMonitorStreamDiskInfo(qemuMonitorPtr mon, virStreamDiskStatePtr states,
+                              unsigned int nr_states)
+{
+    int ret;
+
+    DEBUG("mon=%p, states=%p, nr_states=%u", mon, states, nr_states);
+
+    if (!mon) {
+        qemuReportError(VIR_ERR_INVALID_ARG, "%s",
+                        _("monitor must not be NULL"));
+        return -1;
+    }
+
+    if (mon->json)
+        ret = qemuMonitorJSONStreamDiskInfo(mon, states, nr_states);
+    else
+        ret = qemuMonitorTextStreamDiskInfo(mon, states, nr_states);
+    return ret;
+}
+
 int qemuMonitorArbitraryCommand(qemuMonitorPtr mon, const char *cmd, char **reply)
 {
     int ret;
diff --git a/src/qemu/qemu_monitor.h b/src/qemu/qemu_monitor.h
index 7d09145..719de76 100644
--- a/src/qemu/qemu_monitor.h
+++ b/src/qemu/qemu_monitor.h
@@ -389,6 +389,12 @@ int qemuMonitorCreateSnapshot(qemuMonitorPtr mon, const char *name);
 int qemuMonitorLoadSnapshot(qemuMonitorPtr mon, const char *name);
 int qemuMonitorDeleteSnapshot(qemuMonitorPtr mon, const char *name);
 
+unsigned long long
+qemuMonitorStreamDisk(qemuMonitorPtr mon, const char *path,
+                      unsigned long long offset, unsigned int flags);
+int qemuMonitorStreamDiskInfo(qemuMonitorPtr mon, virStreamDiskStatePtr states,
+                              unsigned int nr_states);
+
 int qemuMonitorArbitraryCommand(qemuMonitorPtr mon, const char *cmd, char **reply);
 
 /**
diff --git a/src/qemu/qemu_monitor_json.c b/src/qemu/qemu_monitor_json.c
index d2c6f0a..15e0c5b 100644
--- a/src/qemu/qemu_monitor_json.c
+++ b/src/qemu/qemu_monitor_json.c
@@ -2342,6 +2342,110 @@ int qemuMonitorJSONDeleteSnapshot(qemuMonitorPtr mon, const char *name)
     return ret;
 }
 
+static int qemuMonitorJSONExtractStreamState(virJSONValuePtr reply,
+                                             virStreamDiskStatePtr state)
+{
+    virJSONValuePtr data;
+    int ret = -1;
+    const char *path;
+    unsigned long long offset, size;
+
+    if (!(data = virJSONValueObjectGet(reply, "return"))) {
+        qemuReportError(VIR_ERR_INTERNAL_ERROR, "%s",
+                        _("stream reply was missing return data"));
+        goto cleanup;
+    }
+
+    if ((path = virJSONValueObjectGetString(data, "device"))) {
+        if (!virJSONValueObjectGetNumberUlong(data, "offset", &offset)) {
+            qemuReportError(VIR_ERR_INTERNAL_ERROR, "%s",
+                            _("stream reply was missing offset"));
+            goto cleanup;
+        }
+        if (!virJSONValueObjectGetNumberUlong(data, "size", &size)) {
+            qemuReportError(VIR_ERR_INTERNAL_ERROR, "%s",
+                            _("stream reply was missing size"));
+            goto cleanup;
+        }
+
+        memcpy(state->path, path, strlen(path));
+        state->offset = offset;
+        state->size = size;
+        ret = 1;
+    } else {
+        /* No currently active streams */
+        ret = 0;
+    }
+
+cleanup:
+    return ret;
+}
+
+unsigned long long
+qemuMonitorJSONStreamDisk(qemuMonitorPtr mon, const char *path,
+                          unsigned long long offset, unsigned int flags)
+{
+    virJSONValuePtr cmd = NULL;
+    virJSONValuePtr reply = NULL;
+    struct _virStreamDiskState state;
+    int rc;
+    unsigned long long ret = -1;
+
+    if (flags == VIR_STREAM_DISK_START)
+        cmd = qemuMonitorJSONMakeCommand("stream", "b:all", "true",
+                                         "s:device", path, NULL);
+    else if (flags == VIR_STREAM_DISK_STOP)
+        cmd = qemuMonitorJSONMakeCommand("stream", "b:stop", "true",
+                                         "s:device", path, NULL);
+    else if (flags == VIR_STREAM_DISK_ONE)
+        cmd = qemuMonitorJSONMakeCommand("stream", "s:device", path,
+                                         "i:offset", offset, NULL);
+    else
+        qemuReportError(VIR_ERR_INTERNAL_ERROR, "Invalid argument for flags: "
+                        "%u", flags);
+
+    if (!cmd)
+        return -1;
+
+    if (qemuMonitorJSONCommand(mon, cmd, &reply) < 0)
+        goto cleanup;
+    rc = qemuMonitorJSONExtractStreamState(reply, &state);
+    if (rc == 0 && (flags == VIR_STREAM_DISK_START ||
+                    flags == VIR_STREAM_DISK_STOP))
+        ret = 0;
+    if (rc == 1 && flags == VIR_STREAM_DISK_ONE)
+        ret = state.offset;
+
+cleanup:
+    virJSONValueFree(cmd);
+    virJSONValueFree(reply);
+    return ret;
+}
+
+int qemuMonitorJSONStreamDiskInfo(qemuMonitorPtr mon,
+                                  virStreamDiskStatePtr states,
+                                  unsigned int nr_states)
+{
+    virJSONValuePtr cmd = NULL;
+    virJSONValuePtr reply = NULL;
+    int ret = -1;
+
+    /* Qemu only supports one stream at a time */
+    nr_states = 1;
+
+    cmd = qemuMonitorJSONMakeCommand("query-stream", NULL);
+    if (!cmd)
+        return -1;
+
+    if (qemuMonitorJSONCommand(mon, cmd, &reply) < 0)
+        goto cleanup;
+    ret = qemuMonitorJSONExtractStreamState(reply, states);
+cleanup:
+    virJSONValueFree(cmd);
+    virJSONValueFree(reply);
+    return ret;
+}
+
 int qemuMonitorJSONArbitraryCommand(qemuMonitorPtr mon,
                                     const char *cmd_str,
                                     char **reply_str)
diff --git a/src/qemu/qemu_monitor_json.h b/src/qemu/qemu_monitor_json.h
index 94806c1..f4db2b4 100644
--- a/src/qemu/qemu_monitor_json.h
+++ b/src/qemu/qemu_monitor_json.h
@@ -196,6 +196,13 @@ int qemuMonitorJSONCreateSnapshot(qemuMonitorPtr mon, const char *name);
 int qemuMonitorJSONLoadSnapshot(qemuMonitorPtr mon, const char *name);
 int qemuMonitorJSONDeleteSnapshot(qemuMonitorPtr mon, const char *name);
 
+unsigned long long
+qemuMonitorJSONStreamDisk(qemuMonitorPtr mon, const char *path,
+                          unsigned long long offset, unsigned int flags);
+int qemuMonitorJSONStreamDiskInfo(qemuMonitorPtr mon,
+                                  virStreamDiskStatePtr states,
+                                  unsigned int nr_states);
+
 int qemuMonitorJSONArbitraryCommand(qemuMonitorPtr mon,
                                     const char *cmd_str,
                                     char **reply_str);
diff --git a/src/qemu/qemu_monitor_text.c b/src/qemu/qemu_monitor_text.c
index d7e128c..115b220 100644
--- a/src/qemu/qemu_monitor_text.c
+++ b/src/qemu/qemu_monitor_text.c
@@ -2569,6 +2569,162 @@ cleanup:
     return ret;
 }
 
+static int qemuMonitorParseStreamInfo(char *text,
+                                      virStreamDiskStatePtr state)
+{
+    char *p;
+    unsigned long long data;
+    unsigned int device_len;
+
+    memset(state->path, 0, VIR_STREAM_PATH_BUFLEN);
+    state->offset = 0;
+    state->size = 0;
+
+    if (strstr(text, "Device '") && strstr(text, "' not found")) {
+        qemuReportError(VIR_ERR_OPERATION_INVALID, "%s", _("Device not found"));
+        return -1;
+    }
+
+    if (strstr(text, "expects a sector size less than device length")) {
+        qemuReportError(VIR_ERR_OPERATION_INVALID, "%s",
+                        _("Offset parameter is greater than the device size"));
+        return -1;
+    }
+
+    if (strstr(text, "Device '") && strstr(text, "' is in use")) {
+        qemuReportError(VIR_ERR_OPERATION_FAILED, "%s",
+                        _("Another streaming operation is in progress"));
+        return -1;
+    }
+
+    if (strstr(text, "No active stream") || STREQ(text, ""))
+        return 0;
+
+    if ((text = STRSKIP(text, "Streaming device ")) == NULL)
+        return -1;
+
+    /* Parse the device path */
+    p = strstr(text, ": Completed ");
+    if (!p)
+        return -1;
+
+    device_len = (unsigned int)(p - text);
+    if (device_len >= VIR_STREAM_PATH_BUFLEN) {
+        qemuReportError(VIR_ERR_OPERATION_FAILED, "%s",
+                        "Device name is too long");
+       return -1;
+    }
+
+    if (sprintf((char *)&state->path, "%.*s", device_len, text) < 0) {
+        qemuReportError(VIR_ERR_OPERATION_FAILED, "%s",
+                        "Unable to store device name");
+        return -1;
+    }
+    text = p + 12; /* Skip over ": Completed " */
+
+    /* Parse the current sector offset */
+    if (virStrToLong_ull (text, &p, 10, &data))
+        return -1;
+    state->offset = (size_t) data;
+    text = p;
+
+    /* Parse the total number of sectors */
+    if (!STRPREFIX(text, " of "))
+        return -1;
+    text += 4;
+    if (virStrToLong_ull (text, &p, 10, &data))
+        return -1;
+    state->size = (size_t) data;
+    text = p;
+
+    /* Verify the ending */
+    if (!STRPREFIX(text, " sectors"))
+        return -1;
+
+    return 1;
+}
+
+unsigned long long
+qemuMonitorTextStreamDisk(qemuMonitorPtr mon, const char *path,
+                          unsigned long long offset, unsigned int flags)
+{
+    char *cmd;
+    char *reply = NULL;
+    int rc;
+    unsigned long long ret = -1;
+    virStreamDiskState state;
+
+    if (flags == VIR_STREAM_DISK_START)
+        rc = virAsprintf(&cmd, "stream -a %s", path);
+    else if (flags == VIR_STREAM_DISK_STOP)
+        rc = virAsprintf(&cmd, "stream -s %s", path);
+    else if (flags == VIR_STREAM_DISK_ONE)
+        rc = virAsprintf(&cmd, "stream %s %llu", path, offset);
+    else {
+        qemuReportError(VIR_ERR_OPERATION_INVALID, "%s%u",
+                        _("invalid value for flags: "), flags);
+        return -1;
+    }
+
+    if (rc < 0) {
+        virReportOOMError();
+        return -1;
+    }
+
+    if (qemuMonitorCommand(mon, cmd, &reply)) {
+        qemuReportError(VIR_ERR_OPERATION_FAILED,
+                         _("failed to perform stream command '%s'"),
+                         cmd);
+        goto cleanup;
+    }
+
+    rc = qemuMonitorParseStreamInfo(reply, &state);
+    if (rc == 0 && (flags == VIR_STREAM_DISK_START ||
+                    flags == VIR_STREAM_DISK_STOP))
+        ret = 0; /* A successful full disk start or stop produces no output */
+    if (rc == 1 && flags == VIR_STREAM_DISK_ONE)
+        ret = state.offset;
+
+cleanup:
+    VIR_FREE(cmd);
+    VIR_FREE(reply);
+    return ret;
+}
+
+int qemuMonitorTextStreamDiskInfo(qemuMonitorPtr mon,
+                                  virStreamDiskStatePtr states,
+                                  unsigned int nr_states)
+{
+    char *cmd;
+    char *reply = NULL;
+    int ret = -1;
+
+    /* Qemu only supports one stream at a time */
+    nr_states = 1;
+
+    if (virAsprintf(&cmd, "info stream") < 0) {
+        virReportOOMError();
+        return -1;
+    }
+
+    if (qemuMonitorCommand(mon, cmd, &reply)) {
+        qemuReportError(VIR_ERR_OPERATION_FAILED,
+                         _("failed to perform stream command '%s'"),
+                         cmd);
+        goto cleanup;
+    }
+
+    ret = qemuMonitorParseStreamInfo(reply, states);
+    if (ret == -1)
+        qemuReportError(VIR_ERR_OPERATION_FAILED,
+                        _("Failed to parse monitor output: '%s'"), reply);
+
+cleanup:
+    VIR_FREE(cmd);
+    VIR_FREE(reply);
+    return ret;
+}
+
 int qemuMonitorTextArbitraryCommand(qemuMonitorPtr mon, const char *cmd,
                                     char **reply)
 {
diff --git a/src/qemu/qemu_monitor_text.h b/src/qemu/qemu_monitor_text.h
index c017509..4d4aaa3 100644
--- a/src/qemu/qemu_monitor_text.h
+++ b/src/qemu/qemu_monitor_text.h
@@ -194,6 +194,14 @@ int qemuMonitorTextCreateSnapshot(qemuMonitorPtr mon, const char *name);
 int qemuMonitorTextLoadSnapshot(qemuMonitorPtr mon, const char *name);
 int qemuMonitorTextDeleteSnapshot(qemuMonitorPtr mon, const char *name);
 
+unsigned long long
+qemuMonitorTextStreamDisk(qemuMonitorPtr mon, const char *path,
+                          unsigned long long offset, unsigned int flags);
+int qemuMonitorTextStreamDiskInfo(qemuMonitorPtr mon,
+                                  virStreamDiskStatePtr states,
+                                  unsigned int nr_states);
+
+
 int qemuMonitorTextArbitraryCommand(qemuMonitorPtr mon, const char *cmd,
                                     char **reply);
 
-- 
1.7.3.2.164.g6f10c




More information about the libvir-list mailing list