[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]

[Libguestfs] [nbdkit PATCH 6/6] Add --threads option for supporting true parallel requests



It's finally time to implement one of the TODO items: we want to
support a thread pool of parallel readers from the client, in
order to allow multiple in-flight operations with potential
out-of-order completion.  We also need at least one plugin that
supports parallel processing for testing the option; the file
plugin fits the bill.

Add and document a new command line option, -t/--threads=N,
which controls how many threads to create per connection (although
we only ever spawn multiple threads if the plugin is parallel,
since otherwise, at most one thread is running at a time anyway).
Setting -t 1 forces a parallel plugin to behave serialized,
setting to other values allows tuning for performance; the
default of 16 matches the choice of MAX_NBD_REQUESTS used in qemu.

One easy way to test:
term1$ echo hello > junk
term1$ ./nbdkit -f -v -r file file=junk rdelay=2s wdelay=1s
term2$ qemu-io -f raw nbd://localhost:10809/ --trace='nbd_*' \
  -c 'aio_read 0 1' -c 'aio_write -P 0x6c 2 2' -c 'aio_flush'

If the write completes before the read, then nbdkit was properly
handling things in parallel with out-of-order replies.

Signed-off-by: Eric Blake <eblake redhat com>
---
 TODO                |  7 -------
 docs/nbdkit.pod     | 12 +++++++++++-
 nbdkit.in           |  2 +-
 plugins/file/file.c |  2 +-
 src/connections.c   | 10 +++++++---
 src/internal.h      |  2 ++
 src/main.c          | 20 ++++++++++++++++++--
 src/plugins.c       |  8 ++++++++
 8 files changed, 48 insertions(+), 15 deletions(-)

diff --git a/TODO b/TODO
index 6c5bb5b..db7469b 100644
--- a/TODO
+++ b/TODO
@@ -12,10 +12,3 @@
 * Glance and/or cinder plugins.

 * Performance - measure and improve it.
-
-* Implement true parallel request handling.  Currently
-  NBDKIT_THREAD_MODEL_SERIALIZE_REQUESTS and
-  NBDKIT_THREAD_MODEL_PARALLEL are the same, because we handle
-  requests within each connection synchronously one at a time.  We
-  could (and should) be able to handle them in parallel by having
-  another thread pool for requests.
diff --git a/docs/nbdkit.pod b/docs/nbdkit.pod
index e3043ba..4593391 100644
--- a/docs/nbdkit.pod
+++ b/docs/nbdkit.pod
@@ -9,7 +9,7 @@ nbdkit - A toolkit for creating NBD servers
  nbdkit [-e EXPORTNAME] [--exit-with-parent] [-f]
         [-g GROUP] [-i IPADDR]
         [--newstyle] [--oldstyle] [-P PIDFILE] [-p PORT] [-r]
-        [--run CMD] [-s] [--selinux-label LABEL]
+        [--run CMD] [-s] [--selinux-label LABEL] [-t THREADS]
         [--tls=off|on|require] [--tls-certificates /path/to/certificates]
         [--tls-verify-peer]
         [-U SOCKET] [-u USER] [-v] [-V]
@@ -230,6 +230,16 @@ Unix domain sockets:

  nbdkit --selinux-label system_u:object_r:svirt_t:s0 ...

+=item B<-t> THREADS
+
+= item B<--threads> THREADS
+
+Set the number of threads to be used per connection, which in turn
+controls the number of outstanding requests that can be processed at
+once.  Only matters for plugins with thread_model=parallel (where it
+defaults to 16).  To force serialized behavior (useful if the client
+is not prepared for out-of-order responses), set this to 1.
+
 =item B<--tls=off>

 =item B<--tls=on>
diff --git a/nbdkit.in b/nbdkit.in
index 6be89ec..9c3d625 100644
--- a/nbdkit.in
+++ b/nbdkit.in
@@ -65,7 +65,7 @@ verbose=
 while [ $# -gt 0 ]; do
     case "$1" in
         # Flags that take an argument.  We must not rewrite the argument.
-        -e | --export* | -g | --group | -i | --ip* | -P | --pid* | -p | --port | --run | --selinux-label | --tls | --tls-certificates | -U | --unix | -u | --user)
+        -e | --export* | -g | --group | -i | --ip* | -P | --pid* | -p | --port | --run | --selinux-label | -t | --threads | --tls | --tls-certificates | -U | --unix | -u | --user)
             args[$i]="$1"
             ((++i))
             args[$i]="$2"
diff --git a/plugins/file/file.c b/plugins/file/file.c
index a603be8..ef5da3d 100644
--- a/plugins/file/file.c
+++ b/plugins/file/file.c
@@ -200,7 +200,7 @@ file_close (void *handle)
   free (h);
 }

-#define THREAD_MODEL NBDKIT_THREAD_MODEL_SERIALIZE_REQUESTS
+#define THREAD_MODEL NBDKIT_THREAD_MODEL_PARALLEL

 /* Get the file size. */
 static int64_t
diff --git a/src/connections.c b/src/connections.c
index 5257032..2d184b0 100644
--- a/src/connections.c
+++ b/src/connections.c
@@ -59,6 +59,9 @@
 /* Maximum length of any option data (bytes). */
 #define MAX_OPTION_LENGTH 4096

+/* Default number of parallel requests. */
+#define DEFAULT_PARALLEL_REQUESTS 16
+
 /* Connection structure. */
 struct connection {
   pthread_mutex_t request_lock;
@@ -203,9 +206,11 @@ _handle_single_connection (int sockin, int sockout)
 {
   int r = -1;
   struct connection *conn;
-  int nworkers = 1; /* TODO default to 16 for parallel plugins, with command-line override */
+  int nworkers = threads ? threads : DEFAULT_PARALLEL_REQUESTS;
   pthread_t *workers = NULL;

+  if (!plugin_is_parallel())
+    nworkers = 0;
   conn = new_connection (sockin, sockout, nworkers);
   if (!conn)
     goto done;
@@ -219,10 +224,9 @@ _handle_single_connection (int sockin, int sockout)
   if (negotiate_handshake (conn) == -1)
     goto done;

-  if (nworkers == 1) {
+  if (!nworkers) {
     /* No need for a separate thread. */
     debug ("handshake complete, processing requests serially");
-    nworkers = conn->nworkers = 0;
     while (!quit && get_status (conn) > 0)
       recv_request_send_reply (conn);
   }
diff --git a/src/internal.h b/src/internal.h
index 1fc5d69..b79c12c 100644
--- a/src/internal.h
+++ b/src/internal.h
@@ -103,6 +103,7 @@ extern const char *tls_certificates_dir;
 extern int tls_verify_peer;
 extern char *unixsocket;
 extern int verbose;
+extern int threads;

 extern volatile int quit;

@@ -151,6 +152,7 @@ extern void plugin_lock_connection (void);
 extern void plugin_unlock_connection (void);
 extern void plugin_lock_request (struct connection *conn);
 extern void plugin_unlock_request (struct connection *conn);
+extern bool plugin_is_parallel (void);
 extern int plugin_errno_is_preserved (void);
 extern int plugin_open (struct connection *conn, int readonly);
 extern void plugin_close (struct connection *conn);
diff --git a/src/main.c b/src/main.c
index c9f08ab..cc5e9e3 100644
--- a/src/main.c
+++ b/src/main.c
@@ -84,6 +84,7 @@ int readonly;                   /* -r */
 char *run;                      /* --run */
 int listen_stdin;               /* -s */
 const char *selinux_label;      /* --selinux-label */
+int threads;                    /* -t */
 int tls;                        /* --tls : 0=off 1=on 2=require */
 const char *tls_certificates_dir; /* --tls-certificates */
 int tls_verify_peer;            /* --tls-verify-peer */
@@ -99,7 +100,7 @@ static char *random_fifo = NULL;

 enum { HELP_OPTION = CHAR_MAX + 1 };

-static const char *short_options = "e:fg:i:nop:P:rsu:U:vV";
+static const char *short_options = "e:fg:i:nop:P:rst:u:U:vV";
 static const struct option long_options[] = {
   { "help",       0, NULL, HELP_OPTION },
   { "dump-config",0, NULL, 0 },
@@ -126,6 +127,7 @@ static const struct option long_options[] = {
   { "selinux-label", 1, NULL, 0 },
   { "single",     0, NULL, 's' },
   { "stdin",      0, NULL, 's' },
+  { "threads",    1, NULL, 't' },
   { "tls",        1, NULL, 0 },
   { "tls-certificates", 1, NULL, 0 },
   { "tls-verify-peer", 0, NULL, 0 },
@@ -143,7 +145,7 @@ usage (void)
           "       [-e EXPORTNAME] [--exit-with-parent] [-f]\n"
           "       [-g GROUP] [-i IPADDR]\n"
           "       [--newstyle] [--oldstyle] [-P PIDFILE] [-p PORT] [-r]\n"
-          "       [--run CMD] [-s] [--selinux-label LABEL]\n"
+          "       [--run CMD] [-s] [--selinux-label LABEL] [-t THREADS]\n"
           "       [--tls=off|on|require] [--tls-certificates /path/to/certificates]\n"
           "       [--tls-verify-peer]\n"
           "       [-U SOCKET] [-u USER] [-v] [-V]\n"
@@ -331,6 +333,20 @@ main (int argc, char *argv[])
       listen_stdin = 1;
       break;

+    case 't':
+      {
+        char *end;
+
+        errno = 0;
+        threads = strtoul (optarg, &end, 0);
+        if (errno || *end) {
+          fprintf (stderr, "%s: cannot parse '%s' into threads\n",
+                   program_name, optarg);
+          exit (EXIT_FAILURE);
+        }
+        /* XXX Worth a maximimum limit on threads? */
+      }
+
     case 'U':
       if (socket_activation) {
         fprintf (stderr, "%s: cannot use socket activation with -U flag\n",
diff --git a/src/plugins.c b/src/plugins.c
index e8c6b28..47c4fa5 100644
--- a/src/plugins.c
+++ b/src/plugins.c
@@ -360,6 +360,14 @@ plugin_unlock_request (struct connection *conn)
   }
 }

+bool
+plugin_is_parallel (void)
+{
+  assert (dl);
+
+  return plugin._thread_model >= NBDKIT_THREAD_MODEL_PARALLEL;
+}
+
 int
 plugin_errno_is_preserved (void)
 {
-- 
2.13.6


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]