[Libguestfs] [PATCH 5/5] output/rhv-upload-plugin: Keep connections alive

Nir Soffer nsoffer at redhat.com
Sat Dec 18 20:36:33 UTC 2021


When importing from vddk, nbdcopy may be blocked for few minutes(!)
trying to get extents. While nbdcopy is blocked, imageio server closes
the idle connections. When we finally get a request from nbdcopy, we
fail to detect that the connection was closed.

Detecting a closed connection is hard and racy. In the good case, we get
a BrokenPipe error. In the bad case, imageio closed the socket right
after we sent a request, and we get an invalid status line. When using
imageio proxy, we may get http error (e.g. 500) if the proxy connection
to imageio server on the host was closed.

Even worse, when we find that the connection was closed, it is not safe
to reopen the connection, since qemu-nbd does not ensure yet that data
written to the previous connection will be flushed when we flush the new
connection.

Fix the issue by keeping the connections alive. A pool keeper thread
sends a flush request on idle connection every ~30 seconds. This also
improves data integrity and efficiency, using idle time to flush written
data.

Fixes https://bugzilla.redhat.com/2032324
---
 output/rhv-upload-plugin.py | 71 +++++++++++++++++++++++++++++++++++++
 1 file changed, 71 insertions(+)

diff --git a/output/rhv-upload-plugin.py b/output/rhv-upload-plugin.py
index 8d088c4e..172da358 100644
--- a/output/rhv-upload-plugin.py
+++ b/output/rhv-upload-plugin.py
@@ -13,50 +13,60 @@
 # GNU General Public License for more details.
 #
 # You should have received a copy of the GNU General Public License along
 # with this program; if not, write to the Free Software Foundation, Inc.,
 # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
 
 import json
 import queue
 import socket
 import ssl
+import threading
 import time
 
 from contextlib import contextmanager
 from http.client import HTTPSConnection, HTTPConnection
 from urllib.parse import urlparse
 
 import nbdkit
 
 # Using version 2 supporting the buffer protocol for better performance.
 API_VERSION = 2
 
 # Maximum number of connection to imageio server. Based on testing with imageio
 # client, this give best performance.
 MAX_CONNECTIONS = 4
 
+# Maximum idle time allowed for imageio connections.
+IDLE_TIMEOUT = 30
+
 # Required parameters.
 size = None
 url = None
 
 # Optional parameters.
 cafile = None
 insecure = False
 is_ovirt_host = False
 
 # List of options read from imageio server.
 options = None
 
 # Pool of HTTP connections.
 pool = None
 
+# Set when plugin is cleaning up.
+done = threading.Event()
+
+# Set when periodic flush request fails.
+pool_error = None
+
 
 # Parse parameters.
 def config(key, value):
     global cafile, url, is_ovirt_host, insecure, size
 
     if key == "cafile":
         cafile = value
     elif key == "insecure":
         insecure = value.lower() in ['true', '1']
     elif key == "is_ovirt_host":
@@ -84,25 +94,31 @@ def after_fork():
     options = get_options(http, url)
     http.close()
 
     nbdkit.debug("imageio features: flush=%(can_flush)r "
                  "zero=%(can_zero)r unix_socket=%(unix_socket)r "
                  "max_readers=%(max_readers)r max_writers=%(max_writers)r"
                  % options)
 
     pool = create_http_pool(url, options)
 
+    t = threading.Thread(target=pool_keeper, name="poolkeeper")
+    t.daemon = True
+    t.start()
+
 
 # This function is not actually defined before nbdkit 1.28, but it
 # doesn't particularly matter if we don't close the pool because
 # clients should call flush().
 def cleanup():
+    nbdkit.debug("cleaning up")
+    done.set()
     close_http_pool(pool)
 
 
 def thread_model():
     """
     Using parallel model to speed up transfer with multiple connections to
     imageio server.
     """
     return nbdkit.THREAD_MODEL_PARALLEL
 
@@ -272,20 +288,23 @@ def emulate_zero(h, count, offset, flags):
         r = http.getresponse()
         if r.status != 200:
             request_failed(r,
                            "could not write zeroes offset %d size %d" %
                            (offset, count))
 
         r.read()
 
 
 def flush(h, flags):
+    if pool_error:
+        raise pool_error
+
     # Wait until all inflight requests are completed, and send a flush
     # request for all imageio connections.
     locked = []
 
     # Lock the pool by taking all connections out.
     while len(locked) < pool.maxsize:
         locked.append(pool.get())
 
     try:
         for item in locked:
@@ -348,26 +367,78 @@ def create_http_pool(url, options):
 
     pool = queue.Queue(count)
 
     for i in range(count):
         http = create_http(url, unix_socket=unix_socket)
         pool.put(PoolItem(http))
 
     return pool
 
 
+def pool_keeper():
+    """
+    Thread flushing idle connections, keeping them alive.
+
+    If a connection does not send any request for 60 seconds, imageio
+    server closes the connection. Recovering from closed connection is
+    hard and unsafe, so this thread ensure that connections never
+    becomes idle by sending a flush request if the connection is idle
+    for too much time.
+
+    In normal conditions, all connections are busy most of the time, so
+    the keeper will find no idle connections. If there short delays in
+    nbdcopy, the keeper will find some idle connections, but will
+    quickly return them back to the pool. In the pathological case when
+    nbdcopy is blocked for 3 minutes on vddk input, the keeper will send
+    a flush request on all connections every ~30 seconds, until nbdcopy
+    starts communicating again.
+    """
+    global pool_error
+
+    nbdkit.debug("pool keeper started")
+
+    while not done.wait(IDLE_TIMEOUT / 2):
+        idle = []
+
+        while True:
+            try:
+                idle.append(pool.get_nowait())
+            except queue.Empty:
+                break
+
+        if idle:
+            now = time.monotonic()
+            for item in idle:
+                if item.last_used and now - item.last_used > IDLE_TIMEOUT:
+                    nbdkit.debug("Flushing idle connection")
+                    try:
+                        send_flush(item.http)
+                        item.last_used = now
+                    except Exception as e:
+                        # We will report this error on the next request.
+                        pool_error = e
+                        item.last_used = None
+
+                pool.put(item)
+
+    nbdkit.debug("pool keeper stopped")
+
+
 @contextmanager
 def http_context(pool):
     """
     Context manager yielding an imageio http connection from the pool. Blocks
     until a connection is available.
     """
+    if pool_error:
+        raise pool_error
+
     item = pool.get()
     try:
         yield item.http
     finally:
         item.last_used = time.monotonic()
         pool.put(item)
 
 
 def close_http_pool(pool):
     """
-- 
2.33.1




More information about the Libguestfs mailing list