[libvirt] [PATCH 8/8] test: Python Unittests for DomainBlockPull API

Adam Litke agl at us.ibm.com
Tue Jun 14 14:36:54 UTC 2011


*** Please do not consider for merging, example tests only ***

Here are the testcases I am currently running to verify the correctness of this
API.  These are continuing to evolve.

Signed-off-by: Adam Litke <agl at us.ibm.com>
---
 blockPull-test.py |  301 +++++++++++++++++++++++++++++++++++++++++++++++++++++
 1 files changed, 301 insertions(+), 0 deletions(-)
 create mode 100755 blockPull-test.py

diff --git a/blockPull-test.py b/blockPull-test.py
new file mode 100755
index 0000000..4001477
--- /dev/null
+++ b/blockPull-test.py
@@ -0,0 +1,301 @@
+#!/usr/bin/env python
+
+import sys
+import subprocess
+import time
+import unittest
+import re
+import threading
+import libvirt
+
+qemu_img_bin = "/home/aglitke/src/qemu/qemu-img"
+virsh_bin = "/home/aglitke/src/libvirt/tools/virsh"
+
+dom_xml = """
+<domain type='kvm'>
+  <name>blockPull-test</name>
+  <memory>131072</memory>
+  <currentMemory>131072</currentMemory>
+  <vcpu>1</vcpu>
+  <os>
+    <type arch='x86_64' machine='pc-0.13'>hvm</type>
+    <boot dev='hd'/>
+  </os>
+  <features>
+    <acpi/>
+    <apic/>
+    <pae/>
+  </features>
+  <clock offset='utc'/>
+  <on_poweroff>destroy</on_poweroff>
+  <on_reboot>restart</on_reboot>
+  <on_crash>restart</on_crash>
+  <devices>
+    <emulator>/home/aglitke/src/qemu/x86_64-softmmu/qemu-system-x86_64</emulator>
+    <disk type='file' device='disk'>
+      <driver name='qemu' type='qed'/>
+      <source file='/tmp/disk1.qed' />
+      <target dev='vda' bus='virtio'/>
+    </disk>
+    <disk type='file' device='disk'>
+      <driver name='qemu' type='qed'/>
+      <source file='/tmp/disk2.qed' />
+      <target dev='vdb' bus='virtio'/>
+    </disk>
+    <disk type='file' device='disk'>
+      <driver name='qemu' type='raw'/>
+      <source file='/tmp/disk3.raw' />
+      <target dev='vdc' bus='virtio'/>
+    </disk>
+    <graphics type='vnc' port='-1' autoport='yes'/>
+  </devices>
+</domain>
+"""
+
+def qemu_img(*args):
+    global qemu_img_bin
+
+    devnull = open('/dev/null', 'r+')
+    return subprocess.call([qemu_img_bin] + list(args), stdin=devnull, stdout=devnull)
+
+def virsh(*args):
+    global virsh_bin
+
+    devnull = open('/dev/null', 'r+')
+    return subprocess.call([virsh_bin] + list(args),
+                           stdin=devnull, stdout=devnull, stderr=devnull)
+
+def make_baseimage(name, size_mb):
+    devnull = open('/dev/null', 'r+')
+    return subprocess.call(['dd', 'if=/dev/zero', "of=%s" % name, 'bs=1M',
+                            'count=%i' % size_mb], stdin=devnull, stdout=devnull, stderr=devnull)
+
+def has_backing_file(path):
+    global qemu_img_bin
+    p1 = subprocess.Popen([qemu_img_bin, "info", path],
+                          stdout=subprocess.PIPE).communicate()[0]
+    matches = re.findall("^backing file:", p1, re.M)
+    if len(matches) > 0:
+        return True
+    return False
+
+class BlockPullTestCase(unittest.TestCase):
+    def _error_handler(self, ctx, error, dummy=None):
+        pass
+
+    def create_disks(self, sparse):
+        self.disks = [ '/tmp/disk1.qed', '/tmp/disk2.qed', '/tmp/disk3.raw' ]
+        if sparse:
+            qemu_img('create', '-f', 'raw', '/tmp/backing1.img', '100M')
+            qemu_img('create', '-f', 'raw', '/tmp/backing2.img', '100M')
+        else:
+            make_baseimage('/tmp/backing1.img', 100)
+            make_baseimage('/tmp/backing2.img', 100)
+        qemu_img('create', '-f', 'qed', '-o', 'backing_file=/tmp/backing1.img,copy_on_read=on', self.disks[0])
+        qemu_img('create', '-f', 'qed', '-o', 'backing_file=/tmp/backing2.img,copy_on_read=on', self.disks[1])
+        qemu_img('create', '-f', 'raw', self.disks[2], '100M')
+
+    def begin(self, sparse=True):
+        global dom_xml
+
+        libvirt.registerErrorHandler(self._error_handler, None)
+        self.create_disks(sparse)
+        self.conn = libvirt.open('qemu:///system')
+        self.dom = self.conn.createXML(dom_xml, 0)
+
+    def end(self):
+        self.dom.destroy()
+        self.conn.close()
+
+class TestBasicErrors(BlockPullTestCase):
+    def setUp(self):
+        self.begin()
+
+    def tearDown(self):
+        self.end()
+
+    def test_bad_path(self):
+        try:
+            self.dom.blockPull('/dev/null', 0)
+        except libvirt.libvirtError, e:
+            self.assertEqual(libvirt.VIR_ERR_INVALID_ARG, e.get_error_code())
+        else:
+            e = self.conn.virConnGetLastError()
+            self.assertEqual(libvirt.VIR_ERR_INVALID_ARG, e[0])
+
+    def test_abort_no_stream(self):
+        try:
+            self.dom.blockPullAbort(self.disks[0], 0)
+        except libvirt.libvirtError, e:
+            self.assertEqual(libvirt.VIR_ERR_OPERATION_INVALID, e.get_error_code())
+        else:
+            e = self.conn.virConnGetLastError()
+            self.assertEqual(libvirt.VIR_ERR_OPERATION_INVALID, e[0])
+
+    def test_start_same_twice(self):
+        self.dom.blockPullAll(self.disks[0], 0)
+        try:
+            self.dom.blockPullAll(self.disks[0], 0)
+        except libvirt.libvirtError, e:
+            self.assertEqual(libvirt.VIR_ERR_OPERATION_FAILED, e.get_error_code())
+        else:
+            e = self.conn.virConnGetLastError()
+            self.assertEqual(libvirt.VIR_ERR_OPERATION_FAILED, e[0])
+
+    def test_unsupported_disk(self):
+        try:
+            self.dom.blockPull(self.disks[2], 0)
+        except libvirt.libvirtError, e:
+            self.assertEqual(libvirt.VIR_ERR_OPERATION_INVALID, e.get_error_code())
+        else:
+            e = self.conn.virConnGetLastError()
+            self.assertEqual(libvirt.VIR_ERR_OPERATION_INVALID, e[0])
+
+class TestBasicCommands(BlockPullTestCase):
+    def setUp(self):
+        pass
+
+    def tearDown(self):
+        self.end()
+
+    def test_start_stop(self):
+        self.begin(sparse=False)
+        self.dom.blockPullAll(self.disks[0], 0)
+        time.sleep(1)
+        self.assertIsNot(None, self.dom.blockPullInfo(self.disks[0], 0))
+        self.dom.blockPullAbort(self.disks[0], 0)
+        time.sleep(1)
+        self.assertIs(None, self.dom.blockPullInfo(self.disks[0], 0))
+
+        # Restarting the stream as incremental should not show up in
+        # blockPullInfo
+        self.dom.blockPull(self.disks[0], 0)
+        self.assertIs(None, self.dom.blockPullInfo(self.disks[0], 0))
+
+    def test_whole_disk(self):
+        self.begin()
+        self.assertTrue(has_backing_file(self.disks[0]))
+        self.dom.blockPullAll(self.disks[0], 0)
+        for i in xrange(1, 5):
+            if self.dom.blockPullInfo(self.disks[0], 0) is None:
+                break
+            time.sleep(1)
+        self.assertFalse(has_backing_file(self.disks[0]))
+
+    def test_incremental(self):
+        self.begin()
+        last_cur = -1
+        while True:
+            info = self.dom.blockPull(self.disks[0], 0)
+            if info is None or info['cur'] == info['end']:
+                break
+            self.assertGreater(info['cur'], last_cur)
+            last_cur = info['cur']
+        self.assertFalse(has_backing_file(self.disks[0]))
+
+    def test_interleaved(self):
+        self.begin()
+        disk_list = range(2)
+        while True:
+            for i in disk_list:
+                info = self.dom.blockPull(self.disks[i], 0)
+                if info is None or info['cur'] == info['end']:
+                    disk_list.remove(i)
+            if len(disk_list) == 0:
+                break
+        for i in range(2):
+            self.assertFalse(has_backing_file(self.disks[i]))
+
+    def test_two_disks_at_once(self):
+        self.begin()
+        disk_list = range(2)
+        for d in disk_list:
+            self.dom.blockPullAll(self.disks[d], 0)
+
+        for i in xrange(5):
+            for d in disk_list:
+                info = self.dom.blockPullInfo(self.disks[d], 0)
+                if info is None:
+                    disk_list.remove(d)
+            if len(disk_list) == 0:
+                break
+            time.sleep(1)
+        for d in range(2):
+            self.assertFalse(has_backing_file(self.disks[d]))
+
+class TestEvents(BlockPullTestCase):
+    def eventLoopRun(self):
+        while self.do_events:
+            libvirt.virEventRunDefaultImpl()
+
+    def eventLoopStart(self):
+        libvirt.virEventRegisterDefaultImpl()
+        self.eventLoopThread = threading.Thread(target=self.eventLoopRun, name="libvirtEventLoop")
+        self.eventLoopThread.setDaemon(True)
+        self.do_events = True
+        self.eventLoopThread.start()
+
+    def eventLoopStop(self):
+        self.do_events = False
+
+    def setUp(self):
+        self.eventLoopStart()
+
+    def tearDown(self):
+        self.end()
+
+    @staticmethod
+    def recordBlockPullEvent(conn, dom, path, status, inst):
+        inst.event = (dom, path, status)
+
+    def test_event_complete(self):
+        self.begin()
+        self.event = None
+        self.conn.domainEventRegisterAny(self.dom, libvirt.VIR_DOMAIN_EVENT_ID_BLOCK_PULL,
+                                            TestEvents.recordBlockPullEvent, self)
+        self.dom.blockPullAll(self.disks[0], 0)
+        for i in xrange(1, 5):
+            if self.event is not None:
+                break
+            time.sleep(1)
+        self.eventLoopStop()
+        self.assertIsNot(None, self.event)
+        self.assertFalse(has_backing_file(self.disks[0]))
+        self.assertEqual(self.event[1], self.disks[0])
+        self.assertEqual(self.event[2], libvirt.VIR_DOMAIN_BLOCK_PULL_COMPLETED)
+
+class TestVirsh(BlockPullTestCase):
+    def setUp(self):
+        pass
+
+    def tearDown(self):
+        self.end()
+
+    def test_blockpull(self):
+        self.begin()
+        info1 = self.dom.blockPull(self.disks[0], 0)
+        virsh('blockpull', self.dom.name(), self.disks[0])
+        info2 = self.dom.blockPull(self.disks[0], 0)
+        self.assertGreater(info2['cur'], info1['cur'])
+
+    def test_blockpull_all(self):
+        self.begin()
+        virsh('blockpull', self.dom.name(), '--all', self.disks[0])
+        for i in xrange(1, 5):
+            if self.dom.blockPullInfo(self.disks[0], 0) is None:
+                break
+            time.sleep(1)
+        self.assertFalse(has_backing_file(self.disks[0]))
+
+    def test_blockpull_abort(self):
+        self.begin(sparse=False)
+        self.dom.blockPullAll(self.disks[0], 0)
+        time.sleep(1)
+        self.assertIsNot(None, self.dom.blockPullInfo(self.disks[0], 0))
+        virsh('blockpull', self.dom.name(), '--abort', self.disks[0])
+        time.sleep(1)
+        self.assertIs(None, self.dom.blockPullInfo(self.disks[0], 0))
+        self.assertTrue(has_backing_file(self.disks[0]))
+
+if __name__ == '__main__':
+    unittest.main()
-- 
1.7.3




More information about the libvir-list mailing list