[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]

[Libguestfs] [nbdkit PATCH v2 3/4] nbd: Enable parallel handling



The final step in making the nbd forwarder fully parallel is to
track a list of pending transactions, rather than a single
transaction.  The list is only ever manipulated with the lock
taken, so all points that use the list see a consistent state;

If we get a response that does not have a cookie from our list,
we have no choice but to consider the server broken and declare
the connection dead.  If we get an ESHUTDOWN from the server,
we rely on nbdkit's main handling to eventually shut us down,
as we do not want to interrupt any other pending in-flight
operations.  Once our reader loop quits, we make sure to mop up
any remaining in-flight operations.

Signed-off-by: Eric Blake <eblake redhat com>
---
 plugins/nbd/nbd.c | 95 ++++++++++++++++++++++++++++++++++---------------------
 1 file changed, 59 insertions(+), 36 deletions(-)

diff --git a/plugins/nbd/nbd.c b/plugins/nbd/nbd.c
index a9146a9..a4a9639 100644
--- a/plugins/nbd/nbd.c
+++ b/plugins/nbd/nbd.c
@@ -115,8 +115,7 @@ nbd_config_complete (void)
   "socket=<SOCKNAME>   (required) The Unix socket to connect to.\n" \
   "export=<NAME>                  Export name to connect to (default \"\").\n" \

-/* TODO Allow more parallelism than one request at a time */
-#define THREAD_MODEL NBDKIT_THREAD_MODEL_SERIALIZE_REQUESTS
+#define THREAD_MODEL NBDKIT_THREAD_MODEL_PARALLEL

 /* The per-transaction details */
 struct transaction {
@@ -126,6 +125,7 @@ struct transaction {
   } u;
   void *buf;
   uint32_t count;
+  struct transaction *next;
 };

 /* The per-connection handle */
@@ -140,8 +140,7 @@ struct handle {
   pthread_mutex_t write_lock;

   pthread_mutex_t trans_lock; /* Covers access to all fields below */
-  /* Our choice of THREAD_MODEL means at most one outstanding transaction */
-  struct transaction trans;
+  struct transaction *trans;
   bool dead;
 };

@@ -262,27 +261,36 @@ nbd_request_full (struct handle *h, uint32_t type, uint64_t offset,
   int err;
   struct transaction *trans;

-  nbd_lock (h);
-  if (h->dead) {
-    nbd_unlock (h);
-    return nbd_mark_dead (h);
+  trans = calloc (1, sizeof *trans);
+  if (!trans) {
+    nbdkit_error ("unable to track transaction: %m");
+    /* Still in sync with server, so don't mark connection dead */
+    return -1;
   }
-  trans = &h->trans;
-  nbd_unlock (h);
   if (pipe (trans->u.fds)) {
     nbdkit_error ("unable to create pipe: %m");
     /* Still in sync with server, so don't mark connection dead */
+    free (trans);
     return -1;
   }
   trans->buf = rep_buf;
   trans->count = rep_buf ? count : 0;
-  if (nbd_request_raw (h, type, offset, count, trans->u.cookie,
-                       req_buf) == 0)
+  nbd_lock (h);
+  if (h->dead) {
+    nbd_unlock (h);
+    goto err;
+  }
+  trans->next = h->trans;
+  h->trans = trans;
+  nbd_unlock (h);
+  if (nbd_request_raw (h, type, offset, count, trans->u.cookie, req_buf) == 0)
     return trans->u.fds[0];

+ err:
   err = errno;
   close (trans->u.fds[0]);
   close (trans->u.fds[1]);
+  free (trans);
   errno = err;
   return nbd_mark_dead (h);
 }
@@ -301,21 +309,34 @@ static int
 nbd_reply_raw (struct handle *h, int *fd)
 {
   struct reply rep;
-  struct transaction trans;
+  struct transaction **ptr;
+  struct transaction *trans;

   *fd = -1;
   if (read_full (h->fd, &rep, sizeof rep) < 0)
     return nbd_mark_dead (h);
-  nbd_lock (h);
-  trans = h->trans;
-  nbd_unlock (h);
-  *fd = trans.u.fds[1];
+  if (be32toh (rep.magic) != NBD_REPLY_MAGIC)
+    return nbd_mark_dead (h);
   nbdkit_debug ("received reply for cookie %#" PRIx64, rep.handle);
-  if (be32toh (rep.magic) != NBD_REPLY_MAGIC || rep.handle != trans.u.cookie)
+  nbd_lock (h);
+  ptr = &h->trans;
+  while ((trans = *ptr) != NULL) {
+    if (rep.handle == trans->u.cookie)
+      break;
+    ptr = &trans->next;
+  }
+  if (trans)
+    *ptr = trans->next;
+  nbd_unlock (h);
+  if (!trans) {
+    nbdkit_error ("reply with unexpected cookie %#" PRIx64, rep.handle);
     return nbd_mark_dead (h);
+  }
+
+  *fd = trans->u.fds[1];
   switch (be32toh (rep.error)) {
   case NBD_SUCCESS:
-    if (trans.buf && read_full (h->fd, trans.buf, trans.count) < 0)
+    if (trans->buf && read_full (h->fd, trans->buf, trans->count) < 0)
       return nbd_mark_dead (h);
     return 0;
   case NBD_EPERM:
@@ -333,16 +354,7 @@ nbd_reply_raw (struct handle *h, int *fd)
   case NBD_ENOSPC:
     return ENOSPC;
   case NBD_ESHUTDOWN:
-    /* The server wants us to initiate soft-disconnect.  Because our
-       THREAD_MODEL does not permit interleaved requests, we know that
-       there are no other pending outstanding messages, so we can
-       attempt that immediately.
-
-       TODO: Once we allow interleaved requests, handling
-       soft-disconnect properly will be trickier */
-    nbd_request_raw (h, NBD_CMD_DISC, 0, 0, 0, NULL);
-    errno = ESHUTDOWN;
-    return nbd_mark_dead (h);
+    return ESHUTDOWN;
   }
 }

@@ -352,9 +364,9 @@ nbd_reader (void *handle)
 {
   struct handle *h = handle;
   bool done = false;
+  int r;

   while (!done) {
-    int r;
     int fd;

     r = nbd_reply_raw (h, &fd);
@@ -370,6 +382,23 @@ nbd_reader (void *handle)
     done = h->dead;
     nbd_unlock (h);
   }
+
+  /* Clean up any stranded in-flight requests */
+  done = false;
+  r = ESHUTDOWN;
+  while (!done) {
+    struct transaction *trans;
+
+    nbd_lock (h);
+    trans = h->trans;
+    h->trans = trans->next;
+    nbd_unlock (h);
+    if (write (trans->u.fds[1], &r, sizeof r) != sizeof r) {
+      nbdkit_error ("failed to write pipe: %m");
+      abort ();
+    }
+    close (trans->u.fds[1]);
+  }
   return NULL;
 }

@@ -383,12 +412,6 @@ nbd_reply (struct handle *h, int fd)
     nbdkit_debug ("failed to read pipe: %m");
     err = EIO;
   }
-  nbd_lock (h);
-  /* TODO This check is just for sanity that the reader thread concept
-     works; it won't work once we allow interleaved requests */
-  assert (fd == h->trans.u.fds[0]);
-  h->trans.u.fds[0] = -1;
-  nbd_unlock (h);
   close (fd);
   errno = err;
   return err ? -1 : 0;
-- 
2.13.6


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]