[Libguestfs] [PATCH libnbd] ublk: Add new nbdublk program

Ming Lei ming.lei at redhat.com
Tue Aug 30 07:12:23 UTC 2022


On Tue, Aug 30, 2022 at 10:32:02AM +0800, Ming Lei wrote:
> Hi Jones,
> 
> On Thu, Aug 25, 2022 at 01:10:55PM +0100, Richard W.M. Jones wrote:
> > This patch adds simple support for a ublk-based NBD client.
> > It is also available here:
> > https://gitlab.com/rwmjones/libnbd/-/tree/nbdublk/ublk
> > 
> > ublk is a way to write Linux block device drivers in userspace:
> 
> Just looked at your nbdublk implementation a bit, basically it is good,
> and one really nice work.
> 
> Also follows two suggestions:
> 
> 1) the io_uring context is multilexed with ublk io command handling, so
> we should avoid to block in both ->handle_io_async() and
> ->handle_event(), otherwise performance may be bad
> 
> 2) in the implementation of nbd worker thread, there are two sleep
> points(wait for incoming io command, and network FD), I'd suggest to use
> poll to wait on any of them
> 
> Recently I are working to add ublksrv io offloading or aio interfaces on this
> sort of case in which io_uring can't be used, which may simplified this
> area, please see the attached patch which applies the above two points against
> your patch. And obvious improvement can be observed on my simple fio test(
> randread, io, 4k bs, libaio) against backend of 'nbdkit file'.
> 
> But these interfaces aren't merged to ublksrv github tree yet, you can find
> them in the aio branch, and demo_event.c is one example wrt. how to use
> them:
> 
>   https://github.com/ming1/ubdsrv/tree/aio
> 
> Actually this interface can be improved further for nbdublk case,
> and the request allocation isn't needed actually for this direct
> offloading. But they are added for covering some IOs not from ublk
> driver, such as meta data, so 'struct ublksrv_aio' is allocated.
> I will try best to finalize them and merge to master branch.
> 
> BTW, IOPS on nbdublk(backend: nbdkit file) still has big gap compared
> with ublk-loop, so I guess in future maybe io_uring should be tried and
> see if big improvement can be observed.
> 

The patch sent in last email may cause io hang on MQ, and follows the fixed
version:


diff --git a/generator/API.ml b/generator/API.ml
index 3e948aa..bdd0fb8 100644
--- a/generator/API.ml
+++ b/generator/API.ml
@@ -2289,6 +2289,26 @@ that eventual action is actually expected - for example, if
 the connection is established but there are no commands in
 flight, using an infinite timeout will permanently block).
 
+This function is mainly useful as an example of how you might
+integrate libnbd with your own main loop, rather than being
+intended as something you would use.";
+    example = Some "examples/aio-connect-read.c";
+  };
+
+  "poll2", {
+    default_call with
+    args = [Int "evt"; Int "timeout" ]; ret = RInt;
+    shortdesc = "poll the handle once with eventfd";
+    longdesc = "\
+This is a simple implementation of L<poll(2)> which is used
+internally by synchronous API calls.  On success, it returns
+C<0> if the C<timeout> (in milliseconds) occurs, or C<1> if
+the poll completed and the state machine progressed. Set
+C<timeout> to C<-1> to block indefinitely (but be careful
+that eventual action is actually expected - for example, if
+the connection is established but there are no commands in
+flight, using an infinite timeout will permanently block).
+
 This function is mainly useful as an example of how you might
 integrate libnbd with your own main loop, rather than being
 intended as something you would use.";
@@ -3153,6 +3173,7 @@ let first_version = [
   "zero", (1, 0);
   "block_status", (1, 0);
   "poll", (1, 0);
+  "poll2", (1, 0);
   "aio_connect", (1, 0);
   "aio_connect_uri", (1, 0);
   "aio_connect_unix", (1, 0);
diff --git a/lib/poll.c b/lib/poll.c
index df01d94..e9d7924 100644
--- a/lib/poll.c
+++ b/lib/poll.c
@@ -27,14 +27,21 @@
 #include "internal.h"
 
 /* A simple main loop implementation using poll(2). */
-int
-nbd_unlocked_poll (struct nbd_handle *h, int timeout)
+static int
+__nbd_unlocked_poll (struct nbd_handle *h, int evt, int timeout)
 {
-  struct pollfd fds[1];
-  int r;
+  struct pollfd fds[2];
+  int r, nr_fds = 1;
 
   /* fd might be negative, and poll will ignore it. */
   fds[0].fd = nbd_unlocked_aio_get_fd (h);
+  if (evt > 0) {
+      fds[1].fd = evt;
+      fds[1].events = POLLIN;
+      fds[1].revents = 0;
+      nr_fds = 2;
+  }
+
   switch (nbd_internal_aio_get_direction (get_next_state (h))) {
   case LIBNBD_AIO_DIRECTION_READ:
     fds[0].events = POLLIN;
@@ -58,7 +65,7 @@ nbd_unlocked_poll (struct nbd_handle *h, int timeout)
    * passed to poll.
    */
   do {
-    r = poll (fds, 1, timeout);
+    r = poll (fds, nr_fds, timeout);
     debug (h, "poll end: r=%d revents=%x", r, fds[0].revents);
   } while (r == -1 && errno == EINTR);
 
@@ -91,3 +98,15 @@ nbd_unlocked_poll (struct nbd_handle *h, int timeout)
 
   return 1;
 }
+
+int
+nbd_unlocked_poll (struct nbd_handle *h, int timeout)
+{
+	return __nbd_unlocked_poll (h, -1, timeout);
+}
+
+int
+nbd_unlocked_poll2 (struct nbd_handle *h, int evt, int timeout)
+{
+	return __nbd_unlocked_poll (h, evt, timeout);
+}
diff --git a/ublk/tgt.c b/ublk/tgt.c
index 4cdd42a..5b478ae 100644
--- a/ublk/tgt.c
+++ b/ublk/tgt.c
@@ -35,6 +35,7 @@
 #endif
 
 #include <ublksrv.h>
+#include <ublksrv_aio.h>
 
 #include <libnbd.h>
 
@@ -46,14 +47,6 @@
 /* Number of seconds to wait for commands to complete when closing the dev. */
 #define RELEASE_TIMEOUT 5
 
-/* List of completed commands. */
-struct completion {
-  struct ublksrv_queue *q;
-  int tag;
-  int res;      /* The normal return value, if the command completes OK. */
-};
-DEFINE_VECTOR_TYPE(completions, struct completion)
-
 /* Thread model:
  *
  * There are two threads per NBD connection.  One thread
@@ -69,23 +62,8 @@ struct thread_info {
   pthread_t io_uring_thread;
   pthread_t nbd_work_thread;
 
-  /* This counts the number of commands in flight.  The condition is
-   * used to allow the operations thread to process commands when
-   * in_flight goes from 0 -> 1.  This is roughly equivalent to
-   * nbd_aio_in_flight, but we need to count it ourselves in order to
-   * use the condition.
-   */
-  _Atomic size_t in_flight;
-  pthread_mutex_t in_flight_mutex;
-  pthread_cond_t in_flight_cond;
-
-  /* Commands have to be completed on the io_uring thread, but they
-   * run on the NBD thread.  So when the NBD command completes we put
-   * the command on this queue and they are passed to the io_uring
-   * thread to call ublksrv_complete_io.
-   */
-  pthread_mutex_t completed_commands_lock;
-  completions completed_commands;
+  struct ublksrv_aio_ctx *aio_ctx;
+  struct ublksrv_aio_list compl;
 };
 DEFINE_VECTOR_TYPE(thread_infos, struct thread_info)
 static thread_infos thread_info;
@@ -95,6 +73,161 @@ static pthread_barrier_t barrier;
 static char jbuf[4096];
 static pthread_mutex_t jbuf_lock = PTHREAD_MUTEX_INITIALIZER;
 
+/* Command completion callback (called on the NBD thread). */
+static int
+command_completed (void *vpdata, int *error)
+{
+  struct ublksrv_aio *req = vpdata;
+  int q_id = ublksrv_aio_qid(req->id);
+  struct ublksrv_aio_ctx *aio_ctx = thread_info.ptr[q_id].aio_ctx;
+  struct ublksrv_queue *q = ublksrv_get_queue(aio_ctx->dev, q_id);
+  struct ublksrv_aio_list *compl = &thread_info.ptr[q_id].compl;
+
+  if (verbose)
+    fprintf (stderr,
+             "%s: command_completed: tag=%d q_id=%zu error=%d\n",
+             "nbdublk", ublksrv_aio_tag(req->id),
+	     ublksrv_aio_qid(req->id), *error);
+
+  /* If the command failed, override the normal result. */
+  if (*error != 0)
+    req->res = *error;
+
+  pthread_spin_lock(&compl->lock);
+  aio_list_add(&compl->list, req);
+  pthread_spin_unlock(&compl->lock);
+
+  return 1;
+}
+
+
+int aio_submitter(struct ublksrv_aio_ctx *ctx,
+		struct ublksrv_aio *req)
+{
+  const struct ublksrv_io_desc *iod = &req->io;
+  const unsigned op = ublksrv_get_op (iod);
+  const unsigned flags = ublksrv_get_flags (iod);
+  const bool fua = flags & UBLK_IO_F_FUA;
+  const bool alloc_zero = flags & UBLK_IO_F_NOUNMAP; /* else punch hole */
+  const size_t q_id = ublksrv_aio_qid(req->id); /* also the NBD handle number */
+  struct nbd_handle *h = nbd.ptr[q_id];
+  uint32_t nbd_flags = 0;
+  int64_t r;
+  nbd_completion_callback cb;
+  bool sync = false;
+
+  if (verbose)
+    fprintf (stderr, "%s: handle_io_async: tag = %d q_id = %zu\n",
+             "nbdublk", ublksrv_aio_tag(req->id), q_id);
+
+  req->res = iod->nr_sectors << 9;
+  cb.callback = command_completed;
+  cb.user_data = req;
+  cb.free = NULL;
+
+  switch (op) {
+  case UBLK_IO_OP_READ:
+    r = nbd_aio_pread (h, (void *) iod->addr, iod->nr_sectors << 9,
+                       iod->start_sector << 9, cb, 0);
+    if (r == -1) {
+      fprintf (stderr, "%s: %s\n", "nbdublk", nbd_get_error ());
+      return -EINVAL;
+    }
+    break;
+
+  case UBLK_IO_OP_WRITE:
+    if (fua && can_fua)
+      nbd_flags |= LIBNBD_CMD_FLAG_FUA;
+
+    r = nbd_aio_pwrite (h, (const void *) iod->addr, iod->nr_sectors << 9,
+                        iod->start_sector << 9, cb, nbd_flags);
+    if (r == -1) {
+      fprintf (stderr, "%s: %s\n", "nbdublk", nbd_get_error ());
+      return -EINVAL;
+    }
+    break;
+
+  case UBLK_IO_OP_FLUSH:
+    r = nbd_flush (h, 0);
+    if (r == -1) {
+      fprintf (stderr, "%s: %s\n", "nbdublk", nbd_get_error ());
+      return -EINVAL;
+    }
+    sync = true;
+    break;
+
+  case UBLK_IO_OP_DISCARD:
+    if (fua && can_fua)
+      nbd_flags |= LIBNBD_CMD_FLAG_FUA;
+
+    r = nbd_trim (h, iod->nr_sectors << 9, iod->start_sector << 9, nbd_flags);
+    if (r == -1) {
+      fprintf (stderr, "%s: %s\n", "nbdublk", nbd_get_error ());
+      return -EINVAL;
+    }
+    sync = true;
+    break;
+
+  case UBLK_IO_OP_WRITE_ZEROES:
+    if (fua && can_fua)
+      nbd_flags |= LIBNBD_CMD_FLAG_FUA;
+
+    if (alloc_zero)
+      nbd_flags |= LIBNBD_CMD_FLAG_NO_HOLE;
+
+    r = nbd_zero (h, iod->nr_sectors << 9, iod->start_sector << 9, nbd_flags);
+    if (r == -1) {
+      fprintf (stderr, "%s: %s\n", "nbdublk", nbd_get_error ());
+      return -EINVAL;
+    }
+    sync = true;
+    break;
+
+  default:
+    fprintf (stderr, "%s: unknown operation %u\n", "nbdublk", op);
+    return -ENOTSUP;
+  }
+
+  /* return if this request is completed */
+  if (sync)
+	  return 1;
+  return 0;
+}
+
+static void *
+nbd_work_thread (void *vpinfo)
+{
+  struct thread_info *ti = vpinfo;
+  struct nbd_handle *h = nbd.ptr[ti->i];
+  struct ublksrv_aio_ctx *aio_ctx = thread_info.ptr[ti->i].aio_ctx;
+  struct ublksrv_queue *q = ublksrv_get_queue(aio_ctx->dev, ti->i);
+  struct ublksrv_aio_list *c = &thread_info.ptr[ti->i].compl;
+
+  /* Signal to the main thread that we have initialized. */
+  pthread_barrier_wait (&barrier);
+
+  while (!ublksrv_aio_ctx_dead(aio_ctx)) {
+      struct aio_list compl;
+
+      aio_list_init(&compl);
+      ublksrv_aio_submit_worker(aio_ctx, aio_submitter, &compl);
+
+      pthread_spin_lock(&c->lock);
+      aio_list_splice(&c->list, &compl);
+      pthread_spin_unlock(&c->lock);
+
+      ublksrv_aio_complete_worker(aio_ctx, &compl);
+
+      if (nbd_poll2 (h, aio_ctx->efd, -1) == -1) {
+        fprintf (stderr, "%s\n", nbd_get_error ());
+        exit (EXIT_FAILURE);
+      }
+  }
+
+  /*NOTREACHED*/
+  return NULL;
+}
+
 static void *
 io_uring_thread (void *vpinfo)
 {
@@ -139,37 +272,6 @@ io_uring_thread (void *vpinfo)
   return NULL;
 }
 
-static void *
-nbd_work_thread (void *vpinfo)
-{
-  struct thread_info *thread_info = vpinfo;
-  const size_t i = thread_info->i;
-  struct nbd_handle *h = nbd.ptr[i];
-
-  /* Signal to the main thread that we have initialized. */
-  pthread_barrier_wait (&barrier);
-
-  while (1) {
-    /* Sleep until at least one command is in flight. */
-    pthread_mutex_lock (&thread_info->in_flight_mutex);
-    while (thread_info->in_flight == 0)
-      pthread_cond_wait (&thread_info->in_flight_cond,
-                         &thread_info->in_flight_mutex);
-    pthread_mutex_unlock (&thread_info->in_flight_mutex);
-
-    /* Dispatch work while there are commands in flight. */
-    while (thread_info->in_flight > 0) {
-      if (nbd_poll (h, -1) == -1) {
-        fprintf (stderr, "%s\n", nbd_get_error ());
-        exit (EXIT_FAILURE);
-      }
-    }
-  }
-
-  /*NOTREACHED*/
-  return NULL;
-}
-
 static int
 set_parameters (struct ublksrv_ctrl_dev *ctrl_dev,
                 const struct ublksrv_dev *dev)
@@ -215,6 +317,7 @@ int
 start_daemon (struct ublksrv_ctrl_dev *ctrl_dev)
 {
   const struct ublksrv_ctrl_dev_info *dinfo = &ctrl_dev->dev_info;
+  int dev_id = ctrl_dev->dev_info.dev_id;
   struct ublksrv_dev *dev;
   size_t i;
   int r;
@@ -265,17 +368,16 @@ start_daemon (struct ublksrv_ctrl_dev *ctrl_dev)
     /* Note this cannot fail because of previous reserve. */
     thread_infos_append (&thread_info,
                          (struct thread_info)
-                         { .dev = dev, .i = i, .in_flight = 0 });
+                         { .dev = dev, .i = i,});
+
+    thread_info.ptr[i].aio_ctx = ublksrv_aio_ctx_init(dev, 0);
+    if (!thread_info.ptr[i].aio_ctx) {
+	  fprintf(stderr, "dev %d queue %d call ublk_aio_ctx_init failed\n",
+			  dev_id, i);
+	  return -ENOMEM;
+    }
+    ublksrv_aio_init_list(&thread_info.ptr[i].compl);
 
-    r = pthread_mutex_init (&thread_info.ptr[i].in_flight_mutex, NULL);
-    if (r != 0)
-      goto bad_pthread;
-    r = pthread_cond_init (&thread_info.ptr[i].in_flight_cond, NULL);
-    if (r != 0)
-      goto bad_pthread;
-    r = pthread_mutex_init (&thread_info.ptr[i].completed_commands_lock, NULL);
-    if (r != 0)
-      goto bad_pthread;
     r = pthread_create (&thread_info.ptr[i].io_uring_thread, NULL,
                         io_uring_thread, &thread_info.ptr[i]);
     if (r != 0)
@@ -316,24 +418,10 @@ start_daemon (struct ublksrv_ctrl_dev *ctrl_dev)
   for (i = 0; i < nbd.len; ++i)
     pthread_join (thread_info.ptr[i].io_uring_thread, NULL);
 
-  /* Wait until a timeout while there are NBD commands in flight. */
-  time (&st);
-  while (time (NULL) - st <= RELEASE_TIMEOUT) {
-    for (i = 0; i < nbd.len; ++i) {
-      if (thread_info.ptr[i].in_flight > 0)
-        break;
-    }
-    if (i == nbd.len) /* no commands in flight */
-      break;
-
-    /* Signal to the operations threads to work. */
-    for (i = 0; i < nbd.len; ++i) {
-      pthread_mutex_lock (&thread_info.ptr[i].in_flight_mutex);
-      pthread_cond_signal (&thread_info.ptr[i].in_flight_cond);
-      pthread_mutex_unlock (&thread_info.ptr[i].in_flight_mutex);
-    }
-
-    sleep (1);
+  for (i = 0; i < nbd.len; ++i) {
+    ublksrv_aio_ctx_shutdown(thread_info.ptr[i].aio_ctx);
+    pthread_join (thread_info.ptr[i].nbd_work_thread, NULL);
+    ublksrv_aio_ctx_deinit(thread_info.ptr[i].aio_ctx);
   }
 
   ublksrv_dev_deinit (dev);
@@ -367,176 +455,37 @@ init_tgt (struct ublksrv_dev *dev, int type, int argc, char *argv[])
   return 0;
 }
 
-/* Command completion callback (called on the NBD thread). */
-static int
-command_completed (void *vpdata, int *error)
-{
-  struct completion *completion = vpdata;
-  struct ublksrv_queue *q = completion->q;
-  const size_t i = q->q_id;
-
-  if (verbose)
-    fprintf (stderr,
-             "%s: command_completed: tag=%d q_id=%zu res=%d error=%d\n",
-             "nbdublk", completion->tag, i, completion->res, *error);
-
-  /* If the command failed, override the normal result. */
-  if (*error != 0)
-    completion->res = *error;
-
-  assert (thread_info.ptr[i].in_flight >= 1);
-  thread_info.ptr[i].in_flight--;
-
-  /* Copy the command to the list of completed commands.
-   *
-   * Note *completion is freed by the .free handler that we added to
-   * this completion callback.
-   */
-  pthread_mutex_lock (&thread_info.ptr[i].completed_commands_lock);
-  completions_append (&thread_info.ptr[i].completed_commands, *completion);
-
-  /* Signal io_uring thread that the command has been completed.
-   * It will call us back in a different thread on ->handle_event
-   * and we can finally complete the command(s) there.
-   */
-  ublksrv_queue_send_event (q);
-  pthread_mutex_unlock (&thread_info.ptr[i].completed_commands_lock);
-
-  /* Retire the NBD command. */
-  return 1;
-}
-
 static void
-handle_event (struct ublksrv_queue *q)
+nbd_handle_event (struct ublksrv_queue *q)
 {
-  const size_t i = q->q_id;
-  size_t j;
+  struct ublksrv_aio_ctx *aio_ctx = thread_info.ptr[q->q_id].aio_ctx;
 
   if (verbose)
-    fprintf (stderr, "%s: handle_event: q_id = %d\n", "nbdublk", q->q_id);
+     fprintf (stderr, "%s: handle_event: q_id = %d\n", "nbdublk", q->q_id);
 
-  pthread_mutex_lock (&thread_info.ptr[i].completed_commands_lock);
-
-  for (j = 0; j < thread_info.ptr[i].completed_commands.len; ++j) {
-    struct completion *completion =
-      &thread_info.ptr[i].completed_commands.ptr[j];
-    ublksrv_complete_io (completion->q, completion->tag, completion->res);
-  }
-  completions_reset (&thread_info.ptr[i].completed_commands);
-  ublksrv_queue_handled_event (q);
-
-  pthread_mutex_unlock (&thread_info.ptr[i].completed_commands_lock);
+  ublksrv_aio_handle_event(aio_ctx, q);
 }
 
-/* Start a single command. */
-static int
-handle_io_async (struct ublksrv_queue *q, int tag)
+static int nbd_handle_io_async(struct ublksrv_queue *q, int tag)
 {
-  const struct ublksrv_io_desc *iod = ublksrv_get_iod (q, tag);
-  const unsigned op = ublksrv_get_op (iod);
-  const unsigned flags = ublksrv_get_flags (iod);
-  const bool fua = flags & UBLK_IO_F_FUA;
-  const bool alloc_zero = flags & UBLK_IO_F_NOUNMAP; /* else punch hole */
-  const size_t q_id = q->q_id; /* also the NBD handle number */
-  struct nbd_handle *h = nbd.ptr[q_id];
-  uint32_t nbd_flags = 0;
-  int64_t r;
-  nbd_completion_callback cb;
-  struct completion *completion;
-
-  if (verbose)
-    fprintf (stderr, "%s: handle_io_async: tag = %d q_id = %zu\n",
-             "nbdublk", tag, q_id);
-
-  /* Set up a completion callback and its user data. */
-  completion = malloc (sizeof *completion);
-  if (completion == NULL) abort ();
-  completion->q = q;
-  completion->tag = tag;
-  completion->res = iod->nr_sectors << 9;
-  cb.callback = command_completed;
-  cb.user_data = completion;
-  cb.free = free;
-
-  switch (op) {
-  case UBLK_IO_OP_READ:
-    r = nbd_aio_pread (h, (void *) iod->addr, iod->nr_sectors << 9,
-                       iod->start_sector << 9, cb, 0);
-    if (r == -1) {
-      fprintf (stderr, "%s: %s\n", "nbdublk", nbd_get_error ());
-      ublksrv_complete_io (q, tag, - (nbd_get_errno () ? : EINVAL));
-      return 0;
-    }
-    break;
-
-  case UBLK_IO_OP_WRITE:
-    if (fua && can_fua)
-      nbd_flags |= LIBNBD_CMD_FLAG_FUA;
-
-    r = nbd_aio_pwrite (h, (const void *) iod->addr, iod->nr_sectors << 9,
-                        iod->start_sector << 9, cb, nbd_flags);
-    if (r == -1) {
-      fprintf (stderr, "%s: %s\n", "nbdublk", nbd_get_error ());
-      ublksrv_complete_io (q, tag, - (nbd_get_errno () ? : EINVAL));
-      return 0;
-    }
-    break;
-
-  case UBLK_IO_OP_FLUSH:
-    r = nbd_flush (h, 0);
-    if (r == -1) {
-      fprintf (stderr, "%s: %s\n", "nbdublk", nbd_get_error ());
-      ublksrv_complete_io (q, tag, - (nbd_get_errno () ? : EINVAL));
-      return 0;
-    }
-    break;
-
-  case UBLK_IO_OP_DISCARD:
-    if (fua && can_fua)
-      nbd_flags |= LIBNBD_CMD_FLAG_FUA;
-
-    r = nbd_trim (h, iod->nr_sectors << 9, iod->start_sector << 9, nbd_flags);
-    if (r == -1) {
-      fprintf (stderr, "%s: %s\n", "nbdublk", nbd_get_error ());
-      ublksrv_complete_io (q, tag, - (nbd_get_errno () ? : EINVAL));
-      return 0;
-    }
-    break;
-
-  case UBLK_IO_OP_WRITE_ZEROES:
-    if (fua && can_fua)
-      nbd_flags |= LIBNBD_CMD_FLAG_FUA;
-
-    if (alloc_zero)
-      nbd_flags |= LIBNBD_CMD_FLAG_NO_HOLE;
-
-    r = nbd_zero (h, iod->nr_sectors << 9, iod->start_sector << 9, nbd_flags);
-    if (r == -1) {
-      fprintf (stderr, "%s: %s\n", "nbdublk", nbd_get_error ());
-      ublksrv_complete_io (q, tag, - (nbd_get_errno () ? : EINVAL));
-      return 0;
-    }
-    break;
-
-  default:
-    fprintf (stderr, "%s: unknown operation %u\n", "nbdublk", op);
-    ublksrv_complete_io (q, tag, -ENOTSUP);
-    return 0;
-  }
-
-  /* Make sure the corresponding NBD worker sees the command. */
-  pthread_mutex_lock (&thread_info.ptr[q_id].in_flight_mutex);
-  thread_info.ptr[q_id].in_flight++;
-  pthread_cond_signal (&thread_info.ptr[q_id].in_flight_cond);
-  pthread_mutex_unlock (&thread_info.ptr[q_id].in_flight_mutex);
-
-  return 0;
+	struct ublksrv_aio_ctx *aio_ctx = thread_info.ptr[q->q_id].aio_ctx;
+	const struct ublksrv_io_desc *iod = ublksrv_get_iod(q, tag);
+	struct ublksrv_aio *req = ublksrv_aio_alloc_req(aio_ctx, 0);
+
+	req->io = *iod;
+	req->id = ublksrv_aio_pid_tag(q->q_id, tag);
+	if (verbose)
+		fprintf (stderr, "%s %d qid %d tag %d\n", __func__, __LINE__,
+			q->q_id, tag);
+	ublksrv_aio_submit_req(aio_ctx, req);
+
+	return 0;
 }
 
 struct ublksrv_tgt_type tgt_type = {
   .type = UBLKSRV_TGT_TYPE_NBD,
   .name = "nbd",
   .init_tgt = init_tgt,
-  .handle_io_async = handle_io_async,
-  .handle_event = handle_event,
+  .handle_io_async = nbd_handle_io_async,
+  .handle_event = nbd_handle_event,
 };


-- 
Ming


More information about the Libguestfs mailing list