[Libguestfs] Fwd: nbdkit async

Shaun McDowell shaunjmcdowell at gmail.com
Tue Feb 21 23:50:31 UTC 2017


I went ahead and implemented 2 new hooks on the existing nbdkit_plugin
struct for async_pread and async_pwrite to get some testable numbers.

The test is set up with an all in memory block device (data being copied to
and from a std vector as simulated reading and writing).
Every read and write op has a 64ms wait before completing (simulated device
latency).
These tests are using nbd-client local to the nbd-server with a unix domain
socket for communication
The device has an ext4 filesystem on it mkfs.ext4 -b 4096 /dev/nbd0

*Baseline test with unmodified nbdkit*

dd if=/dev/zero of=./mnt/zeros bs=128k count=80 conv=fsync
80+0 records in
80+0 records out
10485760 bytes (10 MB, 10 MiB) copied, 5.40797 s, 1.9 MB/s

echo 1 | sudo tee /proc/sys/vm/drop_caches

time cat mnt/zeros > /dev/null
real    0m5.386s
user    0m0.004s
sys    0m0.000s

Read and Write performance are both around 2MB/s which is exactly a single
128k read or write every 64ms.

*With nbdkit modified to use async_pread and async_pwrite and a buffer pool
of 64 buffers (each 128k for a total of 8MB buffer memory)*

dd if=/dev/zero of=./mnt/zeros bs=128k count=8000 conv=fsync
8000+0 records in
8000+0 records out
1048576000 bytes (1.0 GB, 1000 MiB) copied, 8.7736 s, 120 MB/s

echo 1 | sudo tee /proc/sys/vm/drop_caches

time cat mnt/zeros > /dev/null
real    0m8.153s
user    0m0.000s
sys    0m0.320s

Read and Write performance are now 120MB/s (about 64x faster) because we
can process 64 ops in parallel. Our throughput scaled nearly linearly
without needing 64 threads in the nbdkit. Total memory for the buffers is
8MB.

*With async_pread/async_pwrite and a buffer pool of 1024 buffers (128MB
buffer memory) and 2 io service threads (3 threads total, 1 thread in
nbdkit pulling requests off the socket)*

dd if=/dev/zero of=./mnt/zeros bs=128k count=80000 conv=fsync
80000+0 records in
80000+0 records out
10485760000 bytes (10 GB, 9.8 GiB) copied, 5.86029 s, 1.8 GB/s

echo 1 | sudo tee /proc/sys/vm/drop_caches

time cat mnt/zeros > /dev/null
real    0m12.545s
user    0m0.012s
sys    0m2.444s

Read performance was capped at around 825 MiB/s for 1 file sequential read
but when performing 2 files in parallel the read throughput was 1.6GB/s and
for 4 files in parallel 1.9GB/s.

Below is the patch for changes made to nbdkit to support this.

diff --git a/include/nbdkit-plugin.h b/include/nbdkit-plugin.h
index 95cba8d..2e88cad 100644
--- a/include/nbdkit-plugin.h
+++ b/include/nbdkit-plugin.h
@@ -38,15 +38,13 @@

 #include <stdarg.h>
 #include <stdint.h>
-
-#ifdef __cplusplus
-extern "C" {
-#endif
+#include <stdbool.h>

 #define NBDKIT_THREAD_MODEL_SERIALIZE_CONNECTIONS     0
 #define NBDKIT_THREAD_MODEL_SERIALIZE_ALL_REQUESTS    1
 #define NBDKIT_THREAD_MODEL_SERIALIZE_REQUESTS        2
 #define NBDKIT_THREAD_MODEL_PARALLEL                  3
+#define NBDKIT_THREAD_MODEL_ASYNC                     4

 #define NBDKIT_API_VERSION                            1

@@ -94,28 +92,35 @@ struct nbdkit_plugin {

   int errno_is_preserved;

+  int (*async_pread) (void *conn, uint64_t reqid, bool flush, void
*handle, void *buf, uint32_t count, uint64_t offset);
+  int (*async_pwrite) (void *conn, uint64_t reqid, bool flush, void
*handle, const void *buf, uint32_t count, uint64_t offset);
   /* int (*set_exportname) (void *handle, const char *exportname); */
 };

-extern void nbdkit_set_error (int err);
-extern void nbdkit_error (const char *msg, ...)
-  __attribute__((format (printf, 1, 2)));
-extern void nbdkit_verror (const char *msg, va_list args);
-extern void nbdkit_debug (const char *msg, ...)
-  __attribute__((format (printf, 1, 2)));
-extern void nbdkit_vdebug (const char *msg, va_list args);
-
-extern char *nbdkit_absolute_path (const char *path);
-extern int64_t nbdkit_parse_size (const char *str);
-
 #ifdef __cplusplus
-#define NBDKIT_CXX_LANG_C extern "C"
+#define NBDKIT_CXX_LANG_C "C"
 #else
 #define NBDKIT_CXX_LANG_C /* nothing */
 #endif

+extern NBDKIT_CXX_LANG_C void nbdkit_set_error (int err);
+extern NBDKIT_CXX_LANG_C void nbdkit_error (const char *msg, ...)
+  __attribute__((format (printf, 1, 2)));
+extern NBDKIT_CXX_LANG_C void nbdkit_verror (const char *msg, va_list
args);
+extern NBDKIT_CXX_LANG_C void nbdkit_debug (const char *msg, ...)
+  __attribute__((format (printf, 1, 2)));
+extern NBDKIT_CXX_LANG_C void nbdkit_vdebug (const char *msg, va_list
args);
+
+extern NBDKIT_CXX_LANG_C char *nbdkit_absolute_path (const char *path);
+extern NBDKIT_CXX_LANG_C int64_t nbdkit_parse_size (const char *str);
+
+extern NBDKIT_CXX_LANG_C int nbdkit_async_reply (void *conn, uint64_t
reqid);
+extern NBDKIT_CXX_LANG_C int nbdkit_async_reply_read (void *conn, uint64_t
reqid, uint32_t count, void *buf);
+extern NBDKIT_CXX_LANG_C int nbdkit_async_reply_error (void *conn,
uint64_t reqid);
+
+
 #define NBDKIT_REGISTER_PLUGIN(plugin)                                  \
-  NBDKIT_CXX_LANG_C                                                     \
+  extern NBDKIT_CXX_LANG_C                                              \
   struct nbdkit_plugin *                                                \
   plugin_init (void)                                                    \
   {                                                                     \
@@ -125,8 +130,4 @@ extern int64_t nbdkit_parse_size (const char *str);
     return &(plugin);                                                   \
   }

-#ifdef __cplusplus
-}
-#endif
-
 #endif /* NBDKIT_PLUGIN_H */
diff --git a/src/connections.c b/src/connections.c
index a0d689a..e03a0f6 100644
--- a/src/connections.c
+++ b/src/connections.c
@@ -62,7 +62,8 @@
 static struct connection *new_connection (int sockin, int sockout);
 static void free_connection (struct connection *conn);
 static int negotiate_handshake (struct connection *conn);
-static int recv_request_send_reply (struct connection *conn);
+static int recv_request (struct connection *conn);
+static int send_reply (struct connection *conn, uint64_t handle, uint32_t
count, void *buf, uint32_t error);

 static int
 _handle_single_connection (int sockin, int sockout)
@@ -86,7 +87,7 @@ _handle_single_connection (int sockin, int sockout)
    * a thread pool.
    */
   while (!quit) {
-    r = recv_request_send_reply (conn);
+    r = recv_request (conn);
     if (r == -1)
       goto err;
     if (r == 0)
@@ -127,6 +128,7 @@ new_connection (int sockin, int sockout)
   conn->sockin = sockin;
   conn->sockout = sockout;
   pthread_mutex_init (&conn->request_lock, NULL);
+  pthread_mutex_init (&conn->reply_lock, NULL);

   return conn;
 }
@@ -143,6 +145,7 @@ free_connection (struct connection *conn)
     close (conn->sockout);

   pthread_mutex_destroy (&conn->request_lock);
+  pthread_mutex_destroy (&conn->reply_lock);

   if (conn->handle)
     plugin_close (conn);
@@ -626,13 +629,13 @@ get_error (struct connection *conn)
  * On read/write errors, sets *error appropriately and returns 0.
  */
 static int
-_handle_request (struct connection *conn,
+_handle_request (struct connection *conn, uint64_t handle,
                  uint32_t cmd, uint32_t flags, uint64_t offset, uint32_t
count,
-                 void *buf,
-                 uint32_t *error)
+                 void *buf)
 {
-  bool flush_after_command;
   int r;
+  uint32_t error = 0;
+  bool flush_after_command;

   /* Flush after command performed? */
   flush_after_command = (flags & NBD_CMD_FLAG_FUA) != 0;
@@ -645,42 +648,54 @@ _handle_request (struct connection *conn,

   switch (cmd) {
   case NBD_CMD_READ:
-    r = plugin_pread (conn, buf, count, offset);
+    if (plugin_can_async_read (conn)) {
+      r = plugin_async_pread (conn, handle, flush_after_command, buf,
count, offset);
+      if (r == 0)
+        return 0; // plugin now has responsibility of sending response
+    }
+    else
+      r = plugin_pread (conn, buf, count, offset);
     if (r == -1) {
-      *error = get_error (conn);
-      return 0;
+      error = get_error (conn);
+      return send_reply (conn, handle, 0, NULL, error);
     }
     break;

   case NBD_CMD_WRITE:
-    r = plugin_pwrite (conn, buf, count, offset);
+    if (plugin_can_async_write (conn)) {
+      r = plugin_async_pwrite (conn, handle, flush_after_command, buf,
count, offset);
+      if (r == 0)
+        return 0; // plugin now has responsibility of sending response
+    }
+    else
+      r = plugin_pwrite (conn, buf, count, offset);
     if (r == -1) {
-      *error = get_error (conn);
-      return 0;
+      error = get_error (conn);
+      return send_reply (conn, handle, 0, NULL, error);
     }
     break;

   case NBD_CMD_FLUSH:
     r = plugin_flush (conn);
     if (r == -1) {
-      *error = get_error (conn);
-      return 0;
+      error = get_error (conn);
+      return send_reply (conn, handle, 0, NULL, error);
     }
     break;

   case NBD_CMD_TRIM:
     r = plugin_trim (conn, count, offset);
     if (r == -1) {
-      *error = get_error (conn);
-      return 0;
+      error = get_error (conn);
+      return send_reply (conn, handle, 0, NULL, error);
     }
     break;

   case NBD_CMD_WRITE_ZEROES:
     r = plugin_zero (conn, count, offset, !(flags & NBD_CMD_FLAG_NO_HOLE));
     if (r == -1) {
-      *error = get_error (conn);
-      return 0;
+      error = get_error (conn);
+      return send_reply (conn, handle, 0, NULL, error);
     }
     break;

@@ -691,24 +706,28 @@ _handle_request (struct connection *conn,
   if (flush_after_command) {
     r = plugin_flush (conn);
     if (r == -1) {
-      *error = get_error (conn);
-      return 0;
+      error = get_error (conn);
+      return send_reply (conn, handle, 0, NULL, error);
     }
   }

-  return 0;
+  if (cmd == NBD_CMD_READ)
+    r = send_reply (conn, handle, count, buf, error);
+  else
+   r = send_reply (conn, handle, 0, NULL, error);
+
+  return r;
 }

 static int
-handle_request (struct connection *conn,
+handle_request (struct connection *conn, uint64_t handle,
                 uint32_t cmd, uint32_t flags, uint64_t offset, uint32_t
count,
-                void *buf,
-                uint32_t *error)
+                void *buf)
 {
   int r;

   plugin_lock_request (conn);
-  r = _handle_request (conn, cmd, flags, offset, count, buf, error);
+  r = _handle_request (conn, handle, cmd, flags, offset, count, buf);
   plugin_unlock_request (conn);

   return r;
@@ -763,11 +782,10 @@ nbd_errno (int error)
 }

 static int
-recv_request_send_reply (struct connection *conn)
+recv_request (struct connection *conn)
 {
   int r;
   struct request request;
-  struct reply reply;
   uint32_t magic, cmd, flags, count, error = 0;
   uint64_t offset;
   CLEANUP_FREE char *buf = NULL;
@@ -808,7 +826,10 @@ recv_request_send_reply (struct connection *conn)
   if (r == 0) {                 /* request not valid */
     if (cmd == NBD_CMD_WRITE)
       skip_over_write_buffer (conn->sockin, count);
-    goto send_reply;
+    r = send_reply (conn, request.handle, 0, NULL, error);
+    if (r == -1)
+      return -1;
+    return 1;
   }

   /* Allocate the data buffer used for either read or write requests. */
@@ -819,7 +840,9 @@ recv_request_send_reply (struct connection *conn)
       error = ENOMEM;
       if (cmd == NBD_CMD_WRITE)
         skip_over_write_buffer (conn->sockin, count);
-      goto send_reply;
+      r = send_reply (conn, request.handle, 0, NULL, error);
+      if (r == -1)
+        return -1;
     }
   }

@@ -837,14 +860,20 @@ recv_request_send_reply (struct connection *conn)
   }

   /* Perform the request.  Only this part happens inside the request lock.
*/
-  r = handle_request (conn, cmd, flags, offset, count, buf, &error);
+  r = handle_request (conn, request.handle, cmd, flags, offset, count,
buf);
   if (r == -1)
     return -1;

-  /* Send the reply packet. */
- send_reply:
+  return 1;                     /* command processed ok */
+}
+
+static int
+_send_reply (struct connection *conn, uint64_t handle, uint32_t count,
void *buf, uint32_t error)
+{
+  int r;
+  struct reply reply;
   reply.magic = htobe32 (NBD_REPLY_MAGIC);
-  reply.handle = request.handle;
+  reply.handle = handle;
   reply.error = htobe32 (nbd_errno (error));

   if (error != 0) {
@@ -862,8 +891,7 @@ recv_request_send_reply (struct connection *conn)
     return -1;
   }

-  /* Send the read data buffer. */
-  if (cmd == NBD_CMD_READ) {
+  if (error == 0 && buf != NULL) { /* Send the read data buffer. */
     r = xwrite (conn->sockout, buf, count);
     if (r == -1) {
       nbdkit_error ("write data: %m");
@@ -871,5 +899,37 @@ recv_request_send_reply (struct connection *conn)
     }
   }

-  return 1;                     /* command processed ok */
+  return 0;
+}
+
+static int
+send_reply (struct connection *conn, uint64_t handle, uint32_t count, void
*buf, uint32_t error)
+{
+  int r;
+
+  plugin_lock_reply (conn);
+  r = _send_reply (conn, handle, count, buf, error);
+  plugin_unlock_reply (conn);
+
+  return r;
+}
+
+int
+nbdkit_async_reply (void *conn, uint64_t reqid)
+{
+  return send_reply (conn, reqid, 0, NULL, 0);
+}
+
+int
+nbdkit_async_reply_read (void *conn, uint64_t reqid, uint32_t count, void
*buf)
+{
+
+  return send_reply (conn, reqid, count, buf, 0);
+}
+
+int
+nbdkit_async_reply_error (void *conn, uint64_t reqid)
+{
+  uint32_t error = get_error (conn);
+  return send_reply (conn, reqid, 0, NULL, error);
 }
diff --git a/src/internal.h b/src/internal.h
index e73edf1..93d32e9 100644
--- a/src/internal.h
+++ b/src/internal.h
@@ -114,6 +114,7 @@ extern void cleanup_free (void *ptr);
 struct connection {
   int sockin, sockout;
   pthread_mutex_t request_lock;
+  pthread_mutex_t reply_lock;
   void *handle;
   uint64_t exportsize;
   int readonly;
@@ -140,6 +141,8 @@ extern void plugin_lock_connection (void);
 extern void plugin_unlock_connection (void);
 extern void plugin_lock_request (struct connection *conn);
 extern void plugin_unlock_request (struct connection *conn);
+extern void plugin_lock_reply (struct connection *conn);
+extern void plugin_unlock_reply (struct connection *conn);
 extern int plugin_errno_is_preserved (void);
 extern int plugin_open (struct connection *conn, int readonly);
 extern void plugin_close (struct connection *conn);
@@ -148,8 +151,12 @@ extern int plugin_can_write (struct connection *conn);
 extern int plugin_can_flush (struct connection *conn);
 extern int plugin_is_rotational (struct connection *conn);
 extern int plugin_can_trim (struct connection *conn);
+extern int plugin_can_async_read (struct connection *conn);
 extern int plugin_pread (struct connection *conn, void *buf, uint32_t
count, uint64_t offset);
+extern int plugin_async_pread (struct connection *conn, uint64_t handle,
bool flush, void *buf, uint32_t count, uint64_t offset);
+extern int plugin_can_async_write (struct connection *conn);
 extern int plugin_pwrite (struct connection *conn, void *buf, uint32_t
count, uint64_t offset);
+extern int plugin_async_pwrite (struct connection *conn, uint64_t handle,
bool flush, void *buf, uint32_t count, uint64_t offset);
 extern int plugin_flush (struct connection *conn);
 extern int plugin_trim (struct connection *conn, uint32_t count, uint64_t
offset);
 extern int plugin_zero (struct connection *conn, uint32_t count, uint64_t
offset, int may_trim);
diff --git a/src/plugins.c b/src/plugins.c
index eeed8a9..9da30db 100644
--- a/src/plugins.c
+++ b/src/plugins.c
@@ -121,8 +121,9 @@ plugin_register (const char *_filename,
              program_name, filename);
     exit (EXIT_FAILURE);
   }
-  if (plugin.pread == NULL) {
-    fprintf (stderr, "%s: %s: plugin must have a .pread callback\n",
+  if (plugin.pread == NULL &&
+     (plugin._thread_model != NBDKIT_THREAD_MODEL_ASYNC ||
plugin.async_pread == NULL)) {
+    fprintf (stderr, "%s: %s: plugin must have either a .pread or
.async_pread callback\n",
              program_name, filename);
     exit (EXIT_FAILURE);
   }
@@ -231,6 +232,9 @@ plugin_dump_fields (void)
   case NBDKIT_THREAD_MODEL_PARALLEL:
     printf ("parallel");
     break;
+  case NBDKIT_THREAD_MODEL_ASYNC:
+    printf ("async");
+    break;
   default:
     printf ("%d # unknown thread model!", plugin._thread_model);
     break;
@@ -258,6 +262,8 @@ plugin_dump_fields (void)
   HAS (flush);
   HAS (trim);
   HAS (zero);
+  HAS (async_pread);
+  HAS (async_pwrite);
 #undef HAS
 }

@@ -350,6 +356,28 @@ plugin_unlock_request (struct connection *conn)
   }
 }

+void
+plugin_lock_reply (struct connection *conn)
+{
+  assert (dl);
+
+  if (plugin._thread_model >= NBDKIT_THREAD_MODEL_PARALLEL) {
+    debug ("acquire per-connection reply lock");
+    pthread_mutex_lock (&conn->reply_lock);
+  }
+}
+
+void
+plugin_unlock_reply (struct connection *conn)
+{
+  assert (dl);
+
+  if (plugin._thread_model >= NBDKIT_THREAD_MODEL_PARALLEL) {
+    debug ("release per-connection reply lock");
+    pthread_mutex_unlock (&conn->reply_lock);
+  }
+}
+
 int
 plugin_errno_is_preserved (void)
 {
@@ -414,7 +442,8 @@ plugin_can_write (struct connection *conn)
   if (plugin.can_write)
     return plugin.can_write (conn->handle);
   else
-    return plugin.pwrite != NULL;
+    return plugin.pwrite != NULL ||
+           (plugin._thread_model == NBDKIT_THREAD_MODEL_ASYNC &&
plugin.async_pwrite != NULL);
 }

 int
@@ -460,6 +489,16 @@ plugin_can_trim (struct connection *conn)
 }

 int
+plugin_can_async_read (struct connection *conn)
+{
+  assert (dl);
+  assert (conn->handle);
+
+  return ((plugin._thread_model == NBDKIT_THREAD_MODEL_ASYNC) &&
+          (plugin.async_pread != NULL));
+}
+
+int
 plugin_pread (struct connection *conn,
               void *buf, uint32_t count, uint64_t offset)
 {
@@ -473,6 +512,27 @@ plugin_pread (struct connection *conn,
 }

 int
+plugin_async_pread (struct connection *conn, uint64_t handle, bool flush,
+                    void *buf, uint32_t count, uint64_t offset)
+{
+  assert (dl);
+  assert (conn->handle);
+
+  debug ("async_pread count=%" PRIu32 " offset=%" PRIu64, count, offset);
+
+  return plugin.async_pread (conn, handle, flush, conn->handle, buf,
count, offset);
+}
+
+int
+plugin_can_async_write (struct connection *conn)
+{
+  assert (dl);
+  assert (conn->handle);
+
+  return (plugin._thread_model == NBDKIT_THREAD_MODEL_ASYNC) &&
(plugin.async_pwrite != NULL);
+}
+
+int
 plugin_pwrite (struct connection *conn,
                void *buf, uint32_t count, uint64_t offset)
 {
@@ -490,23 +550,44 @@ plugin_pwrite (struct connection *conn,
 }

 int
+plugin_async_pwrite (struct connection *conn, uint64_t handle, bool flush,
+                     void *buf, uint32_t count, uint64_t offset)
+{
+  assert (dl);
+  assert (conn->handle);
+
+  debug ("async_pwrite count=%" PRIu32 " offset=%" PRIu64, count, offset);
+
+  if (plugin.async_pwrite != NULL)
+    return plugin.async_pwrite (conn, handle, flush, conn->handle, buf,
count, offset);
+  else {
+    errno = EROFS;
+    return -1;
+  }
+}
+
+int
 plugin_flush (struct connection *conn)
 {
+  int r;
   assert (dl);
   assert (conn->handle);

   debug ("flush");

   if (plugin.flush != NULL)
-    return plugin.flush (conn->handle);
+    r = plugin.flush (conn->handle);
   else {
     errno = EINVAL;
-    return -1;
+    r = -1;
   }
+
+  return r;
 }

 int
-plugin_trim (struct connection *conn, uint32_t count, uint64_t offset)
+plugin_trim (struct connection *conn,
+             uint32_t count, uint64_t offset)
 {
   assert (dl);
   assert (conn->handle);
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://listman.redhat.com/archives/libguestfs/attachments/20170221/156ded8e/attachment.htm>


More information about the Libguestfs mailing list