[Libguestfs] [nbdkit PATCH v2 5/8] connections: Set up thread pool for handling client requests

Eric Blake eblake at redhat.com
Mon Nov 20 19:38:06 UTC 2017


Finish plumbing up everything we will need to process multiple
client requests in parallel after handshake is complete. Since
status is now global, and properly protected by a mutex, all
of the threads will eventually quit as soon as any of them
notices EOF or nbdkit detects a signal.

For ease of review, the framework for configuring threads is
done separately from the low-level work of utilizing the threads,
so this patch sees no behavior change (because we hard-code
conn->nworkers to 0); although it's a one-line hack to test that
a larger nworkers still behaves the same even for a non-parallel
plugin (in fact, such a hack was how I found and squashed several
thread-safety bugs in the previous patches, exposed from running
test-socket-activation in a loop).

Signed-off-by: Eric Blake <eblake at redhat.com>
---
 src/connections.c | 91 +++++++++++++++++++++++++++++++++++++++++++++++++------
 1 file changed, 82 insertions(+), 9 deletions(-)

diff --git a/src/connections.c b/src/connections.c
index 9d95e7f..41371fb 100644
--- a/src/connections.c
+++ b/src/connections.c
@@ -68,7 +68,7 @@ struct connection {
   int status; /* 1 for more I/O with client, 0 for shutdown, -1 on error */
   void *handle;
   void *crypto_session;
-  int nworkers; /* TODO set up a thread pool for parallel workers */
+  int nworkers;

   uint64_t exportsize;
   int readonly;
@@ -83,7 +83,8 @@ struct connection {
   connection_close_function close;
 };

-static struct connection *new_connection (int sockin, int sockout);
+static struct connection *new_connection (int sockin, int sockout,
+                                          int nworkers);
 static void free_connection (struct connection *conn);
 static int negotiate_handshake (struct connection *conn);
 static int recv_request_send_reply (struct connection *conn);
@@ -175,12 +176,39 @@ set_status (struct connection *conn, int value)
   return value;
 }

+struct worker_data {
+  struct connection *conn;
+  char *name;
+};
+
+static void *
+connection_worker (void *data)
+{
+  struct worker_data *worker = data;
+  struct connection *conn = worker->conn;
+  char *name = worker->name;
+
+  debug ("starting worker thread %s", name);
+  threadlocal_new_server_thread ();
+  threadlocal_set_name (name);
+  free (worker);
+
+  while (!quit && get_status (conn) > 0)
+    recv_request_send_reply (conn);
+  debug ("exiting worker thread %s", threadlocal_get_name ());
+  free (name);
+  return NULL;
+}
+
 static int
 _handle_single_connection (int sockin, int sockout)
 {
   int r = -1;
-  struct connection *conn = new_connection (sockin, sockout);
+  struct connection *conn;
+  int nworkers = 1; /* TODO default to 16 for parallel plugins, with command-line override */
+  pthread_t *workers = NULL;

+  conn = new_connection (sockin, sockout, nworkers);
   if (!conn)
     goto done;

@@ -193,11 +221,55 @@ _handle_single_connection (int sockin, int sockout)
   if (negotiate_handshake (conn) == -1)
     goto done;

-  /* Process requests.  XXX Allow these to be dispatched in parallel using
-   * a thread pool.
-   */
-  while (!quit && get_status (conn) > 0)
-    recv_request_send_reply (conn);
+  if (nworkers <= 1) {
+    /* No need for a separate thread. */
+    debug ("handshake complete, processing requests serially");
+    conn->nworkers = 0;
+    while (!quit && get_status (conn) > 0)
+      recv_request_send_reply (conn);
+  }
+  else {
+    /* Create thread pool to process requests. */
+    debug ("handshake complete, processing requests with %d threads",
+           nworkers);
+    workers = calloc (nworkers, sizeof *workers);
+    if (!workers) {
+      perror ("malloc");
+      goto done;
+    }
+
+    for (nworkers = 0; nworkers < conn->nworkers; nworkers++) {
+      struct worker_data *worker = malloc (sizeof *worker);
+      int err;
+
+      if (!worker) {
+        perror ("malloc");
+        set_status (conn, -1);
+        goto wait;
+      }
+      if (asprintf (&worker->name, "%s.%d", plugin_name (), nworkers) < 0) {
+        perror ("asprintf");
+        set_status (conn, -1);
+        free (worker);
+        goto wait;
+      }
+      worker->conn = conn;
+      err = pthread_create (&workers[nworkers], NULL, connection_worker,
+                            worker);
+      if (err) {
+        errno = err;
+        perror ("pthread_create");
+        set_status (conn, -1);
+        free (worker);
+        goto wait;
+      }
+    }
+
+  wait:
+    while (nworkers)
+      pthread_join (workers[--nworkers], NULL);
+    free (workers);
+  }

   r = get_status (conn);
  done:
@@ -218,7 +290,7 @@ handle_single_connection (int sockin, int sockout)
 }

 static struct connection *
-new_connection (int sockin, int sockout)
+new_connection (int sockin, int sockout, int nworkers)
 {
   struct connection *conn;

@@ -229,6 +301,7 @@ new_connection (int sockin, int sockout)
   }

   conn->status = 1;
+  conn->nworkers = nworkers;
   conn->sockin = sockin;
   conn->sockout = sockout;
   pthread_mutex_init (&conn->request_lock, NULL);
-- 
2.13.6




More information about the Libguestfs mailing list