[Libguestfs] [PATCH libnbd v2 3/3] python: Add test for doing asynch copy from one handle to another.

Richard W.M. Jones rjones at redhat.com
Sun Aug 11 10:26:13 UTC 2019


---
 python/t/590-aio-copy.py | 123 +++++++++++++++++++++++++++++++++++++++
 1 file changed, 123 insertions(+)

diff --git a/python/t/590-aio-copy.py b/python/t/590-aio-copy.py
new file mode 100644
index 0000000..999491b
--- /dev/null
+++ b/python/t/590-aio-copy.py
@@ -0,0 +1,123 @@
+# libnbd Python bindings
+# Copyright (C) 2010-2019 Red Hat Inc.
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+
+import select
+import nbd
+
+disk_size = 512 * 1024 * 1024
+bs = 65536
+max_reads_in_flight = 16
+bytes_read = 0
+bytes_written = 0
+
+def asynch_copy (src, dst):
+    size = src.get_size ()
+
+    # This is our reading position in the source.
+    soff = 0
+
+    # This callback is called when any pread from the source
+    # has completed.
+    writes = []
+    def read_completed (buf, offset, error):
+        global bytes_read
+        bytes_read += buf.size ()
+        wr = (buf, offset)
+        writes.append (wr)
+        # By returning 1 here we auto-retire the pread command.
+        return 1
+
+    # This callback is called when any pwrite to the destination
+    # has completed.
+    def write_completed (buf, error):
+        global bytes_written
+        bytes_written += buf.size ()
+        buf.free ()
+        # By returning 1 here we auto-retire the pwrite command.
+        return 1
+
+    # The main loop which runs until we have finished reading and
+    # there are no more commands in flight.
+    while soff < size or dst.aio_in_flight () > 0:
+        # If we're able to submit more reads from the source
+        # then do so now.
+        if soff < size and src.aio_in_flight () < max_reads_in_flight:
+            bufsize = min (bs, size - soff)
+            buf = nbd.Buffer (bufsize)
+            # NB: Python lambdas are BROKEN.
+            # https://stackoverflow.com/questions/2295290
+            src.aio_pread_callback (buf, soff,
+                                    lambda err, buf=buf, soff=soff:
+                                    read_completed (buf, soff, err))
+            soff += bufsize
+
+        # If there are any write commands waiting to be issued
+        # to the destination, send them now.
+        for buf, offset in writes:
+            # See above link about broken Python lambdas.
+            dst.aio_pwrite_callback (buf, offset,
+                                     lambda err, buf=buf:
+                                     write_completed (buf, err))
+        writes = []
+
+        poll = select.poll ()
+
+        sfd = src.aio_get_fd ()
+        dfd = dst.aio_get_fd ()
+
+        sevents = 0
+        devents = 0
+        if src.aio_get_direction () & nbd.AIO_DIRECTION_READ:
+            sevents += select.POLLIN
+        if src.aio_get_direction () & nbd.AIO_DIRECTION_WRITE:
+            sevents += select.POLLOUT
+        if dst.aio_get_direction () & nbd.AIO_DIRECTION_READ:
+            devents += select.POLLIN
+        if dst.aio_get_direction () & nbd.AIO_DIRECTION_WRITE:
+            devents += select.POLLOUT
+        poll.register (sfd, sevents)
+        poll.register (dfd, devents)
+        for (fd, revents) in poll.poll ():
+            # The direction of each handle can change since we
+            # slept in the select.
+            if fd == sfd and revents & select.POLLIN and \
+               src.aio_get_direction () & nbd.AIO_DIRECTION_READ:
+                src.aio_notify_read ()
+            elif fd == sfd and revents & select.POLLOUT and \
+                 src.aio_get_direction () & nbd.AIO_DIRECTION_WRITE:
+                src.aio_notify_write ()
+            elif fd == dfd and revents & select.POLLIN and \
+                 dst.aio_get_direction () & nbd.AIO_DIRECTION_READ:
+                dst.aio_notify_read ()
+            elif fd == dfd and revents & select.POLLOUT and \
+                 dst.aio_get_direction () & nbd.AIO_DIRECTION_WRITE:
+                dst.aio_notify_write ()
+
+src = nbd.NBD ()
+src.set_handle_name ("src")
+dst = nbd.NBD ()
+dst.set_handle_name ("dst")
+src.connect_command (["nbdkit", "-s", "--exit-with-parent", "-r",
+                      "pattern", "size=%d" % disk_size])
+dst.connect_command (["nbdkit", "-s", "--exit-with-parent",
+                      "memory", "size=%d" % disk_size])
+asynch_copy (src, dst)
+
+print ("bytes read: %d written: %d size: %d" %
+       (bytes_read, bytes_written, disk_size))
+assert bytes_read == disk_size
+assert bytes_written == disk_size
-- 
2.22.0




More information about the Libguestfs mailing list