[Libguestfs] [nbdkit PATCH] nbd: Rewrite thread passing to use semaphore rather than pipe

Eric Blake eblake at redhat.com
Sat May 25 18:10:15 UTC 2019


I ran some local testing on my Fedora 29 system via:
$ ./nbdkit memory 1m
$ for i in `seq 10`; do
    ./nbdkit -U - --filter=stats nbd \
      statsappend=true statsfile=$file hostname=localhost port=10809 \
      --run '~/libnbd/examples/threaded-reads-and-writes $unixsocket'
  done

Pre-patch, the runs averaged  1.266s, 1.30E+08 bits/s
Post-patch, the runs averaged 1.154s, 1.42E+08 bits/s

This is roughly 9% performance gain, all because semaphores are
lighter weight than pipes for inter-thread locking.  The refactoring
will also make it a bit easier to incorporate the use of libnbd.

Signed-off-by: Eric Blake <eblake at redhat.com>
---

Sending this in case someone wants to review my work, but I was happy
enough with the benchmark numbers to go ahead and push this one.

 plugins/nbd/nbd.c | 134 +++++++++++++++++++++++-----------------------
 1 file changed, 67 insertions(+), 67 deletions(-)

diff --git a/plugins/nbd/nbd.c b/plugins/nbd/nbd.c
index 9039e1b..b2f3446 100644
--- a/plugins/nbd/nbd.c
+++ b/plugins/nbd/nbd.c
@@ -48,6 +48,7 @@
 #include <sys/un.h>
 #include <assert.h>
 #include <pthread.h>
+#include <semaphore.h>

 #define NBDKIT_API_VERSION 2

@@ -155,10 +156,8 @@ nbd_config_complete (void)

 /* The per-transaction details */
 struct transaction {
-  union {
-    uint64_t cookie;
-    int fds[2];
-  } u;
+  uint64_t cookie;
+  sem_t sem;
   void *buf;
   uint64_t offset;
   uint32_t count;
@@ -182,6 +181,7 @@ struct handle {

   pthread_mutex_t trans_lock; /* Covers access to all fields below */
   struct transaction *trans;
+  uint64_t unique;
   bool dead;
 };

@@ -264,7 +264,7 @@ find_trans_by_cookie (struct handle *h, uint64_t cookie, bool remove)
   ACQUIRE_LOCK_FOR_CURRENT_SCOPE (&h->trans_lock);
   ptr = &h->trans;
   while ((trans = *ptr) != NULL) {
-    if (cookie == trans->u.cookie)
+    if (cookie == trans->cookie)
       break;
     ptr = &trans->next;
   }
@@ -300,28 +300,27 @@ nbd_request_raw (struct handle *h, uint16_t flags, uint16_t type,
 }

 /* Perform the request half of a transaction. On success, return the
-   non-negative fd for reading the reply; on error return -1. */
-static int
+   transaction; on error return NULL. */
+static struct transaction *
 nbd_request_full (struct handle *h, uint16_t flags, uint16_t type,
                   uint64_t offset, uint32_t count, const void *req_buf,
                   void *rep_buf, struct nbdkit_extents *extents)
 {
   int err;
   struct transaction *trans;
-  int fd;
   uint64_t cookie;

   trans = calloc (1, sizeof *trans);
   if (!trans) {
     nbdkit_error ("unable to track transaction: %m");
     /* Still in sync with server, so don't mark connection dead */
-    return -1;
+    return NULL;
   }
-  if (pipe (trans->u.fds)) {
-    nbdkit_error ("unable to create pipe: %m");
+  if (sem_init (&trans->sem, 0, 0)) {
+    nbdkit_error ("unable to create semaphore: %m");
     /* Still in sync with server, so don't mark connection dead */
     free (trans);
-    return -1;
+    return NULL;
   }
   trans->buf = rep_buf;
   trans->count = rep_buf ? count : 0;
@@ -331,43 +330,39 @@ nbd_request_full (struct handle *h, uint16_t flags, uint16_t type,
     ACQUIRE_LOCK_FOR_CURRENT_SCOPE (&h->trans_lock);
     if (h->dead)
       goto err;
+    cookie = trans->cookie = h->unique++;
     trans->next = h->trans;
     h->trans = trans;
-    fd = trans->u.fds[0];
-    cookie = trans->u.cookie;
   }
   if (nbd_request_raw (h, flags, type, offset, count, cookie, req_buf) == 0)
-    return fd;
+    return trans;
   trans = find_trans_by_cookie (h, cookie, true);

  err:
   err = errno;
-  if (trans) {
-    close (trans->u.fds[0]);
-    close (trans->u.fds[1]);
-    free (trans);
-  }
-  else
-    close (fd);
+  if (sem_destroy (&trans->sem))
+    abort ();
+  free (trans);
+  nbd_mark_dead (h);
   errno = err;
-  return nbd_mark_dead (h);
+  return NULL;
 }

 /* Shorthand for nbd_request_full when no extra buffers are involved. */
-static int
+static struct transaction *
 nbd_request (struct handle *h, uint16_t flags, uint16_t type, uint64_t offset,
              uint32_t count)
 {
   return nbd_request_full (h, flags, type, offset, count, NULL, NULL, NULL);
 }

-/* Read a reply, and look up the fd corresponding to the transaction.
+/* Read a reply, and look up the corresponding transaction.
    Return the server's non-negative answer (converted to local errno
    value) on success, or -1 on read failure. If structured replies
-   were negotiated, fd is set to -1 if there are still more replies
+   were negotiated, trans_out is set to NULL if there are still more replies
    expected. */
 static int
-nbd_reply_raw (struct handle *h, int *fd)
+nbd_reply_raw (struct handle *h, struct transaction **trans_out)
 {
   union {
     struct simple_reply simple;
@@ -387,7 +382,7 @@ nbd_reply_raw (struct handle *h, int *fd)
   bool zero = false; /* if len, whether to read or memset */
   uint16_t errlen;

-  *fd = -1;
+  *trans_out = NULL;
   /* magic and handle overlap between simple and structured replies */
   if (read_full (h->fd, &rep, sizeof rep.simple))
     return nbd_mark_dead (h);
@@ -573,10 +568,9 @@ nbd_reply_raw (struct handle *h, int *fd)
   /* Thanks to structured replies, we must preserve an error in any
      earlier chunk for replay during the final chunk. */
   if (!more) {
-    *fd = trans->u.fds[1];
+    *trans_out = trans;
     if (!error)
       error = trans->err;
-    free (trans);
   }
   else if (error && !trans->err)
     trans->err = error;
@@ -616,19 +610,20 @@ nbd_reader (void *handle)
   int r;

   while (!done) {
-    int fd;
+    struct transaction *trans;

-    r = nbd_reply_raw (h, &fd);
+    r = nbd_reply_raw (h, &trans);
     if (r >= 0) {
-      if (fd < 0)
+      if (!trans)
         nbdkit_debug ("partial reply handled, waiting for final reply");
-      else if (write (fd, &r, sizeof r) != sizeof r) {
-        nbdkit_error ("failed to write pipe: %m");
-        abort ();
+      else {
+        trans->err = r;
+        if (sem_post (&trans->sem)) {
+          nbdkit_error ("failed to post semaphore: %m");
+          abort ();
+        }
       }
     }
-    if (fd >= 0)
-      close (fd);
     ACQUIRE_LOCK_FOR_CURRENT_SCOPE (&h->trans_lock);
     done = h->dead;
   }
@@ -645,27 +640,32 @@ nbd_reader (void *handle)
     }
     if (!trans)
       break;
-    if (write (trans->u.fds[1], &r, sizeof r) != sizeof r) {
-      nbdkit_error ("failed to write pipe: %m");
+    trans->err = r;
+    if (sem_post (&trans->sem)) {
+      nbdkit_error ("failed to post semaphore: %m");
       abort ();
     }
-    close (trans->u.fds[1]);
-    free (trans);
   }
   return NULL;
 }

 /* Perform the reply half of a transaction. */
 static int
-nbd_reply (struct handle *h, int fd)
+nbd_reply (struct handle *h, struct transaction *trans)
 {
   int err;

-  if (read (fd, &err, sizeof err) != sizeof err) {
-    nbdkit_debug ("failed to read pipe: %m");
+  while ((err = sem_wait (&trans->sem)) == -1 && errno == EINTR)
+    /* try again */;
+  if (err) {
+    nbdkit_debug ("failed to wait on semaphore: %m");
     err = EIO;
   }
-  close (fd);
+  else
+    err = trans->err;
+  if (sem_destroy (&trans->sem))
+    abort ();
+  free (trans);
   errno = err;
   return err ? -1 : 0;
 }
@@ -1175,11 +1175,11 @@ nbd_pread (void *handle, void *buf, uint32_t count, uint64_t offset,
            uint32_t flags)
 {
   struct handle *h = handle;
-  int c;
+  struct transaction *s;

   assert (!flags);
-  c = nbd_request_full (h, 0, NBD_CMD_READ, offset, count, NULL, buf, NULL);
-  return c < 0 ? c : nbd_reply (h, c);
+  s = nbd_request_full (h, 0, NBD_CMD_READ, offset, count, NULL, buf, NULL);
+  return s ? nbd_reply (h, s) : -1;
 }

 /* Write data to the file. */
@@ -1188,12 +1188,12 @@ nbd_pwrite (void *handle, const void *buf, uint32_t count, uint64_t offset,
             uint32_t flags)
 {
   struct handle *h = handle;
-  int c;
+  struct transaction *s;

   assert (!(flags & ~NBDKIT_FLAG_FUA));
-  c = nbd_request_full (h, flags & NBDKIT_FLAG_FUA ? NBD_CMD_FLAG_FUA : 0,
+  s = nbd_request_full (h, flags & NBDKIT_FLAG_FUA ? NBD_CMD_FLAG_FUA : 0,
                         NBD_CMD_WRITE, offset, count, buf, NULL, NULL);
-  return c < 0 ? c : nbd_reply (h, c);
+  return s ? nbd_reply (h, s) : -1;
 }

 /* Write zeroes to the file. */
@@ -1201,7 +1201,7 @@ static int
 nbd_zero (void *handle, uint32_t count, uint64_t offset, uint32_t flags)
 {
   struct handle *h = handle;
-  int c;
+  struct transaction *s;
   int f = 0;

   assert (!(flags & ~(NBDKIT_FLAG_FUA | NBDKIT_FLAG_MAY_TRIM)));
@@ -1211,8 +1211,8 @@ nbd_zero (void *handle, uint32_t count, uint64_t offset, uint32_t flags)
     f |= NBD_CMD_FLAG_NO_HOLE;
   if (flags & NBDKIT_FLAG_FUA)
     f |= NBD_CMD_FLAG_FUA;
-  c = nbd_request (h, f, NBD_CMD_WRITE_ZEROES, offset, count);
-  return c < 0 ? c : nbd_reply (h, c);
+  s = nbd_request (h, f, NBD_CMD_WRITE_ZEROES, offset, count);
+  return s ? nbd_reply (h, s) : -1;
 }

 /* Trim a portion of the file. */
@@ -1220,12 +1220,12 @@ static int
 nbd_trim (void *handle, uint32_t count, uint64_t offset, uint32_t flags)
 {
   struct handle *h = handle;
-  int c;
+  struct transaction *s;

   assert (!(flags & ~NBDKIT_FLAG_FUA));
-  c = nbd_request (h, flags & NBDKIT_FLAG_FUA ? NBD_CMD_FLAG_FUA : 0,
+  s = nbd_request (h, flags & NBDKIT_FLAG_FUA ? NBD_CMD_FLAG_FUA : 0,
                    NBD_CMD_TRIM, offset, count);
-  return c < 0 ? c : nbd_reply (h, c);
+  return s ? nbd_reply (h, s) : -1;
 }

 /* Flush the file to disk. */
@@ -1233,11 +1233,11 @@ static int
 nbd_flush (void *handle, uint32_t flags)
 {
   struct handle *h = handle;
-  int c;
+  struct transaction *s;

   assert (!flags);
-  c = nbd_request (h, 0, NBD_CMD_FLUSH, 0, 0);
-  return c < 0 ? c : nbd_reply (h, c);
+  s = nbd_request (h, 0, NBD_CMD_FLUSH, 0, 0);
+  return s ? nbd_reply (h, s) : -1;
 }

 /* Read extents of the file. */
@@ -1246,13 +1246,13 @@ nbd_extents (void *handle, uint32_t count, uint64_t offset,
              uint32_t flags, struct nbdkit_extents *extents)
 {
   struct handle *h = handle;
-  int c;
+  struct transaction *s;

   assert (!(flags & ~NBDKIT_FLAG_REQ_ONE) && h->extents);
-  c = nbd_request_full (h, flags & NBDKIT_FLAG_REQ_ONE ? NBD_CMD_FLAG_REQ_ONE : 0,
+  s = nbd_request_full (h, flags & NBDKIT_FLAG_REQ_ONE ? NBD_CMD_FLAG_REQ_ONE : 0,
                         NBD_CMD_BLOCK_STATUS, offset, count, NULL, NULL,
                         extents);
-  return c < 0 ? c : nbd_reply (h, c);
+  return s ? nbd_reply (h, s) : -1;
 }

 /* Cache a portion of the file. */
@@ -1260,11 +1260,11 @@ static int
 nbd_cache (void *handle, uint32_t count, uint64_t offset, uint32_t flags)
 {
   struct handle *h = handle;
-  int c;
+  struct transaction *s;

   assert (!flags);
-  c = nbd_request (h, 0, NBD_CMD_CACHE, offset, count);
-  return c < 0 ? c : nbd_reply (h, c);
+  s = nbd_request (h, 0, NBD_CMD_CACHE, offset, count);
+  return s ? nbd_reply (h, s) : -1;
 }

 static struct nbdkit_plugin plugin = {
-- 
2.20.1




More information about the Libguestfs mailing list