[Libguestfs] [PATCH libnbd 8/8] copy: Adaptive queue size

Richard W.M. Jones rjones at redhat.com
Sun Feb 20 18:56:57 UTC 2022


On Sun, Feb 20, 2022 at 02:14:03PM +0200, Nir Soffer wrote:
> Limit the size of the copy queue also by the number of queued bytes.
> This allows using many concurrent small requests, required to get good
> performance, but limiting the number of large requests that are actually
> faster with lower concurrency.
> 
> New --queue-size option added to control the maximum queue size. With 2
> MiB we can have 8 256 KiB requests per connection. The default queue
> size is 16 MiB, to match the default --requests value (64) with the
> default --request-size (256 KiB). Testing show that using more than 16
> 256 KiB requests with one connection do not improve anything.
> 
> The new option will simplify limiting memory usage when using large
> requests, like this change in virt-v2v:
> https://github.com/libguestfs/virt-v2v/commit/c943420219fa0ee971fc228aa4d9127c5ce973f7
> 
> I tested this change with 3 images:
> 
> - Fedora 35 + 3g of random data - hopefully simulates a real image
> - Fully allocated image - the special case when every read command is
>   converted to a write command.
> - Zero image - the special case when every read command is converted to
>   a zero command.
> 
> On 2 machines:
> 
> - laptop: Intel(R) Core(TM) i7-10850H CPU @ 2.70GHz, 12 cpus,
>   1.5 MiB L2 cache per 2 cpus, 12 MiB L3 cache.
> - server: Intel(R) Xeon(R) Gold 5218R CPU @ 2.10GHz, 80 cpus,
>   1 MiB L2 cache per cpu, 27.5 MiB L3 cache.
> 
> In all cases, both source and destination are served by qemu-nbd, using
> --cache=none --aio=native. Because qemu-nbd does not support MULTI_CON
> for writing, we are testing a single connection when copying an to
> qemu-nbd. I tested also copying to null: since in this case we use 4
> connections (these tests are marked with /ro).
> 
> Results for copying all images on all machines with nbdcopy v1.11.0 and
> this change. "before" and "after" are average time of 10 runs.
> 
> image       machine    before    after    queue size    improvement
> ===================================================================
> fedora      laptop      3.044    2.129           2m           +43%
> full        laptop      4.900    3.136           2m           +56%
> zero        laptop      3.147    2.624           2m           +20%
> -------------------------------------------------------------------
> fedora      server      2.324    2.189          16m            +6%
> full        server      3.521    3.380           8m            +4%
> zero        server      2.297    2.338          16m            -2%
> -------------------------------------------------------------------
> fedora/ro   laptop      2.040    1.663           1m           +23%
> fedora/ro   server      1.585    1.393           2m           +14%
> 
> Signed-off-by: Nir Soffer <nsoffer at redhat.com>
> ---
>  copy/main.c                 | 52 ++++++++++++++++++++++++-------------
>  copy/multi-thread-copying.c | 18 +++++++------
>  copy/nbdcopy.h              |  1 +
>  copy/nbdcopy.pod            | 12 +++++++--
>  4 files changed, 55 insertions(+), 28 deletions(-)
> 
> diff --git a/copy/main.c b/copy/main.c
> index 390de1eb..832f99da 100644
> --- a/copy/main.c
> +++ b/copy/main.c
> @@ -36,53 +36,55 @@
>  
>  #include <pthread.h>
>  
>  #include <libnbd.h>
>  
>  #include "ispowerof2.h"
>  #include "human-size.h"
>  #include "version.h"
>  #include "nbdcopy.h"
>  
> -bool allocated;                 /* --allocated flag */
> -unsigned connections = 4;       /* --connections */
> -bool destination_is_zero;       /* --destination-is-zero flag */
> -bool extents = true;            /* ! --no-extents flag */
> -bool flush;                     /* --flush flag */
> -unsigned max_requests = 64;     /* --requests */
> -bool progress;                  /* -p flag */
> -int progress_fd = -1;           /* --progress=FD */
> -unsigned request_size = 1<<18;  /* --request-size */
> -unsigned sparse_size = 4096;    /* --sparse */
> -bool synchronous;               /* --synchronous flag */
> -unsigned threads;               /* --threads */
> -struct rw *src, *dst;           /* The source and destination. */
> -bool verbose;                   /* --verbose flag */
> -
> -const char *prog;               /* program name (== basename argv[0]) */
> +bool allocated;                     /* --allocated flag */
> +unsigned connections = 4;           /* --connections */
> +bool destination_is_zero;           /* --destination-is-zero flag */
> +bool extents = true;                /* ! --no-extents flag */
> +bool flush;                         /* --flush flag */
> +unsigned max_requests = 64;         /* --requests */
> +bool progress;                      /* -p flag */
> +int progress_fd = -1;               /* --progress=FD */
> +unsigned request_size = 1<<18;      /* --request-size */
> +unsigned queue_size = 16<<20;       /* --queue-size */
> +unsigned sparse_size = 4096;        /* --sparse */
> +bool synchronous;                   /* --synchronous flag */
> +unsigned threads;                   /* --threads */
> +struct rw *src, *dst;               /* The source and destination. */
> +bool verbose;                       /* --verbose flag */
> +
> +const char *prog;                   /* program name (== basename argv[0]) */
>  
>  static bool is_nbd_uri (const char *s);
>  static struct rw *open_local (const char *filename, direction d);
>  static void print_rw (struct rw *rw, const char *prefix, FILE *fp);
>  
>  static void __attribute__((noreturn))
>  usage (FILE *fp, int exitcode)
>  {
>    fprintf (fp,
>  "\n"
>  "Copy to and from an NBD server:\n"
>  "\n"
>  "    nbdcopy [--allocated] [-C N|--connections=N]\n"
>  "            [--destination-is-zero|--target-is-zero] [--flush]\n"
>  "            [--no-extents] [-p|--progress|--progress=FD]\n"
> -"            [--request-size=N] [-R N|--requests=N] [-S N|--sparse=N]\n"
> -"            [--synchronous] [-T N|--threads=N] [-v|--verbose]\n"
> +"            [--request-size=N] [--queue-size=N] [-R N|--requests=N]\n"
> +"            [-S N|--sparse=N] [--synchronous] [-T N|--threads=N] \n"
> +"            [-v|--verbose]\n"
>  "            SOURCE DESTINATION\n"
>  "\n"
>  "    SOURCE, DESTINATION := - | FILE | DEVICE | NBD-URI | [ CMD ARGS ... ]\n"
>  "    DESTINATION += null:\n"
>  "\n"
>  "Other options:\n"
>  "\n"
>  "    nbdcopy --help\n"
>  "    nbdcopy -V|--version\n"
>  "\n"
> @@ -104,33 +106,35 @@ main (int argc, char *argv[])
>  {
>    enum {
>      HELP_OPTION = CHAR_MAX + 1,
>      LONG_OPTIONS,
>      SHORT_OPTIONS,
>      ALLOCATED_OPTION,
>      DESTINATION_IS_ZERO_OPTION,
>      FLUSH_OPTION,
>      NO_EXTENTS_OPTION,
>      REQUEST_SIZE_OPTION,
> +    QUEUE_SIZE_OPTION,
>      SYNCHRONOUS_OPTION,
>    };
>    const char *short_options = "C:pR:S:T:vV";
>    const struct option long_options[] = {
>      { "help",               no_argument,       NULL, HELP_OPTION },
>      { "long-options",       no_argument,       NULL, LONG_OPTIONS },
>      { "allocated",          no_argument,       NULL, ALLOCATED_OPTION },
>      { "connections",        required_argument, NULL, 'C' },
>      { "destination-is-zero",no_argument,       NULL, DESTINATION_IS_ZERO_OPTION },
>      { "flush",              no_argument,       NULL, FLUSH_OPTION },
>      { "no-extents",         no_argument,       NULL, NO_EXTENTS_OPTION },
>      { "progress",           optional_argument, NULL, 'p' },
>      { "request-size",       required_argument, NULL, REQUEST_SIZE_OPTION },
> +    { "queue-size",         required_argument, NULL, QUEUE_SIZE_OPTION },
>      { "requests",           required_argument, NULL, 'R' },
>      { "short-options",      no_argument,       NULL, SHORT_OPTIONS },
>      { "sparse",             required_argument, NULL, 'S' },
>      { "synchronous",        no_argument,       NULL, SYNCHRONOUS_OPTION },
>      { "target-is-zero",     no_argument,       NULL, DESTINATION_IS_ZERO_OPTION },
>      { "threads",            required_argument, NULL, 'T' },
>      { "verbose",            no_argument,       NULL, 'v' },
>      { "version",            no_argument,       NULL, 'V' },
>      { NULL }
>    };
> @@ -212,20 +216,28 @@ main (int argc, char *argv[])
>        }
>        if (request_size < MIN_REQUEST_SIZE || request_size > MAX_REQUEST_SIZE ||
>                !is_power_of_2 (request_size)) {
>          fprintf (stderr,
>                  "%s: --request-size: must be a power of 2 within %d-%d\n",
>                   prog, MIN_REQUEST_SIZE, MAX_REQUEST_SIZE);
>          exit (EXIT_FAILURE);
>        }
>        break;
>  
> +    case QUEUE_SIZE_OPTION:
> +      if (sscanf (optarg, "%u", &queue_size) != 1) {
> +        fprintf (stderr, "%s: --queue-size: could not parse: %s\n",
> +                 prog, optarg);
> +        exit (EXIT_FAILURE);
> +      }
> +      break;
> +
>      case 'R':
>        if (sscanf (optarg, "%u", &max_requests) != 1 || max_requests == 0) {
>          fprintf (stderr, "%s: --requests: could not parse: %s\n",
>                   prog, optarg);
>          exit (EXIT_FAILURE);
>        }
>        break;
>  
>      case 'S':
>        if (sscanf (optarg, "%u", &sparse_size) != 1) {
> @@ -360,20 +372,24 @@ main (int argc, char *argv[])
>    }
>  
>    if (synchronous)
>      connections = 1;
>  
>    if (connections < threads)
>      threads = connections;
>    if (threads < connections)
>      connections = threads;
>  
> +  /* Adapt queue to size to request size if needed. */
> +  if (request_size > queue_size)
> +    queue_size = request_size;
> +
>    /* Truncate the destination to the same size as the source.  Only
>     * has an effect on regular files.
>     */
>    if (dst->ops->truncate)
>      dst->ops->truncate (dst, src->size);
>  
>    /* Check if the source is bigger than the destination, since that
>     * would truncate (ie. lose) data.  Copying from smaller to larger
>     * is OK.
>     */
> diff --git a/copy/multi-thread-copying.c b/copy/multi-thread-copying.c
> index 620dc571..d9f17285 100644
> --- a/copy/multi-thread-copying.c
> +++ b/copy/multi-thread-copying.c
> @@ -123,21 +123,21 @@ multi_thread_copying (void)
>      if (err != 0) {
>        errno = err;
>        perror ("pthread_join");
>        exit (EXIT_FAILURE);
>      }
>    }
>  
>    free (workers);
>  }
>  
> -static void wait_for_request_slots (size_t index);
> +static void wait_for_request_slots (struct worker *worker);
>  static unsigned in_flight (size_t index);
>  static void poll_both_ends (size_t index);
>  static int finished_read (void *vp, int *error);
>  static int finished_command (void *vp, int *error);
>  static void free_command (struct command *command);
>  static void fill_dst_range_with_zeroes (struct command *command);
>  static struct command *create_command (uint64_t offset, size_t len, bool zero,
>                                         struct worker *worker);
>  
>  /* Tracking worker queue size.
> @@ -148,20 +148,21 @@ static struct command *create_command (uint64_t offset, size_t len, bool zero,
>   * subcommand in finished_read(), or when a write command completes in
>   * finished_command().
>   *
>   * Zero commands are not considered in the queue size since they have no
>   * payload.
>   */
>  
>  static inline void
>  increase_queue_size(struct worker *worker, size_t len)
>  {
> +  assert (worker->queue_size < queue_size);
>    worker->queue_size += len;
>  }
>  
>  static inline void
>  decrease_queue_size(struct worker *worker, size_t len)
>  {
>    assert (worker->queue_size >= len);
>    worker->queue_size -= len;
>  }
>  
> @@ -203,21 +204,21 @@ worker_thread (void *wp)
>           * requests.
>           */
>          while (exts.ptr[i].length > 0) {
>            len = exts.ptr[i].length;
>            if (len > request_size)
>              len = request_size;
>  
>            command = create_command (exts.ptr[i].offset, len,
>                                      false, w);
>  
> -          wait_for_request_slots (w->index);
> +          wait_for_request_slots (w);
>  
>            /* NOTE: Must increase the queue size after waiting. */
>            increase_queue_size (w, len);
>  
>            /* Begin the asynch read operation. */
>            src->ops->asynch_read (src, command,
>                                   (nbd_completion_callback) {
>                                     .callback = finished_read,
>                                     .user_data = command,
>                                   });
> @@ -233,36 +234,37 @@ worker_thread (void *wp)
>    }
>  
>    /* Wait for in flight NBD requests to finish. */
>    while (in_flight (w->index) > 0)
>      poll_both_ends (w->index);
>  
>    free (exts.ptr);
>    return NULL;
>  }
>  
> -/* If the number of requests in flight exceeds the limit, poll
> - * waiting for at least one request to finish.  This enforces
> - * the user --requests option.
> +/* If the number of requests in flight or the number of queued bytes
> + * exceed the limit, poll waiting for at least one request to finish.
> + * This enforces the user --requests and --queue-size options.
>   *
>   * NB: Unfortunately it's not possible to call this from a callback,
>   * since it will deadlock trying to grab the libnbd handle lock.  This
>   * means that although the worker thread calls this and enforces the
>   * limit, when we split up requests into subrequests (eg. doing
>   * sparseness detection) we will probably exceed the user request
>   * limit. XXX
>   */
>  static void
> -wait_for_request_slots (size_t index)
> +wait_for_request_slots (struct worker *worker)
>  {
> -  while (in_flight (index) >= max_requests)
> -    poll_both_ends (index);
> +  while (in_flight (worker->index) >= max_requests ||
> +         worker->queue_size >= queue_size)
> +    poll_both_ends (worker->index);
>  }
>  
>  /* Count the number of asynchronous commands in flight. */
>  static unsigned
>  in_flight (size_t index)
>  {
>    return src->ops->in_flight (src, index) + dst->ops->in_flight (dst, index);
>  }
>  
>  /* Poll (optional) NBD src and NBD dst, moving the state machine(s)
> diff --git a/copy/nbdcopy.h b/copy/nbdcopy.h
> index d6bd63f0..19797dfd 100644
> --- a/copy/nbdcopy.h
> +++ b/copy/nbdcopy.h
> @@ -226,20 +226,21 @@ extern void asynch_notify_read_write_not_supported (struct rw *rw,
>  
>  extern bool allocated;
>  extern unsigned connections;
>  extern bool destination_is_zero;
>  extern bool extents;
>  extern bool flush;
>  extern unsigned max_requests;
>  extern bool progress;
>  extern int progress_fd;
>  extern unsigned request_size;
> +extern unsigned queue_size;
>  extern unsigned sparse_size;
>  extern bool synchronous;
>  extern unsigned threads;
>  extern bool verbose;
>  
>  extern const char *prog;
>  
>  extern void progress_bar (off_t pos, int64_t size);
>  extern void synch_copying (void);
>  extern void multi_thread_copying (void);
> diff --git a/copy/nbdcopy.pod b/copy/nbdcopy.pod
> index f7100935..22ceca8e 100644
> --- a/copy/nbdcopy.pod
> +++ b/copy/nbdcopy.pod
> @@ -1,21 +1,22 @@
>  =head1 NAME
>  
>  nbdcopy - copy to and from an NBD server
>  
>  =head1 SYNOPSIS
>  
>   nbdcopy [--allocated] [-C N|--connections=N]
>           [--destination-is-zero|--target-is-zero] [--flush]
>           [--no-extents] [-p|--progress|--progress=FD]
> -         [--request-size=N] [-R N|--requests=N] [-S N|--sparse=N]
> -         [--synchronous] [-T N|--threads=N] [-v|--verbose]
> +         [--request-size=N] [--queue-size=N] [-R N|--requests=N]
> +         [-S N|--sparse=N] [--synchronous] [-T N|--threads=N]
> +         [-v|--verbose]
>           SOURCE DESTINATION
>  
>   SOURCE, DESTINATION := - | FILE | DEVICE | NBD-URI | [ CMD ARGS ... ]
>   DESTINATION += null:
>  
>   nbdcopy --help
>  
>   nbdcopy -V|--version
>  
>  =head1 EXAMPLES
> @@ -156,20 +157,27 @@ following shell commands:
>  
>  Set the maximum request size in bytes. The maximum value is 32 MiB,
>  specified by the NBD protocol.
>  
>  =item B<-R> N
>  
>  =item B<--requests=>N
>  
>  Set the maximum number of requests in flight per NBD connection.
>  
> +=item B<--queue-size=>N
> +
> +Set the maximum number of bytes to queue for in flight requests. The
> +default value is 16 MiB, allowing up to 64 256k requests per NBD
> +connection. If you use larger B<--request-size> you may want to increase
> +this value.
> +
>  =item B<-S> N
>  
>  =item B<--sparse=>N
>  
>  Detect all zero blocks of size N (bytes) and make them sparse on the
>  output.  You can also turn off sparse detection using S<I<-S 0>>.
>  The default is 4096 bytes.
>  
>  =item B<--synchronous>

Patch series looks pretty sensible to me.  Let's see if Eric has any
comments.

Should the queue size be represented by a size_t?  While it seems
unlikely now you'd want a queue of larger than 4GB, I guess there
might be a niche case for it (and maybe in 10 years we'll all have
massive laptops!)

Rich.

-- 
Richard Jones, Virtualization Group, Red Hat http://people.redhat.com/~rjones
Read my programming and virtualization blog: http://rwmj.wordpress.com
libguestfs lets you edit virtual machines.  Supports shell scripting,
bindings from many languages.  http://libguestfs.org




More information about the Libguestfs mailing list