[Libguestfs] [nbdkit PATCH] nbd: Rewrite thread passing to use semaphore rather than pipe

Richard W.M. Jones rjones at redhat.com
Sat May 25 18:15:53 UTC 2019


On Sat, May 25, 2019 at 01:10:15PM -0500, Eric Blake wrote:
> I ran some local testing on my Fedora 29 system via:
> $ ./nbdkit memory 1m
> $ for i in `seq 10`; do
>     ./nbdkit -U - --filter=stats nbd \
>       statsappend=true statsfile=$file hostname=localhost port=10809 \
>       --run '~/libnbd/examples/threaded-reads-and-writes $unixsocket'
>   done
> 
> Pre-patch, the runs averaged  1.266s, 1.30E+08 bits/s
> Post-patch, the runs averaged 1.154s, 1.42E+08 bits/s
> 
> This is roughly 9% performance gain, all because semaphores are
> lighter weight than pipes for inter-thread locking.  The refactoring
> will also make it a bit easier to incorporate the use of libnbd.
>
> Signed-off-by: Eric Blake <eblake at redhat.com>
> ---
> 
> Sending this in case someone wants to review my work, but I was happy
> enough with the benchmark numbers to go ahead and push this one.

Yup, seems fine.  I was concerned this call might not have been
available in FreeBSD, but it is so that's OK.

Thanks,

Rich.

>  plugins/nbd/nbd.c | 134 +++++++++++++++++++++++-----------------------
>  1 file changed, 67 insertions(+), 67 deletions(-)
> 
> diff --git a/plugins/nbd/nbd.c b/plugins/nbd/nbd.c
> index 9039e1b..b2f3446 100644
> --- a/plugins/nbd/nbd.c
> +++ b/plugins/nbd/nbd.c
> @@ -48,6 +48,7 @@
>  #include <sys/un.h>
>  #include <assert.h>
>  #include <pthread.h>
> +#include <semaphore.h>
> 
>  #define NBDKIT_API_VERSION 2
> 
> @@ -155,10 +156,8 @@ nbd_config_complete (void)
> 
>  /* The per-transaction details */
>  struct transaction {
> -  union {
> -    uint64_t cookie;
> -    int fds[2];
> -  } u;
> +  uint64_t cookie;
> +  sem_t sem;
>    void *buf;
>    uint64_t offset;
>    uint32_t count;
> @@ -182,6 +181,7 @@ struct handle {
> 
>    pthread_mutex_t trans_lock; /* Covers access to all fields below */
>    struct transaction *trans;
> +  uint64_t unique;
>    bool dead;
>  };
> 
> @@ -264,7 +264,7 @@ find_trans_by_cookie (struct handle *h, uint64_t cookie, bool remove)
>    ACQUIRE_LOCK_FOR_CURRENT_SCOPE (&h->trans_lock);
>    ptr = &h->trans;
>    while ((trans = *ptr) != NULL) {
> -    if (cookie == trans->u.cookie)
> +    if (cookie == trans->cookie)
>        break;
>      ptr = &trans->next;
>    }
> @@ -300,28 +300,27 @@ nbd_request_raw (struct handle *h, uint16_t flags, uint16_t type,
>  }
> 
>  /* Perform the request half of a transaction. On success, return the
> -   non-negative fd for reading the reply; on error return -1. */
> -static int
> +   transaction; on error return NULL. */
> +static struct transaction *
>  nbd_request_full (struct handle *h, uint16_t flags, uint16_t type,
>                    uint64_t offset, uint32_t count, const void *req_buf,
>                    void *rep_buf, struct nbdkit_extents *extents)
>  {
>    int err;
>    struct transaction *trans;
> -  int fd;
>    uint64_t cookie;
> 
>    trans = calloc (1, sizeof *trans);
>    if (!trans) {
>      nbdkit_error ("unable to track transaction: %m");
>      /* Still in sync with server, so don't mark connection dead */
> -    return -1;
> +    return NULL;
>    }
> -  if (pipe (trans->u.fds)) {
> -    nbdkit_error ("unable to create pipe: %m");
> +  if (sem_init (&trans->sem, 0, 0)) {
> +    nbdkit_error ("unable to create semaphore: %m");
>      /* Still in sync with server, so don't mark connection dead */
>      free (trans);
> -    return -1;
> +    return NULL;
>    }
>    trans->buf = rep_buf;
>    trans->count = rep_buf ? count : 0;
> @@ -331,43 +330,39 @@ nbd_request_full (struct handle *h, uint16_t flags, uint16_t type,
>      ACQUIRE_LOCK_FOR_CURRENT_SCOPE (&h->trans_lock);
>      if (h->dead)
>        goto err;
> +    cookie = trans->cookie = h->unique++;
>      trans->next = h->trans;
>      h->trans = trans;
> -    fd = trans->u.fds[0];
> -    cookie = trans->u.cookie;
>    }
>    if (nbd_request_raw (h, flags, type, offset, count, cookie, req_buf) == 0)
> -    return fd;
> +    return trans;
>    trans = find_trans_by_cookie (h, cookie, true);
> 
>   err:
>    err = errno;
> -  if (trans) {
> -    close (trans->u.fds[0]);
> -    close (trans->u.fds[1]);
> -    free (trans);
> -  }
> -  else
> -    close (fd);
> +  if (sem_destroy (&trans->sem))
> +    abort ();
> +  free (trans);
> +  nbd_mark_dead (h);
>    errno = err;
> -  return nbd_mark_dead (h);
> +  return NULL;
>  }
> 
>  /* Shorthand for nbd_request_full when no extra buffers are involved. */
> -static int
> +static struct transaction *
>  nbd_request (struct handle *h, uint16_t flags, uint16_t type, uint64_t offset,
>               uint32_t count)
>  {
>    return nbd_request_full (h, flags, type, offset, count, NULL, NULL, NULL);
>  }
> 
> -/* Read a reply, and look up the fd corresponding to the transaction.
> +/* Read a reply, and look up the corresponding transaction.
>     Return the server's non-negative answer (converted to local errno
>     value) on success, or -1 on read failure. If structured replies
> -   were negotiated, fd is set to -1 if there are still more replies
> +   were negotiated, trans_out is set to NULL if there are still more replies
>     expected. */
>  static int
> -nbd_reply_raw (struct handle *h, int *fd)
> +nbd_reply_raw (struct handle *h, struct transaction **trans_out)
>  {
>    union {
>      struct simple_reply simple;
> @@ -387,7 +382,7 @@ nbd_reply_raw (struct handle *h, int *fd)
>    bool zero = false; /* if len, whether to read or memset */
>    uint16_t errlen;
> 
> -  *fd = -1;
> +  *trans_out = NULL;
>    /* magic and handle overlap between simple and structured replies */
>    if (read_full (h->fd, &rep, sizeof rep.simple))
>      return nbd_mark_dead (h);
> @@ -573,10 +568,9 @@ nbd_reply_raw (struct handle *h, int *fd)
>    /* Thanks to structured replies, we must preserve an error in any
>       earlier chunk for replay during the final chunk. */
>    if (!more) {
> -    *fd = trans->u.fds[1];
> +    *trans_out = trans;
>      if (!error)
>        error = trans->err;
> -    free (trans);
>    }
>    else if (error && !trans->err)
>      trans->err = error;
> @@ -616,19 +610,20 @@ nbd_reader (void *handle)
>    int r;
> 
>    while (!done) {
> -    int fd;
> +    struct transaction *trans;
> 
> -    r = nbd_reply_raw (h, &fd);
> +    r = nbd_reply_raw (h, &trans);
>      if (r >= 0) {
> -      if (fd < 0)
> +      if (!trans)
>          nbdkit_debug ("partial reply handled, waiting for final reply");
> -      else if (write (fd, &r, sizeof r) != sizeof r) {
> -        nbdkit_error ("failed to write pipe: %m");
> -        abort ();
> +      else {
> +        trans->err = r;
> +        if (sem_post (&trans->sem)) {
> +          nbdkit_error ("failed to post semaphore: %m");
> +          abort ();
> +        }
>        }
>      }
> -    if (fd >= 0)
> -      close (fd);
>      ACQUIRE_LOCK_FOR_CURRENT_SCOPE (&h->trans_lock);
>      done = h->dead;
>    }
> @@ -645,27 +640,32 @@ nbd_reader (void *handle)
>      }
>      if (!trans)
>        break;
> -    if (write (trans->u.fds[1], &r, sizeof r) != sizeof r) {
> -      nbdkit_error ("failed to write pipe: %m");
> +    trans->err = r;
> +    if (sem_post (&trans->sem)) {
> +      nbdkit_error ("failed to post semaphore: %m");
>        abort ();
>      }
> -    close (trans->u.fds[1]);
> -    free (trans);
>    }
>    return NULL;
>  }
> 
>  /* Perform the reply half of a transaction. */
>  static int
> -nbd_reply (struct handle *h, int fd)
> +nbd_reply (struct handle *h, struct transaction *trans)
>  {
>    int err;
> 
> -  if (read (fd, &err, sizeof err) != sizeof err) {
> -    nbdkit_debug ("failed to read pipe: %m");
> +  while ((err = sem_wait (&trans->sem)) == -1 && errno == EINTR)
> +    /* try again */;
> +  if (err) {
> +    nbdkit_debug ("failed to wait on semaphore: %m");
>      err = EIO;
>    }
> -  close (fd);
> +  else
> +    err = trans->err;
> +  if (sem_destroy (&trans->sem))
> +    abort ();
> +  free (trans);
>    errno = err;
>    return err ? -1 : 0;
>  }
> @@ -1175,11 +1175,11 @@ nbd_pread (void *handle, void *buf, uint32_t count, uint64_t offset,
>             uint32_t flags)
>  {
>    struct handle *h = handle;
> -  int c;
> +  struct transaction *s;
> 
>    assert (!flags);
> -  c = nbd_request_full (h, 0, NBD_CMD_READ, offset, count, NULL, buf, NULL);
> -  return c < 0 ? c : nbd_reply (h, c);
> +  s = nbd_request_full (h, 0, NBD_CMD_READ, offset, count, NULL, buf, NULL);
> +  return s ? nbd_reply (h, s) : -1;
>  }
> 
>  /* Write data to the file. */
> @@ -1188,12 +1188,12 @@ nbd_pwrite (void *handle, const void *buf, uint32_t count, uint64_t offset,
>              uint32_t flags)
>  {
>    struct handle *h = handle;
> -  int c;
> +  struct transaction *s;
> 
>    assert (!(flags & ~NBDKIT_FLAG_FUA));
> -  c = nbd_request_full (h, flags & NBDKIT_FLAG_FUA ? NBD_CMD_FLAG_FUA : 0,
> +  s = nbd_request_full (h, flags & NBDKIT_FLAG_FUA ? NBD_CMD_FLAG_FUA : 0,
>                          NBD_CMD_WRITE, offset, count, buf, NULL, NULL);
> -  return c < 0 ? c : nbd_reply (h, c);
> +  return s ? nbd_reply (h, s) : -1;
>  }
> 
>  /* Write zeroes to the file. */
> @@ -1201,7 +1201,7 @@ static int
>  nbd_zero (void *handle, uint32_t count, uint64_t offset, uint32_t flags)
>  {
>    struct handle *h = handle;
> -  int c;
> +  struct transaction *s;
>    int f = 0;
> 
>    assert (!(flags & ~(NBDKIT_FLAG_FUA | NBDKIT_FLAG_MAY_TRIM)));
> @@ -1211,8 +1211,8 @@ nbd_zero (void *handle, uint32_t count, uint64_t offset, uint32_t flags)
>      f |= NBD_CMD_FLAG_NO_HOLE;
>    if (flags & NBDKIT_FLAG_FUA)
>      f |= NBD_CMD_FLAG_FUA;
> -  c = nbd_request (h, f, NBD_CMD_WRITE_ZEROES, offset, count);
> -  return c < 0 ? c : nbd_reply (h, c);
> +  s = nbd_request (h, f, NBD_CMD_WRITE_ZEROES, offset, count);
> +  return s ? nbd_reply (h, s) : -1;
>  }
> 
>  /* Trim a portion of the file. */
> @@ -1220,12 +1220,12 @@ static int
>  nbd_trim (void *handle, uint32_t count, uint64_t offset, uint32_t flags)
>  {
>    struct handle *h = handle;
> -  int c;
> +  struct transaction *s;
> 
>    assert (!(flags & ~NBDKIT_FLAG_FUA));
> -  c = nbd_request (h, flags & NBDKIT_FLAG_FUA ? NBD_CMD_FLAG_FUA : 0,
> +  s = nbd_request (h, flags & NBDKIT_FLAG_FUA ? NBD_CMD_FLAG_FUA : 0,
>                     NBD_CMD_TRIM, offset, count);
> -  return c < 0 ? c : nbd_reply (h, c);
> +  return s ? nbd_reply (h, s) : -1;
>  }
> 
>  /* Flush the file to disk. */
> @@ -1233,11 +1233,11 @@ static int
>  nbd_flush (void *handle, uint32_t flags)
>  {
>    struct handle *h = handle;
> -  int c;
> +  struct transaction *s;
> 
>    assert (!flags);
> -  c = nbd_request (h, 0, NBD_CMD_FLUSH, 0, 0);
> -  return c < 0 ? c : nbd_reply (h, c);
> +  s = nbd_request (h, 0, NBD_CMD_FLUSH, 0, 0);
> +  return s ? nbd_reply (h, s) : -1;
>  }
> 
>  /* Read extents of the file. */
> @@ -1246,13 +1246,13 @@ nbd_extents (void *handle, uint32_t count, uint64_t offset,
>               uint32_t flags, struct nbdkit_extents *extents)
>  {
>    struct handle *h = handle;
> -  int c;
> +  struct transaction *s;
> 
>    assert (!(flags & ~NBDKIT_FLAG_REQ_ONE) && h->extents);
> -  c = nbd_request_full (h, flags & NBDKIT_FLAG_REQ_ONE ? NBD_CMD_FLAG_REQ_ONE : 0,
> +  s = nbd_request_full (h, flags & NBDKIT_FLAG_REQ_ONE ? NBD_CMD_FLAG_REQ_ONE : 0,
>                          NBD_CMD_BLOCK_STATUS, offset, count, NULL, NULL,
>                          extents);
> -  return c < 0 ? c : nbd_reply (h, c);
> +  return s ? nbd_reply (h, s) : -1;
>  }
> 
>  /* Cache a portion of the file. */
> @@ -1260,11 +1260,11 @@ static int
>  nbd_cache (void *handle, uint32_t count, uint64_t offset, uint32_t flags)
>  {
>    struct handle *h = handle;
> -  int c;
> +  struct transaction *s;
> 
>    assert (!flags);
> -  c = nbd_request (h, 0, NBD_CMD_CACHE, offset, count);
> -  return c < 0 ? c : nbd_reply (h, c);
> +  s = nbd_request (h, 0, NBD_CMD_CACHE, offset, count);
> +  return s ? nbd_reply (h, s) : -1;
>  }
> 
>  static struct nbdkit_plugin plugin = {
> -- 
> 2.20.1
> 
> _______________________________________________
> Libguestfs mailing list
> Libguestfs at redhat.com
> https://www.redhat.com/mailman/listinfo/libguestfs

-- 
Richard Jones, Virtualization Group, Red Hat http://people.redhat.com/~rjones
Read my programming and virtualization blog: http://rwmj.wordpress.com
virt-p2v converts physical machines to virtual machines.  Boot with a
live CD or over the network (PXE) and turn machines into KVM guests.
http://libguestfs.org/virt-v2v




More information about the Libguestfs mailing list