[Libguestfs] [nbdkit PATCH 3/4] nbd: Use libnbd 0.1

Eric Blake eblake at redhat.com
Thu May 30 03:13:37 UTC 2019


This conversion should be feature compatible with the standalone nbd
code. Note that the use of libnbd makes the binary for this particular
plugin fall under an LGPLv2+ license rather than BSD; but the source
code in nbd.c remains BSD.

A lot of code simply disappears, now that I'm no longer directly
utilizing the NBD protocol files but relying on libnbd.  Coordination
between threads from nbdkit and a central state machine thread is
gated by a lock around the pending transition list, using the same
trick as before of using a semaphore to wake up the right thread after
the server's reply, regardless of out-of-order handling from the
server.

Benchmark-wise, using the same setup as in commit e897ed70, I see
an order-of-magnitude slowdown:

Pre-patch, the runs averaged   1.266s, 1.30E+08 bits/s
Post-patch, the runs averaged 11.663s, 1.41E+07 bits/s

This will need further profiling to determine how much is nbdkit's
fault, and how much is libnbd's.  I think that we are probably holding
some locks for too long, resulting in some serialized performance.
Also, the standalone code was able to run read of command 1 in
parallel with write of command 2 via separate threads, whereas
libnbd's state machine is serializing everything (whether or not the
state machine spreads out the I/O to do writes from the thread calling
nbd_aio_FOO and reads from the reader thread, the two are never run at
once).

The trickiest part (for me) was the fact that since the state machine
loop is in a separate thread from the initial requests, the loop is
often blocked on just POLLIN for the state machine fd.  My initial
attempt tried to grab the transaction lock before calling nbd_aio_FOO
to create the cookie, and generally worked where the server's reply
would then wake up the reader. But that was even slower (due to a
larger lock section), and also hit a problem where large NBD_CMD_WRITE
would block on writes, which the poll() was not expecting (best seen
when running tests/test-nozero.sh with LIBNBD_DEBUG=1).  I tried
blindly setting POLLOUT as a valid reason to break the poll without
regard to the current state, but that burned a lot of CPU as it fired
even when there was no progress.  So I finally settled on using a
pipe-to-self and poll()ing on two fds at once (the libnbd state
machine and my pipe) to know when the loop needed to start processing
because another thread started a command, where the pipe-to-self also
makes it possible to not need to hold the transaction lock while
generating a cookie.

Signed-off-by: Eric Blake <eblake at redhat.com>
---
 plugins/nbd/nbd.c       | 1102 +++++++++------------------------------
 plugins/nbd/Makefile.am |    8 +-
 2 files changed, 262 insertions(+), 848 deletions(-)

diff --git a/plugins/nbd/nbd.c b/plugins/nbd/nbd.c
index 4daf8a4..b1e978a 100644
--- a/plugins/nbd/nbd.c
+++ b/plugins/nbd/nbd.c
@@ -49,43 +49,35 @@
 #include <assert.h>
 #include <pthread.h>
 #include <semaphore.h>
+#include <poll.h>
+
+#include <libnbd.h>

 #define NBDKIT_API_VERSION 2

 #include <nbdkit-plugin.h>
-#include "protocol.h"
 #include "byte-swapping.h"
 #include "cleanup.h"

 /* The per-transaction details */
 struct transaction {
-  uint64_t cookie;
+  int64_t cookie;
   sem_t sem;
-  void *buf;
-  uint64_t offset;
-  uint32_t count;
   uint32_t err;
-  struct nbdkit_extents *extents;
   struct transaction *next;
 };

 /* The per-connection handle */
 struct handle {
   /* These fields are read-only once initialized */
-  int fd;
-  int flags;
-  int64_t size;
-  bool structured;
-  bool extents;
+  struct nbd_handle *nbd;
+  int fd; /* Cache of nbd_aio_get_fd */
+  int fds[2]; /* Pipe for kicking the reader thread */
+  bool readonly;
   pthread_t reader;

-  /* Prevents concurrent threads from interleaving writes to server */
-  pthread_mutex_t write_lock;
-
-  pthread_mutex_t trans_lock; /* Covers access to all fields below */
-  struct transaction *trans;
-  uint64_t unique;
-  bool dead;
+  pthread_mutex_t trans_lock; /* Covers access to trans list */
+  struct transaction *trans; /* List of pending transactions */
 };

 /* Connect to server via absolute name of Unix socket */
@@ -218,451 +210,74 @@ nbdplug_config_complete (void)

 #define THREAD_MODEL NBDKIT_THREAD_MODEL_PARALLEL

-/* Read an entire buffer, returning 0 on success or -1 with errno set. */
-static int
-read_full (int fd, void *buf, size_t len)
-{
-  ssize_t r;
-
-  while (len) {
-    r = read (fd, buf, len);
-    if (r < 0) {
-      if (errno == EINTR || errno == EAGAIN)
-        continue;
-      return -1;
-    }
-    if (!r) {
-      /* Unexpected EOF */
-      errno = EBADMSG;
-      return -1;
-    }
-    buf += r;
-    len -= r;
-  }
-  return 0;
-}
-
-/* Write an entire buffer, returning 0 on success or -1 with errno set. */
-static int
-write_full (int fd, const void *buf, size_t len)
-{
-  ssize_t r;
-
-  while (len) {
-    r = write (fd, buf, len);
-    if (r < 0) {
-      if (errno == EINTR || errno == EAGAIN)
-        continue;
-      return -1;
-    }
-    buf += r;
-    len -= r;
-  }
-  return 0;
-}
-
-/* Called during transmission phases when there is no hope of
- * resynchronizing with the server, and all further requests from the
- * client will fail.  Returns -1 for convenience. */
-static int
-nbdplug_mark_dead (struct handle *h)
-{
-  int err = errno;
-
-  ACQUIRE_LOCK_FOR_CURRENT_SCOPE (&h->trans_lock);
-  if (!h->dead) {
-    nbdkit_debug ("permanent failure while talking to server %s: %m",
-                  servname);
-    h->dead = true;
-  }
-  else if (!err)
-    errno = ESHUTDOWN;
-  /* NBD only accepts a limited set of errno values over the wire, and
-     nbdkit converts all other values to EINVAL. If we died due to an
-     errno value that cannot transmit over the wire, translate it to
-     ESHUTDOWN instead.  */
-  if (err == EPIPE || err == EBADMSG)
-    nbdkit_set_error (ESHUTDOWN);
-  return -1;
-}
-
-/* Find and possibly remove the transaction corresponding to cookie
-   from the list. */
-static struct transaction *
-find_trans_by_cookie (struct handle *h, uint64_t cookie, bool remove)
-{
-  struct transaction **ptr;
-  struct transaction *trans;
-
-  ACQUIRE_LOCK_FOR_CURRENT_SCOPE (&h->trans_lock);
-  ptr = &h->trans;
-  while ((trans = *ptr) != NULL) {
-    if (cookie == trans->cookie)
-      break;
-    ptr = &trans->next;
-  }
-  if (trans && remove)
-    *ptr = trans->next;
-  return trans;
-}
-
-/* Send a request, return 0 on success or -1 on write failure. */
-static int
-nbdplug_request_raw (struct handle *h, uint16_t flags, uint16_t type,
-                     uint64_t offset, uint32_t count, uint64_t cookie,
-                     const void *buf)
-{
-  struct request req = {
-    .magic = htobe32 (NBD_REQUEST_MAGIC),
-    .flags = htobe16 (flags),
-    .type = htobe16 (type),
-    .handle = cookie, /* Opaque to server, so endianness doesn't matter */
-    .offset = htobe64 (offset),
-    .count = htobe32 (count),
-  };
-  int r;
-
-  ACQUIRE_LOCK_FOR_CURRENT_SCOPE (&h->write_lock);
-  nbdkit_debug ("sending request type %d (%s), flags %#x, offset %#" PRIx64
-                ", count %#x, cookie %#" PRIx64, type, name_of_nbd_cmd (type),
-                flags, offset, count, cookie);
-  r = write_full (h->fd, &req, sizeof req);
-  if (buf && !r)
-    r = write_full (h->fd, buf, count);
-  return r;
-}
-
-/* Perform the request half of a transaction. On success, return the
-   transaction; on error return NULL. */
-static struct transaction *
-nbdplug_request_full (struct handle *h, uint16_t flags, uint16_t type,
-                      uint64_t offset, uint32_t count, const void *req_buf,
-                      void *rep_buf, struct nbdkit_extents *extents)
-{
-  int err;
-  struct transaction *trans;
-  uint64_t cookie;
-
-  trans = calloc (1, sizeof *trans);
-  if (!trans) {
-    nbdkit_error ("unable to track transaction: %m");
-    /* Still in sync with server, so don't mark connection dead */
-    return NULL;
-  }
-  if (sem_init (&trans->sem, 0, 0)) {
-    nbdkit_error ("unable to create semaphore: %m");
-    /* Still in sync with server, so don't mark connection dead */
-    free (trans);
-    return NULL;
-  }
-  trans->buf = rep_buf;
-  trans->count = rep_buf ? count : 0;
-  trans->offset = offset;
-  trans->extents = extents;
-  {
-    ACQUIRE_LOCK_FOR_CURRENT_SCOPE (&h->trans_lock);
-    if (h->dead)
-      goto err;
-    cookie = trans->cookie = h->unique++;
-    trans->next = h->trans;
-    h->trans = trans;
-  }
-  if (nbdplug_request_raw (h, flags, type, offset, count, cookie, req_buf) == 0)
-    return trans;
-  trans = find_trans_by_cookie (h, cookie, true);
-
- err:
-  err = errno;
-  if (sem_destroy (&trans->sem))
-    abort ();
-  free (trans);
-  nbdplug_mark_dead (h);
-  errno = err;
-  return NULL;
-}
-
-/* Shorthand for nbdplug_request_full when no extra buffers are involved. */
-static struct transaction *
-nbdplug_request (struct handle *h, uint16_t flags, uint16_t type,
-                 uint64_t offset, uint32_t count)
-{
-  return nbdplug_request_full (h, flags, type, offset, count, NULL, NULL, NULL);
-}
-
-/* Read a reply, and look up the corresponding transaction.
-   Return the server's non-negative answer (converted to local errno
-   value) on success, or -1 on read failure. If structured replies
-   were negotiated, trans_out is set to NULL if there are still more replies
-   expected. */
-static int
-nbdplug_reply_raw (struct handle *h, struct transaction **trans_out)
-{
-  union {
-    struct simple_reply simple;
-    struct structured_reply structured;
-  } rep;
-  struct transaction *trans;
-  void *buf = NULL;
-  CLEANUP_FREE char *payload = NULL;
-  uint32_t count;
-  uint32_t id;
-  struct block_descriptor *extents = NULL;
-  size_t nextents = 0;
-  int error = NBD_SUCCESS;
-  bool more = false;
-  uint32_t len = 0; /* 0 except for structured reads */
-  uint64_t offset = 0; /* if len, absolute offset of structured read chunk */
-  bool zero = false; /* if len, whether to read or memset */
-  uint16_t errlen;
-
-  *trans_out = NULL;
-  /* magic and handle overlap between simple and structured replies */
-  if (read_full (h->fd, &rep, sizeof rep.simple))
-    return nbdplug_mark_dead (h);
-  rep.simple.magic = be32toh (rep.simple.magic);
-  switch (rep.simple.magic) {
-  case NBD_SIMPLE_REPLY_MAGIC:
-    nbdkit_debug ("received simple reply for cookie %#" PRIx64 ", status %s",
-                  rep.simple.handle,
-                  name_of_nbd_error (be32toh (rep.simple.error)));
-    error = be32toh (rep.simple.error);
-    break;
-  case NBD_STRUCTURED_REPLY_MAGIC:
-    if (!h->structured) {
-      nbdkit_error ("structured response without negotiation");
-      return nbdplug_mark_dead (h);
-    }
-    if (read_full (h->fd, sizeof rep.simple + (char *) &rep,
-                   sizeof rep - sizeof rep.simple))
-      return nbdplug_mark_dead (h);
-    rep.structured.flags = be16toh (rep.structured.flags);
-    rep.structured.type = be16toh (rep.structured.type);
-    rep.structured.length = be32toh (rep.structured.length);
-    nbdkit_debug ("received structured reply %s for cookie %#" PRIx64
-                  ", payload length %" PRId32,
-                  name_of_nbd_reply_type (rep.structured.type),
-                  rep.structured.handle, rep.structured.length);
-    if (rep.structured.length > 64 * 1024 * 1024) {
-      nbdkit_error ("structured reply length is suspiciously large: %" PRId32,
-                    rep.structured.length);
-      return nbdplug_mark_dead (h);
-    }
-    if (rep.structured.length) {
-      /* Special case for OFFSET_DATA in order to read tail of chunk
-         directly into final buffer later on */
-      len = (rep.structured.type == NBD_REPLY_TYPE_OFFSET_DATA &&
-             rep.structured.length > sizeof offset) ? sizeof offset :
-        rep.structured.length;
-      payload = malloc (len);
-      if (!payload) {
-        nbdkit_error ("reading structured reply payload: %m");
-        return nbdplug_mark_dead (h);
-      }
-      if (read_full (h->fd, payload, len))
-        return nbdplug_mark_dead (h);
-      len = 0;
-    }
-    more = !(rep.structured.flags & NBD_REPLY_FLAG_DONE);
-    switch (rep.structured.type) {
-    case NBD_REPLY_TYPE_NONE:
-      if (rep.structured.length) {
-        nbdkit_error ("NBD_REPLY_TYPE_NONE with invalid payload");
-        return nbdplug_mark_dead (h);
-      }
-      if (more) {
-        nbdkit_error ("NBD_REPLY_TYPE_NONE without done flag");
-        return nbdplug_mark_dead (h);
-      }
-      break;
-    case NBD_REPLY_TYPE_OFFSET_DATA:
-      if (rep.structured.length <= sizeof offset) {
-        nbdkit_error ("structured reply OFFSET_DATA too small");
-        return nbdplug_mark_dead (h);
-      }
-      memcpy (&offset, payload, sizeof offset);
-      offset = be64toh (offset);
-      len = rep.structured.length - sizeof offset;
-      break;
-    case NBD_REPLY_TYPE_OFFSET_HOLE:
-      if (rep.structured.length != sizeof offset + sizeof len) {
-        nbdkit_error ("structured reply OFFSET_HOLE size incorrect");
-        return nbdplug_mark_dead (h);
-      }
-      memcpy (&offset, payload, sizeof offset);
-      offset = be64toh (offset);
-      memcpy (&len, payload, sizeof len);
-      len = be32toh (len);
-      if (!len) {
-        nbdkit_error ("structured reply OFFSET_HOLE length incorrect");
-        return nbdplug_mark_dead (h);
-      }
-      zero = true;
-      break;
-    case NBD_REPLY_TYPE_BLOCK_STATUS:
-      if (!h->extents) {
-        nbdkit_error ("block status response without negotiation");
-        return nbdplug_mark_dead (h);
-      }
-      if (rep.structured.length < sizeof *extents ||
-          rep.structured.length % sizeof *extents != sizeof id) {
-        nbdkit_error ("structured reply OFFSET_HOLE size incorrect");
-        return nbdplug_mark_dead (h);
-      }
-      nextents = rep.structured.length / sizeof *extents;
-      extents = (struct block_descriptor *) &payload[sizeof id];
-      memcpy (&id, payload, sizeof id);
-      id = be32toh (id);
-      nbdkit_debug ("parsing %zu extents for context id %" PRId32,
-                    nextents, id);
-      break;
-    default:
-      if (!NBD_REPLY_TYPE_IS_ERR (rep.structured.type)) {
-        nbdkit_error ("received unexpected structured reply %s",
-                      name_of_nbd_reply_type (rep.structured.type));
-        return nbdplug_mark_dead (h);
-      }
-
-      if (rep.structured.length < sizeof error + sizeof errlen) {
-        nbdkit_error ("structured reply error size incorrect");
-        return nbdplug_mark_dead (h);
-      }
-      memcpy (&errlen, payload + sizeof error, sizeof errlen);
-      errlen = be16toh (errlen);
-      if (errlen > rep.structured.length - sizeof error - sizeof errlen) {
-        nbdkit_error ("structured reply error message size incorrect");
-        return nbdplug_mark_dead (h);
-      }
-      memcpy (&error, payload, sizeof error);
-      error = be32toh (error);
-      if (errlen)
-        nbdkit_debug ("received structured error %s with message: %.*s",
-                      name_of_nbd_error (error), (int) errlen,
-                      payload + sizeof error + sizeof errlen);
-      else
-        nbdkit_debug ("received structured error %s without message",
-                      name_of_nbd_error (error));
-    }
-    break;
-
-  default:
-    nbdkit_error ("received unexpected magic in reply: %#" PRIx32,
-                  rep.simple.magic);
-    return nbdplug_mark_dead (h);
-  }
-
-  trans = find_trans_by_cookie (h, rep.simple.handle, !more);
-  if (!trans) {
-    nbdkit_error ("reply with unexpected cookie %#" PRIx64, rep.simple.handle);
-    return nbdplug_mark_dead (h);
-  }
-
-  buf = trans->buf;
-  count = trans->count;
-  if (nextents) {
-    if (!trans->extents) {
-      nbdkit_error ("block status response to a non-status command");
-      return nbdplug_mark_dead (h);
-    }
-    offset = trans->offset;
-    for (size_t i = 0; i < nextents; i++) {
-      /* We rely on the fact that NBDKIT_EXTENT_* match NBD_STATE_* */
-      if (nbdkit_add_extent (trans->extents, offset,
-                             be32toh (extents[i].length),
-                             be32toh (extents[i].status_flags)) == -1) {
-        error = errno;
-        break;
-      }
-      offset += be32toh (extents[i].length);
-    }
-  }
-  if (buf && h->structured && rep.simple.magic == NBD_SIMPLE_REPLY_MAGIC) {
-    nbdkit_error ("simple read reply when structured was expected");
-    return nbdplug_mark_dead (h);
-  }
-  if (len) {
-    if (!buf) {
-      nbdkit_error ("structured read response to a non-read command");
-      return nbdplug_mark_dead (h);
-    }
-    if (offset < trans->offset || offset > INT64_MAX ||
-        offset + len > trans->offset + count) {
-      nbdkit_error ("structured read reply with unexpected offset/length");
-      return nbdplug_mark_dead (h);
-    }
-    buf = (char *) buf + offset - trans->offset;
-    if (zero) {
-      memset (buf, 0, len);
-      buf = NULL;
-    }
-    else
-      count = len;
-  }
-
-  /* Thanks to structured replies, we must preserve an error in any
-     earlier chunk for replay during the final chunk. */
-  if (!more) {
-    *trans_out = trans;
-    if (!error)
-      error = trans->err;
-  }
-  else if (error && !trans->err)
-    trans->err = error;
-
-  /* Convert from wire value to local errno, and perform any final read */
-  switch (error) {
-  case NBD_SUCCESS:
-    if (buf && read_full (h->fd, buf, count))
-      return nbdplug_mark_dead (h);
-    return 0;
-  case NBD_EPERM:
-    return EPERM;
-  case NBD_EIO:
-    return EIO;
-  case NBD_ENOMEM:
-    return ENOMEM;
-  default:
-    nbdkit_debug ("unexpected error %d, squashing to EINVAL", error);
-    /* fallthrough */
-  case NBD_EINVAL:
-    return EINVAL;
-  case NBD_ENOSPC:
-    return ENOSPC;
-  case NBD_EOVERFLOW:
-    return EOVERFLOW;
-  case NBD_ESHUTDOWN:
-    return ESHUTDOWN;
-  }
-}
-
 /* Reader loop. */
 void *
 nbdplug_reader (void *handle)
 {
   struct handle *h = handle;
-  bool done = false;
   int r;

-  while (!done) {
-    struct transaction *trans;
-
-    r = nbdplug_reply_raw (h, &trans);
-    if (r >= 0) {
-      if (!trans)
-        nbdkit_debug ("partial reply handled, waiting for final reply");
-      else {
-        trans->err = r;
+  while (!nbd_aio_is_dead (h->nbd) && !nbd_aio_is_closed (h->nbd)) {
+    struct pollfd fds[2] = {
+      [0].fd = h->fd,
+      [1].fd = h->fds[0],
+      [1].events = POLLIN,
+    };
+    struct transaction *trans, **prev;
+    int dir;
+    char c;
+
+    dir = nbd_aio_get_direction (h->nbd);
+    nbdkit_debug ("polling, dir=%d", dir);
+    if (dir & LIBNBD_AIO_DIRECTION_READ)
+      fds[0].events |= POLLIN;
+    if (dir & LIBNBD_AIO_DIRECTION_WRITE)
+      fds[0].events |= POLLOUT;
+    if (poll (fds, 2, -1) == -1) {
+      nbdkit_error ("poll: %m");
+      break;
+    }
+
+    if (dir & LIBNBD_AIO_DIRECTION_READ && fds[0].revents & POLLIN)
+      nbd_aio_notify_read (h->nbd);
+    else if (dir & LIBNBD_AIO_DIRECTION_WRITE && fds[0].revents & POLLOUT)
+      nbd_aio_notify_write (h->nbd);
+
+    /* Check if we were kicked because a command was started */
+    if (fds[1].revents & POLLIN && read (h->fds[0], &c, 1) != 1) {
+      nbdkit_error ("failed to read pipe: %m");
+      break;
+    }
+
+    ACQUIRE_LOCK_FOR_CURRENT_SCOPE (&h->trans_lock);
+    trans = h->trans;
+    prev = &h->trans;
+    while (trans) {
+      r = nbd_aio_command_completed (h->nbd, trans->cookie);
+      if (r == -1) {
+        nbdkit_debug ("transaction %" PRId64 " failed: %s", trans->cookie,
+                      nbd_get_error ());
+        trans->err = nbd_get_errno ();
+        if (!trans->err)
+          trans->err = EIO;
+      }
+      if (r) {
+        nbdkit_debug ("cookie %" PRId64 " completed state machine, status %d",
+                      trans->cookie, trans->err);
+        *prev = trans->next;
         if (sem_post (&trans->sem)) {
           nbdkit_error ("failed to post semaphore: %m");
           abort ();
         }
       }
+      else
+        prev = &trans->next;
+      trans = *prev;
     }
-    ACQUIRE_LOCK_FOR_CURRENT_SCOPE (&h->trans_lock);
-    done = h->dead;
   }

   /* Clean up any stranded in-flight requests */
-  r = ESHUTDOWN;
+  nbdkit_debug ("state machine changed to %s", nbd_connection_state (h->nbd));
   while (1) {
     struct transaction *trans;

@@ -673,15 +288,63 @@ nbdplug_reader (void *handle)
     }
     if (!trans)
       break;
-    trans->err = r;
+    r = nbd_aio_command_completed (h->nbd, trans->cookie);
+    if (r == -1) {
+      nbdkit_debug ("transaction %" PRId64 " failed: %s", trans->cookie,
+                    nbd_get_error ());
+      trans->err = nbd_get_errno ();
+      if (!trans->err)
+        trans->err = ESHUTDOWN;
+    }
+    else if (!r)
+      trans->err = ESHUTDOWN;
     if (sem_post (&trans->sem)) {
       nbdkit_error ("failed to post semaphore: %m");
       abort ();
     }
   }
+  nbdkit_debug ("exiting state machine thread");
   return NULL;
 }

+/* Register a cookie and return a transaction. */
+static struct transaction *
+nbdplug_register (struct handle *h, int64_t cookie)
+{
+  struct transaction *trans;
+  char c = 0;
+
+  if (cookie == -1) {
+    nbdkit_error ("command failed: %s", nbd_get_error ());
+    errno = nbd_get_errno ();
+    return NULL;
+  }
+
+  nbdkit_debug ("cookie %" PRId64 " started by state machine", cookie);
+  trans = calloc (1, sizeof *trans);
+  if (!trans) {
+    nbdkit_error ("unable to track transaction: %m");
+    return NULL;
+  }
+
+  /* While locked, kick the reader thread and add our transaction */
+  ACQUIRE_LOCK_FOR_CURRENT_SCOPE (&h->trans_lock);
+  if (write (h->fds[1], &c, 1) != 1) {
+    nbdkit_error ("write to pipe: %m");
+    free (trans);
+    return NULL;
+  }
+  if (sem_init (&trans->sem, 0, 0)) {
+    nbdkit_error ("unable to create semaphore: %m");
+    free (trans);
+    return NULL;
+  }
+  trans->cookie = cookie;
+  trans->next = h->trans;
+  h->trans = trans;
+  return trans;
+}
+
 /* Perform the reply half of a transaction. */
 static int
 nbdplug_reply (struct handle *h, struct transaction *trans)
@@ -708,400 +371,60 @@ nbdplug_reply (struct handle *h, struct transaction *trans)
   return err ? -1 : 0;
 }

-/* Receive response to @option into @reply, and consume any
-   payload. If @payload is non-NULL, caller must free *payload. Return
-   0 on success, or -1 if communication to server is no longer
-   possible. */
-static int
-nbdplug_newstyle_recv_option_reply (struct handle *h, uint32_t option,
-                                    struct fixed_new_option_reply *reply,
-                                    void **payload)
-{
-  CLEANUP_FREE char *buffer = NULL;
-
-  if (payload)
-    *payload = NULL;
-  if (read_full (h->fd, reply, sizeof *reply)) {
-    nbdkit_error ("unable to read option reply: %m");
-    return -1;
-  }
-  reply->magic = be64toh (reply->magic);
-  reply->option = be32toh (reply->option);
-  reply->reply = be32toh (reply->reply);
-  reply->replylen = be32toh (reply->replylen);
-  if (reply->magic != NBD_REP_MAGIC || reply->option != option) {
-    nbdkit_error ("unexpected option reply");
-    return -1;
-  }
-  if (reply->replylen) {
-    if (reply->reply == NBD_REP_ACK) {
-      nbdkit_error ("NBD_REP_ACK should not have replylen %" PRId32,
-                    reply->replylen);
-      return -1;
-    }
-    if (reply->replylen > 16 * 1024 * 1024) {
-      nbdkit_error ("option reply length is suspiciously large: %" PRId32,
-                    reply->replylen);
-      return -1;
-    }
-    /* buffer is a string for NBD_REP_ERR_*; adding a NUL terminator
-       makes that string easier to use, without hurting other reply
-       types where buffer is not a string */
-    buffer = malloc (reply->replylen + 1);
-    if (!buffer) {
-      nbdkit_error ("malloc: %m");
-      return -1;
-    }
-    if (read_full (h->fd, buffer, reply->replylen)) {
-      nbdkit_error ("unable to read option reply payload: %m");
-      return -1;
-    }
-    buffer[reply->replylen] = '\0';
-    if (!payload)
-      nbdkit_debug ("ignoring option reply payload");
-    else {
-      *payload = buffer;
-      buffer = NULL;
-    }
-  }
-  return 0;
-}
-
-/* Attempt to negotiate structured reads, block status, and NBD_OPT_GO.
-   Return 1 if haggling completed, 0 if haggling failed but
-   NBD_OPT_EXPORT_NAME is still viable, or -1 on inability to connect. */
-static int
-nbdplug_newstyle_haggle (struct handle *h)
-{
-  const char *const query = "base:allocation";
-  struct new_option opt;
-  uint32_t exportnamelen = htobe32 (strlen (export));
-  uint32_t nrqueries = htobe32 (1);
-  uint32_t querylen = htobe32 (strlen (query));
-  /* For now, we make no NBD_INFO_* requests, relying on the server to
-     send its defaults. TODO: nbdkit should let plugins report block
-     sizes, at which point we should request NBD_INFO_BLOCK_SIZE and
-     obey any sizes set by server. */
-  uint16_t nrinfos = htobe16 (0);
-  struct fixed_new_option_reply reply;
-
-  nbdkit_debug ("trying NBD_OPT_STRUCTURED_REPLY");
-  opt.version = htobe64 (NEW_VERSION);
-  opt.option = htobe32 (NBD_OPT_STRUCTURED_REPLY);
-  opt.optlen = htobe32 (0);
-  if (write_full (h->fd, &opt, sizeof opt)) {
-    nbdkit_error ("unable to request NBD_OPT_STRUCTURED_REPLY: %m");
-    return -1;
-  }
-  if (nbdplug_newstyle_recv_option_reply (h, NBD_OPT_STRUCTURED_REPLY, &reply,
-                                          NULL) < 0)
-    return -1;
-  if (reply.reply == NBD_REP_ACK) {
-    nbdkit_debug ("structured replies enabled, trying NBD_OPT_SET_META_CONTEXT");
-    h->structured = true;
-
-    opt.version = htobe64 (NEW_VERSION);
-    opt.option = htobe32 (NBD_OPT_SET_META_CONTEXT);
-    opt.optlen = htobe32 (sizeof exportnamelen + strlen (export) +
-                          sizeof nrqueries + sizeof querylen + strlen (query));
-    if (write_full (h->fd, &opt, sizeof opt) ||
-        write_full (h->fd, &exportnamelen, sizeof exportnamelen) ||
-        write_full (h->fd, export, strlen (export)) ||
-        write_full (h->fd, &nrqueries, sizeof nrqueries) ||
-        write_full (h->fd, &querylen, sizeof querylen) ||
-        write_full (h->fd, query, strlen (query))) {
-      nbdkit_error ("unable to request NBD_OPT_SET_META_CONTEXT: %m");
-      return -1;
-    }
-    if (nbdplug_newstyle_recv_option_reply (h, NBD_OPT_SET_META_CONTEXT, &reply,
-                                            NULL) < 0)
-      return -1;
-    if (reply.reply == NBD_REP_META_CONTEXT) {
-      /* Cheat: we asked for exactly one context. We could double
-         check that the server is replying with exactly the
-         "base:allocation" context, and then remember the id it tells
-         us to later confirm that responses to NBD_CMD_BLOCK_STATUS
-         match up; but in the absence of multiple contexts, it's
-         easier to just assume the server is compliant, and will reuse
-         the same id, without bothering to check further. */
-      nbdkit_debug ("extents enabled");
-      h->extents = true;
-      if (nbdplug_newstyle_recv_option_reply (h, NBD_OPT_SET_META_CONTEXT,
-                                              &reply, NULL) < 0)
-        return -1;
-    }
-    if (reply.reply != NBD_REP_ACK) {
-      if (h->extents) {
-        nbdkit_error ("unexpected response to set meta context");
-        return -1;
-      }
-      nbdkit_debug ("ignoring meta context response %s",
-                    name_of_nbd_rep (reply.reply));
-    }
-  }
-  else {
-    nbdkit_debug ("structured replies disabled");
-  }
-
-  /* Try NBD_OPT_GO */
-  nbdkit_debug ("trying NBD_OPT_GO");
-  opt.version = htobe64 (NEW_VERSION);
-  opt.option = htobe32 (NBD_OPT_GO);
-  opt.optlen = htobe32 (sizeof exportnamelen + strlen (export) +
-                        sizeof nrinfos);
-  if (write_full (h->fd, &opt, sizeof opt) ||
-      write_full (h->fd, &exportnamelen, sizeof exportnamelen) ||
-      write_full (h->fd, export, strlen (export)) ||
-      write_full (h->fd, &nrinfos, sizeof nrinfos)) {
-    nbdkit_error ("unable to request NBD_OPT_GO: %m");
-    return -1;
-  }
-  while (1) {
-    CLEANUP_FREE void *buffer;
-    struct fixed_new_option_reply_info_export *reply_export;
-    uint16_t info;
-
-    if (nbdplug_newstyle_recv_option_reply (h, NBD_OPT_GO, &reply, &buffer) < 0)
-      return -1;
-    switch (reply.reply) {
-    case NBD_REP_INFO:
-      /* Parse payload, but ignore all except NBD_INFO_EXPORT */
-      if (reply.replylen < 2) {
-        nbdkit_error ("NBD_REP_INFO reply too short");
-        return -1;
-      }
-      memcpy (&info, buffer, sizeof info);
-      info = be16toh (info);
-      switch (info) {
-      case NBD_INFO_EXPORT:
-        if (reply.replylen != sizeof *reply_export) {
-          nbdkit_error ("NBD_INFO_EXPORT reply wrong size");
-          return -1;
-        }
-        reply_export = buffer;
-        h->size = be64toh (reply_export->exportsize);
-        h->flags = be16toh (reply_export->eflags);
-        break;
-      default:
-        nbdkit_debug ("ignoring server info %d", info);
-      }
-      break;
-    case NBD_REP_ACK:
-      /* End of replies, valid if server already sent NBD_INFO_EXPORT,
-         observable since h->flags must contain NBD_FLAG_HAS_FLAGS */
-      assert (!buffer);
-      if (!h->flags) {
-        nbdkit_error ("server omitted NBD_INFO_EXPORT reply to NBD_OPT_GO");
-        return -1;
-      }
-      nbdkit_debug ("NBD_OPT_GO complete");
-      return 1;
-    case NBD_REP_ERR_UNSUP:
-      /* Special case this failure to fall back to NBD_OPT_EXPORT_NAME */
-      nbdkit_debug ("server lacks NBD_OPT_GO support");
-      return 0;
-    default:
-      /* Unexpected. Either the server sent a legitimate error or an
-         unexpected reply, but either way, we can't connect. */
-      if (NBD_REP_IS_ERR (reply.reply))
-        if (reply.replylen)
-          nbdkit_error ("server rejected NBD_OPT_GO with %s: %s",
-                        name_of_nbd_rep (reply.reply), (char *) buffer);
-        else
-          nbdkit_error ("server rejected NBD_OPT_GO with %s",
-                        name_of_nbd_rep (reply.reply));
-      else
-        nbdkit_error ("server used unexpected reply %s to NBD_OPT_GO",
-                      name_of_nbd_rep (reply.reply));
-      return -1;
-    }
-  }
-}
-
-/* Connect to a Unix socket, returning the fd on success */
-static int
-nbdplug_connect_unix (void)
-{
-  struct sockaddr_un sock = { .sun_family = AF_UNIX };
-  int fd;
-
-  nbdkit_debug ("connecting to Unix socket name=%s", sockname);
-  fd = socket (AF_UNIX, SOCK_STREAM, 0);
-  if (fd < 0) {
-    nbdkit_error ("socket: %m");
-    return -1;
-  }
-
-  /* We already validated length during nbdplug_config_complete */
-  assert (strlen (sockname) <= sizeof sock.sun_path);
-  memcpy (sock.sun_path, sockname, strlen (sockname));
-  if (connect (fd, (const struct sockaddr *) &sock, sizeof sock) < 0) {
-    nbdkit_error ("connect: %m");
-    return -1;
-  }
-  return fd;
-}
-
-/* Connect to a TCP socket, returning the fd on success */
-static int
-nbdplug_connect_tcp (void)
-{
-  struct addrinfo hints = { .ai_family = AF_UNSPEC,
-                            .ai_socktype = SOCK_STREAM, };
-  struct addrinfo *result, *rp;
-  int r;
-  const int optval = 1;
-  int fd;
-
-  nbdkit_debug ("connecting to TCP socket host=%s port=%s", hostname, port);
-  r = getaddrinfo (hostname, port, &hints, &result);
-  if (r != 0) {
-    nbdkit_error ("getaddrinfo: %s", gai_strerror (r));
-    return -1;
-  }
-
-  for (rp = result; rp; rp = rp->ai_next) {
-    fd = socket (rp->ai_family, rp->ai_socktype, rp->ai_protocol);
-    if (fd == -1)
-      continue;
-    if (connect (fd, rp->ai_addr, rp->ai_addrlen) != -1)
-      break;
-    close (fd);
-  }
-  freeaddrinfo (result);
-  if (rp == NULL) {
-    nbdkit_error ("connect: %m");
-    close (fd);
-    return -1;
-  }
-
-  if (setsockopt (fd, IPPROTO_TCP, TCP_NODELAY, &optval,
-                  sizeof (int)) == -1) {
-    nbdkit_error ("cannot set TCP_NODELAY option: %m");
-    close (fd);
-    return -1;
-  }
-  return fd;
-}
-
 /* Create the shared or per-connection handle. */
 static struct handle *
 nbdplug_open_handle (int readonly)
 {
   struct handle *h;
-  struct old_handshake old;
-  uint64_t version;
+  int r;

   h = calloc (1, sizeof *h);
   if (h == NULL) {
     nbdkit_error ("malloc: %m");
     return NULL;
   }
+  if (pipe (h->fds)) {
+    nbdkit_error ("pipe: %m");
+    free (h);
+    return NULL;
+  }

  retry:
+  h->fd = -1;
+  h->nbd = nbd_create ();
+  if (!h->nbd)
+    goto err;
+  if (nbd_set_export_name (h->nbd, export) == -1)
+    goto err;
+  if (nbd_request_meta_context (h->nbd, "base:allocation") == -1)
+    goto err;
   if (sockname)
-    h->fd = nbdplug_connect_unix ();
+    r = nbd_connect_unix (h->nbd, sockname);
   else
-    h->fd = nbdplug_connect_tcp ();
-  if (h->fd == -1) {
+    r = nbd_connect_tcp (h->nbd, hostname, port);
+  if (r == -1) {
     if (retry--) {
+      nbdkit_debug ("connect failed; will try again: %s", nbd_get_error ());
       sleep (1);
+      nbd_close (h->nbd);
       goto retry;
     }
     goto err;
   }
-
-  /* old and new handshake share same meaning of first 16 bytes */
-  if (read_full (h->fd, &old, offsetof (struct old_handshake, exportsize))) {
-    nbdkit_error ("unable to read magic: %m");
-    goto err;
-  }
-  if (strncmp (old.nbdmagic, "NBDMAGIC", sizeof old.nbdmagic)) {
-    nbdkit_error ("wrong magic, %s is not an NBD server", servname);
+  h->fd = nbd_aio_get_fd (h->nbd);
+  if (h->fd == -1)
     goto err;
-  }
-  version = be64toh (old.version);
-  if (version == OLD_VERSION) {
-    nbdkit_debug ("trying oldstyle connection");
-    if (read_full (h->fd,
-                   (char *) &old + offsetof (struct old_handshake, exportsize),
-                   sizeof old - offsetof (struct old_handshake, exportsize))) {
-      nbdkit_error ("unable to read old handshake: %m");
-      goto err;
-    }
-    h->size = be64toh (old.exportsize);
-    h->flags = be16toh (old.eflags);
-  }
-  else if (version == NEW_VERSION) {
-    uint16_t gflags;
-    uint32_t cflags;
-    struct new_option opt;
-    struct new_handshake_finish finish;
-    size_t expect;

-    nbdkit_debug ("trying newstyle connection");
-    if (read_full (h->fd, &gflags, sizeof gflags)) {
-      nbdkit_error ("unable to read global flags: %m");
-      goto err;
-    }
-    gflags = be16toh (gflags);
-    cflags = htobe32 (gflags & (NBD_FLAG_FIXED_NEWSTYLE | NBD_FLAG_NO_ZEROES));
-    if (write_full (h->fd, &cflags, sizeof cflags)) {
-      nbdkit_error ("unable to return global flags: %m");
-      goto err;
-    }
-
-    /* Prefer NBD_OPT_GO if possible */
-    if (gflags & NBD_FLAG_FIXED_NEWSTYLE) {
-      int rc = nbdplug_newstyle_haggle (h);
-      if (rc < 0)
-        goto err;
-      if (!rc)
-        goto export_name;
-    }
-    else {
-    export_name:
-      /* Option haggling untried or failed, use older NBD_OPT_EXPORT_NAME */
-      nbdkit_debug ("trying NBD_OPT_EXPORT_NAME");
-      opt.version = htobe64 (NEW_VERSION);
-      opt.option = htobe32 (NBD_OPT_EXPORT_NAME);
-      opt.optlen = htobe32 (strlen (export));
-      if (write_full (h->fd, &opt, sizeof opt) ||
-          write_full (h->fd, export, strlen (export))) {
-        nbdkit_error ("unable to request export '%s': %m", export);
-        goto err;
-      }
-      expect = sizeof finish;
-      if (gflags & NBD_FLAG_NO_ZEROES)
-        expect -= sizeof finish.zeroes;
-      if (read_full (h->fd, &finish, expect)) {
-        nbdkit_error ("unable to read new handshake: %m");
-        goto err;
-      }
-      h->size = be64toh (finish.exportsize);
-      h->flags = be16toh (finish.eflags);
-    }
-  }
-  else {
-    nbdkit_error ("unexpected version %#" PRIx64, version);
-    goto err;
-  }
   if (readonly)
-    h->flags |= NBD_FLAG_READ_ONLY;
+    h->readonly = true;

   /* Spawn a dedicated reader thread */
-  if ((errno = pthread_mutex_init (&h->write_lock, NULL))) {
-    nbdkit_error ("failed to initialize write mutex: %m");
-    goto err;
-  }
   if ((errno = pthread_mutex_init (&h->trans_lock, NULL))) {
     nbdkit_error ("failed to initialize transaction mutex: %m");
-    pthread_mutex_destroy (&h->write_lock);
     goto err;
   }
   if ((errno = pthread_create (&h->reader, NULL, nbdplug_reader, h))) {
     nbdkit_error ("failed to initialize reader thread: %m");
-    pthread_mutex_destroy (&h->write_lock);
     pthread_mutex_destroy (&h->trans_lock);
     goto err;
   }
@@ -1109,8 +432,11 @@ nbdplug_open_handle (int readonly)
   return h;

  err:
-  if (h->fd >= 0)
-    close (h->fd);
+  close (h->fds[0]);
+  close (h->fds[1]);
+  nbdkit_error ("failure while creating nbd handle: %s", nbd_get_error ());
+  if (h->nbd)
+    nbd_close (h->nbd);
   free (h);
   return NULL;
 }
@@ -1128,14 +454,13 @@ nbdplug_open (int readonly)
 static void
 nbdplug_close_handle (struct handle *h)
 {
-  if (!h->dead) {
-    nbdplug_request_raw (h, 0, NBD_CMD_DISC, 0, 0, 0, NULL);
-    shutdown (h->fd, SHUT_WR);
-  }
+  if (nbd_shutdown (h->nbd) == -1)
+    nbdkit_debug ("failed to clean up handle: %s", nbd_get_error ());
   if ((errno = pthread_join (h->reader, NULL)))
     nbdkit_debug ("failed to join reader thread: %m");
-  close (h->fd);
-  pthread_mutex_destroy (&h->write_lock);
+  close (h->fds[0]);
+  close (h->fds[1]);
+  nbd_close (h->nbd);
   pthread_mutex_destroy (&h->trans_lock);
   free (h);
 }
@@ -1150,87 +475,137 @@ nbdplug_close (void *handle)
     nbdplug_close_handle (h);
 }

+
+
 /* Get the file size. */
 static int64_t
 nbdplug_get_size (void *handle)
 {
   struct handle *h = handle;
+  int64_t size = nbd_get_size (h->nbd);

-  return h->size;
+  if (size == -1) {
+    nbdkit_error ("failure to get size: %s", nbd_get_error ());
+    return -1;
+  }
+  return size;
 }

 static int
 nbdplug_can_write (void *handle)
 {
   struct handle *h = handle;
+  int i = nbd_read_only (h->nbd);

-  return !(h->flags & NBD_FLAG_READ_ONLY);
+  if (i == -1) {
+    nbdkit_error ("failure to check readonly flag: %s", nbd_get_error ());
+    return -1;
+  }
+  return !(i || h->readonly);
 }

 static int
 nbdplug_can_flush (void *handle)
 {
   struct handle *h = handle;
+  int i = nbd_can_flush (h->nbd);

-  return h->flags & NBD_FLAG_SEND_FLUSH;
+  if (i == -1) {
+    nbdkit_error ("failure to check flush flag: %s", nbd_get_error ());
+    return -1;
+  }
+  return i;
 }

 static int
 nbdplug_is_rotational (void *handle)
 {
   struct handle *h = handle;
+  int i = nbd_is_rotational (h->nbd);

-  return h->flags & NBD_FLAG_ROTATIONAL;
+  if (i == -1) {
+    nbdkit_error ("failure to check rotational flag: %s", nbd_get_error ());
+    return -1;
+  }
+  return i;
 }

 static int
 nbdplug_can_trim (void *handle)
 {
   struct handle *h = handle;
+  int i = nbd_can_trim (h->nbd);

-  return h->flags & NBD_FLAG_SEND_TRIM;
+  if (i == -1) {
+    nbdkit_error ("failure to check trim flag: %s", nbd_get_error ());
+    return -1;
+  }
+  return i;
 }

 static int
 nbdplug_can_zero (void *handle)
 {
   struct handle *h = handle;
+  int i = nbd_can_zero (h->nbd);

-  return h->flags & NBD_FLAG_SEND_WRITE_ZEROES;
+  if (i == -1) {
+    nbdkit_error ("failure to check zero flag: %s", nbd_get_error ());
+    return -1;
+  }
+  return i;
 }

 static int
 nbdplug_can_fua (void *handle)
 {
   struct handle *h = handle;
+  int i = nbd_can_fua (h->nbd);

-  return h->flags & NBD_FLAG_SEND_FUA ? NBDKIT_FUA_NATIVE : NBDKIT_FUA_NONE;
+  if (i == -1) {
+    nbdkit_error ("failure to check fua flag: %s", nbd_get_error ());
+    return -1;
+  }
+  return i ? NBDKIT_FUA_NATIVE : NBDKIT_FUA_NONE;
 }

 static int
 nbdplug_can_multi_conn (void *handle)
 {
   struct handle *h = handle;
+  int i = nbd_can_multi_conn (h->nbd);

-  return h->flags & NBD_FLAG_CAN_MULTI_CONN;
+  if (i == -1) {
+    nbdkit_error ("failure to check multi-conn flag: %s", nbd_get_error ());
+    return -1;
+  }
+  return i;
 }

 static int
 nbdplug_can_cache (void *handle)
 {
   struct handle *h = handle;
+  int i = nbd_can_cache (h->nbd);

-  if (h->flags & NBD_FLAG_SEND_CACHE)
-    return NBDKIT_CACHE_NATIVE;
-  return NBDKIT_CACHE_NONE;
+  if (i == -1) {
+    nbdkit_error ("failure to check cache flag: %s", nbd_get_error ());
+    return -1;
+  }
+  return i ? NBDKIT_CACHE_NATIVE : NBDKIT_CACHE_NONE;
 }

 static int
 nbdplug_can_extents (void *handle)
 {
   struct handle *h = handle;
+  int i = nbd_can_meta_context (h->nbd, "base:allocation");

-  return h->extents;
+  if (i == -1) {
+    nbdkit_error ("failure to check extents ability: %s", nbd_get_error ());
+    return -1;
+  }
+  return i;
 }

 /* Read data from the file. */
@@ -1242,7 +617,8 @@ nbdplug_pread (void *handle, void *buf, uint32_t count, uint64_t offset,
   struct transaction *s;

   assert (!flags);
-  s = nbdplug_request_full (h, 0, NBD_CMD_READ, offset, count, NULL, buf, NULL);
+  /* XXX API changes in libnbd 0.1.2: */
+  s = nbdplug_register (h, nbd_aio_pread (h->nbd, buf, count, offset /* , 0 */));
   return nbdplug_reply (h, s);
 }

@@ -1253,10 +629,10 @@ nbdplug_pwrite (void *handle, const void *buf, uint32_t count, uint64_t offset,
 {
   struct handle *h = handle;
   struct transaction *s;
+  uint32_t f = flags & NBDKIT_FLAG_FUA ? LIBNBD_CMD_FLAG_FUA : 0;

   assert (!(flags & ~NBDKIT_FLAG_FUA));
-  s = nbdplug_request_full (h, flags & NBDKIT_FLAG_FUA ? NBD_CMD_FLAG_FUA : 0,
-                            NBD_CMD_WRITE, offset, count, buf, NULL, NULL);
+  s = nbdplug_register (h, nbd_aio_pwrite (h->nbd, buf, count, offset, f));
   return nbdplug_reply (h, s);
 }

@@ -1269,13 +645,12 @@ nbdplug_zero (void *handle, uint32_t count, uint64_t offset, uint32_t flags)
   uint32_t f = 0;

   assert (!(flags & ~(NBDKIT_FLAG_FUA | NBDKIT_FLAG_MAY_TRIM)));
-  assert (h->flags & NBD_FLAG_SEND_WRITE_ZEROES);

   if (!(flags & NBDKIT_FLAG_MAY_TRIM))
-    f |= NBD_CMD_FLAG_NO_HOLE;
+    f |= LIBNBD_CMD_FLAG_NO_HOLE;
   if (flags & NBDKIT_FLAG_FUA)
-    f |= NBD_CMD_FLAG_FUA;
-  s = nbdplug_request (h, f, NBD_CMD_WRITE_ZEROES, offset, count);
+    f |= LIBNBD_CMD_FLAG_FUA;
+  s = nbdplug_register (h, nbd_aio_zero (h->nbd, count, offset, f));
   return nbdplug_reply (h, s);
 }

@@ -1285,10 +660,10 @@ nbdplug_trim (void *handle, uint32_t count, uint64_t offset, uint32_t flags)
 {
   struct handle *h = handle;
   struct transaction *s;
+  uint32_t f = flags & NBDKIT_FLAG_FUA ? LIBNBD_CMD_FLAG_FUA : 0;

   assert (!(flags & ~NBDKIT_FLAG_FUA));
-  s = nbdplug_request (h, flags & NBDKIT_FLAG_FUA ? NBD_CMD_FLAG_FUA : 0,
-                       NBD_CMD_TRIM, offset, count);
+  s = nbdplug_register (h, nbd_aio_trim (h->nbd, count, offset, f));
   return nbdplug_reply (h, s);
 }

@@ -1300,10 +675,36 @@ nbdplug_flush (void *handle, uint32_t flags)
   struct transaction *s;

   assert (!flags);
-  s = nbdplug_request (h, 0, NBD_CMD_FLUSH, 0, 0);
+  /* XXX API changes in libnbd 0.1.2: */
+  s = nbdplug_register (h, nbd_aio_flush (h->nbd /* , f */));
   return nbdplug_reply (h, s);
 }

+struct nbdplug_extent_data
+{
+  struct nbdkit_extents *extents;
+  int err;
+};
+
+static void
+nbdplug_extent (void *opaque, const char *metacontext, uint64_t offset,
+                uint32_t *entries, size_t nr_entries)
+{
+  struct nbdplug_extent_data *data = opaque;
+
+  assert (strcmp (metacontext, "base:allocation") == 0);
+  assert (nr_entries % 2 == 0);
+  while (nr_entries && !data->err) {
+    /* We rely on the fact that NBDKIT_EXTENT_* match NBD_STATE_* */
+    if (nbdkit_add_extent (data->extents, offset,
+                           entries[0], entries[1]) == -1)
+      data->err = errno;
+    offset += entries[0];
+    entries += 2;
+    nr_entries -= 2;
+  }
+}
+
 /* Read extents of the file. */
 static int
 nbdplug_extents (void *handle, uint32_t count, uint64_t offset,
@@ -1311,12 +712,20 @@ nbdplug_extents (void *handle, uint32_t count, uint64_t offset,
 {
   struct handle *h = handle;
   struct transaction *s;
-  uint32_t f = flags & NBDKIT_FLAG_REQ_ONE ? NBD_CMD_FLAG_REQ_ONE : 0;
+  uint32_t f = flags & NBDKIT_FLAG_REQ_ONE ? LIBNBD_CMD_FLAG_REQ_ONE : 0;
+  struct nbdplug_extent_data data = { extents, 0 };
+  int r;

-  assert (!(flags & ~NBDKIT_FLAG_REQ_ONE) && h->extents);
-  s = nbdplug_request_full (h, f, NBD_CMD_BLOCK_STATUS, offset, count, NULL,
-                            NULL, extents);
-  return nbdplug_reply (h, s);
+  assert (!(flags & ~NBDKIT_FLAG_REQ_ONE));
+  /* XXX API changes in libnbd 0.1.2: */
+  s = nbdplug_register (h, nbd_aio_block_status (h->nbd, count, offset, f,
+                                                 &data, nbdplug_extent /* , f */));
+  r = nbdplug_reply (h, s);
+  if (r == 0 && data.err) {
+    errno = data.err;
+    r = -1;
+  }
+  return r;
 }

 /* Cache a portion of the file. */
@@ -1327,7 +736,8 @@ nbdplug_cache (void *handle, uint32_t count, uint64_t offset, uint32_t flags)
   struct transaction *s;

   assert (!flags);
-  s = nbdplug_request (h, 0, NBD_CMD_CACHE, offset, count);
+  /* XXX API changes in libnbd 0.1.2: */
+  s = nbdplug_register (h, nbd_aio_cache (h->nbd, count, offset /* , f */));
   return nbdplug_reply (h, s);
 }

diff --git a/plugins/nbd/Makefile.am b/plugins/nbd/Makefile.am
index bfc2a83..a854a44 100644
--- a/plugins/nbd/Makefile.am
+++ b/plugins/nbd/Makefile.am
@@ -41,7 +41,6 @@ nbdkit_nbd_plugin_la_SOURCES = \
 nbdkit_nbd_plugin_la_CPPFLAGS = \
 	-I$(top_srcdir)/include \
 	-I$(top_srcdir)/common/include \
-	-I$(top_srcdir)/common/protocol \
 	-I$(top_srcdir)/common/utils \
 	-I$(top_srcdir)/server
 nbdkit_nbd_plugin_la_CFLAGS = \
@@ -50,7 +49,6 @@ nbdkit_nbd_plugin_la_LDFLAGS = \
 	-module -avoid-version -shared \
 	-Wl,--version-script=$(top_srcdir)/plugins/plugins.syms
 nbdkit_nbd_plugin_la_LIBADD = \
-	$(top_builddir)/common/protocol/libprotocol.la \
 	$(top_builddir)/common/utils/libutils.la

 # TODO: drop standalone version, which is locked at nbdkit 1.13.4 behavior,
@@ -62,9 +60,15 @@ nbdkit_nbd_plugin_la_CFLAGS += \
 	$(LIBNBD_CFLAGS)
 nbdkit_nbd_plugin_la_LIBADD += \
 	$(LIBNBD_LIBS)
+
 else !HAVE_LIBNBD
 nbdkit_nbd_plugin_la_SOURCES += \
 	nbd-standalone.c
+nbdkit_nbd_plugin_la_CPPFLAGS += \
+	-I$(top_srcdir)/common/protocol \
+nbdkit_nbd_plugin_la_LIBADD += \
+	$(top_builddir)/common/protocol/libprotocol.la
+
 endif !HAVE_LIBNBD

 if HAVE_POD
-- 
2.20.1




More information about the Libguestfs mailing list