[Libguestfs] [PATCH libnbd 8/8] copy: Adaptive queue size
Richard W.M. Jones
rjones at redhat.com
Sun Feb 20 18:56:57 UTC 2022
On Sun, Feb 20, 2022 at 02:14:03PM +0200, Nir Soffer wrote:
> Limit the size of the copy queue also by the number of queued bytes.
> This allows using many concurrent small requests, required to get good
> performance, but limiting the number of large requests that are actually
> faster with lower concurrency.
>
> New --queue-size option added to control the maximum queue size. With 2
> MiB we can have 8 256 KiB requests per connection. The default queue
> size is 16 MiB, to match the default --requests value (64) with the
> default --request-size (256 KiB). Testing show that using more than 16
> 256 KiB requests with one connection do not improve anything.
>
> The new option will simplify limiting memory usage when using large
> requests, like this change in virt-v2v:
> https://github.com/libguestfs/virt-v2v/commit/c943420219fa0ee971fc228aa4d9127c5ce973f7
>
> I tested this change with 3 images:
>
> - Fedora 35 + 3g of random data - hopefully simulates a real image
> - Fully allocated image - the special case when every read command is
> converted to a write command.
> - Zero image - the special case when every read command is converted to
> a zero command.
>
> On 2 machines:
>
> - laptop: Intel(R) Core(TM) i7-10850H CPU @ 2.70GHz, 12 cpus,
> 1.5 MiB L2 cache per 2 cpus, 12 MiB L3 cache.
> - server: Intel(R) Xeon(R) Gold 5218R CPU @ 2.10GHz, 80 cpus,
> 1 MiB L2 cache per cpu, 27.5 MiB L3 cache.
>
> In all cases, both source and destination are served by qemu-nbd, using
> --cache=none --aio=native. Because qemu-nbd does not support MULTI_CON
> for writing, we are testing a single connection when copying an to
> qemu-nbd. I tested also copying to null: since in this case we use 4
> connections (these tests are marked with /ro).
>
> Results for copying all images on all machines with nbdcopy v1.11.0 and
> this change. "before" and "after" are average time of 10 runs.
>
> image machine before after queue size improvement
> ===================================================================
> fedora laptop 3.044 2.129 2m +43%
> full laptop 4.900 3.136 2m +56%
> zero laptop 3.147 2.624 2m +20%
> -------------------------------------------------------------------
> fedora server 2.324 2.189 16m +6%
> full server 3.521 3.380 8m +4%
> zero server 2.297 2.338 16m -2%
> -------------------------------------------------------------------
> fedora/ro laptop 2.040 1.663 1m +23%
> fedora/ro server 1.585 1.393 2m +14%
>
> Signed-off-by: Nir Soffer <nsoffer at redhat.com>
> ---
> copy/main.c | 52 ++++++++++++++++++++++++-------------
> copy/multi-thread-copying.c | 18 +++++++------
> copy/nbdcopy.h | 1 +
> copy/nbdcopy.pod | 12 +++++++--
> 4 files changed, 55 insertions(+), 28 deletions(-)
>
> diff --git a/copy/main.c b/copy/main.c
> index 390de1eb..832f99da 100644
> --- a/copy/main.c
> +++ b/copy/main.c
> @@ -36,53 +36,55 @@
>
> #include <pthread.h>
>
> #include <libnbd.h>
>
> #include "ispowerof2.h"
> #include "human-size.h"
> #include "version.h"
> #include "nbdcopy.h"
>
> -bool allocated; /* --allocated flag */
> -unsigned connections = 4; /* --connections */
> -bool destination_is_zero; /* --destination-is-zero flag */
> -bool extents = true; /* ! --no-extents flag */
> -bool flush; /* --flush flag */
> -unsigned max_requests = 64; /* --requests */
> -bool progress; /* -p flag */
> -int progress_fd = -1; /* --progress=FD */
> -unsigned request_size = 1<<18; /* --request-size */
> -unsigned sparse_size = 4096; /* --sparse */
> -bool synchronous; /* --synchronous flag */
> -unsigned threads; /* --threads */
> -struct rw *src, *dst; /* The source and destination. */
> -bool verbose; /* --verbose flag */
> -
> -const char *prog; /* program name (== basename argv[0]) */
> +bool allocated; /* --allocated flag */
> +unsigned connections = 4; /* --connections */
> +bool destination_is_zero; /* --destination-is-zero flag */
> +bool extents = true; /* ! --no-extents flag */
> +bool flush; /* --flush flag */
> +unsigned max_requests = 64; /* --requests */
> +bool progress; /* -p flag */
> +int progress_fd = -1; /* --progress=FD */
> +unsigned request_size = 1<<18; /* --request-size */
> +unsigned queue_size = 16<<20; /* --queue-size */
> +unsigned sparse_size = 4096; /* --sparse */
> +bool synchronous; /* --synchronous flag */
> +unsigned threads; /* --threads */
> +struct rw *src, *dst; /* The source and destination. */
> +bool verbose; /* --verbose flag */
> +
> +const char *prog; /* program name (== basename argv[0]) */
>
> static bool is_nbd_uri (const char *s);
> static struct rw *open_local (const char *filename, direction d);
> static void print_rw (struct rw *rw, const char *prefix, FILE *fp);
>
> static void __attribute__((noreturn))
> usage (FILE *fp, int exitcode)
> {
> fprintf (fp,
> "\n"
> "Copy to and from an NBD server:\n"
> "\n"
> " nbdcopy [--allocated] [-C N|--connections=N]\n"
> " [--destination-is-zero|--target-is-zero] [--flush]\n"
> " [--no-extents] [-p|--progress|--progress=FD]\n"
> -" [--request-size=N] [-R N|--requests=N] [-S N|--sparse=N]\n"
> -" [--synchronous] [-T N|--threads=N] [-v|--verbose]\n"
> +" [--request-size=N] [--queue-size=N] [-R N|--requests=N]\n"
> +" [-S N|--sparse=N] [--synchronous] [-T N|--threads=N] \n"
> +" [-v|--verbose]\n"
> " SOURCE DESTINATION\n"
> "\n"
> " SOURCE, DESTINATION := - | FILE | DEVICE | NBD-URI | [ CMD ARGS ... ]\n"
> " DESTINATION += null:\n"
> "\n"
> "Other options:\n"
> "\n"
> " nbdcopy --help\n"
> " nbdcopy -V|--version\n"
> "\n"
> @@ -104,33 +106,35 @@ main (int argc, char *argv[])
> {
> enum {
> HELP_OPTION = CHAR_MAX + 1,
> LONG_OPTIONS,
> SHORT_OPTIONS,
> ALLOCATED_OPTION,
> DESTINATION_IS_ZERO_OPTION,
> FLUSH_OPTION,
> NO_EXTENTS_OPTION,
> REQUEST_SIZE_OPTION,
> + QUEUE_SIZE_OPTION,
> SYNCHRONOUS_OPTION,
> };
> const char *short_options = "C:pR:S:T:vV";
> const struct option long_options[] = {
> { "help", no_argument, NULL, HELP_OPTION },
> { "long-options", no_argument, NULL, LONG_OPTIONS },
> { "allocated", no_argument, NULL, ALLOCATED_OPTION },
> { "connections", required_argument, NULL, 'C' },
> { "destination-is-zero",no_argument, NULL, DESTINATION_IS_ZERO_OPTION },
> { "flush", no_argument, NULL, FLUSH_OPTION },
> { "no-extents", no_argument, NULL, NO_EXTENTS_OPTION },
> { "progress", optional_argument, NULL, 'p' },
> { "request-size", required_argument, NULL, REQUEST_SIZE_OPTION },
> + { "queue-size", required_argument, NULL, QUEUE_SIZE_OPTION },
> { "requests", required_argument, NULL, 'R' },
> { "short-options", no_argument, NULL, SHORT_OPTIONS },
> { "sparse", required_argument, NULL, 'S' },
> { "synchronous", no_argument, NULL, SYNCHRONOUS_OPTION },
> { "target-is-zero", no_argument, NULL, DESTINATION_IS_ZERO_OPTION },
> { "threads", required_argument, NULL, 'T' },
> { "verbose", no_argument, NULL, 'v' },
> { "version", no_argument, NULL, 'V' },
> { NULL }
> };
> @@ -212,20 +216,28 @@ main (int argc, char *argv[])
> }
> if (request_size < MIN_REQUEST_SIZE || request_size > MAX_REQUEST_SIZE ||
> !is_power_of_2 (request_size)) {
> fprintf (stderr,
> "%s: --request-size: must be a power of 2 within %d-%d\n",
> prog, MIN_REQUEST_SIZE, MAX_REQUEST_SIZE);
> exit (EXIT_FAILURE);
> }
> break;
>
> + case QUEUE_SIZE_OPTION:
> + if (sscanf (optarg, "%u", &queue_size) != 1) {
> + fprintf (stderr, "%s: --queue-size: could not parse: %s\n",
> + prog, optarg);
> + exit (EXIT_FAILURE);
> + }
> + break;
> +
> case 'R':
> if (sscanf (optarg, "%u", &max_requests) != 1 || max_requests == 0) {
> fprintf (stderr, "%s: --requests: could not parse: %s\n",
> prog, optarg);
> exit (EXIT_FAILURE);
> }
> break;
>
> case 'S':
> if (sscanf (optarg, "%u", &sparse_size) != 1) {
> @@ -360,20 +372,24 @@ main (int argc, char *argv[])
> }
>
> if (synchronous)
> connections = 1;
>
> if (connections < threads)
> threads = connections;
> if (threads < connections)
> connections = threads;
>
> + /* Adapt queue to size to request size if needed. */
> + if (request_size > queue_size)
> + queue_size = request_size;
> +
> /* Truncate the destination to the same size as the source. Only
> * has an effect on regular files.
> */
> if (dst->ops->truncate)
> dst->ops->truncate (dst, src->size);
>
> /* Check if the source is bigger than the destination, since that
> * would truncate (ie. lose) data. Copying from smaller to larger
> * is OK.
> */
> diff --git a/copy/multi-thread-copying.c b/copy/multi-thread-copying.c
> index 620dc571..d9f17285 100644
> --- a/copy/multi-thread-copying.c
> +++ b/copy/multi-thread-copying.c
> @@ -123,21 +123,21 @@ multi_thread_copying (void)
> if (err != 0) {
> errno = err;
> perror ("pthread_join");
> exit (EXIT_FAILURE);
> }
> }
>
> free (workers);
> }
>
> -static void wait_for_request_slots (size_t index);
> +static void wait_for_request_slots (struct worker *worker);
> static unsigned in_flight (size_t index);
> static void poll_both_ends (size_t index);
> static int finished_read (void *vp, int *error);
> static int finished_command (void *vp, int *error);
> static void free_command (struct command *command);
> static void fill_dst_range_with_zeroes (struct command *command);
> static struct command *create_command (uint64_t offset, size_t len, bool zero,
> struct worker *worker);
>
> /* Tracking worker queue size.
> @@ -148,20 +148,21 @@ static struct command *create_command (uint64_t offset, size_t len, bool zero,
> * subcommand in finished_read(), or when a write command completes in
> * finished_command().
> *
> * Zero commands are not considered in the queue size since they have no
> * payload.
> */
>
> static inline void
> increase_queue_size(struct worker *worker, size_t len)
> {
> + assert (worker->queue_size < queue_size);
> worker->queue_size += len;
> }
>
> static inline void
> decrease_queue_size(struct worker *worker, size_t len)
> {
> assert (worker->queue_size >= len);
> worker->queue_size -= len;
> }
>
> @@ -203,21 +204,21 @@ worker_thread (void *wp)
> * requests.
> */
> while (exts.ptr[i].length > 0) {
> len = exts.ptr[i].length;
> if (len > request_size)
> len = request_size;
>
> command = create_command (exts.ptr[i].offset, len,
> false, w);
>
> - wait_for_request_slots (w->index);
> + wait_for_request_slots (w);
>
> /* NOTE: Must increase the queue size after waiting. */
> increase_queue_size (w, len);
>
> /* Begin the asynch read operation. */
> src->ops->asynch_read (src, command,
> (nbd_completion_callback) {
> .callback = finished_read,
> .user_data = command,
> });
> @@ -233,36 +234,37 @@ worker_thread (void *wp)
> }
>
> /* Wait for in flight NBD requests to finish. */
> while (in_flight (w->index) > 0)
> poll_both_ends (w->index);
>
> free (exts.ptr);
> return NULL;
> }
>
> -/* If the number of requests in flight exceeds the limit, poll
> - * waiting for at least one request to finish. This enforces
> - * the user --requests option.
> +/* If the number of requests in flight or the number of queued bytes
> + * exceed the limit, poll waiting for at least one request to finish.
> + * This enforces the user --requests and --queue-size options.
> *
> * NB: Unfortunately it's not possible to call this from a callback,
> * since it will deadlock trying to grab the libnbd handle lock. This
> * means that although the worker thread calls this and enforces the
> * limit, when we split up requests into subrequests (eg. doing
> * sparseness detection) we will probably exceed the user request
> * limit. XXX
> */
> static void
> -wait_for_request_slots (size_t index)
> +wait_for_request_slots (struct worker *worker)
> {
> - while (in_flight (index) >= max_requests)
> - poll_both_ends (index);
> + while (in_flight (worker->index) >= max_requests ||
> + worker->queue_size >= queue_size)
> + poll_both_ends (worker->index);
> }
>
> /* Count the number of asynchronous commands in flight. */
> static unsigned
> in_flight (size_t index)
> {
> return src->ops->in_flight (src, index) + dst->ops->in_flight (dst, index);
> }
>
> /* Poll (optional) NBD src and NBD dst, moving the state machine(s)
> diff --git a/copy/nbdcopy.h b/copy/nbdcopy.h
> index d6bd63f0..19797dfd 100644
> --- a/copy/nbdcopy.h
> +++ b/copy/nbdcopy.h
> @@ -226,20 +226,21 @@ extern void asynch_notify_read_write_not_supported (struct rw *rw,
>
> extern bool allocated;
> extern unsigned connections;
> extern bool destination_is_zero;
> extern bool extents;
> extern bool flush;
> extern unsigned max_requests;
> extern bool progress;
> extern int progress_fd;
> extern unsigned request_size;
> +extern unsigned queue_size;
> extern unsigned sparse_size;
> extern bool synchronous;
> extern unsigned threads;
> extern bool verbose;
>
> extern const char *prog;
>
> extern void progress_bar (off_t pos, int64_t size);
> extern void synch_copying (void);
> extern void multi_thread_copying (void);
> diff --git a/copy/nbdcopy.pod b/copy/nbdcopy.pod
> index f7100935..22ceca8e 100644
> --- a/copy/nbdcopy.pod
> +++ b/copy/nbdcopy.pod
> @@ -1,21 +1,22 @@
> =head1 NAME
>
> nbdcopy - copy to and from an NBD server
>
> =head1 SYNOPSIS
>
> nbdcopy [--allocated] [-C N|--connections=N]
> [--destination-is-zero|--target-is-zero] [--flush]
> [--no-extents] [-p|--progress|--progress=FD]
> - [--request-size=N] [-R N|--requests=N] [-S N|--sparse=N]
> - [--synchronous] [-T N|--threads=N] [-v|--verbose]
> + [--request-size=N] [--queue-size=N] [-R N|--requests=N]
> + [-S N|--sparse=N] [--synchronous] [-T N|--threads=N]
> + [-v|--verbose]
> SOURCE DESTINATION
>
> SOURCE, DESTINATION := - | FILE | DEVICE | NBD-URI | [ CMD ARGS ... ]
> DESTINATION += null:
>
> nbdcopy --help
>
> nbdcopy -V|--version
>
> =head1 EXAMPLES
> @@ -156,20 +157,27 @@ following shell commands:
>
> Set the maximum request size in bytes. The maximum value is 32 MiB,
> specified by the NBD protocol.
>
> =item B<-R> N
>
> =item B<--requests=>N
>
> Set the maximum number of requests in flight per NBD connection.
>
> +=item B<--queue-size=>N
> +
> +Set the maximum number of bytes to queue for in flight requests. The
> +default value is 16 MiB, allowing up to 64 256k requests per NBD
> +connection. If you use larger B<--request-size> you may want to increase
> +this value.
> +
> =item B<-S> N
>
> =item B<--sparse=>N
>
> Detect all zero blocks of size N (bytes) and make them sparse on the
> output. You can also turn off sparse detection using S<I<-S 0>>.
> The default is 4096 bytes.
>
> =item B<--synchronous>
Patch series looks pretty sensible to me. Let's see if Eric has any
comments.
Should the queue size be represented by a size_t? While it seems
unlikely now you'd want a queue of larger than 4GB, I guess there
might be a niche case for it (and maybe in 10 years we'll all have
massive laptops!)
Rich.
--
Richard Jones, Virtualization Group, Red Hat http://people.redhat.com/~rjones
Read my programming and virtualization blog: http://rwmj.wordpress.com
libguestfs lets you edit virtual machines. Supports shell scripting,
bindings from many languages. http://libguestfs.org
More information about the Libguestfs
mailing list