[Libguestfs] [PATCH libnbd 8/8] copy: Adaptive queue size

Nir Soffer nsoffer at redhat.com
Sun Feb 20 12:14:03 UTC 2022


Limit the size of the copy queue also by the number of queued bytes.
This allows using many concurrent small requests, required to get good
performance, but limiting the number of large requests that are actually
faster with lower concurrency.

New --queue-size option added to control the maximum queue size. With 2
MiB we can have 8 256 KiB requests per connection. The default queue
size is 16 MiB, to match the default --requests value (64) with the
default --request-size (256 KiB). Testing show that using more than 16
256 KiB requests with one connection do not improve anything.

The new option will simplify limiting memory usage when using large
requests, like this change in virt-v2v:
https://github.com/libguestfs/virt-v2v/commit/c943420219fa0ee971fc228aa4d9127c5ce973f7

I tested this change with 3 images:

- Fedora 35 + 3g of random data - hopefully simulates a real image
- Fully allocated image - the special case when every read command is
  converted to a write command.
- Zero image - the special case when every read command is converted to
  a zero command.

On 2 machines:

- laptop: Intel(R) Core(TM) i7-10850H CPU @ 2.70GHz, 12 cpus,
  1.5 MiB L2 cache per 2 cpus, 12 MiB L3 cache.
- server: Intel(R) Xeon(R) Gold 5218R CPU @ 2.10GHz, 80 cpus,
  1 MiB L2 cache per cpu, 27.5 MiB L3 cache.

In all cases, both source and destination are served by qemu-nbd, using
--cache=none --aio=native. Because qemu-nbd does not support MULTI_CON
for writing, we are testing a single connection when copying an to
qemu-nbd. I tested also copying to null: since in this case we use 4
connections (these tests are marked with /ro).

Results for copying all images on all machines with nbdcopy v1.11.0 and
this change. "before" and "after" are average time of 10 runs.

image       machine    before    after    queue size    improvement
===================================================================
fedora      laptop      3.044    2.129           2m           +43%
full        laptop      4.900    3.136           2m           +56%
zero        laptop      3.147    2.624           2m           +20%
-------------------------------------------------------------------
fedora      server      2.324    2.189          16m            +6%
full        server      3.521    3.380           8m            +4%
zero        server      2.297    2.338          16m            -2%
-------------------------------------------------------------------
fedora/ro   laptop      2.040    1.663           1m           +23%
fedora/ro   server      1.585    1.393           2m           +14%

Signed-off-by: Nir Soffer <nsoffer at redhat.com>
---
 copy/main.c                 | 52 ++++++++++++++++++++++++-------------
 copy/multi-thread-copying.c | 18 +++++++------
 copy/nbdcopy.h              |  1 +
 copy/nbdcopy.pod            | 12 +++++++--
 4 files changed, 55 insertions(+), 28 deletions(-)

diff --git a/copy/main.c b/copy/main.c
index 390de1eb..832f99da 100644
--- a/copy/main.c
+++ b/copy/main.c
@@ -36,53 +36,55 @@
 
 #include <pthread.h>
 
 #include <libnbd.h>
 
 #include "ispowerof2.h"
 #include "human-size.h"
 #include "version.h"
 #include "nbdcopy.h"
 
-bool allocated;                 /* --allocated flag */
-unsigned connections = 4;       /* --connections */
-bool destination_is_zero;       /* --destination-is-zero flag */
-bool extents = true;            /* ! --no-extents flag */
-bool flush;                     /* --flush flag */
-unsigned max_requests = 64;     /* --requests */
-bool progress;                  /* -p flag */
-int progress_fd = -1;           /* --progress=FD */
-unsigned request_size = 1<<18;  /* --request-size */
-unsigned sparse_size = 4096;    /* --sparse */
-bool synchronous;               /* --synchronous flag */
-unsigned threads;               /* --threads */
-struct rw *src, *dst;           /* The source and destination. */
-bool verbose;                   /* --verbose flag */
-
-const char *prog;               /* program name (== basename argv[0]) */
+bool allocated;                     /* --allocated flag */
+unsigned connections = 4;           /* --connections */
+bool destination_is_zero;           /* --destination-is-zero flag */
+bool extents = true;                /* ! --no-extents flag */
+bool flush;                         /* --flush flag */
+unsigned max_requests = 64;         /* --requests */
+bool progress;                      /* -p flag */
+int progress_fd = -1;               /* --progress=FD */
+unsigned request_size = 1<<18;      /* --request-size */
+unsigned queue_size = 16<<20;       /* --queue-size */
+unsigned sparse_size = 4096;        /* --sparse */
+bool synchronous;                   /* --synchronous flag */
+unsigned threads;                   /* --threads */
+struct rw *src, *dst;               /* The source and destination. */
+bool verbose;                       /* --verbose flag */
+
+const char *prog;                   /* program name (== basename argv[0]) */
 
 static bool is_nbd_uri (const char *s);
 static struct rw *open_local (const char *filename, direction d);
 static void print_rw (struct rw *rw, const char *prefix, FILE *fp);
 
 static void __attribute__((noreturn))
 usage (FILE *fp, int exitcode)
 {
   fprintf (fp,
 "\n"
 "Copy to and from an NBD server:\n"
 "\n"
 "    nbdcopy [--allocated] [-C N|--connections=N]\n"
 "            [--destination-is-zero|--target-is-zero] [--flush]\n"
 "            [--no-extents] [-p|--progress|--progress=FD]\n"
-"            [--request-size=N] [-R N|--requests=N] [-S N|--sparse=N]\n"
-"            [--synchronous] [-T N|--threads=N] [-v|--verbose]\n"
+"            [--request-size=N] [--queue-size=N] [-R N|--requests=N]\n"
+"            [-S N|--sparse=N] [--synchronous] [-T N|--threads=N] \n"
+"            [-v|--verbose]\n"
 "            SOURCE DESTINATION\n"
 "\n"
 "    SOURCE, DESTINATION := - | FILE | DEVICE | NBD-URI | [ CMD ARGS ... ]\n"
 "    DESTINATION += null:\n"
 "\n"
 "Other options:\n"
 "\n"
 "    nbdcopy --help\n"
 "    nbdcopy -V|--version\n"
 "\n"
@@ -104,33 +106,35 @@ main (int argc, char *argv[])
 {
   enum {
     HELP_OPTION = CHAR_MAX + 1,
     LONG_OPTIONS,
     SHORT_OPTIONS,
     ALLOCATED_OPTION,
     DESTINATION_IS_ZERO_OPTION,
     FLUSH_OPTION,
     NO_EXTENTS_OPTION,
     REQUEST_SIZE_OPTION,
+    QUEUE_SIZE_OPTION,
     SYNCHRONOUS_OPTION,
   };
   const char *short_options = "C:pR:S:T:vV";
   const struct option long_options[] = {
     { "help",               no_argument,       NULL, HELP_OPTION },
     { "long-options",       no_argument,       NULL, LONG_OPTIONS },
     { "allocated",          no_argument,       NULL, ALLOCATED_OPTION },
     { "connections",        required_argument, NULL, 'C' },
     { "destination-is-zero",no_argument,       NULL, DESTINATION_IS_ZERO_OPTION },
     { "flush",              no_argument,       NULL, FLUSH_OPTION },
     { "no-extents",         no_argument,       NULL, NO_EXTENTS_OPTION },
     { "progress",           optional_argument, NULL, 'p' },
     { "request-size",       required_argument, NULL, REQUEST_SIZE_OPTION },
+    { "queue-size",         required_argument, NULL, QUEUE_SIZE_OPTION },
     { "requests",           required_argument, NULL, 'R' },
     { "short-options",      no_argument,       NULL, SHORT_OPTIONS },
     { "sparse",             required_argument, NULL, 'S' },
     { "synchronous",        no_argument,       NULL, SYNCHRONOUS_OPTION },
     { "target-is-zero",     no_argument,       NULL, DESTINATION_IS_ZERO_OPTION },
     { "threads",            required_argument, NULL, 'T' },
     { "verbose",            no_argument,       NULL, 'v' },
     { "version",            no_argument,       NULL, 'V' },
     { NULL }
   };
@@ -212,20 +216,28 @@ main (int argc, char *argv[])
       }
       if (request_size < MIN_REQUEST_SIZE || request_size > MAX_REQUEST_SIZE ||
               !is_power_of_2 (request_size)) {
         fprintf (stderr,
                 "%s: --request-size: must be a power of 2 within %d-%d\n",
                  prog, MIN_REQUEST_SIZE, MAX_REQUEST_SIZE);
         exit (EXIT_FAILURE);
       }
       break;
 
+    case QUEUE_SIZE_OPTION:
+      if (sscanf (optarg, "%u", &queue_size) != 1) {
+        fprintf (stderr, "%s: --queue-size: could not parse: %s\n",
+                 prog, optarg);
+        exit (EXIT_FAILURE);
+      }
+      break;
+
     case 'R':
       if (sscanf (optarg, "%u", &max_requests) != 1 || max_requests == 0) {
         fprintf (stderr, "%s: --requests: could not parse: %s\n",
                  prog, optarg);
         exit (EXIT_FAILURE);
       }
       break;
 
     case 'S':
       if (sscanf (optarg, "%u", &sparse_size) != 1) {
@@ -360,20 +372,24 @@ main (int argc, char *argv[])
   }
 
   if (synchronous)
     connections = 1;
 
   if (connections < threads)
     threads = connections;
   if (threads < connections)
     connections = threads;
 
+  /* Adapt queue to size to request size if needed. */
+  if (request_size > queue_size)
+    queue_size = request_size;
+
   /* Truncate the destination to the same size as the source.  Only
    * has an effect on regular files.
    */
   if (dst->ops->truncate)
     dst->ops->truncate (dst, src->size);
 
   /* Check if the source is bigger than the destination, since that
    * would truncate (ie. lose) data.  Copying from smaller to larger
    * is OK.
    */
diff --git a/copy/multi-thread-copying.c b/copy/multi-thread-copying.c
index 620dc571..d9f17285 100644
--- a/copy/multi-thread-copying.c
+++ b/copy/multi-thread-copying.c
@@ -123,21 +123,21 @@ multi_thread_copying (void)
     if (err != 0) {
       errno = err;
       perror ("pthread_join");
       exit (EXIT_FAILURE);
     }
   }
 
   free (workers);
 }
 
-static void wait_for_request_slots (size_t index);
+static void wait_for_request_slots (struct worker *worker);
 static unsigned in_flight (size_t index);
 static void poll_both_ends (size_t index);
 static int finished_read (void *vp, int *error);
 static int finished_command (void *vp, int *error);
 static void free_command (struct command *command);
 static void fill_dst_range_with_zeroes (struct command *command);
 static struct command *create_command (uint64_t offset, size_t len, bool zero,
                                        struct worker *worker);
 
 /* Tracking worker queue size.
@@ -148,20 +148,21 @@ static struct command *create_command (uint64_t offset, size_t len, bool zero,
  * subcommand in finished_read(), or when a write command completes in
  * finished_command().
  *
  * Zero commands are not considered in the queue size since they have no
  * payload.
  */
 
 static inline void
 increase_queue_size(struct worker *worker, size_t len)
 {
+  assert (worker->queue_size < queue_size);
   worker->queue_size += len;
 }
 
 static inline void
 decrease_queue_size(struct worker *worker, size_t len)
 {
   assert (worker->queue_size >= len);
   worker->queue_size -= len;
 }
 
@@ -203,21 +204,21 @@ worker_thread (void *wp)
          * requests.
          */
         while (exts.ptr[i].length > 0) {
           len = exts.ptr[i].length;
           if (len > request_size)
             len = request_size;
 
           command = create_command (exts.ptr[i].offset, len,
                                     false, w);
 
-          wait_for_request_slots (w->index);
+          wait_for_request_slots (w);
 
           /* NOTE: Must increase the queue size after waiting. */
           increase_queue_size (w, len);
 
           /* Begin the asynch read operation. */
           src->ops->asynch_read (src, command,
                                  (nbd_completion_callback) {
                                    .callback = finished_read,
                                    .user_data = command,
                                  });
@@ -233,36 +234,37 @@ worker_thread (void *wp)
   }
 
   /* Wait for in flight NBD requests to finish. */
   while (in_flight (w->index) > 0)
     poll_both_ends (w->index);
 
   free (exts.ptr);
   return NULL;
 }
 
-/* If the number of requests in flight exceeds the limit, poll
- * waiting for at least one request to finish.  This enforces
- * the user --requests option.
+/* If the number of requests in flight or the number of queued bytes
+ * exceed the limit, poll waiting for at least one request to finish.
+ * This enforces the user --requests and --queue-size options.
  *
  * NB: Unfortunately it's not possible to call this from a callback,
  * since it will deadlock trying to grab the libnbd handle lock.  This
  * means that although the worker thread calls this and enforces the
  * limit, when we split up requests into subrequests (eg. doing
  * sparseness detection) we will probably exceed the user request
  * limit. XXX
  */
 static void
-wait_for_request_slots (size_t index)
+wait_for_request_slots (struct worker *worker)
 {
-  while (in_flight (index) >= max_requests)
-    poll_both_ends (index);
+  while (in_flight (worker->index) >= max_requests ||
+         worker->queue_size >= queue_size)
+    poll_both_ends (worker->index);
 }
 
 /* Count the number of asynchronous commands in flight. */
 static unsigned
 in_flight (size_t index)
 {
   return src->ops->in_flight (src, index) + dst->ops->in_flight (dst, index);
 }
 
 /* Poll (optional) NBD src and NBD dst, moving the state machine(s)
diff --git a/copy/nbdcopy.h b/copy/nbdcopy.h
index d6bd63f0..19797dfd 100644
--- a/copy/nbdcopy.h
+++ b/copy/nbdcopy.h
@@ -226,20 +226,21 @@ extern void asynch_notify_read_write_not_supported (struct rw *rw,
 
 extern bool allocated;
 extern unsigned connections;
 extern bool destination_is_zero;
 extern bool extents;
 extern bool flush;
 extern unsigned max_requests;
 extern bool progress;
 extern int progress_fd;
 extern unsigned request_size;
+extern unsigned queue_size;
 extern unsigned sparse_size;
 extern bool synchronous;
 extern unsigned threads;
 extern bool verbose;
 
 extern const char *prog;
 
 extern void progress_bar (off_t pos, int64_t size);
 extern void synch_copying (void);
 extern void multi_thread_copying (void);
diff --git a/copy/nbdcopy.pod b/copy/nbdcopy.pod
index f7100935..22ceca8e 100644
--- a/copy/nbdcopy.pod
+++ b/copy/nbdcopy.pod
@@ -1,21 +1,22 @@
 =head1 NAME
 
 nbdcopy - copy to and from an NBD server
 
 =head1 SYNOPSIS
 
  nbdcopy [--allocated] [-C N|--connections=N]
          [--destination-is-zero|--target-is-zero] [--flush]
          [--no-extents] [-p|--progress|--progress=FD]
-         [--request-size=N] [-R N|--requests=N] [-S N|--sparse=N]
-         [--synchronous] [-T N|--threads=N] [-v|--verbose]
+         [--request-size=N] [--queue-size=N] [-R N|--requests=N]
+         [-S N|--sparse=N] [--synchronous] [-T N|--threads=N]
+         [-v|--verbose]
          SOURCE DESTINATION
 
  SOURCE, DESTINATION := - | FILE | DEVICE | NBD-URI | [ CMD ARGS ... ]
  DESTINATION += null:
 
  nbdcopy --help
 
  nbdcopy -V|--version
 
 =head1 EXAMPLES
@@ -156,20 +157,27 @@ following shell commands:
 
 Set the maximum request size in bytes. The maximum value is 32 MiB,
 specified by the NBD protocol.
 
 =item B<-R> N
 
 =item B<--requests=>N
 
 Set the maximum number of requests in flight per NBD connection.
 
+=item B<--queue-size=>N
+
+Set the maximum number of bytes to queue for in flight requests. The
+default value is 16 MiB, allowing up to 64 256k requests per NBD
+connection. If you use larger B<--request-size> you may want to increase
+this value.
+
 =item B<-S> N
 
 =item B<--sparse=>N
 
 Detect all zero blocks of size N (bytes) and make them sparse on the
 output.  You can also turn off sparse detection using S<I<-S 0>>.
 The default is 4096 bytes.
 
 =item B<--synchronous>
 
-- 
2.35.1




More information about the Libguestfs mailing list