[Libguestfs] [libnbd PATCH 1/6] api: Add nbd_aio_in_flight

Eric Blake eblake at redhat.com
Sat Jun 29 13:28:24 UTC 2019


Some clients need to know when it is safe to issue NBD_CMD_DISC, or to
decide whether calling poll(POLLIN) will block indefinitely because
the server isn't expected to respond.  Make this easier to learn by
tracking the count of commands we have queued up to send, as well as
the count of commands where we are waiting on the server's response.

Update tests/aio-parallel* and examples/batched-read-write to use
nbd's own in-flight counter instead of reimplementing it ourselves.

Note that h->in_flight is only ever updated while the lock is held;
but we may want to consider also making it atomic and therefore
readable as a lock-less function.
---
 examples/batched-read-write.c | 17 +++++---------
 generator/generator           | 42 ++++++++++++++++++++++++++---------
 lib/aio.c                     |  9 ++++++++
 lib/internal.h                |  4 +++-
 lib/rw.c                      |  6 +++++
 tests/aio-parallel-load.c     | 29 +++++++++++++-----------
 tests/aio-parallel.c          | 15 +++++--------
 7 files changed, 77 insertions(+), 45 deletions(-)

diff --git a/examples/batched-read-write.c b/examples/batched-read-write.c
index 90dfe86..194ad1c 100644
--- a/examples/batched-read-write.c
+++ b/examples/batched-read-write.c
@@ -48,26 +48,22 @@ try_deadlock (void *arg)
   struct pollfd fds[1];
   size_t i;
   int64_t handles[2], done;
-  size_t in_flight;        /* counts number of requests in flight */
   int dir, r;

   /* Issue commands. */
-  in_flight = 0;
   handles[0] = nbd_aio_pread (nbd, in, packetsize, 0, 0);
   if (handles[0] == -1) {
     fprintf (stderr, "%s\n", nbd_get_error ());
     goto error;
   }
-  in_flight++;
   handles[1] = nbd_aio_pwrite (nbd, out, packetsize, packetsize, 0);
   if (handles[1] == -1) {
     fprintf (stderr, "%s\n", nbd_get_error ());
     goto error;
   }
-  in_flight++;

   /* Now wait for commands to retire, or for deadlock to occur */
-  while (in_flight > 0) {
+  while (nbd_aio_in_flight (nbd) > 0) {
     if (nbd_aio_is_dead (nbd) || nbd_aio_is_closed (nbd)) {
       fprintf (stderr, "connection is dead or closed\n");
       goto error;
@@ -96,23 +92,20 @@ try_deadlock (void *arg)

     /* If a command is ready to retire, retire it. */
     while ((done = nbd_aio_peek_command_completed (nbd)) > 0) {
-      for (i = 0; i < in_flight; ++i) {
+      for (i = 0; i < sizeof handles / sizeof handles[0]; ++i) {
         if (handles[i] == done) {
           r = nbd_aio_command_completed (nbd, handles[i]);
           if (r == -1) {
             fprintf (stderr, "%s\n", nbd_get_error ());
             goto error;
           }
-          assert (r);
-          memmove (&handles[i], &handles[i+1],
-                   sizeof (handles[0]) * (in_flight - i - 1));
-          break;
+          assert (r == 1);
+          handles[i] = 0;
         }
       }
-      assert (i < in_flight);
-      in_flight--;
     }
   }
+  assert (nbd_aio_in_flight (nbd) == 0);

   printf ("finished OK\n");

diff --git a/generator/generator b/generator/generator
index 9192988..5cc3e80 100755
--- a/generator/generator
+++ b/generator/generator
@@ -1749,6 +1749,12 @@ not a normal command because NBD servers are not obliged
 to send a reply.  Instead you should wait for
 C<nbd_aio_is_closed> to become true on the connection.

+Although libnbd does not prevent you from issuing this command while
+still waiting on the replies to previous commands, the NBD protocol
+recommends that you wait until there are no other commands in flight
+(see C<nbd_aio_in_flight>), to give the server a better chance at a
+clean shutdown.
+
 The C<flags> parameter must be C<0> for now (it exists for future NBD
 protocol extensions).  There is no direct synchronous counterpart;
 however, C<nbd_shutdown> will call this function if appropriate.";
@@ -1867,16 +1873,16 @@ you would set C<events = POLLIN>.  If C<revents> returns C<POLLIN>
 or C<POLLHUP> you would then call C<nbd_aio_notify_read>.

 Note that once libnbd reaches C<nbd_aio_is_ready>, this direction is
-returned even before a command is issued via C<nbd_aio_pwrite> and
-friends. In a single-threaded use of libnbd, it is not worth polling
-until after issuing a command, as otherwise the server will never wake
-up the poll. In a multi-threaded scenario, you can have one thread
-begin a polling loop prior to any commands, but any other thread that
-issues a command will need a way to kick the polling thread out of
-poll in case issuing the command changes the needed polling
-direction. Possible ways to do this include polling for activity on a
-pipe-to-self, or using L<pthread_kill(3)> to send a signal that is
-masked except during L<ppoll(2)>.
+returned even when there are no commands in flight (see
+C<nbd_aio_in_flight>). In a single-threaded use of libnbd, it is not
+worth polling until after issuing a command, as otherwise the server
+will never wake up the poll. In a multi-threaded scenario, you can
+have one thread begin a polling loop prior to any commands, but any
+other thread that issues a command will need a way to kick the
+polling thread out of poll in case issuing the command changes the
+needed polling direction. Possible ways to do this include polling
+for activity on a pipe-to-self, or using L<pthread_kill(3)> to send
+a signal that is masked except during L<ppoll(2)>.

 =item C<LIBNBD_AIO_DIRECTION_WRITE> = 2

@@ -2012,6 +2018,22 @@ C<nbd_aio_command_completed> to actually retire the command and learn
 whether the command was successful.";
   };

+  "aio_in_flight", {
+    default_call with
+    args = []; ret = RInt;
+    permitted_states = [ Connected; Closed; Dead ];
+    (* XXX is_locked = false ? *)
+    shortdesc = "check how many aio commands are still in flight";
+    longdesc = "\
+Return the number of in-flight aio commands that are still awaiting a
+response from the server before they can be retired.  If this returns
+a non-zero value when requesting a disconnect from the server (see
+C<nbd_aio_disconnect> and C<nbd_shutdown>), libnbd does not try to
+wait for those commands to complete gracefully; if the server strands
+commands while shutting down, C<nbd_aio_command_completed> will not
+be able to report status on those commands.";
+  };
+
   "connection_state", {
     default_call with
     args = []; ret = RConstString;
diff --git a/lib/aio.c b/lib/aio.c
index c68a059..b29378b 100644
--- a/lib/aio.c
+++ b/lib/aio.c
@@ -23,6 +23,7 @@
 #include <stdbool.h>
 #include <errno.h>
 #include <inttypes.h>
+#include <assert.h>

 #include "internal.h"

@@ -84,6 +85,8 @@ nbd_unlocked_aio_command_completed (struct nbd_handle *h,
     prev_cmd->next = cmd->next;
   else
     h->cmds_done = cmd->next;
+  h->in_flight--;
+  assert (h->in_flight >= 0);

   free (cmd);

@@ -110,3 +113,9 @@ nbd_unlocked_aio_peek_command_completed (struct nbd_handle *h)
   set_error (EINVAL, "no commands are in flight");
   return -1;
 }
+
+int
+nbd_unlocked_aio_in_flight (struct nbd_handle *h)
+{
+  return h->in_flight;
+}
diff --git a/lib/internal.h b/lib/internal.h
index 5aa9f22..15f4b64 100644
--- a/lib/internal.h
+++ b/lib/internal.h
@@ -186,9 +186,11 @@ struct nbd_handle {
    * to be issued.  The second list contains commands which have been
    * issued and waiting for replies.  The third list contains commands
    * which we have received replies, waiting for the main program to
-   * acknowledge them.
+   * acknowledge them.  in_flight tracks the combined length of the
+   * first two lists.
    */
   struct command_in_flight *cmds_to_issue, *cmds_in_flight, *cmds_done;
+  int in_flight;
   /* Current command during a REPLY cycle */
   struct command_in_flight *reply_cmd;

diff --git a/lib/rw.c b/lib/rw.c
index 6b57f11..53cd521 100644
--- a/lib/rw.c
+++ b/lib/rw.c
@@ -25,6 +25,7 @@
 #include <inttypes.h>
 #include <errno.h>
 #include <assert.h>
+#include <limits.h>

 #include "internal.h"

@@ -167,6 +168,10 @@ nbd_internal_command_common (struct nbd_handle *h,
       set_error (EINVAL, "cannot request more commands after NBD_CMD_DISC");
       return -1;
   }
+  if (h->in_flight == INT_MAX) {
+      set_error (ENOMEM, "too many commands already in flight");
+      return -1;
+  }

   switch (type) {
     /* Commands which send or receive data are limited to MAX_REQUEST_SIZE. */
@@ -236,6 +241,7 @@ nbd_internal_command_common (struct nbd_handle *h,
       return -1;
   }

+  h->in_flight++;
   return cmd->handle;
 }

diff --git a/tests/aio-parallel-load.c b/tests/aio-parallel-load.c
index 7922acd..a68c714 100644
--- a/tests/aio-parallel-load.c
+++ b/tests/aio-parallel-load.c
@@ -189,7 +189,6 @@ start_thread (void *arg)
   size_t i;
   uint64_t offset, handle;
   uint64_t handles[MAX_IN_FLIGHT];
-  size_t in_flight;        /* counts number of requests in flight */
   int dir, r, cmd;
   time_t t;
   bool expired = false;
@@ -231,8 +230,8 @@ start_thread (void *arg)
   assert (nbd_read_only (nbd) == 0);

   /* Issue commands. */
-  in_flight = 0;
-  while (!expired || in_flight > 0) {
+  assert (nbd_aio_in_flight (nbd) == 0);
+  while (!expired || nbd_aio_in_flight (nbd) > 0) {
     if (nbd_aio_is_dead (nbd) || nbd_aio_is_closed (nbd)) {
       fprintf (stderr, "thread %zu: connection is dead or closed\n",
                status->i);
@@ -243,12 +242,12 @@ start_thread (void *arg)
     time (&t);
     if (t > status->end_time) {
       expired = true;
-      if (!in_flight)
+      if (nbd_aio_in_flight (nbd) <= 0)
         break;
     }

     /* If we can issue another request, do so. */
-    while (!expired && in_flight < MAX_IN_FLIGHT) {
+    while (!expired && nbd_aio_in_flight (nbd) < MAX_IN_FLIGHT) {
       offset = rand () % (EXPORTSIZE - buf_size);
       cmd = rand () & 1;
       if (cmd == 0) {
@@ -263,10 +262,14 @@ start_thread (void *arg)
         fprintf (stderr, "%s\n", nbd_get_error ());
         goto error;
       }
-      handles[in_flight] = handle;
-      in_flight++;
-      if (in_flight > status->most_in_flight)
-        status->most_in_flight = in_flight;
+      for (i = 0; i < MAX_IN_FLIGHT; i++) {
+        if (handles[i] == 0) {
+          handles[i] = handle;
+          break;
+        }
+      }
+      if (nbd_aio_in_flight (nbd)  > status->most_in_flight)
+        status->most_in_flight = nbd_aio_in_flight (nbd);
     }

     fds[0].fd = nbd_aio_get_fd (nbd);
@@ -291,16 +294,16 @@ start_thread (void *arg)
       nbd_aio_notify_write (nbd);

     /* If a command is ready to retire, retire it. */
-    for (i = 0; i < in_flight; ++i) {
+    for (i = 0; i < MAX_IN_FLIGHT; ++i) {
+      if (handles[i] == 0)
+        continue;
       r = nbd_aio_command_completed (nbd, handles[i]);
       if (r == -1) {
         fprintf (stderr, "%s\n", nbd_get_error ());
         goto error;
       }
       if (r) {
-        memmove (&handles[i], &handles[i+1],
-                 sizeof (handles[0]) * (in_flight - i - 1));
-        in_flight--;
+        handles[i] = 0;
         status->requests++;
       }
     }
diff --git a/tests/aio-parallel.c b/tests/aio-parallel.c
index a9b0fd9..b5d126a 100644
--- a/tests/aio-parallel.c
+++ b/tests/aio-parallel.c
@@ -198,7 +198,6 @@ start_thread (void *arg)
   size_t i;
   int64_t offset, handle;
   char *buf;
-  size_t in_flight;        /* counts number of requests in flight */
   int dir, r, cmd;
   time_t t;
   bool expired = false;
@@ -238,8 +237,8 @@ start_thread (void *arg)
   assert (nbd_read_only (nbd) == 0);

   /* Issue commands. */
-  in_flight = 0;
-  while (!expired || in_flight > 0) {
+  assert (nbd_aio_in_flight (nbd) == 0);
+  while (!expired || nbd_aio_in_flight (nbd) > 0) {
     if (nbd_aio_is_dead (nbd) || nbd_aio_is_closed (nbd)) {
       fprintf (stderr, "thread %zu: connection is dead or closed\n",
                status->i);
@@ -250,12 +249,12 @@ start_thread (void *arg)
     time (&t);
     if (t > status->end_time) {
       expired = true;
-      if (!in_flight)
+      if (nbd_aio_in_flight (nbd) <= 0)
         break;
     }

     /* If we can issue another request, do so. */
-    while (!expired && in_flight < MAX_IN_FLIGHT) {
+    while (!expired && nbd_aio_in_flight (nbd) < MAX_IN_FLIGHT) {
       /* Find a free command slot. */
       for (i = 0; i < MAX_IN_FLIGHT; ++i)
         if (commands[status->i][i].offset == -1)
@@ -282,9 +281,8 @@ start_thread (void *arg)
       commands[status->i][i].offset = offset;
       commands[status->i][i].handle = handle;
       commands[status->i][i].cmd = cmd;
-      in_flight++;
-      if (in_flight > status->most_in_flight)
-        status->most_in_flight = in_flight;
+      if (nbd_aio_in_flight (nbd) > status->most_in_flight)
+        status->most_in_flight = nbd_aio_in_flight (nbd);
     }

     fds[0].fd = nbd_aio_get_fd (nbd);
@@ -329,7 +327,6 @@ start_thread (void *arg)
           }

           commands[status->i][i].offset = -1;
-          in_flight--;
           status->requests++;
         }
       }
-- 
2.20.1




More information about the Libguestfs mailing list