[Libguestfs] [PATCH 4/6] v2v: rhv-upload-plugin: Support multiple connections

Nir Soffer nirsof at gmail.com
Fri Jan 22 22:45:22 UTC 2021


Use multiple connections to imageio server to speed up the transfer.

Connections are managed via a thread safe queue. Threads remove
a connection from the queue for every request, and put it back when at
the end of the request. Only one thread can access the connection at the
same time.

Threads are accessing existing values in the handle dict, like
h["path"]. They may also modify h["failed"] on errors. These operations
are thread safe and do not require additional locking.

Sending flush request is more tricky; on imageio side, we have one
qemu-nbd server, with multiple connections. I'm not sure if sending one
flush command on one of the connections is good enough to flush all
commands, so we send flush command on all connections in the flush
callback.

Since flush may be called when there are in-flight requests, we wait
until all in-flight request are done before sending a flush. While
flushing, the connection pool is empty so new requests will block on the
queue until the flush is completed.

Closing is done in a similar way, waiting until all in-flight requests
are done and closing all connections.

Testing shows that we requests are spread over 4 connections, but
performance is worse. Connection time increased from 5.36 seconds to
7.08 seconds (32% slower).

[connection 1 ops, 7.053520 s]
[dispatch 548 ops, 1.188910 s]
[write 469 ops, 1.014646 s, 332.75 MiB, 327.95 MiB/s]
[zero 77 ops, 0.058495 s, 1.13 GiB, 19.29 GiB/s]
[flush 2 ops, 0.000291 s]

[connection 1 ops, 7.085039 s]
[dispatch 548 ops, 1.097437 s]
[write 478 ops, 0.924214 s, 323.25 MiB, 349.76 MiB/s]
[zero 68 ops, 0.052265 s, 1.22 GiB, 23.43 GiB/s]
[flush 2 ops, 0.000258 s]

[connection 1 ops, 7.037253 s]
[dispatch 547 ops, 1.111386 s]
[write 477 ops, 0.959592 s, 343.25 MiB, 357.70 MiB/s]
[zero 68 ops, 0.047005 s, 1.20 GiB, 25.44 GiB/s]
[flush 2 ops, 0.000266 s]

[connection 1 ops, 7.045538 s]
[dispatch 548 ops, 1.191029 s]
[write 482 ops, 1.041125 s, 347.12 MiB, 333.41 MiB/s]
[zero 64 ops, 0.045171 s, 1.14 GiB, 25.16 GiB/s]
[flush 2 ops, 0.000210 s]

Signed-off-by: Nir Soffer <nsoffer at redhat.com>
---
 v2v/rhv-upload-plugin.py | 293 +++++++++++++++++++++++++--------------
 1 file changed, 187 insertions(+), 106 deletions(-)

diff --git a/v2v/rhv-upload-plugin.py b/v2v/rhv-upload-plugin.py
index e639be0c..f3242f87 100644
--- a/v2v/rhv-upload-plugin.py
+++ b/v2v/rhv-upload-plugin.py
@@ -21,20 +21,28 @@ import functools
 import inspect
 import json
 import logging
+import queue
 import socket
 import ssl
 import sys
 import time
 
+from contextlib import contextmanager
 from http.client import HTTPSConnection, HTTPConnection
 from urllib.parse import urlparse
 
+import nbdkit
+
 import ovirtsdk4 as sdk
 import ovirtsdk4.types as types
 
 # Using version 2 supporting the buffer protocol for better performance.
 API_VERSION = 2
 
+# Maximum number of connection to imageio server. Based on testing with imageio
+# client, this give best performance.
+MAX_CONNECTIONS = 4
+
 # Timeout to wait for oVirt disks to change status, or the transfer
 # object to finish initializing [seconds].
 timeout = 5 * 60
@@ -61,6 +69,14 @@ def config_complete():
         raise RuntimeError("missing configuration parameters")
 
 
+def thread_model():
+    """
+    Using parallel model to speed up transfer with multiple connections to
+    imageio server.
+    """
+    return nbdkit.THREAD_MODEL_PARALLEL
+
+
 def debug(s):
     if params['verbose']:
         print(s, file=sys.stderr)
@@ -129,7 +145,7 @@ def open(readonly):
         # See https://bugzilla.redhat.com/1916176.
         http.close()
 
-        http = optimize_http(http, host, options)
+        pool = create_http_pool(destination_url, host, options)
     except:
         cancel_transfer(connection, transfer)
         raise
@@ -147,7 +163,8 @@ def open(readonly):
         'disk_id': disk.id,
         'transfer': transfer,
         'failed': False,
-        'http': http,
+        'pool': pool,
+        'connections': pool.qsize(),
         'path': destination_url.path,
     }
 
@@ -196,67 +213,65 @@ def request_failed(r, msg):
 @failing
 def pread(h, buf, offset, flags):
     count = len(buf)
-    http = h['http']
-
     headers = {"Range": "bytes=%d-%d" % (offset, offset + count - 1)}
-    http.request("GET", h['path'], headers=headers)
 
-    r = http.getresponse()
-    # 206 = HTTP Partial Content.
-    if r.status != 206:
-        request_failed(r,
-                       "could not read sector offset %d size %d" %
-                       (offset, count))
-
-    content_length = int(r.getheader("content-length"))
-    if content_length != count:
-        # Should never happen.
-        request_failed(r,
-                       "unexpected Content-Length offset %d size %d got %d" %
-                       (offset, count, content_length))
-
-    with memoryview(buf) as view:
-        got = 0
-        while got < count:
-            n = r.readinto(view[got:])
-            if n == 0:
-                request_failed(r,
-                               "short read offset %d size %d got %d" %
-                               (offset, count, got))
-            got += n
+    with http_context(h) as http:
+        http.request("GET", h['path'], headers=headers)
+
+        r = http.getresponse()
+        # 206 = HTTP Partial Content.
+        if r.status != 206:
+            request_failed(r,
+                           "could not read sector offset %d size %d" %
+                           (offset, count))
+
+        content_length = int(r.getheader("content-length"))
+        if content_length != count:
+            # Should never happen.
+            request_failed(r,
+                           "unexpected Content-Length offset %d size %d got %d" %
+                           (offset, count, content_length))
+
+        with memoryview(buf) as view:
+            got = 0
+            while got < count:
+                n = r.readinto(view[got:])
+                if n == 0:
+                    request_failed(r,
+                                   "short read offset %d size %d got %d" %
+                                   (offset, count, got))
+                got += n
 
 
 @failing
 def pwrite(h, buf, offset, flags):
-    http = h['http']
-
     count = len(buf)
 
-    http.putrequest("PUT", h['path'] + "?flush=n")
-    # The oVirt server only uses the first part of the range, and the
-    # content-length.
-    http.putheader("Content-Range", "bytes %d-%d/*" % (offset, offset + count - 1))
-    http.putheader("Content-Length", str(count))
-    http.endheaders()
+    with http_context(h) as http:
+        http.putrequest("PUT", h['path'] + "?flush=n")
+        # The oVirt server only uses the first part of the range, and the
+        # content-length.
+        http.putheader("Content-Range", "bytes %d-%d/*" %
+                       (offset, offset + count - 1))
+        http.putheader("Content-Length", str(count))
+        http.endheaders()
 
-    try:
-        http.send(buf)
-    except BrokenPipeError:
-        pass
+        try:
+            http.send(buf)
+        except BrokenPipeError:
+            pass
 
-    r = http.getresponse()
-    if r.status != 200:
-        request_failed(r,
-                       "could not write sector offset %d size %d" %
-                       (offset, count))
+        r = http.getresponse()
+        if r.status != 200:
+            request_failed(r,
+                           "could not write sector offset %d size %d" %
+                           (offset, count))
 
-    r.read()
+        r.read()
 
 
 @failing
 def zero(h, count, offset, flags):
-    http = h['http']
-
     # Unlike the trim and flush calls, there is no 'can_zero' method
     # so nbdkit could call this even if the server doesn't support
     # zeroing.  If this is the case we must emulate.
@@ -273,65 +288,66 @@ def zero(h, count, offset, flags):
     headers = {"Content-Type": "application/json",
                "Content-Length": str(len(buf))}
 
-    http.request("PATCH", h['path'], body=buf, headers=headers)
+    with http_context(h) as http:
+        http.request("PATCH", h['path'], body=buf, headers=headers)
 
-    r = http.getresponse()
-    if r.status != 200:
-        request_failed(r,
-                       "could not zero sector offset %d size %d" %
-                       (offset, count))
+        r = http.getresponse()
+        if r.status != 200:
+            request_failed(r,
+                           "could not zero sector offset %d size %d" %
+                           (offset, count))
 
-    r.read()
+        r.read()
 
 
 def emulate_zero(h, count, offset):
-    http = h['http']
+    with http_context(h) as http:
+        http.putrequest("PUT", h['path'])
+        http.putheader("Content-Range",
+                       "bytes %d-%d/*" % (offset, offset + count - 1))
+        http.putheader("Content-Length", str(count))
+        http.endheaders()
 
-    http.putrequest("PUT", h['path'])
-    http.putheader("Content-Range",
-                   "bytes %d-%d/*" % (offset, offset + count - 1))
-    http.putheader("Content-Length", str(count))
-    http.endheaders()
-
-    try:
-        buf = bytearray(128 * 1024)
-        while count > len(buf):
-            http.send(buf)
-            count -= len(buf)
-        http.send(memoryview(buf)[:count])
-    except BrokenPipeError:
-        pass
+        try:
+            buf = bytearray(128 * 1024)
+            while count > len(buf):
+                http.send(buf)
+                count -= len(buf)
+            http.send(memoryview(buf)[:count])
+        except BrokenPipeError:
+            pass
 
-    r = http.getresponse()
-    if r.status != 200:
-        request_failed(r,
-                       "could not write zeroes offset %d size %d" %
-                       (offset, count))
+        r = http.getresponse()
+        if r.status != 200:
+            request_failed(r,
+                           "could not write zeroes offset %d size %d" %
+                           (offset, count))
 
-    r.read()
+        r.read()
 
 
 @failing
 def flush(h, flags):
-    http = h['http']
-
     # Construct the JSON request for flushing.
     buf = json.dumps({'op': "flush"}).encode()
 
     headers = {"Content-Type": "application/json",
                "Content-Length": str(len(buf))}
 
-    http.request("PATCH", h['path'], body=buf, headers=headers)
+    # Wait until all inflight requests are completed, and send a flush request
+    # for all imageio connections.
 
-    r = http.getresponse()
-    if r.status != 200:
-        request_failed(r, "could not flush")
+    for http in iter_http_pool(h):
+        http.request("PATCH", h['path'], body=buf, headers=headers)
+
+        r = http.getresponse()
+        if r.status != 200:
+            request_failed(r, "could not flush")
 
-    r.read()
+        r.read()
 
 
 def close(h):
-    http = h['http']
     connection = h['connection']
     transfer = h['transfer']
     disk_id = h['disk_id']
@@ -342,7 +358,7 @@ def close(h):
     # plugin exits.
     sys.stderr.flush()
 
-    http.close()
+    close_http_pool(h)
 
     # If the connection failed earlier ensure we cancel the transfer. Canceling
     # the transfer will delete the disk.
@@ -647,6 +663,81 @@ def transfer_supports_format():
     return "format" in sig.parameters
 
 
+# Connection pool managment
+
+
+def create_http_pool(url, host, options):
+    pool = queue.Queue()
+
+    count = min(options["max_readers"],
+                options["max_writers"],
+                MAX_CONNECTIONS)
+    debug("using %d connections" % count)
+
+    unix_socket = options["unix_socket"] if host is not None else None
+
+    for i in range(count):
+        http = create_http(url, unix_socket=unix_socket)
+        pool.put(http)
+
+    return pool
+
+
+ at contextmanager
+def http_context(h):
+    """
+    Context manager yielding an imageio http connection from the pool. Blocks
+    until a connection is available.
+    """
+    pool = h["pool"]
+    http = pool.get()
+    try:
+        yield http
+    finally:
+        pool.put(http)
+
+
+def iter_http_pool(h):
+    """
+    Wait until all inflight requests are done, and iterate on imageio
+    connections.
+
+    The pool is empty during iteration. New requests issued during iteration
+    will block until iteration is done.
+    """
+    pool = h["pool"]
+    locked = []
+
+    # Lock the pool by taking the connection out.
+    while len(locked) < h["connections"]:
+        locked.append(pool.get())
+
+    try:
+        for http in locked:
+            yield http
+    finally:
+        # Unlock the pool by puting the connection back.
+        for http in locked:
+            pool.put(http)
+
+
+def close_http_pool(h):
+    """
+    Wait until all inflight requests are done, close all connections and remove
+    them from the pool.
+
+    No request can be served by the pool after this call.
+    """
+    pool = h["pool"]
+    locked = []
+
+    while len(locked) < h["connections"]:
+        locked.append(pool.get())
+
+    for http in locked:
+        http.close()
+
+
 # oVirt imageio operations
 
 
@@ -665,12 +756,20 @@ def parse_transfer_url(transfer):
         return urlparse(transfer.proxy_url)
 
 
-def create_http(url):
+def create_http(url, unix_socket=None):
     """
     Create http connection for transfer url.
 
     Returns HTTPConnection.
     """
+    if unix_socket:
+        debug("using unix socket %r" % unix_socket)
+        try:
+            return UnixHTTPConnection(unix_socket)
+        except Exception as e:
+            # Very unlikely, but we can recover by using https.
+            debug("cannot create unix socket connection: %s" % e)
+
     if url.scheme == "https":
         context = \
             ssl.create_default_context(purpose=ssl.Purpose.SERVER_AUTH,
@@ -679,8 +778,10 @@ def create_http(url):
             context.check_hostname = False
             context.verify_mode = ssl.CERT_NONE
 
+        debug("using https connection")
         return HTTPSConnection(url.hostname, url.port, context=context)
     elif url.scheme == "http":
+        debug("using http connection")
         return HTTPConnection(url.hostname, url.port)
     else:
         raise RuntimeError("unknown URL scheme (%s)" % url.scheme)
@@ -718,23 +819,3 @@ def get_options(http, url):
     else:
         raise RuntimeError("could not use OPTIONS request: %d: %s" %
                            (r.status, r.reason))
-
-
-def optimize_http(http, host, options):
-    """
-    Return an optimized http connection using unix socket if we are connected
-    to imageio server on the local host and it features a unix socket.
-    """
-    unix_socket = options['unix_socket']
-
-    if host is not None and unix_socket is not None:
-        try:
-            http = UnixHTTPConnection(unix_socket)
-        except Exception as e:
-            # Very unlikely failure, but we can recover by using the https
-            # connection.
-            debug("cannot create unix socket connection, using https: %s" % e)
-        else:
-            debug("optimizing connection using unix socket %r" % unix_socket)
-
-    return http
-- 
2.26.2




More information about the Libguestfs mailing list