[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]

[Libguestfs] [PATCH libnbd 2/3] fuse: Issue commands in parallel on each thread



With this patch things are a little faster (compare to previous
commit):

   READ: bw=276MiB/s (290MB/s), 69.1MiB/s-69.3MiB/s (72.5MB/s-72.6MB/s), io=4096MiB (4295MB), run=14781-14814msec
---
 fuse/operations.c | 158 ++++++++++++++++++++++++++++++++++++++++++----
 1 file changed, 145 insertions(+), 13 deletions(-)

diff --git a/fuse/operations.c b/fuse/operations.c
index 4da701e..a8e6f81 100644
--- a/fuse/operations.c
+++ b/fuse/operations.c
@@ -35,10 +35,12 @@
 #include <limits.h>
 #include <fcntl.h>
 #include <unistd.h>
+#include <poll.h>
 #include <errno.h>
 #include <assert.h>
 #include <sys/types.h>
 #include <sys/stat.h>
+#include <pthread.h>
 
 #include <libnbd.h>
 
@@ -47,6 +49,11 @@
 
 #define MAX_REQUEST_SIZE (32 * 1024 * 1024)
 
+/* Number of seconds to wait for commands to complete when closing the
+ * file.
+ */
+#define RELEASE_TIMEOUT 5
+
 /* Wraps calls to libnbd functions and automatically checks for error,
  * returning errors in the format required by FUSE.  It also prints
  * out the full error message on stderr, so that we don't lose it.
@@ -133,11 +140,106 @@ nbdfuse_open (const char *path, struct fuse_file_info *fi)
   return 0;
 }
 
+/* Because we're called on multiple threads and are using a single nbd
+ * handle, we want to issue multiple commands in parallel.  We
+ * therefore cannot use the synchronous APIs (nbd_pread etc) single
+ * those lock the handle while they are waiting for the response.
+ *
+ * Instead we start the command using an AIO call (eg. nbd_aio_pread),
+ * and wait for it to complete by calling this function which does not
+ * hold the handle lock.  Thus commands in other threads can run in
+ * parallel.
+ *
+ * Note that we are intentionally calling poll(2) on the same file
+ * descriptor from multiple threads.  This means it's likely that
+ * another threads will see events related to our command and do
+ * processing for us.  This is (mostly) OK, but it means that the
+ * handle direction will change unexpectedly, so we need to be
+ * prepared for that.  The alternative is a more complex and slower
+ * design involving a separate polling thread.
+ */
+static int poll_socket (struct pollfd *, int timeout);
+
+static int
+wait_for_completion (int64_t cookie)
+{
+  static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
+  int r;
+  unsigned dir;
+  struct pollfd fds[1];
+
+  /* nbd_aio_command_completed returns:
+   *  0 => command still in flight, we must wait in the loop
+   *  1 => completed successfully
+   * -1 => error
+   */
+  while ((r = nbd_aio_command_completed (nbd, cookie)) == 0) {
+    /* Don't poll forever here since other threads may finish our
+     * command for us.
+     */
+    if (poll_socket (fds, 100) == -1)
+      return -1;
+
+    /* Direction may have changed in another thread, so check it
+     * again.  We also have to check the socket revents again.
+     * Protect the whole lot with a global lock.
+     */
+    pthread_mutex_lock (&lock);
+    r = poll_socket (fds, 0);
+    if (r >= 0) {
+      dir = nbd_aio_get_direction (nbd);
+      r = 0;
+      if ((dir & LIBNBD_AIO_DIRECTION_READ) != 0 &&
+          (fds[0].revents & POLLIN) != 0)
+        r = nbd_aio_notify_read (nbd);
+      else if ((dir & LIBNBD_AIO_DIRECTION_WRITE) != 0 &&
+               (fds[0].revents & POLLOUT) != 0)
+        r = nbd_aio_notify_write (nbd);
+      else if ((fds[0].revents & (POLLHUP | POLLERR | POLLNVAL)) != 0) {
+        fprintf (stderr, "nbdfuse: server closed socket unexpectedly\n");
+        r = -1;
+      }
+    }
+    pthread_mutex_unlock (&lock);
+    if (r == -1)
+      return -1;
+  }
+
+  return r;
+}
+
+static int
+poll_socket (struct pollfd *fds, int timeout)
+{
+  int r;
+  unsigned dir;
+
+  fds[0].fd = nbd_aio_get_fd (nbd);
+  if (fds[0].fd == -1)
+    return -1;
+  fds[0].events = fds[0].revents = 0;
+  dir = nbd_aio_get_direction (nbd);
+  if ((dir & LIBNBD_AIO_DIRECTION_READ) != 0)
+    fds[0].events |= POLLIN;
+  if ((dir & LIBNBD_AIO_DIRECTION_WRITE) != 0)
+    fds[0].events |= POLLOUT;
+
+  r = poll (fds, 1, timeout);
+  if (r == -1) {
+    perror ("nbdfuse: poll");
+    return -1;
+  }
+
+  return r;
+}
+
 static int
 nbdfuse_read (const char *path, char *buf,
               size_t count, off_t offset,
               struct fuse_file_info *fi)
 {
+  int64_t cookie;
+
   if (!file_mode && (path[0] != '/' || strcmp (path+1, filename) != 0))
     return -ENOENT;
 
@@ -150,7 +252,9 @@ nbdfuse_read (const char *path, char *buf,
   if (offset + count > size)
     count = size - offset;
 
-  CHECK_NBD_ERROR (nbd_pread (nbd, buf, count, offset, 0));
+  CHECK_NBD_ERROR (cookie = nbd_aio_pread (nbd, buf, count, offset,
+                                           NBD_NULL_COMPLETION, 0));
+  CHECK_NBD_ERROR (wait_for_completion (cookie));
 
   return (int) count;
 }
@@ -160,6 +264,8 @@ nbdfuse_write (const char *path, const char *buf,
                size_t count, off_t offset,
                struct fuse_file_info *fi)
 {
+  int64_t cookie;
+
   /* Probably shouldn't happen because of nbdfuse_open check. */
   if (readonly)
     return -EACCES;
@@ -176,7 +282,9 @@ nbdfuse_write (const char *path, const char *buf,
   if (offset + count > size)
     count = size - offset;
 
-  CHECK_NBD_ERROR (nbd_pwrite (nbd, buf, count, offset, 0));
+  CHECK_NBD_ERROR (cookie = nbd_aio_pwrite (nbd, buf, count, offset,
+                                            NBD_NULL_COMPLETION, 0));
+  CHECK_NBD_ERROR (wait_for_completion (cookie));
 
   return (int) count;
 }
@@ -184,28 +292,44 @@ nbdfuse_write (const char *path, const char *buf,
 static int
 nbdfuse_fsync (const char *path, int datasync, struct fuse_file_info *fi)
 {
+  int64_t cookie;
+
   if (readonly)
     return 0;
 
   /* If the server doesn't support flush then the operation is
    * silently ignored.
    */
-  if (nbd_can_flush (nbd))
-    CHECK_NBD_ERROR (nbd_flush (nbd, 0));
+  if (nbd_can_flush (nbd)) {
+    CHECK_NBD_ERROR (cookie = nbd_aio_flush (nbd, NBD_NULL_COMPLETION, 0));
+    CHECK_NBD_ERROR (wait_for_completion (cookie));
+  }
 
   return 0;
 }
 
-/* This is called on the last close of a file.  We do a flush here to
- * be on the safe side, but it's not strictly necessary.
- */
+/* This is called on the last close of a file. */
 static int
 nbdfuse_release (const char *path, struct fuse_file_info *fi)
 {
-  if (readonly)
-    return 0;
+  time_t st;
 
-  return nbdfuse_fsync (path, 0, fi);
+  /* We do a synchronous flush here to be on the safe side, but it's
+   * not strictly necessary.
+   */
+  if (!readonly && nbd_can_flush (nbd))
+    CHECK_NBD_ERROR (nbd_flush (nbd, 0));
+
+  /* Wait until there are no more commands in flight or until a
+   * timeout is reached.
+   */
+  time (&st);
+  while (nbd_aio_in_flight (nbd) > 0 &&
+         time (NULL) - st <= RELEASE_TIMEOUT &&
+         nbd_poll (nbd, 1000) >= 0)
+    ;
+
+  return 0;
 }
 
 /* Punch a hole or write zeros. */
@@ -213,6 +337,8 @@ static int
 nbdfuse_fallocate (const char *path, int mode, off_t offset, off_t len,
                    struct fuse_file_info *fi)
 {
+  int64_t cookie;
+
   if (readonly)
     return -EACCES;
 
@@ -220,7 +346,9 @@ nbdfuse_fallocate (const char *path, int mode, off_t offset, off_t len,
     if (!nbd_can_trim (nbd))
       return -EOPNOTSUPP;       /* Trim not supported. */
     else {
-      CHECK_NBD_ERROR (nbd_trim (nbd, len, offset, 0));
+      CHECK_NBD_ERROR (cookie = nbd_aio_trim (nbd, len, offset,
+                                              NBD_NULL_COMPLETION, 0));
+      CHECK_NBD_ERROR (wait_for_completion (cookie));
       return 0;
     }
   }
@@ -236,13 +364,17 @@ nbdfuse_fallocate (const char *path, int mode, off_t offset, off_t len,
 
       while (len > 0) {
         off_t n = MIN (len, sizeof zerobuf);
-        CHECK_NBD_ERROR (nbd_pwrite (nbd, zerobuf, n, offset, 0));
+        CHECK_NBD_ERROR (cookie = nbd_aio_pwrite (nbd, zerobuf, n, offset,
+                                                  NBD_NULL_COMPLETION,0));
+        CHECK_NBD_ERROR (wait_for_completion (cookie));
         len -= n;
       }
       return 0;
     }
     else {
-      CHECK_NBD_ERROR (nbd_zero (nbd, len, offset, 0));
+      CHECK_NBD_ERROR (cookie = nbd_aio_zero (nbd, len, offset,
+                                              NBD_NULL_COMPLETION,0));
+      CHECK_NBD_ERROR (wait_for_completion (cookie));
       return 0;
     }
   }
-- 
2.31.1


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]