[Libguestfs] [PATCH disk-sync 5/5] Convert disk_sync inner loop to asynchronous.

Richard W.M. Jones rjones at redhat.com
Tue Aug 27 09:00:59 UTC 2019


On Mon, Aug 26, 2019 at 01:36:41PM +0200, Martin Kletzander wrote:
> On Thu, Aug 22, 2019 at 03:39:35PM +0100, Richard W.M. Jones wrote:
> >Previously the inner loop would issue nbd.pread() requests
> >synchronously, meaning that we would issue a request for each data
> >block from the nbdkit server (which would in turn synchronously
> >request the data from VMware) and wait until nbdkit replies before
> >continuing.
> >
> >This converts the inner loop so it issues as many pread requests
> >asychronously to nbdkit as the server can handle (any extra are queued
> >in nbd_handle).  The server will answer these in parallel and possibly
> >out of order.
> >
> >This results in somewhat better throughput (for me: 13 minutes down to
> >5 minutes for an "initial" run).  Although unfortunately we are still
> >limited by VDDK's braindead threading model.
> >---
> >wrapper/disk_sync.py | 55 +++++++++++++++++++++++++++++++-------------
> >1 file changed, 39 insertions(+), 16 deletions(-)
> >
> >diff --git a/wrapper/disk_sync.py b/wrapper/disk_sync.py
> >index e655ead..e854009 100644
> >--- a/wrapper/disk_sync.py
> >+++ b/wrapper/disk_sync.py
> >@@ -409,6 +409,26 @@ def get_block_status(nbd_handle, extent):
> >    return blocks
> >
> >
> >+# This is called back when nbd_aio_pread completes.
> >+def read_completed(fd, buf, offset, err):
> >+    logging.debug('Writing %d B to offset %d B' % (buf.size(), offset))
> >+    os.pwrite(fd, buf.to_bytearray(), offset)
> >+    # By returning 1 here we auto-retire the aio_pread command.
> >+    return 1
> >+
> >+
> >+# Process any AIO requests without blocking.
> >+def process_aio_requests(nbd_handle):
> >+    while nbd_handle.poll(0) == 1:
> >+        pass
> >+
> >+
> >+# Block until all AIO commands on the handle have finished.
> >+def wait_for_aio_commands_to_finish(nbd_handle):
> >+    while nbd_handle.aio_in_flight() > 0:
> >+        nbd_handle.poll(-1)
> >+
> >+
> >def sync_data():
> >    state = State().instance
> >    for key, disk in state['disks'].items():
> >@@ -491,25 +511,28 @@ def sync_data():
> >                              (block['length'], block['offset']))
> >                # Optimize for memory usage, maybe?
> >                os.pwrite(fd, [0] * block['length'], block['offset'])
> >-                copied += block['length']
> >-                disk['progress']['copied'] = copied
> >-                state.write()
> >            else:
> >-                wrote = 0
> >-                while wrote < block['length']:
> >-                    length = min(block['length'] - wrote, MAX_PREAD_LEN)
> >-                    offset = block['offset'] + wrote
> >+                count = 0
> >+                while count < block['length']:
> >+                    length = min(block['length'] - count, MAX_PREAD_LEN)
> >+                    offset = block['offset'] + count
> >+
> >                    logging.debug('Reading %d B from offset %d B' %
> >                                  (length, offset))
> >-                    # Ideally use mmap() without any temporary buffer
> >-                    data = nbd_handle.pread(length, offset)
> >-                    logging.debug('Writing %d B to offset %d B' %
> >-                                  (length, offset))
> >-                    os.pwrite(fd, data, offset)
> >-                    copied += length
> >-                    wrote += length
> >-                    disk['progress']['copied'] = copied
> >-                    state.write()
> >+                    buf = nbd.Buffer(length)
> >+                    nbd_handle.aio_pread(
> >+                        buf, offset,
> >+                        lambda err, fd=fd, buf=buf, offset=offset:
> >+                        read_completed(fd, buf, offset, err))
> 
> If the order of parameters is changed, there is no need for the anonymous
> function here, but that's just a small thing I noticed.

The initialized parameters seem to be needed to work around a bug
(more properly "serious implementation flaw") in Python:

https://stackoverflow.com/questions/2295290

> >+                    count += length
> >+
> >+                    process_aio_requests(nbd_handle)
> 
> In order to allow less requests in flight, would it be enough to just do
> something like this here (similarly to wait_for_aio_commands_to_finish)?
> 
>    while nbd_handle.aio_in_flight() > NUM_IN_FLIGHT:
>        nbd_handle.poll(-1)

Yes we could do this.

The eventual solution may be to replace the whole loop with a polling
loop, but this would be a more invasive change.  For comparison of
this approach see:

https://github.com/libguestfs/libnbd/blob/7ef893735937cd7ae62d0b41171ec14195ef2710/examples/threaded-reads-and-writes.c#L234-L307

> Also, I presume all of the locking is left to libnbd to be done (and
> as you can see I don't concern myself with any locking in the whole
> file), but if that was to be improved, is there some python part
> that would require it?  For example when cleaning up the code?

There's a lock automatically held whenever any method on nbd_handle is
called.  I don't believe any other locking is needed.  In any case
it's all single-threaded, libnbd does not create any threads itself.

Rich.

-- 
Richard Jones, Virtualization Group, Red Hat http://people.redhat.com/~rjones
Read my programming and virtualization blog: http://rwmj.wordpress.com
virt-df lists disk usage of guests without needing to install any
software inside the virtual machine.  Supports Linux and Windows.
http://people.redhat.com/~rjones/virt-df/




More information about the Libguestfs mailing list