[Libguestfs] [PATCH libnbd 5/8] copy: Introduce worker struct

Nir Soffer nsoffer at redhat.com
Sun Feb 20 12:14:00 UTC 2022


I want to keep more info per worker, and using a worker struct is the
natural way to do this. This also allows cleaning up the ops-* interface
which accepted uintptr_t index while the index is never a pointer. I
think the pointer is a result of passing the index to the thread using
the void* pointer.

The worker struct is used only by the multi-threading-copy module, but
in future patch I want to keep the worker pointer in the command, to
allow commands to update worker state when they finish.

Signed-off-by: Nir Soffer <nsoffer at redhat.com>
---
 copy/file-ops.c             |  4 +--
 copy/main.c                 |  6 ++---
 copy/multi-thread-copying.c | 49 +++++++++++++++++++------------------
 copy/nbd-ops.c              | 10 ++++----
 copy/nbdcopy.h              | 24 +++++++++++-------
 copy/null-ops.c             |  4 +--
 copy/pipe-ops.c             |  2 +-
 7 files changed, 53 insertions(+), 46 deletions(-)

diff --git a/copy/file-ops.c b/copy/file-ops.c
index aaf04ade..ab378754 100644
--- a/copy/file-ops.c
+++ b/copy/file-ops.c
@@ -614,27 +614,27 @@ file_asynch_zero (struct rw *rw, struct command *command,
 {
   int dummy = 0;
 
   if (!file_synch_zero (rw, command->offset, command->slice.len, allocate))
     return false;
   cb.callback (cb.user_data, &dummy);
   return true;
 }
 
 static unsigned
-file_in_flight (struct rw *rw, uintptr_t index)
+file_in_flight (struct rw *rw, size_t index)
 {
   return 0;
 }
 
 static void
-file_get_extents (struct rw *rw, uintptr_t index,
+file_get_extents (struct rw *rw, size_t index,
                   uint64_t offset, uint64_t count,
                   extent_list *ret)
 {
   ret->len = 0;
 
 #ifdef SEEK_HOLE
   struct rw_file *rwf = (struct rw_file *)rw;
   static pthread_mutex_t lseek_lock = PTHREAD_MUTEX_INITIALIZER;
 
   if (rwf->seek_hole_supported) {
diff --git a/copy/main.c b/copy/main.c
index 67788b5d..390de1eb 100644
--- a/copy/main.c
+++ b/copy/main.c
@@ -513,44 +513,44 @@ print_rw (struct rw *rw, const char *prefix, FILE *fp)
 
   fprintf (fp, "%s: %s \"%s\"\n", prefix, rw->ops->ops_name, rw->name);
   fprintf (fp, "%s: size=%" PRIi64 " (%s)\n",
            prefix, rw->size, human_size (buf, rw->size, NULL));
 }
 
 /* Default implementation of rw->ops->get_extents for backends which
  * don't/can't support extents.  Also used for the --no-extents case.
  */
 void
-default_get_extents (struct rw *rw, uintptr_t index,
+default_get_extents (struct rw *rw, size_t index,
                      uint64_t offset, uint64_t count,
                      extent_list *ret)
 {
   struct extent e;
 
   ret->len = 0;
 
   e.offset = offset;
   e.length = count;
   e.zero = false;
   if (extent_list_append (ret, e) == -1) {
     perror ("realloc");
     exit (EXIT_FAILURE);
   }
 }
 
 /* Implementations of get_polling_fd and asynch_notify_* for backends
  * which don't support polling.
  */
 void
-get_polling_fd_not_supported (struct rw *rw, uintptr_t index,
+get_polling_fd_not_supported (struct rw *rw, size_t index,
                               int *fd_rtn, int *direction_rtn)
 {
   /* Not an error, this causes poll to ignore the fd. */
   *fd_rtn = -1;
   *direction_rtn = LIBNBD_AIO_DIRECTION_READ;
 }
 
 void
-asynch_notify_read_write_not_supported (struct rw *rw, uintptr_t index)
+asynch_notify_read_write_not_supported (struct rw *rw, size_t index)
 {
   /* nothing */
 }
diff --git a/copy/multi-thread-copying.c b/copy/multi-thread-copying.c
index aa6a9f41..a1a8d09c 100644
--- a/copy/multi-thread-copying.c
+++ b/copy/multi-thread-copying.c
@@ -70,184 +70,185 @@ get_next_offset (uint64_t *offset, uint64_t *count)
      * the commands.  We might move this into a callback, but those
      * are called from threads and not necessarily in monotonic order
      * so the progress bar would move erratically.
      */
     progress_bar (*offset, src->size);
   }
   pthread_mutex_unlock (&lock);
   return r;
 }
 
-static void *worker_thread (void *ip);
+static void *worker_thread (void *wp);
 
 void
 multi_thread_copying (void)
 {
-  pthread_t *workers;
+  struct worker *workers;
   size_t i;
   int err;
 
   /* Some invariants that should be true if the main program called us
    * correctly.
    */
   assert (threads > 0);
   assert (threads == connections);
 /*
   if (src.ops == &nbd_ops)
     assert (src.u.nbd.handles.size == connections);
   if (dst.ops == &nbd_ops)
     assert (dst.u.nbd.handles.size == connections);
 */
   assert (src->size != -1);
 
-  workers = malloc (sizeof (pthread_t) * threads);
+  workers = calloc (threads, sizeof *workers);
   if (workers == NULL) {
-    perror ("malloc");
+    perror ("calloc");
     exit (EXIT_FAILURE);
   }
 
   /* Start the worker threads. */
   for (i = 0; i < threads; ++i) {
-    err = pthread_create (&workers[i], NULL, worker_thread,
-                          (void *)(uintptr_t)i);
+    workers[i].index = i;
+    err = pthread_create (&workers[i].thread, NULL, worker_thread,
+                          &workers[i]);
     if (err != 0) {
       errno = err;
       perror ("pthread_create");
       exit (EXIT_FAILURE);
     }
   }
 
   /* Wait until all worker threads exit. */
   for (i = 0; i < threads; ++i) {
-    err = pthread_join (workers[i], NULL);
+    err = pthread_join (workers[i].thread, NULL);
     if (err != 0) {
       errno = err;
       perror ("pthread_join");
       exit (EXIT_FAILURE);
     }
   }
 
   free (workers);
 }
 
-static void wait_for_request_slots (uintptr_t index);
-static unsigned in_flight (uintptr_t index);
-static void poll_both_ends (uintptr_t index);
+static void wait_for_request_slots (size_t index);
+static unsigned in_flight (size_t index);
+static void poll_both_ends (size_t index);
 static int finished_read (void *vp, int *error);
 static int finished_command (void *vp, int *error);
 static void free_command (struct command *command);
 static void fill_dst_range_with_zeroes (struct command *command);
 static struct command *create_command (uint64_t offset, size_t len, bool zero,
-                                       uintptr_t index);
+                                       size_t index);
 
 /* There are 'threads' worker threads, each copying work ranges from
  * src to dst until there are no more work ranges.
  */
 static void *
-worker_thread (void *indexp)
+worker_thread (void *wp)
 {
-  uintptr_t index = (uintptr_t) indexp;
+  struct worker *w = wp;
   uint64_t offset, count;
   extent_list exts = empty_vector;
 
   while (get_next_offset (&offset, &count)) {
     size_t i;
 
     assert (0 < count && count <= THREAD_WORK_SIZE);
     if (extents)
-      src->ops->get_extents (src, index, offset, count, &exts);
+      src->ops->get_extents (src, w->index, offset, count, &exts);
     else
-      default_get_extents (src, index, offset, count, &exts);
+      default_get_extents (src, w->index, offset, count, &exts);
 
     for (i = 0; i < exts.len; ++i) {
       struct command *command;
       size_t len;
 
       if (exts.ptr[i].zero) {
         /* The source is zero so we can proceed directly to skipping,
          * fast zeroing, or writing zeroes at the destination.
          */
         command = create_command (exts.ptr[i].offset, exts.ptr[i].length,
-                                  true, index);
+                                  true, w->index);
         fill_dst_range_with_zeroes (command);
       }
 
       else /* data */ {
         /* As the extent might be larger than permitted for a single
          * command, we may have to split this into multiple read
          * requests.
          */
         while (exts.ptr[i].length > 0) {
           len = exts.ptr[i].length;
           if (len > request_size)
             len = request_size;
 
           command = create_command (exts.ptr[i].offset, len,
-                                    false, index);
+                                    false, w->index);
 
-          wait_for_request_slots (index);
+          wait_for_request_slots (w->index);
 
           /* Begin the asynch read operation. */
           src->ops->asynch_read (src, command,
                                  (nbd_completion_callback) {
                                    .callback = finished_read,
                                    .user_data = command,
                                  });
 
           exts.ptr[i].offset += len;
           exts.ptr[i].length -= len;
         }
       }
 
       offset += count;
       count = 0;
     } /* for extents */
   }
 
   /* Wait for in flight NBD requests to finish. */
-  while (in_flight (index) > 0)
-    poll_both_ends (index);
+  while (in_flight (w->index) > 0)
+    poll_both_ends (w->index);
 
   free (exts.ptr);
   return NULL;
 }
 
 /* If the number of requests in flight exceeds the limit, poll
  * waiting for at least one request to finish.  This enforces
  * the user --requests option.
  *
  * NB: Unfortunately it's not possible to call this from a callback,
  * since it will deadlock trying to grab the libnbd handle lock.  This
  * means that although the worker thread calls this and enforces the
  * limit, when we split up requests into subrequests (eg. doing
  * sparseness detection) we will probably exceed the user request
  * limit. XXX
  */
 static void
-wait_for_request_slots (uintptr_t index)
+wait_for_request_slots (size_t index)
 {
   while (in_flight (index) >= max_requests)
     poll_both_ends (index);
 }
 
 /* Count the number of asynchronous commands in flight. */
 static unsigned
-in_flight (uintptr_t index)
+in_flight (size_t index)
 {
   return src->ops->in_flight (src, index) + dst->ops->in_flight (dst, index);
 }
 
 /* Poll (optional) NBD src and NBD dst, moving the state machine(s)
  * along.  This is a lightly modified nbd_poll.
  */
 static void
-poll_both_ends (uintptr_t index)
+poll_both_ends (size_t index)
 {
   struct pollfd fds[2];
   int r, direction;
 
   memset (fds, 0, sizeof fds);
 
   /* Note: if polling is not supported, this function will
    * set fd == -1 which poll ignores.
    */
   src->ops->get_polling_fd (src, index, &fds[0].fd, &direction);
@@ -331,21 +332,21 @@ create_buffer (size_t len)
     exit (EXIT_FAILURE);
   }
 
   buffer->refs = 1;
 
   return buffer;
 }
 
 /* Create a new command for read or zero. */
 static struct command *
-create_command (uint64_t offset, size_t len, bool zero, uintptr_t index)
+create_command (uint64_t offset, size_t len, bool zero, size_t index)
 {
   struct command *command;
 
   command = calloc (1, sizeof *command);
   if (command == NULL) {
     perror ("calloc");
     exit (EXIT_FAILURE);
   }
 
   command->offset = offset;
diff --git a/copy/nbd-ops.c b/copy/nbd-ops.c
index 10551d3a..dca86e88 100644
--- a/copy/nbd-ops.c
+++ b/copy/nbd-ops.c
@@ -377,32 +377,32 @@ add_extent (void *vp, const char *metacontext,
       exit (EXIT_FAILURE);
     }
 
     offset += entries[i];
   }
 
   return 0;
 }
 
 static unsigned
-nbd_ops_in_flight (struct rw *rw, uintptr_t index)
+nbd_ops_in_flight (struct rw *rw, size_t index)
 {
   struct rw_nbd *rwn = (struct rw_nbd *) rw;
 
   /* Since the commands are auto-retired in the callbacks we don't
    * need to count "done" commands.
    */
   return nbd_aio_in_flight (rwn->handles.ptr[index]);
 }
 
 static void
-nbd_ops_get_polling_fd (struct rw *rw, uintptr_t index,
+nbd_ops_get_polling_fd (struct rw *rw, size_t index,
                         int *fd, int *direction)
 {
   struct rw_nbd *rwn = (struct rw_nbd *) rw;
   struct nbd_handle *nbd;
 
   nbd = rwn->handles.ptr[index];
 
   *fd = nbd_aio_get_fd (nbd);
   if (*fd == -1)
     goto error;
@@ -412,47 +412,47 @@ nbd_ops_get_polling_fd (struct rw *rw, uintptr_t index,
     goto error;
 
   return;
 
 error:
   fprintf (stderr, "%s: %s\n", rw->name, nbd_get_error ());
   exit (EXIT_FAILURE);
 }
 
 static void
-nbd_ops_asynch_notify_read (struct rw *rw, uintptr_t index)
+nbd_ops_asynch_notify_read (struct rw *rw, size_t index)
 {
   struct rw_nbd *rwn = (struct rw_nbd *) rw;
   if (nbd_aio_notify_read (rwn->handles.ptr[index]) == -1) {
     fprintf (stderr, "%s: %s\n", rw->name, nbd_get_error ());
     exit (EXIT_FAILURE);
   }
 }
 
 static void
-nbd_ops_asynch_notify_write (struct rw *rw, uintptr_t index)
+nbd_ops_asynch_notify_write (struct rw *rw, size_t index)
 {
   struct rw_nbd *rwn = (struct rw_nbd *) rw;
   if (nbd_aio_notify_write (rwn->handles.ptr[index]) == -1) {
     fprintf (stderr, "%s: %s\n", rw->name, nbd_get_error ());
     exit (EXIT_FAILURE);
   }
 }
 
 /* This is done synchronously, but that's fine because commands from
  * the previous work range in flight continue to run, it's difficult
  * to (sanely) start new work until we have the full list of extents,
  * and in almost every case the remote NBD server can answer our
  * request for extents in a single round trip.
  */
 static void
-nbd_ops_get_extents (struct rw *rw, uintptr_t index,
+nbd_ops_get_extents (struct rw *rw, size_t index,
                      uint64_t offset, uint64_t count,
                      extent_list *ret)
 {
   struct rw_nbd *rwn = (struct rw_nbd *) rw;
   extent_list exts = empty_vector;
   struct nbd_handle *nbd;
 
   nbd = rwn->handles.ptr[index];
 
   ret->len = 0;
diff --git a/copy/nbdcopy.h b/copy/nbdcopy.h
index c070f8d7..4fe8bee6 100644
--- a/copy/nbdcopy.h
+++ b/copy/nbdcopy.h
@@ -71,36 +71,42 @@ struct buffer {
 struct slice {
   size_t len;                   /* Length of slice. */
   size_t base;                  /* Start of slice relative to buffer. */
   struct buffer *buffer;        /* Underlying allocation (may be shared
                                  * or NULL).
                                  */
 };
 
 #define slice_ptr(slice) ((slice).buffer->data + (slice).base)
 
+/* Worker state used by multi-threaded copying. */
+struct worker {
+  pthread_t thread;
+  size_t index;
+};
+
 /* Commands for asynchronous operations in flight.
  *
  * We don't store the command type (read/write/zero/etc) because it is
  * implicit in the function being called and because commands
  * naturally change from read -> write/zero/etc as they progress.
  *
  * slice.buffer may be NULL for commands (like zero) that have no
  * associated data.
  *
  * A separate set of commands, slices and buffers is maintained per
  * thread so no locking is necessary.
  */
 struct command {
   uint64_t offset;              /* Offset relative to start of disk. */
   struct slice slice;           /* Data slice. */
-  uintptr_t index;              /* Thread number. */
+  size_t index;                 /* Thread number. */
 };
 
 /* List of extents for rw->ops->get_extents. */
 struct extent {
   uint64_t offset;
   uint64_t length;
   bool zero;
 };
 DEFINE_VECTOR_TYPE(extent_list, struct extent);
 
@@ -173,51 +179,51 @@ struct rw_ops {
                         struct command *command,
                         nbd_completion_callback cb);
 
   /* Asynchronously zero.  command->slice.buffer is not used.  If not possible,
    * returns false.  'cb' must be called only if returning true.
    */
   bool (*asynch_zero) (struct rw *rw, struct command *command,
                        nbd_completion_callback cb, bool allocate);
 
   /* Number of asynchronous commands in flight for a particular thread. */
-  unsigned (*in_flight) (struct rw *rw, uintptr_t index);
+  unsigned (*in_flight) (struct rw *rw, size_t index);
 
   /* Get polling file descriptor and direction, and notify read/write.
    * For sources which cannot be polled (such as files and pipes)
    * get_polling_fd returns fd == -1 (NOT an error), and the
    * asynch_notify_* functions are no-ops.
    */
-  void (*get_polling_fd) (struct rw *rw, uintptr_t index,
+  void (*get_polling_fd) (struct rw *rw, size_t index,
                           int *fd_rtn, int *direction_rtn);
-  void (*asynch_notify_read) (struct rw *rw, uintptr_t index);
-  void (*asynch_notify_write) (struct rw *rw, uintptr_t index);
+  void (*asynch_notify_read) (struct rw *rw, size_t index);
+  void (*asynch_notify_write) (struct rw *rw, size_t index);
 
   /* Read base:allocation extents metadata for a region of the source.
    * For local files the same information is read from the kernel.
    *
    * Note that qemu-img fetches extents for the entire disk up front,
    * and we want to avoid doing that because it had very negative
    * behaviour for certain sources (ie. VDDK).
    */
-  void (*get_extents) (struct rw *rw, uintptr_t index,
+  void (*get_extents) (struct rw *rw, size_t index,
                        uint64_t offset, uint64_t count,
                        extent_list *ret);
 };
 
-extern void default_get_extents (struct rw *rw, uintptr_t index,
+extern void default_get_extents (struct rw *rw, size_t index,
                                  uint64_t offset, uint64_t count,
                                  extent_list *ret);
-extern void get_polling_fd_not_supported (struct rw *rw, uintptr_t index,
+extern void get_polling_fd_not_supported (struct rw *rw, size_t index,
                                           int *fd_rtn, int *direction_rtn);
 extern void asynch_notify_read_write_not_supported (struct rw *rw,
-                                                    uintptr_t index);
+                                                    size_t index);
 
 extern bool allocated;
 extern unsigned connections;
 extern bool destination_is_zero;
 extern bool extents;
 extern bool flush;
 extern unsigned max_requests;
 extern bool progress;
 extern int progress_fd;
 extern unsigned request_size;
diff --git a/copy/null-ops.c b/copy/null-ops.c
index 5f1fda50..1218a623 100644
--- a/copy/null-ops.c
+++ b/copy/null-ops.c
@@ -126,27 +126,27 @@ static bool
 null_asynch_zero (struct rw *rw, struct command *command,
                   nbd_completion_callback cb, bool allocate)
 {
   int dummy = 0;
 
   cb.callback (cb.user_data, &dummy);
   return true;
 }
 
 static unsigned
-null_in_flight (struct rw *rw, uintptr_t index)
+null_in_flight (struct rw *rw, size_t index)
 {
   return 0;
 }
 
 static void
-null_get_extents (struct rw *rw, uintptr_t index,
+null_get_extents (struct rw *rw, size_t index,
                   uint64_t offset, uint64_t count,
                   extent_list *ret)
 {
   abort ();
 }
 
 static struct rw_ops null_ops = {
   .ops_name = "null_ops",
   .close = null_close,
   .is_read_only = null_is_read_only,
diff --git a/copy/pipe-ops.c b/copy/pipe-ops.c
index f9b8599a..3c8b6c2b 100644
--- a/copy/pipe-ops.c
+++ b/copy/pipe-ops.c
@@ -147,21 +147,21 @@ pipe_asynch_write (struct rw *rw,
 }
 
 static bool
 pipe_asynch_zero (struct rw *rw, struct command *command,
                        nbd_completion_callback cb, bool allocate)
 {
   return false; /* not supported by pipes */
 }
 
 static unsigned
-pipe_in_flight (struct rw *rw, uintptr_t index)
+pipe_in_flight (struct rw *rw, size_t index)
 {
   return 0;
 }
 
 static struct rw_ops pipe_ops = {
   .ops_name = "pipe_ops",
 
   .close = pipe_close,
 
   .is_read_only = pipe_is_read_only,
-- 
2.35.1




More information about the Libguestfs mailing list