[Libguestfs] [PATCH nbdkit 5/5] vddk: Implement parallel thread model

Richard W.M. Jones rjones at redhat.com
Wed Oct 27 12:21:13 UTC 2021


Since VDDK 6.0 (but not actually implemented until VDDK 6.7) VDDK has
added asynchronous read and write operations.  This commit makes use
of these, allowing us to use the parallel thread model for increased
performance.

In the parallel thread model, nbdkit will be calling us in parallel
from multiple nbdkit threads.  VDDK does not allow multiple threads to
simultaneously call VDDK operations on the same handle.  So we create
a background thread per handle (== connection).

Only the background thread makes VDDK calls[1].  The background thread
handles a mix of synchronous (like extents, flush) and asynchronous
(like read, write) operations, but all from one thread.

Parallel nbdkit threads issue commands to the background thread
associated with each handle, and wait until they are retired.

[1] All VDDK calls except for connecting and disconnecting which for
different reasons are protected by a global lock, so I did not need to
change those.
---
 plugins/vddk/Makefile.am |   1 +
 plugins/vddk/vddk.h      |  32 +++
 plugins/vddk/vddk.c      | 374 +++++++----------------------
 plugins/vddk/worker.c    | 496 +++++++++++++++++++++++++++++++++++++++
 4 files changed, 611 insertions(+), 292 deletions(-)

diff --git a/plugins/vddk/Makefile.am b/plugins/vddk/Makefile.am
index 4f470ff9e..f8382fc91 100644
--- a/plugins/vddk/Makefile.am
+++ b/plugins/vddk/Makefile.am
@@ -49,6 +49,7 @@ nbdkit_vddk_plugin_la_SOURCES = \
 	stats.c \
 	vddk-structs.h \
 	vddk-stubs.h \
+	worker.c \
 	$(top_srcdir)/include/nbdkit-plugin.h \
 	$(NULL)
 
diff --git a/plugins/vddk/vddk.h b/plugins/vddk/vddk.h
index 1400589dd..b42911c30 100644
--- a/plugins/vddk/vddk.h
+++ b/plugins/vddk/vddk.h
@@ -120,6 +120,33 @@ extern int vddk_debug_stats;
     VDDK_CALL_END (VixDiskLib_FreeErrorText, 0);                \
   } while (0)
 
+/* Queue of asynchronous commands sent to the background thread. */
+enum command_type { GET_SIZE, READ, WRITE, FLUSH, CAN_EXTENTS, EXTENTS, STOP };
+struct command {
+  enum command_type type;       /* command */
+  void *ptr;                    /* buffer, extents list, return values */
+  uint32_t count;               /* READ, WRITE, EXTENTS */
+  uint64_t offset;              /* READ, WRITE, EXTENTS */
+  bool req_one;                 /* EXTENTS NBDKIT_FLAG_REQ_ONE */
+  pthread_mutex_t mutex;        /* completion mutex */
+  pthread_cond_t cond;          /* completion condition */
+  bool completed;               /* set to true when completed */
+  bool error;                   /* set to true if there was an error */
+};
+
+DEFINE_VECTOR_TYPE(command_queue, struct command *)
+
+/* The per-connection handle. */
+struct vddk_handle {
+  VixDiskLibConnectParams *params; /* connection parameters */
+  VixDiskLibConnection connection; /* connection */
+  VixDiskLibHandle handle;         /* disk handle */
+  pthread_t thread;                /* background thread for asynch work */
+  command_queue commands;          /* commands for background thread */
+  pthread_mutex_t commands_lock;   /* lock on command queue */
+  pthread_cond_t commands_cond;    /* condition (queue size 0 -> 1) */
+};
+
 /* reexec.c */
 extern bool noreexec;
 extern char *reexeced;
@@ -141,4 +168,9 @@ extern pthread_mutex_t stats_lock;
 #undef OPTIONAL_STUB
 extern void display_stats (void);
 
+/* worker.c */
+extern const char *command_type_string (enum command_type type);
+extern int send_command_and_wait (struct vddk_handle *h, struct command *cmd);
+extern void *vddk_worker_thread (void *handle);
+
 #endif /* NBDKIT_VDDK_H */
diff --git a/plugins/vddk/vddk.c b/plugins/vddk/vddk.c
index 68fd993b2..34aae63e2 100644
--- a/plugins/vddk/vddk.c
+++ b/plugins/vddk/vddk.c
@@ -50,9 +50,6 @@
 #include <nbdkit-plugin.h>
 
 #include "cleanup.h"
-#include "minmax.h"
-#include "rounding.h"
-#include "tvdiff.h"
 #include "vector.h"
 
 #include "vddk.h"
@@ -512,23 +509,18 @@ vddk_dump_plugin (void)
 /* The rules on threads and VDDK are here:
  * https://code.vmware.com/docs/11750/virtual-disk-development-kit-programming-guide/GUID-6BE903E8-DC70-46D9-98E4-E34A2002C2AD.html
  *
- * Before nbdkit 1.22 we used SERIALIZE_ALL_REQUESTS.  Since nbdkit
- * 1.22 we changed this to SERIALIZE_REQUESTS and added a mutex around
- * calls to VixDiskLib_Open and VixDiskLib_Close.  This is not quite
- * within the letter of the rules, but is within the spirit.
+ * Before nbdkit 1.22 we used SERIALIZE_ALL_REQUESTS.  In nbdkit
+ * 1.22-1.28 we changed this to SERIALIZE_REQUESTS and added a mutex
+ * around calls to VixDiskLib_Open and VixDiskLib_Close.  In nbdkit
+ * 1.30 and above we assign a background thread per connection to do
+ * asynch operations and use the PARALLEL model.  We still need the
+ * lock around Open and Close.
  */
-#define THREAD_MODEL NBDKIT_THREAD_MODEL_SERIALIZE_REQUESTS
+#define THREAD_MODEL NBDKIT_THREAD_MODEL_PARALLEL
 
 /* Lock protecting open/close calls - see above. */
 static pthread_mutex_t open_close_lock = PTHREAD_MUTEX_INITIALIZER;
 
-/* The per-connection handle. */
-struct vddk_handle {
-  VixDiskLibConnectParams *params; /* connection parameters */
-  VixDiskLibConnection connection; /* connection */
-  VixDiskLibHandle handle;         /* disk handle */
-};
-
 static inline VixDiskLibConnectParams *
 allocate_connect_params (void)
 {
@@ -569,12 +561,16 @@ vddk_open (int readonly)
   VixError err;
   uint32_t flags;
   const char *transport_mode;
+  int pterr;
 
-  h = malloc (sizeof *h);
+  h = calloc (1, sizeof *h);
   if (h == NULL) {
     nbdkit_error ("malloc: %m");
     return NULL;
   }
+  h->commands = (command_queue) empty_vector;
+  pthread_mutex_init (&h->commands_lock, NULL);
+  pthread_cond_init (&h->commands_cond, NULL);
 
   h->params = allocate_connect_params ();
   if (h->params == NULL) {
@@ -651,6 +647,16 @@ vddk_open (int readonly)
   VDDK_CALL_END (VixDiskLib_GetTransportMode, 0);
   nbdkit_debug ("transport mode: %s", transport_mode);
 
+  /* Start the background thread which actually does the asynchronous
+   * work.
+   */
+  pterr = pthread_create (&h->thread, NULL, vddk_worker_thread, h);
+  if (pterr != 0) {
+    errno = pterr;
+    nbdkit_error ("pthread_create: %m");
+    goto err2;
+  }
+
   return h;
 
  err2:
@@ -660,6 +666,8 @@ vddk_open (int readonly)
  err1:
   free_connect_params (h->params);
  err0:
+  pthread_mutex_destroy (&h->commands_lock);
+  pthread_cond_destroy (&h->commands_cond);
   free (h);
   return NULL;
 }
@@ -670,6 +678,10 @@ vddk_close (void *handle)
 {
   ACQUIRE_LOCK_FOR_CURRENT_SCOPE (&open_close_lock);
   struct vddk_handle *h = handle;
+  struct command stop_cmd = { .type = STOP };
+
+  send_command_and_wait (h, &stop_cmd);
+  pthread_join (h->thread, NULL);
 
   VDDK_CALL_START (VixDiskLib_Close, "handle")
     VixDiskLib_Close (h->handle);
@@ -679,6 +691,9 @@ vddk_close (void *handle)
   VDDK_CALL_END (VixDiskLib_Disconnect, 0);
 
   free_connect_params (h->params);
+  pthread_mutex_destroy (&h->commands_lock);
+  pthread_cond_destroy (&h->commands_cond);
+  command_queue_reset (&h->commands);
   free (h);
 }
 
@@ -687,54 +702,29 @@ static int64_t
 vddk_get_size (void *handle)
 {
   struct vddk_handle *h = handle;
-  VixDiskLibInfo *info;
-  VixError err;
   uint64_t size;
+  struct command get_size_cmd = { .type = GET_SIZE, .ptr = &size };
 
-  VDDK_CALL_START (VixDiskLib_GetInfo, "handle, &info")
-    err = VixDiskLib_GetInfo (h->handle, &info);
-  VDDK_CALL_END (VixDiskLib_GetInfo, 0);
-  if (err != VIX_OK) {
-    VDDK_ERROR (err, "VixDiskLib_GetInfo");
+  if (send_command_and_wait (h, &get_size_cmd) == -1)
     return -1;
-  }
-
-  size = info->capacity * (uint64_t)VIXDISKLIB_SECTOR_SIZE;
-
-  if (vddk_debug_diskinfo) {
-    nbdkit_debug ("disk info: capacity: %" PRIu64 " sectors "
-                  "(%" PRIi64 " bytes)",
-                  info->capacity, size);
-    nbdkit_debug ("disk info: biosGeo: C:%" PRIu32 " H:%" PRIu32 " S:%" PRIu32,
-                  info->biosGeo.cylinders,
-                  info->biosGeo.heads,
-                  info->biosGeo.sectors);
-    nbdkit_debug ("disk info: physGeo: C:%" PRIu32 " H:%" PRIu32 " S:%" PRIu32,
-                  info->physGeo.cylinders,
-                  info->physGeo.heads,
-                  info->physGeo.sectors);
-    nbdkit_debug ("disk info: adapter type: %d",
-                  (int) info->adapterType);
-    nbdkit_debug ("disk info: num links: %d", info->numLinks);
-    nbdkit_debug ("disk info: parent filename hint: %s",
-                  info->parentFileNameHint ? : "NULL");
-    nbdkit_debug ("disk info: uuid: %s",
-                  info->uuid ? : "NULL");
-    if (library_version >= 7) {
-      nbdkit_debug ("disk info: sector size: "
-                    "logical %" PRIu32 " physical %" PRIu32,
-                    info->logicalSectorSize,
-                    info->physicalSectorSize);
-    }
-  }
-
-  VDDK_CALL_START (VixDiskLib_FreeInfo, "info")
-    VixDiskLib_FreeInfo (info);
-  VDDK_CALL_END (VixDiskLib_FreeInfo, 0);
 
   return (int64_t) size;
 }
 
+static int
+vddk_can_fua (void *handle)
+{
+  /* The Flush call was not available in VDDK < 6.0. */
+  return VixDiskLib_Flush != NULL ? NBDKIT_FUA_NATIVE : NBDKIT_FUA_NONE;
+}
+
+static int
+vddk_can_flush (void *handle)
+{
+  /* The Flush call was not available in VDDK < 6.0. */
+  return VixDiskLib_Flush != NULL;
+}
+
 /* Read data from the file.
  *
  * Note that reads have to be aligned to sectors (XXX).
@@ -744,32 +734,14 @@ vddk_pread (void *handle, void *buf, uint32_t count, uint64_t offset,
             uint32_t flags)
 {
   struct vddk_handle *h = handle;
-  VixError err;
+  struct command read_cmd = {
+    .type = READ,
+    .ptr = buf,
+    .count = count,
+    .offset = offset,
+  };
 
-  /* Align to sectors. */
-  if (!IS_ALIGNED (offset, VIXDISKLIB_SECTOR_SIZE)) {
-    nbdkit_error ("%s is not aligned to sectors", "read");
-    return -1;
-  }
-  if (!IS_ALIGNED (count, VIXDISKLIB_SECTOR_SIZE)) {
-    nbdkit_error ("%s is not aligned to sectors", "read");
-    return -1;
-  }
-  offset /= VIXDISKLIB_SECTOR_SIZE;
-  count /= VIXDISKLIB_SECTOR_SIZE;
-
-  VDDK_CALL_START (VixDiskLib_Read,
-                   "handle, %" PRIu64 " sectors, "
-                   "%" PRIu32 " sectors, buffer",
-                   offset, count)
-    err = VixDiskLib_Read (h->handle, offset, count, buf);
-  VDDK_CALL_END (VixDiskLib_Read, count * VIXDISKLIB_SECTOR_SIZE);
-  if (err != VIX_OK) {
-    VDDK_ERROR (err, "VixDiskLib_Read");
-    return -1;
-  }
-
-  return 0;
+  return send_command_and_wait (h, &read_cmd);
 }
 
 static int vddk_flush (void *handle, uint32_t flags);
@@ -782,32 +754,17 @@ static int
 vddk_pwrite (void *handle, const void *buf, uint32_t count, uint64_t offset,
              uint32_t flags)
 {
+  struct vddk_handle *h = handle;
   const bool fua = flags & NBDKIT_FLAG_FUA;
-  struct vddk_handle *h = handle;
-  VixError err;
+  struct command write_cmd = {
+    .type = WRITE,
+    .ptr = (void *) buf,
+    .count = count,
+    .offset = offset,
+  };
 
-  /* Align to sectors. */
-  if (!IS_ALIGNED (offset, VIXDISKLIB_SECTOR_SIZE)) {
-    nbdkit_error ("%s is not aligned to sectors", "write");
+  if (send_command_and_wait (h, &write_cmd) == -1)
     return -1;
-  }
-  if (!IS_ALIGNED (count, VIXDISKLIB_SECTOR_SIZE)) {
-    nbdkit_error ("%s is not aligned to sectors", "write");
-    return -1;
-  }
-  offset /= VIXDISKLIB_SECTOR_SIZE;
-  count /= VIXDISKLIB_SECTOR_SIZE;
-
-  VDDK_CALL_START (VixDiskLib_Write,
-                   "handle, %" PRIu64 " sectors, "
-                   "%" PRIu32 " sectors, buffer",
-                   offset, count)
-    err = VixDiskLib_Write (h->handle, offset, count, buf);
-  VDDK_CALL_END (VixDiskLib_Write, count * VIXDISKLIB_SECTOR_SIZE);
-  if (err != VIX_OK) {
-    VDDK_ERROR (err, "VixDiskLib_Write");
-    return -1;
-  }
 
   if (fua) {
     if (vddk_flush (handle, 0) == -1)
@@ -817,126 +774,32 @@ vddk_pwrite (void *handle, const void *buf, uint32_t count, uint64_t offset,
   return 0;
 }
 
-static int
-vddk_can_fua (void *handle)
-{
-  /* The Flush call was not available in VDDK < 6.0. */
-  return VixDiskLib_Flush != NULL ? NBDKIT_FUA_NATIVE : NBDKIT_FUA_NONE;
-}
-
-static int
-vddk_can_flush (void *handle)
-{
-  /* The Flush call was not available in VDDK < 6.0. */
-  return VixDiskLib_Flush != NULL;
-}
-
 /* Flush data to the file. */
 static int
 vddk_flush (void *handle, uint32_t flags)
 {
   struct vddk_handle *h = handle;
-  VixError err;
+  struct command flush_cmd = {
+    .type = FLUSH,
+  };
 
-  /* The documentation for Flush is missing, but the comment in the
-   * header file seems to indicate that it waits for WriteAsync
-   * commands to finish.  We don't use WriteAsync, and in any case
-   * there's a new function Wait to wait for those.  However I
-   * verified using strace that in fact Flush does call fsync on the
-   * file so it appears to be the correct call to use here.
-   */
-
-  VDDK_CALL_START (VixDiskLib_Flush, "handle")
-    err = VixDiskLib_Flush (h->handle);
-  VDDK_CALL_END (VixDiskLib_Flush, 0);
-  if (err != VIX_OK) {
-    VDDK_ERROR (err, "VixDiskLib_Flush");
-    return -1;
-  }
-
-  return 0;
+  return send_command_and_wait (h, &flush_cmd);
 }
 
 static int
 vddk_can_extents (void *handle)
 {
   struct vddk_handle *h = handle;
-  VixError err;
-  VixDiskLibBlockList *block_list;
+  int ret;
+  struct command can_extents_cmd = {
+    .type = CAN_EXTENTS,
+    .ptr = &ret,
+  };
 
-  /* This call was added in VDDK 6.7.  In earlier versions the
-   * function pointer will be NULL and we cannot query extents.
-   */
-  if (VixDiskLib_QueryAllocatedBlocks == NULL) {
-    nbdkit_debug ("can_extents: VixDiskLib_QueryAllocatedBlocks == NULL, "
-                  "probably this is VDDK < 6.7");
-    return 0;
-  }
-
-  /* Suppress errors around this call.  See:
-   * https://bugzilla.redhat.com/show_bug.cgi?id=1709211#c7
-   */
-  error_suppression = 1;
-
-  /* However even when the call is available it rarely works well so
-   * the best thing we can do here is to try the call and if it's
-   * non-functional return false.
-   */
-  VDDK_CALL_START (VixDiskLib_QueryAllocatedBlocks,
-                   "handle, 0, %d sectors, %d sectors",
-                   VIXDISKLIB_MIN_CHUNK_SIZE, VIXDISKLIB_MIN_CHUNK_SIZE)
-    err = VixDiskLib_QueryAllocatedBlocks (h->handle,
-                                           0, VIXDISKLIB_MIN_CHUNK_SIZE,
-                                           VIXDISKLIB_MIN_CHUNK_SIZE,
-                                           &block_list);
-  VDDK_CALL_END (VixDiskLib_QueryAllocatedBlocks, 0);
-  error_suppression = 0;
-  if (err == VIX_OK) {
-    VDDK_CALL_START (VixDiskLib_FreeBlockList, "block_list")
-      VixDiskLib_FreeBlockList (block_list);
-    VDDK_CALL_END (VixDiskLib_FreeBlockList, 0);
-  }
-  if (err != VIX_OK) {
-    char *errmsg = VixDiskLib_GetErrorText (err, NULL);
-    nbdkit_debug ("can_extents: VixDiskLib_QueryAllocatedBlocks test failed, "
-                  "extents support will be disabled: "
-                  "original error: %s",
-                  errmsg);
-    VixDiskLib_FreeErrorText (errmsg);
-    return 0;
-  }
-
-  return 1;
-}
-
-static int
-add_extent (struct nbdkit_extents *extents,
-            uint64_t *position, uint64_t next_position, bool is_hole)
-{
-  uint32_t type = 0;
-  const uint64_t length = next_position - *position;
-
-  if (is_hole) {
-    type = NBDKIT_EXTENT_HOLE;
-    /* Images opened as single link might be backed by another file in the
-       chain, so the holes are not guaranteed to be zeroes. */
-    if (!single_link)
-      type |= NBDKIT_EXTENT_ZERO;
-  }
-
-  assert (*position <= next_position);
-  if (*position == next_position)
-    return 0;
-
-  if (vddk_debug_extents)
-    nbdkit_debug ("adding extent type %s at [%" PRIu64 "...%" PRIu64 "]",
-                  is_hole ? "hole" : "allocated data",
-                  *position, next_position-1);
-  if (nbdkit_add_extent (extents, *position, length, type) == -1)
+  if (send_command_and_wait (h, &can_extents_cmd) == -1)
     return -1;
 
-  *position = next_position;
-  return 0;
+  return ret;
 }
 
 static int
@@ -945,88 +808,15 @@ vddk_extents (void *handle, uint32_t count, uint64_t offset, uint32_t flags,
 {
   struct vddk_handle *h = handle;
   bool req_one = flags & NBDKIT_FLAG_REQ_ONE;
-  uint64_t position, end, start_sector;
-
-  position = offset;
-  end = offset + count;
-
-  /* We can only query whole chunks.  Therefore start with the first
-   * chunk before offset.
-   */
-  start_sector =
-    ROUND_DOWN (offset, VIXDISKLIB_MIN_CHUNK_SIZE * VIXDISKLIB_SECTOR_SIZE)
-    / VIXDISKLIB_SECTOR_SIZE;
-  while (start_sector * VIXDISKLIB_SECTOR_SIZE < end) {
-    VixError err;
-    uint32_t i;
-    uint64_t nr_chunks, nr_sectors;
-    VixDiskLibBlockList *block_list;
-
-    assert (IS_ALIGNED (start_sector, VIXDISKLIB_MIN_CHUNK_SIZE));
-
-    nr_chunks =
-      ROUND_UP (end - start_sector * VIXDISKLIB_SECTOR_SIZE,
-                VIXDISKLIB_MIN_CHUNK_SIZE * VIXDISKLIB_SECTOR_SIZE)
-      / (VIXDISKLIB_MIN_CHUNK_SIZE * VIXDISKLIB_SECTOR_SIZE);
-    nr_chunks = MIN (nr_chunks, VIXDISKLIB_MAX_CHUNK_NUMBER);
-    nr_sectors = nr_chunks * VIXDISKLIB_MIN_CHUNK_SIZE;
-
-    VDDK_CALL_START (VixDiskLib_QueryAllocatedBlocks,
-                     "handle, %" PRIu64 " sectors, %" PRIu64 " sectors, "
-                     "%d sectors",
-                     start_sector, nr_sectors, VIXDISKLIB_MIN_CHUNK_SIZE)
-      err = VixDiskLib_QueryAllocatedBlocks (h->handle,
-                                             start_sector, nr_sectors,
-                                             VIXDISKLIB_MIN_CHUNK_SIZE,
-                                             &block_list);
-    VDDK_CALL_END (VixDiskLib_QueryAllocatedBlocks, 0);
-    if (err != VIX_OK) {
-      VDDK_ERROR (err, "VixDiskLib_QueryAllocatedBlocks");
-      return -1;
-    }
-
-    for (i = 0; i < block_list->numBlocks; ++i) {
-      uint64_t blk_offset, blk_length;
-
-      blk_offset = block_list->blocks[i].offset * VIXDISKLIB_SECTOR_SIZE;
-      blk_length = block_list->blocks[i].length * VIXDISKLIB_SECTOR_SIZE;
-
-      /* The query returns allocated blocks.  We must insert holes
-       * between the blocks as necessary.
-       */
-      if ((position < blk_offset &&
-           add_extent (extents, &position, blk_offset, true) == -1) ||
-          (add_extent (extents,
-                       &position, blk_offset + blk_length, false) == -1)) {
-        VDDK_CALL_START (VixDiskLib_FreeBlockList, "block_list")
-          VixDiskLib_FreeBlockList (block_list);
-        VDDK_CALL_END (VixDiskLib_FreeBlockList, 0);
-        return -1;
-      }
-    }
-    VDDK_CALL_START (VixDiskLib_FreeBlockList, "block_list")
-      VixDiskLib_FreeBlockList (block_list);
-    VDDK_CALL_END (VixDiskLib_FreeBlockList, 0);
-
-    /* There's an implicit hole after the returned list of blocks, up
-     * to the end of the QueryAllocatedBlocks request.
-     */
-    if (add_extent (extents,
-                    &position,
-                    (start_sector + nr_sectors) * VIXDISKLIB_SECTOR_SIZE,
-                    true) == -1)
-      return -1;
-
-    start_sector += nr_sectors;
-
-    /* If one extent was requested, as long as we've added an extent
-     * overlapping the original offset we're done.
-     */
-    if (req_one && position > offset)
-      break;
-  }
-
-  return 0;
+  struct command extents_cmd = {
+    .type = EXTENTS,
+    .ptr = extents,
+    .count = count,
+    .offset = offset,
+    .req_one = req_one,
+  };
+
+  return send_command_and_wait (h, &extents_cmd);
 }
 
 static struct nbdkit_plugin plugin = {
diff --git a/plugins/vddk/worker.c b/plugins/vddk/worker.c
new file mode 100644
index 000000000..701f9c322
--- /dev/null
+++ b/plugins/vddk/worker.c
@@ -0,0 +1,496 @@
+/* nbdkit
+ * Copyright (C) 2013-2021 Red Hat Inc.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ *
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ *
+ * * Neither the name of Red Hat nor the names of its contributors may be
+ * used to endorse or promote products derived from this software without
+ * specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY RED HAT AND CONTRIBUTORS ''AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
+ * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
+ * PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL RED HAT OR
+ * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
+ * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
+ * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
+ * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
+ * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ */
+
+#include <config.h>
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <stdint.h>
+#include <inttypes.h>
+
+#include <pthread.h>
+
+#define NBDKIT_API_VERSION 2
+#include <nbdkit-plugin.h>
+
+#include "cleanup.h"
+#include "minmax.h"
+#include "rounding.h"
+#include "vector.h"
+
+#include "vddk.h"
+
+const char *
+command_type_string (enum command_type type)
+{
+  switch (type) {
+  case GET_SIZE:    return "get_size";
+  case READ:        return "read";
+  case WRITE:       return "write";
+  case FLUSH:       return "flush";
+  case CAN_EXTENTS: return "can_extents";
+  case EXTENTS:     return "extents";
+  case STOP:        return "stop";
+  default:          abort ();
+  }
+}
+
+/* Send command to the background thread and wait for completion.
+ *
+ * Returns 0 for OK
+ * On error, calls nbdkit_error and returns -1.
+ */
+int
+send_command_and_wait (struct vddk_handle *h, struct command *cmd)
+{
+  int r;
+
+  /* This will be used to signal command completion back to us. */
+  pthread_mutex_init (&cmd->mutex, NULL);
+  pthread_cond_init (&cmd->cond, NULL);
+
+  /* Add the command to the command queue. */
+  {
+    ACQUIRE_LOCK_FOR_CURRENT_SCOPE (&h->commands_lock);
+    r = command_queue_append (&h->commands, cmd);
+    if (r == 0)
+      pthread_cond_signal (&h->commands_cond);
+  }
+
+  /* On error command_queue_append will call nbdkit_error. */
+  if (r == -1)
+    return -1;
+
+  /* Wait for the command to be completed by the background thread. */
+  {
+    ACQUIRE_LOCK_FOR_CURRENT_SCOPE (&cmd->mutex);
+    while (!cmd->completed)
+      pthread_cond_wait (&cmd->cond, &cmd->mutex);
+  }
+
+  pthread_mutex_destroy (&cmd->mutex);
+  pthread_cond_destroy (&cmd->cond);
+
+  /* On error the background thread will call nbdkit_error. */
+  return ! cmd->error ? 0 : -1;
+}
+
+/* Add an extent to the list of extents. */
+static int
+add_extent (struct nbdkit_extents *extents,
+            uint64_t *position, uint64_t next_position, bool is_hole)
+{
+  uint32_t type = 0;
+  const uint64_t length = next_position - *position;
+
+  if (is_hole) {
+    type = NBDKIT_EXTENT_HOLE;
+    /* Images opened as single link might be backed by another file in the
+       chain, so the holes are not guaranteed to be zeroes. */
+    if (!single_link)
+      type |= NBDKIT_EXTENT_ZERO;
+  }
+
+  assert (*position <= next_position);
+  if (*position == next_position)
+    return 0;
+
+  if (vddk_debug_extents)
+    nbdkit_debug ("adding extent type %s at [%" PRIu64 "...%" PRIu64 "]",
+                  is_hole ? "hole" : "allocated data",
+                  *position, next_position-1);
+  if (nbdkit_add_extent (extents, *position, length, type) == -1)
+    return -1;
+
+  *position = next_position;
+  return 0;
+}
+
+/* Asynchronous commands are completed when this function is called. */
+static void
+complete_command (void *vp, VixError err)
+{
+  struct command *cmd = vp;
+
+  if (err != VIX_OK) {
+    VDDK_ERROR (err, "asynchronous %s failed", command_type_string (cmd->type));
+    cmd->error = true;
+  }
+
+  ACQUIRE_LOCK_FOR_CURRENT_SCOPE (&cmd->mutex);
+  cmd->completed = true;
+  pthread_cond_signal (&cmd->cond);
+}
+
+/* Background worker thread, one per connection, which is where the
+ * VDDK commands are issued.
+ */
+void *
+vddk_worker_thread (void *handle)
+{
+  struct vddk_handle *h = handle;
+  bool stop = false;
+
+  while (!stop) {
+    struct command *cmd;
+    bool async = false;
+    VixError err;
+
+    /* Wait until we are sent at least one command. */
+    {
+      ACQUIRE_LOCK_FOR_CURRENT_SCOPE (&h->commands_lock);
+      while (h->commands.size == 0)
+        pthread_cond_wait (&h->commands_cond, &h->commands_lock);
+      cmd = h->commands.ptr[0];
+      command_queue_remove (&h->commands, 0);
+    }
+
+    cmd->completed = false;
+    cmd->error = false;
+
+    switch (cmd->type) {
+    case STOP:
+      stop = true;
+
+      /* Wait for any asynchronous commands to complete. */
+      VDDK_CALL_START (VixDiskLib_Wait, "handle")
+        err = VixDiskLib_Wait (h->handle);
+      VDDK_CALL_END (VixDiskLib_Wait, 0);
+      if (err != VIX_OK) {
+        VDDK_ERROR (err, "VixDiskLib_Wait");
+        cmd->error = true;
+        break;
+      }
+      break;
+
+    case GET_SIZE: {
+      VixDiskLibInfo *info;
+      uint64_t size;
+
+      VDDK_CALL_START (VixDiskLib_GetInfo, "handle, &info")
+        err = VixDiskLib_GetInfo (h->handle, &info);
+      VDDK_CALL_END (VixDiskLib_GetInfo, 0);
+      if (err != VIX_OK) {
+        VDDK_ERROR (err, "VixDiskLib_GetInfo");
+        cmd->error = true;
+        break;
+      }
+
+      size = info->capacity * (uint64_t)VIXDISKLIB_SECTOR_SIZE;
+      *(uint64_t *)cmd->ptr = size;
+
+      if (vddk_debug_diskinfo) {
+        nbdkit_debug ("disk info: capacity: "
+                      "%" PRIu64 " sectors "
+                      "(%" PRIi64 " bytes)",
+                      info->capacity, size);
+        nbdkit_debug ("disk info: biosGeo: "
+                      "C:%" PRIu32 " H:%" PRIu32 " S:%" PRIu32,
+                      info->biosGeo.cylinders,
+                      info->biosGeo.heads,
+                      info->biosGeo.sectors);
+        nbdkit_debug ("disk info: physGeo: "
+                      "C:%" PRIu32 " H:%" PRIu32 " S:%" PRIu32,
+                      info->physGeo.cylinders,
+                      info->physGeo.heads,
+                      info->physGeo.sectors);
+        nbdkit_debug ("disk info: adapter type: %d",
+                      (int) info->adapterType);
+        nbdkit_debug ("disk info: num links: %d", info->numLinks);
+        nbdkit_debug ("disk info: parent filename hint: %s",
+                      info->parentFileNameHint ? : "NULL");
+        nbdkit_debug ("disk info: uuid: %s",
+                      info->uuid ? : "NULL");
+        if (library_version >= 7) {
+          nbdkit_debug ("disk info: sector size: "
+                        "logical %" PRIu32 " physical %" PRIu32,
+                        info->logicalSectorSize,
+                        info->physicalSectorSize);
+        }
+      }
+
+      VDDK_CALL_START (VixDiskLib_FreeInfo, "info")
+        VixDiskLib_FreeInfo (info);
+      VDDK_CALL_END (VixDiskLib_FreeInfo, 0);
+
+      break;
+    }
+
+    case READ: {
+      uint32_t count = cmd->count;
+      uint64_t offset = cmd->offset;
+      void *buf = cmd->ptr;
+
+      /* Align to sectors. */
+      if (!IS_ALIGNED (offset, VIXDISKLIB_SECTOR_SIZE)) {
+        nbdkit_error ("%s is not aligned to sectors", "read");
+        cmd->error = true;
+        break;
+      }
+      if (!IS_ALIGNED (count, VIXDISKLIB_SECTOR_SIZE)) {
+        nbdkit_error ("%s is not aligned to sectors", "read");
+        cmd->error = true;
+        break;
+      }
+      offset /= VIXDISKLIB_SECTOR_SIZE;
+      count /= VIXDISKLIB_SECTOR_SIZE;
+
+      VDDK_CALL_START (VixDiskLib_ReadAsync,
+                       "handle, %" PRIu64 " sectors, "
+                       "%" PRIu32 " sectors, buffer, callback",
+                       offset, count)
+        err = VixDiskLib_ReadAsync (h->handle, offset, count, buf,
+                                    complete_command, cmd);
+      VDDK_CALL_END (VixDiskLib_ReadAsync, count * VIXDISKLIB_SECTOR_SIZE);
+      if (err != VIX_ASYNC) {
+        VDDK_ERROR (err, "VixDiskLib_ReadAsync");
+        cmd->error = true;
+        break;
+      }
+
+      async = true;             /* Don't retire this command now. */
+      break;
+    }
+
+    case WRITE: {
+      uint32_t count = cmd->count;
+      uint64_t offset = cmd->offset;
+      const void *buf = cmd->ptr;
+
+      /* Align to sectors. */
+      if (!IS_ALIGNED (offset, VIXDISKLIB_SECTOR_SIZE)) {
+        nbdkit_error ("%s is not aligned to sectors", "write");
+        cmd->error = true;
+        break;
+      }
+      if (!IS_ALIGNED (count, VIXDISKLIB_SECTOR_SIZE)) {
+        nbdkit_error ("%s is not aligned to sectors", "write");
+        cmd->error = true;
+        break;
+      }
+      offset /= VIXDISKLIB_SECTOR_SIZE;
+      count /= VIXDISKLIB_SECTOR_SIZE;
+
+      VDDK_CALL_START (VixDiskLib_WriteAsync,
+                       "handle, %" PRIu64 " sectors, "
+                       "%" PRIu32 " sectors, buffer, callback",
+                       offset, count)
+        err = VixDiskLib_WriteAsync (h->handle, offset, count, buf,
+                                     complete_command, cmd);
+      VDDK_CALL_END (VixDiskLib_WriteAsync, count * VIXDISKLIB_SECTOR_SIZE);
+      if (err != VIX_ASYNC) {
+        VDDK_ERROR (err, "VixDiskLib_WriteAsync");
+        cmd->error = true;
+        break;
+      }
+
+      async = true;             /* Don't retire this command now. */
+      break;
+    }
+
+    case FLUSH: {
+      /* The documentation for Flush is missing, but the comment in
+       * the header file seems to indicate that it waits for
+       * WriteAsync commands to finish.  We don't use WriteAsync, and
+       * in any case there's a new function Wait to wait for those.
+       * However I verified using strace that in fact Flush does call
+       * fsync on the file so it appears to be the correct call to use
+       * here.
+       */
+      VDDK_CALL_START (VixDiskLib_Flush, "handle")
+        err = VixDiskLib_Flush (h->handle);
+      VDDK_CALL_END (VixDiskLib_Flush, 0);
+      if (err != VIX_OK) {
+        VDDK_ERROR (err, "VixDiskLib_Flush");
+        cmd->error = true;
+        break;
+      }
+    }
+
+    case CAN_EXTENTS: {
+      VixDiskLibBlockList *block_list;
+
+      /* This call was added in VDDK 6.7.  In earlier versions the
+       * function pointer will be NULL and we cannot query extents.
+       */
+      if (VixDiskLib_QueryAllocatedBlocks == NULL) {
+        nbdkit_debug ("can_extents: VixDiskLib_QueryAllocatedBlocks == NULL, "
+                      "probably this is VDDK < 6.7");
+        *(int *)cmd->ptr = 0;
+        break;
+      }
+
+      /* Suppress errors around this call.  See:
+       * https://bugzilla.redhat.com/show_bug.cgi?id=1709211#c7
+       */
+      error_suppression = 1;
+
+      /* However even when the call is available it rarely works well
+       * so the best thing we can do here is to try the call and if
+       * it's non-functional return false.
+       */
+      VDDK_CALL_START (VixDiskLib_QueryAllocatedBlocks,
+                       "handle, 0, %d sectors, %d sectors",
+                       VIXDISKLIB_MIN_CHUNK_SIZE, VIXDISKLIB_MIN_CHUNK_SIZE)
+        err = VixDiskLib_QueryAllocatedBlocks (h->handle,
+                                               0, VIXDISKLIB_MIN_CHUNK_SIZE,
+                                               VIXDISKLIB_MIN_CHUNK_SIZE,
+                                               &block_list);
+      VDDK_CALL_END (VixDiskLib_QueryAllocatedBlocks, 0);
+      error_suppression = 0;
+      if (err == VIX_OK) {
+        VDDK_CALL_START (VixDiskLib_FreeBlockList, "block_list")
+          VixDiskLib_FreeBlockList (block_list);
+        VDDK_CALL_END (VixDiskLib_FreeBlockList, 0);
+      }
+      if (err != VIX_OK) {
+        char *errmsg = VixDiskLib_GetErrorText (err, NULL);
+        nbdkit_debug ("can_extents: "
+                      "VixDiskLib_QueryAllocatedBlocks test failed, "
+                      "extents support will be disabled: "
+                      "original error: %s",
+                      errmsg);
+        VixDiskLib_FreeErrorText (errmsg);
+        *(int *)cmd->ptr = 0;
+        break;
+      }
+
+      *(int *)cmd->ptr = 1;
+    }
+
+    case EXTENTS: {
+      uint32_t count = cmd->count;
+      uint64_t offset = cmd->offset;
+      bool req_one = cmd->req_one;
+      struct nbdkit_extents *extents = cmd->ptr;
+      uint64_t position, end, start_sector;
+
+      position = offset;
+      end = offset + count;
+
+      /* We can only query whole chunks.  Therefore start with the
+       * first chunk before offset.
+       */
+      start_sector =
+        ROUND_DOWN (offset, VIXDISKLIB_MIN_CHUNK_SIZE * VIXDISKLIB_SECTOR_SIZE)
+        / VIXDISKLIB_SECTOR_SIZE;
+      while (start_sector * VIXDISKLIB_SECTOR_SIZE < end) {
+        uint32_t i;
+        uint64_t nr_chunks, nr_sectors;
+        VixDiskLibBlockList *block_list;
+
+        assert (IS_ALIGNED (start_sector, VIXDISKLIB_MIN_CHUNK_SIZE));
+
+        nr_chunks =
+          ROUND_UP (end - start_sector * VIXDISKLIB_SECTOR_SIZE,
+                    VIXDISKLIB_MIN_CHUNK_SIZE * VIXDISKLIB_SECTOR_SIZE)
+          / (VIXDISKLIB_MIN_CHUNK_SIZE * VIXDISKLIB_SECTOR_SIZE);
+        nr_chunks = MIN (nr_chunks, VIXDISKLIB_MAX_CHUNK_NUMBER);
+        nr_sectors = nr_chunks * VIXDISKLIB_MIN_CHUNK_SIZE;
+
+        VDDK_CALL_START (VixDiskLib_QueryAllocatedBlocks,
+                         "handle, %" PRIu64 " sectors, %" PRIu64 " sectors, "
+                         "%d sectors",
+                         start_sector, nr_sectors, VIXDISKLIB_MIN_CHUNK_SIZE)
+          err = VixDiskLib_QueryAllocatedBlocks (h->handle,
+                                                 start_sector, nr_sectors,
+                                                 VIXDISKLIB_MIN_CHUNK_SIZE,
+                                                 &block_list);
+        VDDK_CALL_END (VixDiskLib_QueryAllocatedBlocks, 0);
+        if (err != VIX_OK) {
+          VDDK_ERROR (err, "VixDiskLib_QueryAllocatedBlocks");
+          cmd->error = true;
+          break;
+        }
+
+        for (i = 0; i < block_list->numBlocks; ++i) {
+          uint64_t blk_offset, blk_length;
+
+          blk_offset = block_list->blocks[i].offset * VIXDISKLIB_SECTOR_SIZE;
+          blk_length = block_list->blocks[i].length * VIXDISKLIB_SECTOR_SIZE;
+
+          /* The query returns allocated blocks.  We must insert holes
+           * between the blocks as necessary.
+           */
+          if ((position < blk_offset &&
+               add_extent (extents, &position, blk_offset, true) == -1) ||
+              (add_extent (extents,
+                           &position, blk_offset + blk_length, false) == -1)) {
+            VDDK_CALL_START (VixDiskLib_FreeBlockList, "block_list")
+              VixDiskLib_FreeBlockList (block_list);
+            VDDK_CALL_END (VixDiskLib_FreeBlockList, 0);
+            cmd->error = true;
+            break;
+          }
+        }
+        VDDK_CALL_START (VixDiskLib_FreeBlockList, "block_list")
+          VixDiskLib_FreeBlockList (block_list);
+        VDDK_CALL_END (VixDiskLib_FreeBlockList, 0);
+
+        /* There's an implicit hole after the returned list of blocks,
+         * up to the end of the QueryAllocatedBlocks request.
+         */
+        if (add_extent (extents,
+                        &position,
+                        (start_sector + nr_sectors) * VIXDISKLIB_SECTOR_SIZE,
+                        true) == -1) {
+          cmd->error = true;
+          break;
+        }
+
+        start_sector += nr_sectors;
+
+        /* If one extent was requested, as long as we've added an extent
+         * overlapping the original offset we're done.
+         */
+        if (req_one && position > offset)
+          break;
+      }
+
+      break;
+    }
+    } /* switch */
+
+    if (!async) {
+      /* For synchronous commands signal the caller thread that the
+       * command has completed.  (Asynchronous commands are completed
+       * in the callback handler).
+       */
+      ACQUIRE_LOCK_FOR_CURRENT_SCOPE (&cmd->mutex);
+      cmd->completed = true;
+      pthread_cond_signal (&cmd->cond);
+    }
+  } /* while (!stop) */
+
+  /* Exit the worker thread. */
+  return NULL;
+}
-- 
2.32.0




More information about the Libguestfs mailing list