[Libguestfs] [PATCH 5/5] output/rhv-upload-plugin: Keep connections alive

Nir Soffer nsoffer at redhat.com
Tue Jan 4 21:10:46 UTC 2022


On Tue, Jan 4, 2022 at 6:29 PM Nir Soffer <nsoffer at redhat.com> wrote:
>
> On Tue, Jan 4, 2022 at 6:02 PM Richard W.M. Jones <rjones at redhat.com> wrote:
> >
> > On Sat, Dec 18, 2021 at 10:36:33PM +0200, Nir Soffer wrote:
> > > When importing from vddk, nbdcopy may be blocked for few minutes(!)
> > > trying to get extents. While nbdcopy is blocked, imageio server closes
> > > the idle connections. When we finally get a request from nbdcopy, we
> > > fail to detect that the connection was closed.
> > >
> > > Detecting a closed connection is hard and racy. In the good case, we get
> > > a BrokenPipe error. In the bad case, imageio closed the socket right
> > > after we sent a request, and we get an invalid status line. When using
> > > imageio proxy, we may get http error (e.g. 500) if the proxy connection
> > > to imageio server on the host was closed.
> > >
> > > Even worse, when we find that the connection was closed, it is not safe
> > > to reopen the connection, since qemu-nbd does not ensure yet that data
> > > written to the previous connection will be flushed when we flush the new
> > > connection.
> > >
> > > Fix the issue by keeping the connections alive. A pool keeper thread
> > > sends a flush request on idle connection every ~30 seconds. This also
> > > improves data integrity and efficiency, using idle time to flush written
> > > data.
> > >
> > > Fixes https://bugzilla.redhat.com/2032324
> >
> > Ideally imageio would not just time out after such a short time when a
> > client has connections open.  (Do we actually hold the TCP-level
> > connection open during this time?)
> >
> > Is there a no-op ping-like request that we can send?
>
> We don't have no-op request, but OPTIONS can be used
> for that. It is has tiny json response that can be droped when
> you use it as a "ping".
>
> Using options can simplify the change, since we don't have to
> report errors in OPTIONS, they are not critical.
>
> >  If TCP-level
> > connection is open, can we enable TCP keepalives?
>
> We can but I think the timeouts are too long, and it is not the right way
> to keep a connection open. You want to do this in the application level,
> this way you verify the entire stack on each ping.

And even if we want to use TCP keepalive - it will not help, since
current imageio still timeout on reading from the client socket
after 60 seconds. The only way to avoid the timeout is to send
some request before that timeout.

In ovirt 4.5 this is not an issue, imageio uses the inactivity_timeout
defined when creating an image transfer.

Nir

>
> >
> > Rich.
> >
> > >  output/rhv-upload-plugin.py | 71 +++++++++++++++++++++++++++++++++++++
> > >  1 file changed, 71 insertions(+)
> > >
> > > diff --git a/output/rhv-upload-plugin.py b/output/rhv-upload-plugin.py
> > > index 8d088c4e..172da358 100644
> > > --- a/output/rhv-upload-plugin.py
> > > +++ b/output/rhv-upload-plugin.py
> > > @@ -13,50 +13,60 @@
> > >  # GNU General Public License for more details.
> > >  #
> > >  # You should have received a copy of the GNU General Public License along
> > >  # with this program; if not, write to the Free Software Foundation, Inc.,
> > >  # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
> > >
> > >  import json
> > >  import queue
> > >  import socket
> > >  import ssl
> > > +import threading
> > >  import time
> > >
> > >  from contextlib import contextmanager
> > >  from http.client import HTTPSConnection, HTTPConnection
> > >  from urllib.parse import urlparse
> > >
> > >  import nbdkit
> > >
> > >  # Using version 2 supporting the buffer protocol for better performance.
> > >  API_VERSION = 2
> > >
> > >  # Maximum number of connection to imageio server. Based on testing with imageio
> > >  # client, this give best performance.
> > >  MAX_CONNECTIONS = 4
> > >
> > > +# Maximum idle time allowed for imageio connections.
> > > +IDLE_TIMEOUT = 30
> > > +
> > >  # Required parameters.
> > >  size = None
> > >  url = None
> > >
> > >  # Optional parameters.
> > >  cafile = None
> > >  insecure = False
> > >  is_ovirt_host = False
> > >
> > >  # List of options read from imageio server.
> > >  options = None
> > >
> > >  # Pool of HTTP connections.
> > >  pool = None
> > >
> > > +# Set when plugin is cleaning up.
> > > +done = threading.Event()
> > > +
> > > +# Set when periodic flush request fails.
> > > +pool_error = None
> > > +
> > >
> > >  # Parse parameters.
> > >  def config(key, value):
> > >      global cafile, url, is_ovirt_host, insecure, size
> > >
> > >      if key == "cafile":
> > >          cafile = value
> > >      elif key == "insecure":
> > >          insecure = value.lower() in ['true', '1']
> > >      elif key == "is_ovirt_host":
> > > @@ -84,25 +94,31 @@ def after_fork():
> > >      options = get_options(http, url)
> > >      http.close()
> > >
> > >      nbdkit.debug("imageio features: flush=%(can_flush)r "
> > >                   "zero=%(can_zero)r unix_socket=%(unix_socket)r "
> > >                   "max_readers=%(max_readers)r max_writers=%(max_writers)r"
> > >                   % options)
> > >
> > >      pool = create_http_pool(url, options)
> > >
> > > +    t = threading.Thread(target=pool_keeper, name="poolkeeper")
> > > +    t.daemon = True
> > > +    t.start()
> > > +
> > >
> > >  # This function is not actually defined before nbdkit 1.28, but it
> > >  # doesn't particularly matter if we don't close the pool because
> > >  # clients should call flush().
> > >  def cleanup():
> > > +    nbdkit.debug("cleaning up")
> > > +    done.set()
> > >      close_http_pool(pool)
> > >
> > >
> > >  def thread_model():
> > >      """
> > >      Using parallel model to speed up transfer with multiple connections to
> > >      imageio server.
> > >      """
> > >      return nbdkit.THREAD_MODEL_PARALLEL
> > >
> > > @@ -272,20 +288,23 @@ def emulate_zero(h, count, offset, flags):
> > >          r = http.getresponse()
> > >          if r.status != 200:
> > >              request_failed(r,
> > >                             "could not write zeroes offset %d size %d" %
> > >                             (offset, count))
> > >
> > >          r.read()
> > >
> > >
> > >  def flush(h, flags):
> > > +    if pool_error:
> > > +        raise pool_error
> > > +
> > >      # Wait until all inflight requests are completed, and send a flush
> > >      # request for all imageio connections.
> > >      locked = []
> > >
> > >      # Lock the pool by taking all connections out.
> > >      while len(locked) < pool.maxsize:
> > >          locked.append(pool.get())
> > >
> > >      try:
> > >          for item in locked:
> > > @@ -348,26 +367,78 @@ def create_http_pool(url, options):
> > >
> > >      pool = queue.Queue(count)
> > >
> > >      for i in range(count):
> > >          http = create_http(url, unix_socket=unix_socket)
> > >          pool.put(PoolItem(http))
> > >
> > >      return pool
> > >
> > >
> > > +def pool_keeper():
> > > +    """
> > > +    Thread flushing idle connections, keeping them alive.
> > > +
> > > +    If a connection does not send any request for 60 seconds, imageio
> > > +    server closes the connection. Recovering from closed connection is
> > > +    hard and unsafe, so this thread ensure that connections never
> > > +    becomes idle by sending a flush request if the connection is idle
> > > +    for too much time.
> > > +
> > > +    In normal conditions, all connections are busy most of the time, so
> > > +    the keeper will find no idle connections. If there short delays in
> > > +    nbdcopy, the keeper will find some idle connections, but will
> > > +    quickly return them back to the pool. In the pathological case when
> > > +    nbdcopy is blocked for 3 minutes on vddk input, the keeper will send
> > > +    a flush request on all connections every ~30 seconds, until nbdcopy
> > > +    starts communicating again.
> > > +    """
> > > +    global pool_error
> > > +
> > > +    nbdkit.debug("pool keeper started")
> > > +
> > > +    while not done.wait(IDLE_TIMEOUT / 2):
> > > +        idle = []
> > > +
> > > +        while True:
> > > +            try:
> > > +                idle.append(pool.get_nowait())
> > > +            except queue.Empty:
> > > +                break
> > > +
> > > +        if idle:
> > > +            now = time.monotonic()
> > > +            for item in idle:
> > > +                if item.last_used and now - item.last_used > IDLE_TIMEOUT:
> > > +                    nbdkit.debug("Flushing idle connection")
> > > +                    try:
> > > +                        send_flush(item.http)
> > > +                        item.last_used = now
> > > +                    except Exception as e:
> > > +                        # We will report this error on the next request.
> > > +                        pool_error = e
> > > +                        item.last_used = None
> > > +
> > > +                pool.put(item)
> > > +
> > > +    nbdkit.debug("pool keeper stopped")
> > > +
> > > +
> > >  @contextmanager
> > >  def http_context(pool):
> > >      """
> > >      Context manager yielding an imageio http connection from the pool. Blocks
> > >      until a connection is available.
> > >      """
> > > +    if pool_error:
> > > +        raise pool_error
> > > +
> > >      item = pool.get()
> > >      try:
> > >          yield item.http
> > >      finally:
> > >          item.last_used = time.monotonic()
> > >          pool.put(item)
> > >
> > >
> > >  def close_http_pool(pool):
> > >      """
> > > --
> > > 2.33.1
> >
> > --
> > Richard Jones, Virtualization Group, Red Hat http://people.redhat.com/~rjones
> > Read my programming and virtualization blog: http://rwmj.wordpress.com
> > virt-df lists disk usage of guests without needing to install any
> > software inside the virtual machine.  Supports Linux and Windows.
> > http://people.redhat.com/~rjones/virt-df/
> >




More information about the Libguestfs mailing list