[Libguestfs] [PATCH 5/5] output/rhv-upload-plugin: Keep connections alive

Richard W.M. Jones rjones at redhat.com
Tue Feb 8 15:24:33 UTC 2022


On Sat, Dec 18, 2021 at 10:36:33PM +0200, Nir Soffer wrote:
> When importing from vddk, nbdcopy may be blocked for few minutes(!)
> trying to get extents. While nbdcopy is blocked, imageio server closes
> the idle connections. When we finally get a request from nbdcopy, we
> fail to detect that the connection was closed.
> 
> Detecting a closed connection is hard and racy. In the good case, we get
> a BrokenPipe error. In the bad case, imageio closed the socket right
> after we sent a request, and we get an invalid status line. When using
> imageio proxy, we may get http error (e.g. 500) if the proxy connection
> to imageio server on the host was closed.
> 
> Even worse, when we find that the connection was closed, it is not safe
> to reopen the connection, since qemu-nbd does not ensure yet that data
> written to the previous connection will be flushed when we flush the new
> connection.
> 
> Fix the issue by keeping the connections alive. A pool keeper thread
> sends a flush request on idle connection every ~30 seconds. This also
> improves data integrity and efficiency, using idle time to flush written
> data.
> 
> Fixes https://bugzilla.redhat.com/2032324
> ---
>  output/rhv-upload-plugin.py | 71 +++++++++++++++++++++++++++++++++++++
>  1 file changed, 71 insertions(+)
> 
> diff --git a/output/rhv-upload-plugin.py b/output/rhv-upload-plugin.py
> index 8d088c4e..172da358 100644
> --- a/output/rhv-upload-plugin.py
> +++ b/output/rhv-upload-plugin.py
> @@ -13,50 +13,60 @@
>  # GNU General Public License for more details.
>  #
>  # You should have received a copy of the GNU General Public License along
>  # with this program; if not, write to the Free Software Foundation, Inc.,
>  # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
>  
>  import json
>  import queue
>  import socket
>  import ssl
> +import threading
>  import time
>  
>  from contextlib import contextmanager
>  from http.client import HTTPSConnection, HTTPConnection
>  from urllib.parse import urlparse
>  
>  import nbdkit
>  
>  # Using version 2 supporting the buffer protocol for better performance.
>  API_VERSION = 2
>  
>  # Maximum number of connection to imageio server. Based on testing with imageio
>  # client, this give best performance.
>  MAX_CONNECTIONS = 4
>  
> +# Maximum idle time allowed for imageio connections.
> +IDLE_TIMEOUT = 30
> +
>  # Required parameters.
>  size = None
>  url = None
>  
>  # Optional parameters.
>  cafile = None
>  insecure = False
>  is_ovirt_host = False
>  
>  # List of options read from imageio server.
>  options = None
>  
>  # Pool of HTTP connections.
>  pool = None
>  
> +# Set when plugin is cleaning up.
> +done = threading.Event()
> +
> +# Set when periodic flush request fails.
> +pool_error = None
> +
>  
>  # Parse parameters.
>  def config(key, value):
>      global cafile, url, is_ovirt_host, insecure, size
>  
>      if key == "cafile":
>          cafile = value
>      elif key == "insecure":
>          insecure = value.lower() in ['true', '1']
>      elif key == "is_ovirt_host":
> @@ -84,25 +94,31 @@ def after_fork():
>      options = get_options(http, url)
>      http.close()
>  
>      nbdkit.debug("imageio features: flush=%(can_flush)r "
>                   "zero=%(can_zero)r unix_socket=%(unix_socket)r "
>                   "max_readers=%(max_readers)r max_writers=%(max_writers)r"
>                   % options)
>  
>      pool = create_http_pool(url, options)
>  
> +    t = threading.Thread(target=pool_keeper, name="poolkeeper")
> +    t.daemon = True
> +    t.start()
> +
>  
>  # This function is not actually defined before nbdkit 1.28, but it
>  # doesn't particularly matter if we don't close the pool because
>  # clients should call flush().
>  def cleanup():
> +    nbdkit.debug("cleaning up")
> +    done.set()
>      close_http_pool(pool)
>  
>  
>  def thread_model():
>      """
>      Using parallel model to speed up transfer with multiple connections to
>      imageio server.
>      """
>      return nbdkit.THREAD_MODEL_PARALLEL
>  
> @@ -272,20 +288,23 @@ def emulate_zero(h, count, offset, flags):
>          r = http.getresponse()
>          if r.status != 200:
>              request_failed(r,
>                             "could not write zeroes offset %d size %d" %
>                             (offset, count))
>  
>          r.read()
>  
>  
>  def flush(h, flags):
> +    if pool_error:
> +        raise pool_error
> +
>      # Wait until all inflight requests are completed, and send a flush
>      # request for all imageio connections.
>      locked = []
>  
>      # Lock the pool by taking all connections out.
>      while len(locked) < pool.maxsize:
>          locked.append(pool.get())
>  
>      try:
>          for item in locked:
> @@ -348,26 +367,78 @@ def create_http_pool(url, options):
>  
>      pool = queue.Queue(count)
>  
>      for i in range(count):
>          http = create_http(url, unix_socket=unix_socket)
>          pool.put(PoolItem(http))
>  
>      return pool
>  
>  
> +def pool_keeper():
> +    """
> +    Thread flushing idle connections, keeping them alive.
> +
> +    If a connection does not send any request for 60 seconds, imageio
> +    server closes the connection. Recovering from closed connection is
> +    hard and unsafe, so this thread ensure that connections never
> +    becomes idle by sending a flush request if the connection is idle
> +    for too much time.
> +
> +    In normal conditions, all connections are busy most of the time, so
> +    the keeper will find no idle connections. If there short delays in
> +    nbdcopy, the keeper will find some idle connections, but will
> +    quickly return them back to the pool. In the pathological case when
> +    nbdcopy is blocked for 3 minutes on vddk input, the keeper will send
> +    a flush request on all connections every ~30 seconds, until nbdcopy
> +    starts communicating again.
> +    """
> +    global pool_error
> +
> +    nbdkit.debug("pool keeper started")
> +
> +    while not done.wait(IDLE_TIMEOUT / 2):
> +        idle = []
> +
> +        while True:
> +            try:
> +                idle.append(pool.get_nowait())
> +            except queue.Empty:
> +                break
> +
> +        if idle:
> +            now = time.monotonic()
> +            for item in idle:
> +                if item.last_used and now - item.last_used > IDLE_TIMEOUT:
> +                    nbdkit.debug("Flushing idle connection")
> +                    try:
> +                        send_flush(item.http)
> +                        item.last_used = now
> +                    except Exception as e:
> +                        # We will report this error on the next request.
> +                        pool_error = e
> +                        item.last_used = None
> +
> +                pool.put(item)
> +
> +    nbdkit.debug("pool keeper stopped")
> +
> +
>  @contextmanager
>  def http_context(pool):
>      """
>      Context manager yielding an imageio http connection from the pool. Blocks
>      until a connection is available.
>      """
> +    if pool_error:
> +        raise pool_error
> +
>      item = pool.get()
>      try:
>          yield item.http
>      finally:
>          item.last_used = time.monotonic()
>          pool.put(item)
>  
>  
>  def close_http_pool(pool):

ACK 4 & 5.

Rich.

-- 
Richard Jones, Virtualization Group, Red Hat http://people.redhat.com/~rjones
Read my programming and virtualization blog: http://rwmj.wordpress.com
virt-top is 'top' for virtual machines.  Tiny program with many
powerful monitoring features, net stats, disk stats, logging, etc.
http://people.redhat.com/~rjones/virt-top




More information about the Libguestfs mailing list