[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]

Re: [Libguestfs] Fwd: nbdkit async



I went ahead and implemented 2 new hooks on the existing nbdkit_plugin struct for async_pread and async_pwrite to get some testable numbers.

The test is set up with an all in memory block device (data being copied to and from a std vector as simulated reading and writing).
Every read and write op has a 64ms wait before completing (simulated device latency).
These tests are using nbd-client local to the nbd-server with a unix domain socket for communication
The device has an ext4 filesystem on it mkfs.ext4 -b 4096 /dev/nbd0

Baseline test with unmodified nbdkit

dd if=/dev/zero of=./mnt/zeros bs=128k count=80 conv=fsync
80+0 records in
80+0 records out
10485760 bytes (10 MB, 10 MiB) copied, 5.40797 s, 1.9 MB/s

echo 1 | sudo tee /proc/sys/vm/drop_caches

time cat mnt/zeros > /dev/null
real    0m5.386s
user    0m0.004s
sys    0m0.000s

Read and Write performance are both around 2MB/s which is exactly a single 128k read or write every 64ms.

With nbdkit modified to use async_pread and async_pwrite and a buffer pool of 64 buffers (each 128k for a total of 8MB buffer memory)

dd if=/dev/zero of=./mnt/zeros bs=128k count=8000 conv=fsync
8000+0 records in
8000+0 records out
1048576000 bytes (1.0 GB, 1000 MiB) copied, 8.7736 s, 120 MB/s

echo 1 | sudo tee /proc/sys/vm/drop_caches

time cat mnt/zeros > /dev/null
real    0m8.153s
user    0m0.000s
sys    0m0.320s

Read and Write performance are now 120MB/s (about 64x faster) because we can process 64 ops in parallel. Our throughput scaled nearly linearly without needing 64 threads in the nbdkit. Total memory for the buffers is 8MB.

With async_pread/async_pwrite and a buffer pool of 1024 buffers (128MB buffer memory) and 2 io service threads (3 threads total, 1 thread in nbdkit pulling requests off the socket)

dd if=/dev/zero of=./mnt/zeros bs=128k count=80000 conv=fsync
80000+0 records in
80000+0 records out
10485760000 bytes (10 GB, 9.8 GiB) copied, 5.86029 s, 1.8 GB/s

echo 1 | sudo tee /proc/sys/vm/drop_caches

time cat mnt/zeros > /dev/null
real    0m12.545s
user    0m0.012s
sys    0m2.444s

Read performance was capped at around 825 MiB/s for 1 file sequential read but when performing 2 files in parallel the read throughput was 1.6GB/s and for 4 files in parallel 1.9GB/s.

Below is the patch for changes made to nbdkit to support this.

diff --git a/include/nbdkit-plugin.h b/include/nbdkit-plugin.h
index 95cba8d..2e88cad 100644
--- a/include/nbdkit-plugin.h
+++ b/include/nbdkit-plugin.h
@@ -38,15 +38,13 @@
 
 #include <stdarg.h>
 #include <stdint.h>
-
-#ifdef __cplusplus
-extern "C" {
-#endif
+#include <stdbool.h>
 
 #define NBDKIT_THREAD_MODEL_SERIALIZE_CONNECTIONS     0
 #define NBDKIT_THREAD_MODEL_SERIALIZE_ALL_REQUESTS    1
 #define NBDKIT_THREAD_MODEL_SERIALIZE_REQUESTS        2
 #define NBDKIT_THREAD_MODEL_PARALLEL                  3
+#define NBDKIT_THREAD_MODEL_ASYNC                     4
 
 #define NBDKIT_API_VERSION                            1
 
@@ -94,28 +92,35 @@ struct nbdkit_plugin {
 
   int errno_is_preserved;
 
+  int (*async_pread) (void *conn, uint64_t reqid, bool flush, void *handle, void *buf, uint32_t count, uint64_t offset);
+  int (*async_pwrite) (void *conn, uint64_t reqid, bool flush, void *handle, const void *buf, uint32_t count, uint64_t offset);
   /* int (*set_exportname) (void *handle, const char *exportname); */
 };
 
-extern void nbdkit_set_error (int err);
-extern void nbdkit_error (const char *msg, ...)
-  __attribute__((format (printf, 1, 2)));
-extern void nbdkit_verror (const char *msg, va_list args);
-extern void nbdkit_debug (const char *msg, ...)
-  __attribute__((format (printf, 1, 2)));
-extern void nbdkit_vdebug (const char *msg, va_list args);
-
-extern char *nbdkit_absolute_path (const char *path);
-extern int64_t nbdkit_parse_size (const char *str);
-
 #ifdef __cplusplus
-#define NBDKIT_CXX_LANG_C extern "C"
+#define NBDKIT_CXX_LANG_C "C"
 #else
 #define NBDKIT_CXX_LANG_C /* nothing */
 #endif
 
+extern NBDKIT_CXX_LANG_C void nbdkit_set_error (int err);
+extern NBDKIT_CXX_LANG_C void nbdkit_error (const char *msg, ...)
+  __attribute__((format (printf, 1, 2)));
+extern NBDKIT_CXX_LANG_C void nbdkit_verror (const char *msg, va_list args);
+extern NBDKIT_CXX_LANG_C void nbdkit_debug (const char *msg, ...)
+  __attribute__((format (printf, 1, 2)));
+extern NBDKIT_CXX_LANG_C void nbdkit_vdebug (const char *msg, va_list args);
+
+extern NBDKIT_CXX_LANG_C char *nbdkit_absolute_path (const char *path);
+extern NBDKIT_CXX_LANG_C int64_t nbdkit_parse_size (const char *str);
+
+extern NBDKIT_CXX_LANG_C int nbdkit_async_reply (void *conn, uint64_t reqid);
+extern NBDKIT_CXX_LANG_C int nbdkit_async_reply_read (void *conn, uint64_t reqid, uint32_t count, void *buf);
+extern NBDKIT_CXX_LANG_C int nbdkit_async_reply_error (void *conn, uint64_t reqid);
+
+
 #define NBDKIT_REGISTER_PLUGIN(plugin)                                  \
-  NBDKIT_CXX_LANG_C                                                     \
+  extern NBDKIT_CXX_LANG_C                                              \
   struct nbdkit_plugin *                                                \
   plugin_init (void)                                                    \
   {                                                                     \
@@ -125,8 +130,4 @@ extern int64_t nbdkit_parse_size (const char *str);
     return &(plugin);                                                   \
   }
 
-#ifdef __cplusplus
-}
-#endif
-
 #endif /* NBDKIT_PLUGIN_H */
diff --git a/src/connections.c b/src/connections.c
index a0d689a..e03a0f6 100644
--- a/src/connections.c
+++ b/src/connections.c
@@ -62,7 +62,8 @@
 static struct connection *new_connection (int sockin, int sockout);
 static void free_connection (struct connection *conn);
 static int negotiate_handshake (struct connection *conn);
-static int recv_request_send_reply (struct connection *conn);
+static int recv_request (struct connection *conn);
+static int send_reply (struct connection *conn, uint64_t handle, uint32_t count, void *buf, uint32_t error);
 
 static int
 _handle_single_connection (int sockin, int sockout)
@@ -86,7 +87,7 @@ _handle_single_connection (int sockin, int sockout)
    * a thread pool.
    */
   while (!quit) {
-    r = recv_request_send_reply (conn);
+    r = recv_request (conn);
     if (r == -1)
       goto err;
     if (r == 0)
@@ -127,6 +128,7 @@ new_connection (int sockin, int sockout)
   conn->sockin = sockin;
   conn->sockout = sockout;
   pthread_mutex_init (&conn->request_lock, NULL);
+  pthread_mutex_init (&conn->reply_lock, NULL);
 
   return conn;
 }
@@ -143,6 +145,7 @@ free_connection (struct connection *conn)
     close (conn->sockout);
 
   pthread_mutex_destroy (&conn->request_lock);
+  pthread_mutex_destroy (&conn->reply_lock);
 
   if (conn->handle)
     plugin_close (conn);
@@ -626,13 +629,13 @@ get_error (struct connection *conn)
  * On read/write errors, sets *error appropriately and returns 0.
  */
 static int
-_handle_request (struct connection *conn,
+_handle_request (struct connection *conn, uint64_t handle,
                  uint32_t cmd, uint32_t flags, uint64_t offset, uint32_t count,
-                 void *buf,
-                 uint32_t *error)
+                 void *buf)
 {
-  bool flush_after_command;
   int r;
+  uint32_t error = 0;
+  bool flush_after_command;
 
   /* Flush after command performed? */
   flush_after_command = (flags & NBD_CMD_FLAG_FUA) != 0;
@@ -645,42 +648,54 @@ _handle_request (struct connection *conn,
 
   switch (cmd) {
   case NBD_CMD_READ:
-    r = plugin_pread (conn, buf, count, offset);
+    if (plugin_can_async_read (conn)) {
+      r = plugin_async_pread (conn, handle, flush_after_command, buf, count, offset);
+      if (r == 0)
+        return 0; // plugin now has responsibility of sending response
+    }
+    else
+      r = plugin_pread (conn, buf, count, offset);
     if (r == -1) {
-      *error = get_error (conn);
-      return 0;
+      error = get_error (conn);
+      return send_reply (conn, handle, 0, NULL, error);
     }
     break;
 
   case NBD_CMD_WRITE:
-    r = plugin_pwrite (conn, buf, count, offset);
+    if (plugin_can_async_write (conn)) {
+      r = plugin_async_pwrite (conn, handle, flush_after_command, buf, count, offset);
+      if (r == 0)
+        return 0; // plugin now has responsibility of sending response
+    }
+    else
+      r = plugin_pwrite (conn, buf, count, offset);
     if (r == -1) {
-      *error = get_error (conn);
-      return 0;
+      error = get_error (conn);
+      return send_reply (conn, handle, 0, NULL, error);
     }
     break;
 
   case NBD_CMD_FLUSH:
     r = plugin_flush (conn);
     if (r == -1) {
-      *error = get_error (conn);
-      return 0;
+      error = get_error (conn);
+      return send_reply (conn, handle, 0, NULL, error);
     }
     break;
 
   case NBD_CMD_TRIM:
     r = plugin_trim (conn, count, offset);
     if (r == -1) {
-      *error = get_error (conn);
-      return 0;
+      error = get_error (conn);
+      return send_reply (conn, handle, 0, NULL, error);
     }
     break;
 
   case NBD_CMD_WRITE_ZEROES:
     r = plugin_zero (conn, count, offset, !(flags & NBD_CMD_FLAG_NO_HOLE));
     if (r == -1) {
-      *error = get_error (conn);
-      return 0;
+      error = get_error (conn);
+      return send_reply (conn, handle, 0, NULL, error);
     }
     break;
 
@@ -691,24 +706,28 @@ _handle_request (struct connection *conn,
   if (flush_after_command) {
     r = plugin_flush (conn);
     if (r == -1) {
-      *error = get_error (conn);
-      return 0;
+      error = get_error (conn);
+      return send_reply (conn, handle, 0, NULL, error);
     }
   }
 
-  return 0;
+  if (cmd == NBD_CMD_READ)
+    r = send_reply (conn, handle, count, buf, error);
+  else
+   r = send_reply (conn, handle, 0, NULL, error);
+
+  return r;
 }
 
 static int
-handle_request (struct connection *conn,
+handle_request (struct connection *conn, uint64_t handle,
                 uint32_t cmd, uint32_t flags, uint64_t offset, uint32_t count,
-                void *buf,
-                uint32_t *error)
+                void *buf)
 {
   int r;
 
   plugin_lock_request (conn);
-  r = _handle_request (conn, cmd, flags, offset, count, buf, error);
+  r = _handle_request (conn, handle, cmd, flags, offset, count, buf);
   plugin_unlock_request (conn);
 
   return r;
@@ -763,11 +782,10 @@ nbd_errno (int error)
 }
 
 static int
-recv_request_send_reply (struct connection *conn)
+recv_request (struct connection *conn)
 {
   int r;
   struct request request;
-  struct reply reply;
   uint32_t magic, cmd, flags, count, error = 0;
   uint64_t offset;
   CLEANUP_FREE char *buf = NULL;
@@ -808,7 +826,10 @@ recv_request_send_reply (struct connection *conn)
   if (r == 0) {                 /* request not valid */
     if (cmd == NBD_CMD_WRITE)
       skip_over_write_buffer (conn->sockin, count);
-    goto send_reply;
+    r = send_reply (conn, request.handle, 0, NULL, error);
+    if (r == -1)
+      return -1;
+    return 1;
   }
 
   /* Allocate the data buffer used for either read or write requests. */
@@ -819,7 +840,9 @@ recv_request_send_reply (struct connection *conn)
       error = ENOMEM;
       if (cmd == NBD_CMD_WRITE)
         skip_over_write_buffer (conn->sockin, count);
-      goto send_reply;
+      r = send_reply (conn, request.handle, 0, NULL, error);
+      if (r == -1)
+        return -1;
     }
   }
 
@@ -837,14 +860,20 @@ recv_request_send_reply (struct connection *conn)
   }
 
   /* Perform the request.  Only this part happens inside the request lock. */
-  r = handle_request (conn, cmd, flags, offset, count, buf, &error);
+  r = handle_request (conn, request.handle, cmd, flags, offset, count, buf);
   if (r == -1)
     return -1;
 
-  /* Send the reply packet. */
- send_reply:
+  return 1;                     /* command processed ok */
+}
+
+static int
+_send_reply (struct connection *conn, uint64_t handle, uint32_t count, void *buf, uint32_t error)
+{
+  int r;
+  struct reply reply;
   reply.magic = htobe32 (NBD_REPLY_MAGIC);
-  reply.handle = request.handle;
+  reply.handle = handle;
   reply.error = htobe32 (nbd_errno (error));
 
   if (error != 0) {
@@ -862,8 +891,7 @@ recv_request_send_reply (struct connection *conn)
     return -1;
   }
 
-  /* Send the read data buffer. */
-  if (cmd == NBD_CMD_READ) {
+  if (error == 0 && buf != NULL) { /* Send the read data buffer. */
     r = xwrite (conn->sockout, buf, count);
     if (r == -1) {
       nbdkit_error ("write data: %m");
@@ -871,5 +899,37 @@ recv_request_send_reply (struct connection *conn)
     }
   }
 
-  return 1;                     /* command processed ok */
+  return 0;
+}
+
+static int
+send_reply (struct connection *conn, uint64_t handle, uint32_t count, void *buf, uint32_t error)
+{
+  int r;
+
+  plugin_lock_reply (conn);
+  r = _send_reply (conn, handle, count, buf, error);
+  plugin_unlock_reply (conn);
+
+  return r;
+}
+
+int
+nbdkit_async_reply (void *conn, uint64_t reqid)
+{
+  return send_reply (conn, reqid, 0, NULL, 0);
+}
+
+int
+nbdkit_async_reply_read (void *conn, uint64_t reqid, uint32_t count, void *buf)
+{
+
+  return send_reply (conn, reqid, count, buf, 0);
+}
+
+int
+nbdkit_async_reply_error (void *conn, uint64_t reqid)
+{
+  uint32_t error = get_error (conn);
+  return send_reply (conn, reqid, 0, NULL, error);
 }
diff --git a/src/internal.h b/src/internal.h
index e73edf1..93d32e9 100644
--- a/src/internal.h
+++ b/src/internal.h
@@ -114,6 +114,7 @@ extern void cleanup_free (void *ptr);
 struct connection {
   int sockin, sockout;
   pthread_mutex_t request_lock;
+  pthread_mutex_t reply_lock;
   void *handle;
   uint64_t exportsize;
   int readonly;
@@ -140,6 +141,8 @@ extern void plugin_lock_connection (void);
 extern void plugin_unlock_connection (void);
 extern void plugin_lock_request (struct connection *conn);
 extern void plugin_unlock_request (struct connection *conn);
+extern void plugin_lock_reply (struct connection *conn);
+extern void plugin_unlock_reply (struct connection *conn);
 extern int plugin_errno_is_preserved (void);
 extern int plugin_open (struct connection *conn, int readonly);
 extern void plugin_close (struct connection *conn);
@@ -148,8 +151,12 @@ extern int plugin_can_write (struct connection *conn);
 extern int plugin_can_flush (struct connection *conn);
 extern int plugin_is_rotational (struct connection *conn);
 extern int plugin_can_trim (struct connection *conn);
+extern int plugin_can_async_read (struct connection *conn);
 extern int plugin_pread (struct connection *conn, void *buf, uint32_t count, uint64_t offset);
+extern int plugin_async_pread (struct connection *conn, uint64_t handle, bool flush, void *buf, uint32_t count, uint64_t offset);
+extern int plugin_can_async_write (struct connection *conn);
 extern int plugin_pwrite (struct connection *conn, void *buf, uint32_t count, uint64_t offset);
+extern int plugin_async_pwrite (struct connection *conn, uint64_t handle, bool flush, void *buf, uint32_t count, uint64_t offset);
 extern int plugin_flush (struct connection *conn);
 extern int plugin_trim (struct connection *conn, uint32_t count, uint64_t offset);
 extern int plugin_zero (struct connection *conn, uint32_t count, uint64_t offset, int may_trim);
diff --git a/src/plugins.c b/src/plugins.c
index eeed8a9..9da30db 100644
--- a/src/plugins.c
+++ b/src/plugins.c
@@ -121,8 +121,9 @@ plugin_register (const char *_filename,
              program_name, filename);
     exit (EXIT_FAILURE);
   }
-  if (plugin.pread == NULL) {
-    fprintf (stderr, "%s: %s: plugin must have a .pread callback\n",
+  if (plugin.pread == NULL &&
+     (plugin._thread_model != NBDKIT_THREAD_MODEL_ASYNC || plugin.async_pread == NULL)) {
+    fprintf (stderr, "%s: %s: plugin must have either a .pread or .async_pread callback\n",
              program_name, filename);
     exit (EXIT_FAILURE);
   }
@@ -231,6 +232,9 @@ plugin_dump_fields (void)
   case NBDKIT_THREAD_MODEL_PARALLEL:
     printf ("parallel");
     break;
+  case NBDKIT_THREAD_MODEL_ASYNC:
+    printf ("async");
+    break;
   default:
     printf ("%d # unknown thread model!", plugin._thread_model);
     break;
@@ -258,6 +262,8 @@ plugin_dump_fields (void)
   HAS (flush);
   HAS (trim);
   HAS (zero);
+  HAS (async_pread);
+  HAS (async_pwrite);
 #undef HAS
 }
 
@@ -350,6 +356,28 @@ plugin_unlock_request (struct connection *conn)
   }
 }
 
+void
+plugin_lock_reply (struct connection *conn)
+{
+  assert (dl);
+
+  if (plugin._thread_model >= NBDKIT_THREAD_MODEL_PARALLEL) {
+    debug ("acquire per-connection reply lock");
+    pthread_mutex_lock (&conn->reply_lock);
+  }
+}
+
+void
+plugin_unlock_reply (struct connection *conn)
+{
+  assert (dl);
+
+  if (plugin._thread_model >= NBDKIT_THREAD_MODEL_PARALLEL) {
+    debug ("release per-connection reply lock");
+    pthread_mutex_unlock (&conn->reply_lock);
+  }
+}
+
 int
 plugin_errno_is_preserved (void)
 {
@@ -414,7 +442,8 @@ plugin_can_write (struct connection *conn)
   if (plugin.can_write)
     return plugin.can_write (conn->handle);
   else
-    return plugin.pwrite != NULL;
+    return plugin.pwrite != NULL ||
+           (plugin._thread_model == NBDKIT_THREAD_MODEL_ASYNC && plugin.async_pwrite != NULL);
 }
 
 int
@@ -460,6 +489,16 @@ plugin_can_trim (struct connection *conn)
 }
 
 int
+plugin_can_async_read (struct connection *conn)
+{
+  assert (dl);
+  assert (conn->handle);
+
+  return ((plugin._thread_model == NBDKIT_THREAD_MODEL_ASYNC) &&
+          (plugin.async_pread != NULL));
+}
+
+int
 plugin_pread (struct connection *conn,
               void *buf, uint32_t count, uint64_t offset)
 {
@@ -473,6 +512,27 @@ plugin_pread (struct connection *conn,
 }
 
 int
+plugin_async_pread (struct connection *conn, uint64_t handle, bool flush,
+                    void *buf, uint32_t count, uint64_t offset)
+{
+  assert (dl);
+  assert (conn->handle);
+
+  debug ("async_pread count=%" PRIu32 " offset=%" PRIu64, count, offset);
+
+  return plugin.async_pread (conn, handle, flush, conn->handle, buf, count, offset);
+}
+
+int
+plugin_can_async_write (struct connection *conn)
+{
+  assert (dl);
+  assert (conn->handle);
+
+  return (plugin._thread_model == NBDKIT_THREAD_MODEL_ASYNC) && (plugin.async_pwrite != NULL);
+}
+
+int
 plugin_pwrite (struct connection *conn,
                void *buf, uint32_t count, uint64_t offset)
 {
@@ -490,23 +550,44 @@ plugin_pwrite (struct connection *conn,
 }
 
 int
+plugin_async_pwrite (struct connection *conn, uint64_t handle, bool flush,
+                     void *buf, uint32_t count, uint64_t offset)
+{
+  assert (dl);
+  assert (conn->handle);
+
+  debug ("async_pwrite count=%" PRIu32 " offset=%" PRIu64, count, offset);
+
+  if (plugin.async_pwrite != NULL)
+    return plugin.async_pwrite (conn, handle, flush, conn->handle, buf, count, offset);
+  else {
+    errno = EROFS;
+    return -1;
+  }
+}
+
+int
 plugin_flush (struct connection *conn)
 {
+  int r;
   assert (dl);
   assert (conn->handle);
 
   debug ("flush");
 
   if (plugin.flush != NULL)
-    return plugin.flush (conn->handle);
+    r = plugin.flush (conn->handle);
   else {
     errno = EINVAL;
-    return -1;
+    r = -1;
   }
+
+  return r;
 }
 
 int
-plugin_trim (struct connection *conn, uint32_t count, uint64_t offset)
+plugin_trim (struct connection *conn,
+             uint32_t count, uint64_t offset)
 {
   assert (dl);
   assert (conn->handle);

[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]