[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]

[Libguestfs] [nbdkit PATCH 2/2] nbd: Use nbdkit aio_*_notify variants



We no longer have to track a linked list of in-flight transactions
that are pending resolution, but rely instead on libnbd 0.1.6+ doing
it on our behalf. Normally, we will get to call nbdplug_register()
prior to the notify callback being reached, but under a heavily-loaded
system, it is conceivable that the libnbd state machine can manage to
fire off our request and receive a server reply all before returning
to the thread waiting on the semaphore, in which case the notify
callback could set the cookie first.  We still have to call
nbd_aio_command_completed to retire the command, but now we can call
it from the context of the thread that made the request rather than
from the central reader thread, and we can check that the retired
command has the same status as expected from the notify callback.

Repeating a setup from commit e897ed70, I'm not seeing any real
difference in performance numbers. But the reduced lines of code, and
one less mutex, makes this seem like a win from the maintenance
persepective.

Signed-off-by: Eric Blake <eblake redhat com>
---
 plugins/nbd/nbd.c | 116 ++++++++++++++++------------------------------
 1 file changed, 39 insertions(+), 77 deletions(-)

diff --git a/plugins/nbd/nbd.c b/plugins/nbd/nbd.c
index 5aeab9e8..548839cf 100644
--- a/plugins/nbd/nbd.c
+++ b/plugins/nbd/nbd.c
@@ -61,7 +61,6 @@ struct transaction {
   sem_t sem;
   uint32_t early_err;
   uint32_t err;
-  struct transaction *next;
   struct nbdkit_extents *extents;
 };

@@ -73,9 +72,6 @@ struct handle {
   int fds[2]; /* Pipe for kicking the reader thread */
   bool readonly;
   pthread_t reader;
-
-  pthread_mutex_t trans_lock; /* Covers access to trans list */
-  struct transaction *trans; /* List of pending transactions */
 };

 /* Connect to server via absolute name of Unix socket */
@@ -306,7 +302,6 @@ void *
 nbdplug_reader (void *handle)
 {
   struct handle *h = handle;
-  int r;

   while (!nbd_aio_is_dead (h->nbd) && !nbd_aio_is_closed (h->nbd)) {
     struct pollfd fds[2] = {
@@ -314,7 +309,6 @@ nbdplug_reader (void *handle)
       [1].fd = h->fds[0],
       [1].events = POLLIN,
     };
-    struct transaction *trans, **prev;
     int dir;
     char c;

@@ -343,61 +337,9 @@ nbdplug_reader (void *handle)
         break;
       }
     }
-
-    ACQUIRE_LOCK_FOR_CURRENT_SCOPE (&h->trans_lock);
-    trans = h->trans;
-    prev = &h->trans;
-    while (trans) {
-      r = nbd_aio_command_completed (h->nbd, trans->cookie);
-      if (r == -1) {
-        nbdkit_debug ("transaction %" PRId64 " failed: %s", trans->cookie,
-                      nbd_get_error ());
-        trans->err = nbd_get_errno ();
-        if (!trans->err)
-          trans->err = EIO;
-      }
-      if (r) {
-        nbdkit_debug ("cookie %" PRId64 " completed state machine, status %d",
-                      trans->cookie, trans->err);
-        *prev = trans->next;
-        if (sem_post (&trans->sem)) {
-          nbdkit_error ("failed to post semaphore: %m");
-          abort ();
-        }
-      }
-      else
-        prev = &trans->next;
-      trans = *prev;
-    }
   }

-  /* Clean up any stranded in-flight requests */
   nbdkit_debug ("state machine changed to %s", nbd_connection_state (h->nbd));
-  while (1) {
-    struct transaction *trans;
-
-    {
-      ACQUIRE_LOCK_FOR_CURRENT_SCOPE (&h->trans_lock);
-      trans = h->trans;
-      h->trans = trans ? trans->next : NULL;
-    }
-    if (!trans)
-      break;
-    r = nbd_aio_command_completed (h->nbd, trans->cookie);
-    if (r == -1) {
-      nbdkit_debug ("transaction %" PRId64 " failed: %s", trans->cookie,
-                    nbd_get_error ());
-      trans->err = nbd_get_errno ();
-      if (!trans->err)
-        trans->err = ESHUTDOWN;
-    }
-    else if (!r)
-      trans->err = ESHUTDOWN;
-    if (sem_post (&trans->sem)) {
-      nbdkit_error ("failed to post semaphore: %m");
-      abort ();
-    }
-  }
   nbdkit_debug ("exiting state machine thread");
   return NULL;
 }
@@ -411,6 +353,23 @@ nbdplug_prepare (struct transaction *trans)
     assert (false);
 }

+static int
+nbdplug_notify (void *opaque, int64_t cookie, int *error)
+{
+  struct transaction *trans = opaque;
+
+  nbdkit_debug ("cookie %" PRId64 " completed state machine, status %d",
+                cookie, *error);
+  assert (trans->cookie == 0 || trans->cookie == cookie);
+  trans->cookie = cookie;
+  trans->err = *error;
+  if (sem_post (&trans->sem)) {
+    nbdkit_error ("failed to post semaphore: %m");
+    abort ();
+  }
+  return 0;
+}
+
 /* Register a cookie and kick the I/O thread. */
 static void
 nbdplug_register (struct handle *h, struct transaction *trans, int64_t cookie)
@@ -425,13 +384,10 @@ nbdplug_register (struct handle *h, struct transaction *trans, int64_t cookie)

   nbdkit_debug ("cookie %" PRId64 " started by state machine", cookie);

-  /* While locked, kick the reader thread and add our transaction */
-  ACQUIRE_LOCK_FOR_CURRENT_SCOPE (&h->trans_lock);
   if (write (h->fds[1], &c, 1) != 1 && errno != EAGAIN)
     nbdkit_debug ("failed to kick reader thread: %m");
+  assert (trans->cookie == 0 || trans->cookie == cookie);
   trans->cookie = cookie;
-  trans->next = h->trans;
-  h->trans = trans;
 }

 /* Perform the reply half of a transaction. */
@@ -449,8 +405,13 @@ nbdplug_reply (struct handle *h, struct transaction *trans)
       nbdkit_debug ("failed to wait on semaphore: %m");
       err = EIO;
     }
-    else
+    else {
+      assert (trans->cookie > 0);
+      err = nbd_aio_command_completed (h->nbd, trans->cookie);
+      assert (err != 0);
+      assert (err == 1 ? trans->err == 0 : trans->err == nbd_get_errno ());
       err = trans->err;
+    }
   }
   if (sem_destroy (&trans->sem))
     abort ();
@@ -520,13 +481,8 @@ nbdplug_open_handle (int readonly)
     h->readonly = true;

   /* Spawn a dedicated reader thread */
-  if ((errno = pthread_mutex_init (&h->trans_lock, NULL))) {
-    nbdkit_error ("failed to initialize transaction mutex: %m");
-    goto err;
-  }
   if ((errno = pthread_create (&h->reader, NULL, nbdplug_reader, h))) {
     nbdkit_error ("failed to initialize reader thread: %m");
-    pthread_mutex_destroy (&h->trans_lock);
     goto err;
   }

@@ -562,7 +518,6 @@ nbdplug_close_handle (struct handle *h)
   close (h->fds[0]);
   close (h->fds[1]);
   nbd_close (h->nbd);
-  pthread_mutex_destroy (&h->trans_lock);
   free (h);
 }

@@ -717,7 +672,8 @@ nbdplug_pread (void *handle, void *buf, uint32_t count, uint64_t offset,

   assert (!flags);
   nbdplug_prepare (&s);
-  nbdplug_register (h, &s, nbd_aio_pread (h->nbd, buf, count, offset, 0));
+  nbdplug_register (h, &s, nbd_aio_pread_notify (h->nbd, buf, count, offset,
+                                                 &s, nbdplug_notify, 0));
   return nbdplug_reply (h, &s);
 }

@@ -732,7 +688,8 @@ nbdplug_pwrite (void *handle, const void *buf, uint32_t count, uint64_t offset,

   assert (!(flags & ~NBDKIT_FLAG_FUA));
   nbdplug_prepare (&s);
-  nbdplug_register (h, &s, nbd_aio_pwrite (h->nbd, buf, count, offset, f));
+  nbdplug_register (h, &s, nbd_aio_pwrite_notify (h->nbd, buf, count, offset,
+                                                  &s, nbdplug_notify, f));
   return nbdplug_reply (h, &s);
 }

@@ -751,7 +708,8 @@ nbdplug_zero (void *handle, uint32_t count, uint64_t offset, uint32_t flags)
   if (flags & NBDKIT_FLAG_FUA)
     f |= LIBNBD_CMD_FLAG_FUA;
   nbdplug_prepare (&s);
-  nbdplug_register (h, &s, nbd_aio_zero (h->nbd, count, offset, f));
+  nbdplug_register (h, &s, nbd_aio_zero_notify (h->nbd, count, offset,
+                                                &s, nbdplug_notify, f));
   return nbdplug_reply (h, &s);
 }

@@ -765,7 +723,8 @@ nbdplug_trim (void *handle, uint32_t count, uint64_t offset, uint32_t flags)

   assert (!(flags & ~NBDKIT_FLAG_FUA));
   nbdplug_prepare (&s);
-  nbdplug_register (h, &s, nbd_aio_trim (h->nbd, count, offset, f));
+  nbdplug_register (h, &s, nbd_aio_trim_notify (h->nbd, count, offset,
+                                                &s, nbdplug_notify, f));
   return nbdplug_reply (h, &s);
 }

@@ -778,7 +737,8 @@ nbdplug_flush (void *handle, uint32_t flags)

   assert (!flags);
   nbdplug_prepare (&s);
-  nbdplug_register (h, &s, nbd_aio_flush (h->nbd, 0));
+  nbdplug_register (h, &s, nbd_aio_flush_notify (h->nbd,
+                                                 &s, nbdplug_notify, 0));
   return nbdplug_reply (h, &s);
 }

@@ -816,8 +776,9 @@ nbdplug_extents (void *handle, uint32_t count, uint64_t offset,
   assert (!(flags & ~NBDKIT_FLAG_REQ_ONE));
   nbdplug_prepare (&s);
   s.extents = extents;
-  nbdplug_register (h, &s, nbd_aio_block_status (h->nbd, count, offset,
-                                                 &s, nbdplug_extent, f));
+  nbdplug_register (h, &s, nbd_aio_block_status_notify (h->nbd, count, offset,
+                                                        &s, nbdplug_extent,
+                                                        nbdplug_notify, f));
   return nbdplug_reply (h, &s);
 }

@@ -830,7 +791,8 @@ nbdplug_cache (void *handle, uint32_t count, uint64_t offset, uint32_t flags)

   assert (!flags);
   nbdplug_prepare (&s);
-  nbdplug_register (h, &s, nbd_aio_cache (h->nbd, count, offset, 0));
+  nbdplug_register (h, &s, nbd_aio_cache_notify (h->nbd, count, offset,
+                                                 &s, nbdplug_notify, 0));
   return nbdplug_reply (h, &s);
 }

-- 
2.20.1


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]