[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]

[Libguestfs] [PATCH libnbd] copy: Refactor ‘struct rw’.



Make this a (fairly) abstract structure.  At least hide the subtype
fields from the main program.  This change is pure refactoring and
doesn’t change the semantics.
---
 copy/file-ops.c             | 164 +++++++++++++++++--
 copy/main.c                 | 315 ++++++++----------------------------
 copy/multi-thread-copying.c | 104 ++++++------
 copy/nbd-ops.c              | 248 +++++++++++++++++++++++++---
 copy/nbdcopy.h              |  53 +++---
 copy/null-ops.c             |  50 +++++-
 copy/pipe-ops.c             |  64 +++++++-
 copy/synch-copying.c        |  30 ++--
 8 files changed, 649 insertions(+), 379 deletions(-)

diff --git a/copy/file-ops.c b/copy/file-ops.c
index 2a239d0..81b08ce 100644
--- a/copy/file-ops.c
+++ b/copy/file-ops.c
@@ -36,35 +36,159 @@
 #include "isaligned.h"
 #include "nbdcopy.h"
 
+static struct rw_ops file_ops;
+
+struct rw_file {
+  struct rw rw;
+  int fd;
+  struct stat stat;
+  bool seek_hole_supported;
+  int sector_size;
+};
+
+static bool
+seek_hole_supported (int fd)
+{
+#ifndef SEEK_HOLE
+  return false;
+#else
+  off_t r = lseek (fd, 0, SEEK_HOLE);
+  return r >= 0;
+#endif
+}
+
+struct rw *
+file_create (const char *name, const struct stat *stat, int fd)
+{
+  struct rw_file *rwf = calloc (1, sizeof *rwf);
+  if (rwf == NULL) { perror ("calloc"); exit (EXIT_FAILURE); }
+
+  rwf->rw.ops = &file_ops;
+  rwf->rw.name = name;
+  rwf->stat = *stat;
+  rwf->fd = fd;
+
+  if (S_ISBLK (stat->st_mode)) {
+    /* Block device. */
+    rwf->rw.size = lseek (fd, 0, SEEK_END);
+    if (rwf->rw.size == -1) {
+      perror ("lseek");
+      exit (EXIT_FAILURE);
+    }
+    if (lseek (fd, 0, SEEK_SET) == -1) {
+      perror ("lseek");
+      exit (EXIT_FAILURE);
+    }
+    rwf->seek_hole_supported = seek_hole_supported (fd);
+    rwf->sector_size = 4096;
+#ifdef BLKSSZGET
+    if (ioctl (fd, BLKSSZGET, &rwf->sector_size))
+      fprintf (stderr, "warning: cannot get sector size: %s: %m", name);
+#endif
+  }
+  else if (S_ISREG (stat->st_mode)) {
+    /* Regular file. */
+    rwf->rw.size = stat->st_size;
+    rwf->seek_hole_supported = seek_hole_supported (fd);
+  }
+  else
+    abort ();
+
+  return (struct rw *) rwf;
+}
+
 static void
 file_close (struct rw *rw)
 {
-  if (close (rw->u.local.fd) == -1) {
+  struct rw_file *rwf = (struct rw_file *)rw;
+
+  if (close (rwf->fd) == -1) {
     fprintf (stderr, "%s: close: %m\n", rw->name);
     exit (EXIT_FAILURE);
   }
+  free (rw);
+}
+
+static void
+file_truncate (struct rw *rw, int64_t size)
+{
+  struct rw_file *rwf = (struct rw_file *) rw;
+
+  /* If the destination is an ordinary file then the original file
+   * size doesn't matter.  Truncate it to the source size.  But
+   * truncate it to zero first so the file is completely empty and
+   * sparse.
+   */
+  if (! S_ISREG (rwf->stat.st_mode))
+    return;
+
+  if (ftruncate (rwf->fd, 0) == -1 ||
+      ftruncate (rwf->fd, size) == -1) {
+    perror ("truncate");
+    exit (EXIT_FAILURE);
+  }
+  rwf->rw.size = size;
+
+  /* We can assume the destination is zero. */
+  destination_is_zero = true;
 }
 
 static void
 file_flush (struct rw *rw)
 {
-  if ((S_ISREG (rw->u.local.stat.st_mode) ||
-       S_ISBLK (rw->u.local.stat.st_mode)) &&
-      fsync (rw->u.local.fd) == -1) {
+  struct rw_file *rwf = (struct rw_file *)rw;
+
+  if ((S_ISREG (rwf->stat.st_mode) ||
+       S_ISBLK (rwf->stat.st_mode)) &&
+      fsync (rwf->fd) == -1) {
     perror (rw->name);
     exit (EXIT_FAILURE);
   }
 }
 
+static bool
+file_is_read_only (struct rw *rw)
+{
+  /* Permissions are hard, and this is only used as an early check
+   * before the copy.  Proceed with the copy and fail if it fails.
+   */
+  return false;
+}
+
+static bool
+file_can_extents (struct rw *rw)
+{
+#ifdef SEEK_HOLE
+  return true;
+#else
+  return false;
+#endif
+}
+
+static bool
+file_can_multi_conn (struct rw *rw)
+{
+  return true;
+}
+
+static void
+file_start_multi_conn (struct rw *rw)
+{
+  /* Don't need to do anything for files since we can read/write on a
+   * single file descriptor.
+   */
+}
+
 static size_t
 file_synch_read (struct rw *rw,
                  void *data, size_t len, uint64_t offset)
 {
+  struct rw_file *rwf = (struct rw_file *)rw;
   size_t n = 0;
   ssize_t r;
 
   while (len > 0) {
-    r = pread (rw->u.local.fd, data, len, offset);
+    r = pread (rwf->fd, data, len, offset);
     if (r == -1) {
       perror (rw->name);
       exit (EXIT_FAILURE);
@@ -85,10 +209,11 @@ static void
 file_synch_write (struct rw *rw,
                   const void *data, size_t len, uint64_t offset)
 {
+  struct rw_file *rwf = (struct rw_file *)rw;
   ssize_t r;
 
   while (len > 0) {
-    r = pwrite (rw->u.local.fd, data, len, offset);
+    r = pwrite (rwf->fd, data, len, offset);
     if (r == -1) {
       perror (rw->name);
       exit (EXIT_FAILURE);
@@ -103,7 +228,8 @@ static bool
 file_synch_trim (struct rw *rw, uint64_t offset, uint64_t count)
 {
 #ifdef FALLOC_FL_PUNCH_HOLE
-  int fd = rw->u.local.fd;
+  struct rw_file *rwf = (struct rw_file *)rw;
+  int fd = rwf->fd;
   int r;
 
   r = fallocate (fd, FALLOC_FL_PUNCH_HOLE | FALLOC_FL_KEEP_SIZE,
@@ -121,9 +247,11 @@ file_synch_trim (struct rw *rw, uint64_t offset, uint64_t count)
 static bool
 file_synch_zero (struct rw *rw, uint64_t offset, uint64_t count)
 {
-  if (S_ISREG (rw->u.local.stat.st_mode)) {
+  struct rw_file *rwf = (struct rw_file *)rw;
+
+  if (S_ISREG (rwf->stat.st_mode)) {
 #ifdef FALLOC_FL_ZERO_RANGE
-    int fd = rw->u.local.fd;
+    int fd = rwf->fd;
     int r;
 
     r = fallocate (fd, FALLOC_FL_ZERO_RANGE, offset, count);
@@ -134,10 +262,10 @@ file_synch_zero (struct rw *rw, uint64_t offset, uint64_t count)
     return true;
 #endif
   }
-  else if (S_ISBLK (rw->u.local.stat.st_mode) &&
-           IS_ALIGNED (offset | count, rw->u.local.sector_size)) {
+  else if (S_ISBLK (rwf->stat.st_mode) &&
+           IS_ALIGNED (offset | count, rwf->sector_size)) {
 #ifdef BLKZEROOUT
-    int fd = rw->u.local.fd;
+    int fd = rwf->fd;
     int r;
     uint64_t range[2] = {offset, count};
 
@@ -223,11 +351,12 @@ file_get_extents (struct rw *rw, uintptr_t index,
   ret->size = 0;
 
 #ifdef SEEK_HOLE
+  struct rw_file *rwf = (struct rw_file *)rw;
   static pthread_mutex_t lseek_lock = PTHREAD_MUTEX_INITIALIZER;
 
-  if (rw->u.local.seek_hole_supported) {
+  if (rwf->seek_hole_supported) {
     uint64_t end = offset + count;
-    int fd = rw->u.local.fd;
+    int fd = rwf->fd;
     off_t pos;
     struct extent e;
     size_t last;
@@ -302,9 +431,14 @@ file_get_extents (struct rw *rw, uintptr_t index,
   default_get_extents (rw, index, offset, count, ret);
 }
 
-struct rw_ops file_ops = {
+static struct rw_ops file_ops = {
   .ops_name = "file_ops",
   .close = file_close,
+  .is_read_only = file_is_read_only,
+  .can_extents = file_can_extents,
+  .can_multi_conn = file_can_multi_conn,
+  .start_multi_conn = file_start_multi_conn,
+  .truncate = file_truncate,
   .flush = file_flush,
   .synch_read = file_synch_read,
   .synch_write = file_synch_write,
diff --git a/copy/main.c b/copy/main.c
index 68a6030..6fdc6fd 100644
--- a/copy/main.c
+++ b/copy/main.c
@@ -53,19 +53,11 @@ int progress_fd = -1;           /* --progress=FD */
 unsigned sparse_size = 4096;    /* --sparse */
 bool synchronous;               /* --synchronous flag */
 unsigned threads;               /* --threads */
-struct rw src, dst;             /* The source and destination. */
+struct rw *src, *dst;           /* The source and destination. */
 bool verbose;                   /* --verbose flag */
 
 static bool is_nbd_uri (const char *s);
-static bool seek_hole_supported (int fd);
-static void open_null (struct rw *rw);
-static int open_local (const char *prog,
-                       const char *filename, bool writing, struct rw *rw);
-static void open_nbd_uri (const char *prog,
-                          const char *uri, bool writing, struct rw *rw);
-static void open_nbd_subprocess (const char *prog,
-                                 const char **argv, size_t argc,
-                                 bool writing, struct rw *rw);
+static struct rw *open_local (const char *filename, bool writing);
 static void print_rw (struct rw *rw, const char *prefix, FILE *fp);
 
 static void __attribute__((noreturn))
@@ -242,18 +234,18 @@ main (int argc, char *argv[])
 
   found1:
     connections = 1;            /* multi-conn not supported */
-    src.name = argv[optind+1];
-    open_nbd_subprocess (argv[0],
-                         (const char **) &argv[optind+1], i-optind-1,
-                         false, &src);
+    src =
+      nbd_rw_create_subprocess ((const char **) &argv[optind+1], i-optind-1,
+                                false);
     optind = i+1;
   }
   else {                        /* Source is not [...]. */
-    src.name = argv[optind++];
-    if (! is_nbd_uri (src.name))
-      src.u.local.fd = open_local (argv[0], src.name, false, &src);
+    const char *src_name = argv[optind++];
+
+    if (! is_nbd_uri (src_name))
+      src = open_local (src_name, false);
     else
-      open_nbd_uri (argv[0], src.name, false, &src);
+      src = nbd_rw_create_uri (src_name, src_name, false);
   }
 
   if (optind >= argc)
@@ -267,48 +259,46 @@ main (int argc, char *argv[])
 
   found2:
     connections = 1;            /* multi-conn not supported */
-    dst.name = argv[optind+1];
-    open_nbd_subprocess (argv[0],
-                         (const char **) &argv[optind+1], i-optind-1,
-                         true, &dst);
+    dst =
+      nbd_rw_create_subprocess ((const char **) &argv[optind+1], i-optind-1,
+                                true);
     optind = i+1;
   }
   else {                        /* Destination is not [...] */
-    dst.name = argv[optind++];
-    if (strcmp (dst.name, "null:") == 0)
-      open_null (&dst);
-    else if (! is_nbd_uri (dst.name))
-      dst.u.local.fd = open_local (argv[0], dst.name, true /* writing */, &dst);
-    else {
-      open_nbd_uri (argv[0], dst.name, true, &dst);
+    const char *dst_name = argv[optind++];
 
-      /* Obviously this is not going to work if the server is
-       * advertising read-only, so fail early with a nice error message.
-       */
-      if (nbd_is_read_only (dst.u.nbd.handles.ptr[0])) {
-        fprintf (stderr, "%s: %s: "
-                 "this NBD server is read-only, cannot write to it\n",
-                 argv[0], dst.name);
-        exit (EXIT_FAILURE);
-      }
-    }
+    if (strcmp (dst_name, "null:") == 0)
+      dst = null_create (dst_name);
+    else if (! is_nbd_uri (dst_name))
+      dst = open_local (dst_name, true /* writing */);
+    else
+      dst = nbd_rw_create_uri (dst_name, dst_name, true);
   }
 
   /* There must be no extra parameters. */
   if (optind != argc)
     usage (stderr, EXIT_FAILURE);
 
-  /* Check we've set the fields of src and dst. */
-  assert (src.ops);
-  assert (src.name);
-  assert (dst.ops);
-  assert (dst.name);
+  /* Check we've created src and dst and set the expected fields. */
+  assert (src != NULL);
+  assert (dst != NULL);
+  assert (src->ops != NULL);
+  assert (src->name != NULL);
+  assert (dst->ops != NULL);
+  assert (dst->name != NULL);
+
+  /* Obviously this is not going to work if the destination is
+   * read-only, so fail early with a nice error message.
+   */
+  if (dst->ops->is_read_only (dst)) {
+    fprintf (stderr, "%s: %s: "
+             "the destination is read-only, cannot write to it\n",
+             argv[0], dst->name);
+    exit (EXIT_FAILURE);
+  }
 
   /* If multi-conn is not supported, force connections to 1. */
-  if ((src.ops == &nbd_ops &&
-       ! nbd_can_multi_conn (src.u.nbd.handles.ptr[0])) ||
-      (dst.ops == &nbd_ops &&
-       ! nbd_can_multi_conn (dst.u.nbd.handles.ptr[0])))
+  if (! src->ops->can_multi_conn (src) || ! dst->ops->can_multi_conn (dst))
     connections = 1;
 
   /* Calculate the number of threads from the number of connections. */
@@ -335,44 +325,17 @@ main (int argc, char *argv[])
   if (threads < connections)
     connections = threads;
 
-  /* Calculate the source and destination sizes.  We set these to -1
-   * if the size is not known (because it's a stream).  Note that for
-   * local types, open_local set something in *.size already.
+  /* Truncate the destination to the same size as the source.  Only
+   * has an effect on regular files.
    */
-  if (src.ops == &nbd_ops) {
-    src.size = nbd_get_size (src.u.nbd.handles.ptr[0]);
-    if (src.size == -1) {
-      fprintf (stderr, "%s: %s: %s\n", argv[0], src.name, nbd_get_error ());
-      exit (EXIT_FAILURE);
-    }
-  }
-  if (dst.ops != &nbd_ops && S_ISREG (dst.u.local.stat.st_mode)) {
-    /* If the destination is an ordinary file then the original file
-     * size doesn't matter.  Truncate it to the source size.  But
-     * truncate it to zero first so the file is completely empty and
-     * sparse.
-     */
-    dst.size = src.size;
-    if (ftruncate (dst.u.local.fd, 0) == -1 ||
-        ftruncate (dst.u.local.fd, dst.size) == -1) {
-      perror ("truncate");
-      exit (EXIT_FAILURE);
-    }
-    destination_is_zero = true;
-  }
-  else if (dst.ops == &nbd_ops) {
-    dst.size = nbd_get_size (dst.u.nbd.handles.ptr[0]);
-    if (dst.size == -1) {
-      fprintf (stderr, "%s: %s: %s\n", argv[0], dst.name, nbd_get_error ());
-      exit (EXIT_FAILURE);
-    }
-  }
+  if (dst->ops->truncate)
+    dst->ops->truncate (dst, src->size);
 
   /* Check if the source is bigger than the destination, since that
    * would truncate (ie. lose) data.  Copying from smaller to larger
    * is OK.
    */
-  if (src.size >= 0 && dst.size >= 0 && src.size > dst.size) {
+  if (src->size >= 0 && dst->size >= 0 && src->size > dst->size) {
     fprintf (stderr,
              "nbdcopy: error: destination size is smaller than source size\n");
     exit (EXIT_FAILURE);
@@ -383,37 +346,29 @@ main (int argc, char *argv[])
    * settings.
    */
   if (verbose) {
-    print_rw (&src, "nbdcopy: src", stderr);
-    print_rw (&dst, "nbdcopy: dst", stderr);
+    print_rw (src, "nbdcopy: src", stderr);
+    print_rw (dst, "nbdcopy: dst", stderr);
     fprintf (stderr, "nbdcopy: connections=%u requests=%u threads=%u "
              "synchronous=%s\n",
              connections, max_requests, threads,
              synchronous ? "true" : "false");
   }
 
-  /* If #connections > 1 then multi-conn is enabled at both ends and
-   * we need to open further connections.
+  /* If multi-conn is enabled on either side, then at this point we
+   * need to ask the backend to open the extra connections.
    */
   if (connections > 1) {
     assert (threads == connections);
-
-    if (src.ops == &nbd_ops) {
-      for (i = 1; i < connections; ++i)
-        open_nbd_uri (argv[0], src.name, false, &src);
-      assert (src.u.nbd.handles.size == connections);
-    }
-    if (dst.ops == &nbd_ops) {
-      for (i = 1; i < connections; ++i)
-        open_nbd_uri (argv[0], dst.name, true, &dst);
-      assert (dst.u.nbd.handles.size == connections);
-    }
+    if (src->ops->can_multi_conn (src))
+      src->ops->start_multi_conn (src);
+    if (dst->ops->can_multi_conn (dst))
+      dst->ops->start_multi_conn (dst);
   }
 
   /* If the source is NBD and we couldn't negotiate meta
    * base:allocation then turn off extents.
    */
-  if (src.ops == &nbd_ops &&
-      !nbd_can_meta_context (src.u.nbd.handles.ptr[0], "base:allocation"))
+  if (! src->ops->can_extents (src))
     extents = false;
 
   /* Always set the progress bar to 0% at the start of the copy. */
@@ -429,12 +384,12 @@ main (int argc, char *argv[])
   progress_bar (1, 1);
 
   /* Shut down the source side. */
-  src.ops->close (&src);
+  src->ops->close (src);
 
   /* Shut down the destination side. */
   if (flush)
-    dst.ops->flush (&dst);
-  dst.ops->close (&dst);
+    dst->ops->flush (dst);
+  dst->ops->close (dst);
 
   exit (EXIT_SUCCESS);
 }
@@ -452,33 +407,25 @@ is_nbd_uri (const char *s)
     strncmp (s, "nbds+vsock:", 11) == 0;
 }
 
-/* Open null: (destination only). */
-static void
-open_null (struct rw *rw)
-{
-  rw->ops = &null_ops;
-  rw->size = INT64_MAX;
-}
-
 /* Open a local (non-NBD) file, ie. a file, device, or "-" for stdio.
- * Returns the open file descriptor which the caller must close.
+ * Returns the struct rw * which the caller must close.
  *
  * “writing” is true if this is the destination parameter.
  * “rw->u.local.stat” and “rw->size” return the file stat and size,
  * but size can be returned as -1 if we don't know the size (if it's a
  * pipe or stdio).
  */
-static int
-open_local (const char *prog,
-            const char *filename, bool writing, struct rw *rw)
+static struct rw *
+open_local (const char *filename, bool writing)
 {
   int flags, fd;
+  struct stat stat;
 
   if (strcmp (filename, "-") == 0) {
     synchronous = true;
     fd = writing ? STDOUT_FILENO : STDIN_FILENO;
     if (writing && isatty (fd)) {
-      fprintf (stderr, "%s: refusing to write to tty\n", prog);
+      fprintf (stderr, "%s: refusing to write to tty\n", "nbdcopy");
       exit (EXIT_FAILURE);
     }
   }
@@ -502,146 +449,17 @@ open_local (const char *prog,
     }
   }
 
-  if (fstat (fd, &rw->u.local.stat) == -1) {
+  if (fstat (fd, &stat) == -1) {
     perror (filename);
     exit (EXIT_FAILURE);
   }
-  if (S_ISBLK (rw->u.local.stat.st_mode)) {
-    /* Block device. */
-    rw->ops = &file_ops;
-    rw->size = lseek (fd, 0, SEEK_END);
-    if (rw->size == -1) {
-      perror ("lseek");
-      exit (EXIT_FAILURE);
-    }
-    if (lseek (fd, 0, SEEK_SET) == -1) {
-      perror ("lseek");
-      exit (EXIT_FAILURE);
-    }
-    rw->u.local.seek_hole_supported = seek_hole_supported (fd);
-    rw->u.local.sector_size = 4096;
-#ifdef BLKSSZGET
-    if (ioctl (fd, BLKSSZGET, &rw->u.local.sector_size))
-      fprintf (stderr, "warning: cannot get sector size: %s: %m", rw->name);
-#endif
-  }
-  else if (S_ISREG (rw->u.local.stat.st_mode)) {
-    /* Regular file. */
-    rw->ops = &file_ops;
-    rw->size = rw->u.local.stat.st_size;
-    rw->u.local.seek_hole_supported = seek_hole_supported (fd);
-  }
+  if (S_ISBLK (stat.st_mode) || S_ISREG (stat.st_mode))
+    return file_create (filename, &stat, fd);
   else {
-    /* Probably stdin/stdout, a pipe or a socket.  Set size == -1
-     * which means don't know, and force synchronous mode.
-     */
-    synchronous = true;
-    rw->ops = &pipe_ops;
-    rw->size = -1;
-    rw->u.local.seek_hole_supported = false;
+    /* Probably stdin/stdout, a pipe or a socket. */
+    synchronous = true;        /* Force synchronous mode for pipes. */
+    return pipe_create (filename, fd);
   }
-
-  return fd;
-}
-
-static bool
-seek_hole_supported (int fd)
-{
-#ifndef SEEK_HOLE
-  return false;
-#else
-  off_t r = lseek (fd, 0, SEEK_HOLE);
-  return r >= 0;
-#endif
-}
-
-static void
-open_nbd_uri (const char *prog,
-              const char *uri, bool writing, struct rw *rw)
-{
-  struct nbd_handle *nbd;
-
-  rw->ops = &nbd_ops;
-  nbd = nbd_create ();
-  if (nbd == NULL) {
-    fprintf (stderr, "%s: %s\n", prog, nbd_get_error ());
-    exit (EXIT_FAILURE);
-  }
-  nbd_set_debug (nbd, verbose);
-  nbd_set_uri_allow_local_file (nbd, true); /* Allow ?tls-psk-file. */
-  if (extents && !writing &&
-      nbd_add_meta_context (nbd, "base:allocation") == -1) {
-    fprintf (stderr, "%s: %s\n", prog, nbd_get_error ());
-    exit (EXIT_FAILURE);
-  }
-
-  if (handles_append (&rw->u.nbd.handles, nbd) == -1) {
-    perror ("realloc");
-    exit (EXIT_FAILURE);
-  }
-
-  if (nbd_connect_uri (nbd, uri) == -1) {
-    fprintf (stderr, "%s: %s: %s\n", prog, uri, nbd_get_error ());
-    exit (EXIT_FAILURE);
-  }
-
-  /* Cache these.  We assume with multi-conn that each handle will act
-   * the same way.
-   */
-  rw->u.nbd.can_trim = nbd_can_trim (nbd) > 0;
-  rw->u.nbd.can_zero = nbd_can_zero (nbd) > 0;
-}
-
-DEFINE_VECTOR_TYPE (const_string_vector, const char *);
-
-static void
-open_nbd_subprocess (const char *prog,
-                     const char **argv, size_t argc,
-                     bool writing, struct rw *rw)
-{
-  struct nbd_handle *nbd;
-  const_string_vector copy = empty_vector;
-  size_t i;
-
-  rw->ops = &nbd_ops;
-  nbd = nbd_create ();
-  if (nbd == NULL) {
-    fprintf (stderr, "%s: %s\n", prog, nbd_get_error ());
-    exit (EXIT_FAILURE);
-  }
-  nbd_set_debug (nbd, verbose);
-  if (extents && !writing &&
-      nbd_add_meta_context (nbd, "base:allocation") == -1) {
-    fprintf (stderr, "%s: %s\n", prog, nbd_get_error ());
-    exit (EXIT_FAILURE);
-  }
-
-  if (handles_append (&rw->u.nbd.handles, nbd) == -1) {
-  memory_error:
-    perror ("realloc");
-    exit (EXIT_FAILURE);
-  }
-
-  /* We have to copy the args so we can null-terminate them. */
-  for (i = 0; i < argc; ++i) {
-    if (const_string_vector_append (&copy, argv[i]) == -1)
-      goto memory_error;
-  }
-  if (const_string_vector_append (&copy, NULL) == -1)
-    goto memory_error;
-
-  if (nbd_connect_systemd_socket_activation (nbd, (char **) copy.ptr) == -1) {
-    fprintf (stderr, "%s: %s: %s\n", prog, argv[0], nbd_get_error ());
-    exit (EXIT_FAILURE);
-  }
-
-  /* Cache these.  We assume with multi-conn that each handle will act
-   * the same way.
-   */
-  rw->u.nbd.can_trim = nbd_can_trim (nbd) > 0;
-  rw->u.nbd.can_zero = nbd_can_zero (nbd) > 0;
-
-  free (copy.ptr);
 }
 
 /* Print an rw struct, used in --verbose mode. */
@@ -650,7 +468,6 @@ print_rw (struct rw *rw, const char *prefix, FILE *fp)
 {
   fprintf (fp, "%s: %s \"%s\"\n", prefix, rw->ops->ops_name, rw->name);
   fprintf (fp, "%s: size=%" PRIi64 "\n", prefix, rw->size);
-  /* Could print other stuff here, but that's enough for debugging. */
 }
 
 /* Default implementation of rw->ops->get_extents for backends which
diff --git a/copy/multi-thread-copying.c b/copy/multi-thread-copying.c
index 98b4056..4f57054 100644
--- a/copy/multi-thread-copying.c
+++ b/copy/multi-thread-copying.c
@@ -50,13 +50,13 @@ get_next_offset (uint64_t *offset, uint64_t *count)
   bool r = false;               /* returning false means no more work */
 
   pthread_mutex_lock (&lock);
-  if (next_offset < src.size) {
+  if (next_offset < src->size) {
     *offset = next_offset;
 
     /* Work out how large this range is.  The last range may be
      * smaller than THREAD_WORK_SIZE.
      */
-    *count = src.size - *offset;
+    *count = src->size - *offset;
     if (*count > THREAD_WORK_SIZE)
       *count = THREAD_WORK_SIZE;
 
@@ -69,7 +69,7 @@ get_next_offset (uint64_t *offset, uint64_t *count)
      * are called from threads and not necessarily in monotonic order
      * so the progress bar would move erratically.
      */
-    progress_bar (*offset, dst.size);
+    progress_bar (*offset, dst->size);
   }
   pthread_mutex_unlock (&lock);
   return r;
@@ -89,11 +89,13 @@ multi_thread_copying (void)
    */
   assert (threads > 0);
   assert (threads == connections);
+/*
   if (src.ops == &nbd_ops)
     assert (src.u.nbd.handles.size == connections);
   if (dst.ops == &nbd_ops)
     assert (dst.u.nbd.handles.size == connections);
-  assert (src.size != -1);
+*/
+  assert (src->size != -1);
 
   workers = malloc (sizeof (pthread_t) * threads);
   if (workers == NULL) {
@@ -147,9 +149,9 @@ worker_thread (void *indexp)
 
     assert (0 < count && count <= THREAD_WORK_SIZE);
     if (extents)
-      src.ops->get_extents (&src, index, offset, count, &exts);
+      src->ops->get_extents (src, index, offset, count, &exts);
     else
-      default_get_extents (&src, index, offset, count, &exts);
+      default_get_extents (src, index, offset, count, &exts);
 
     for (i = 0; i < exts.size; ++i) {
       struct command *command;
@@ -208,11 +210,11 @@ worker_thread (void *indexp)
           wait_for_request_slots (index);
 
           /* Begin the asynch read operation. */
-          src.ops->asynch_read (&src, command,
-                                (nbd_completion_callback) {
-                                  .callback = finished_read,
-                                  .user_data = command,
-                                });
+          src->ops->asynch_read (src, command,
+                                 (nbd_completion_callback) {
+                                   .callback = finished_read,
+                                   .user_data = command,
+                                 });
 
           exts.ptr[i].offset += len;
           exts.ptr[i].length -= len;
@@ -254,7 +256,7 @@ wait_for_request_slots (uintptr_t index)
 static unsigned
 in_flight (uintptr_t index)
 {
-  return src.ops->in_flight (&src, index) + dst.ops->in_flight (&dst, index);
+  return src->ops->in_flight (src, index) + dst->ops->in_flight (dst, index);
 }
 
 /* Poll (optional) NBD src and NBD dst, moving the state machine(s)
@@ -271,7 +273,7 @@ poll_both_ends (uintptr_t index)
   /* Note: if polling is not supported, this function will
    * set fd == -1 which poll ignores.
    */
-  src.ops->get_polling_fd (&src, index, &fds[0].fd, &direction);
+  src->ops->get_polling_fd (src, index, &fds[0].fd, &direction);
   if (fds[0].fd >= 0) {
     switch (direction) {
     case LIBNBD_AIO_DIRECTION_READ:
@@ -286,7 +288,7 @@ poll_both_ends (uintptr_t index)
     }
   }
 
-  dst.ops->get_polling_fd (&dst, index, &fds[1].fd, &direction);
+  dst->ops->get_polling_fd (dst, index, &fds[1].fd, &direction);
   if (fds[1].fd >= 0) {
     switch (direction) {
     case LIBNBD_AIO_DIRECTION_READ:
@@ -311,24 +313,24 @@ poll_both_ends (uintptr_t index)
 
   if (fds[0].fd >= 0) {
     if ((fds[0].revents & (POLLIN | POLLHUP)) != 0)
-      src.ops->asynch_notify_read (&src, index);
+      src->ops->asynch_notify_read (src, index);
     else if ((fds[0].revents & POLLOUT) != 0)
-      src.ops->asynch_notify_write (&src, index);
+      src->ops->asynch_notify_write (src, index);
     else if ((fds[0].revents & (POLLERR | POLLNVAL)) != 0) {
       errno = ENOTCONN;
-      perror (src.name);
+      perror (src->name);
       exit (EXIT_FAILURE);
     }
   }
 
   if (fds[1].fd >= 0) {
     if ((fds[1].revents & (POLLIN | POLLHUP)) != 0)
-      dst.ops->asynch_notify_read (&dst, index);
+      dst->ops->asynch_notify_read (dst, index);
     else if ((fds[1].revents & POLLOUT) != 0)
-      dst.ops->asynch_notify_write (&dst, index);
+      dst->ops->asynch_notify_write (dst, index);
     else if ((fds[1].revents & (POLLERR | POLLNVAL)) != 0) {
       errno = ENOTCONN;
-      perror (dst.name);
+      perror (dst->name);
       exit (EXIT_FAILURE);
     }
   }
@@ -377,11 +379,11 @@ finished_read (void *vp, int *error)
     /* If sparseness detection (see below) is turned off then we write
      * the whole command.
      */
-    dst.ops->asynch_write (&dst, command,
-                           (nbd_completion_callback) {
-                             .callback = free_command,
-                             .user_data = command,
-                           });
+    dst->ops->asynch_write (dst, command,
+                            (nbd_completion_callback) {
+                              .callback = free_command,
+                              .user_data = command,
+                            });
   }
   else {                               /* Sparseness detection. */
     const uint64_t start = command->offset;
@@ -408,11 +410,11 @@ finished_read (void *vp, int *error)
             newcommand = copy_subcommand (command,
                                           last_offset, i - last_offset,
                                           false);
-            dst.ops->asynch_write (&dst, newcommand,
-                                   (nbd_completion_callback) {
-                                     .callback = free_command,
-                                     .user_data = newcommand,
-                                   });
+            dst->ops->asynch_write (dst, newcommand,
+                                    (nbd_completion_callback) {
+                                      .callback = free_command,
+                                      .user_data = newcommand,
+                                    });
           }
           /* Start the new hole. */
           last_offset = i;
@@ -445,11 +447,11 @@ finished_read (void *vp, int *error)
         newcommand = copy_subcommand (command,
                                       last_offset, i - last_offset,
                                       false);
-        dst.ops->asynch_write (&dst, newcommand,
-                               (nbd_completion_callback) {
-                                 .callback = free_command,
-                                 .user_data = newcommand,
-                               });
+        dst->ops->asynch_write (dst, newcommand,
+                                (nbd_completion_callback) {
+                                  .callback = free_command,
+                                  .user_data = newcommand,
+                                });
       }
       else {
         newcommand = copy_subcommand (command,
@@ -462,11 +464,11 @@ finished_read (void *vp, int *error)
     /* There may be an unaligned tail, so write that. */
     if (end - i > 0) {
       newcommand = copy_subcommand (command, i, end - i, false);
-      dst.ops->asynch_write (&dst, newcommand,
-                             (nbd_completion_callback) {
-                               .callback = free_command,
-                               .user_data = newcommand,
-                             });
+      dst->ops->asynch_write (dst, newcommand,
+                              (nbd_completion_callback) {
+                                .callback = free_command,
+                                .user_data = newcommand,
+                              });
     }
 
     /* Free the original command since it has been split into
@@ -503,20 +505,20 @@ fill_dst_range_with_zeroes (struct command *command)
 
   if (!allocated) {
     /* Try trimming. */
-    if (dst.ops->asynch_trim (&dst, command,
-                              (nbd_completion_callback) {
-                                .callback = free_command,
-                                .user_data = command,
-                              }))
+    if (dst->ops->asynch_trim (dst, command,
+                               (nbd_completion_callback) {
+                                 .callback = free_command,
+                                 .user_data = command,
+                               }))
       return;
   }
 
   /* Try efficient zeroing. */
-  if (dst.ops->asynch_zero (&dst, command,
-                            (nbd_completion_callback) {
-                              .callback = free_command,
-                              .user_data = command,
-                            }))
+  if (dst->ops->asynch_zero (dst, command,
+                             (nbd_completion_callback) {
+                               .callback = free_command,
+                               .user_data = command,
+                             }))
     return;
 
   /* Fall back to loop writing zeroes.  This is going to be slow
@@ -533,7 +535,7 @@ fill_dst_range_with_zeroes (struct command *command)
     if (len > MAX_REQUEST_SIZE)
       len = MAX_REQUEST_SIZE;
 
-    dst.ops->synch_write (&dst, data, len, command->offset);
+    dst->ops->synch_write (dst, data, len, command->offset);
     command->slice.len -= len;
     command->offset += len;
   }
diff --git a/copy/nbd-ops.c b/copy/nbd-ops.c
index 7b48cbc..24970c2 100644
--- a/copy/nbd-ops.c
+++ b/copy/nbd-ops.c
@@ -27,45 +27,221 @@
 
 #include "nbdcopy.h"
 
+static struct rw_ops nbd_ops;
+
+DEFINE_VECTOR_TYPE (handles, struct nbd_handle *)
+DEFINE_VECTOR_TYPE (const_string_vector, const char *);
+
+struct rw_nbd {
+  struct rw rw;
+
+  /* Because of multi-conn we have to remember enough state in this
+   * handle in order to be able to open another connection with the
+   * same parameters after nbd_rw_create* has been called once.
+   */
+  enum { CREATE_URI, CREATE_SUBPROCESS } create_t;
+  const char *uri;              /* For CREATE_URI */
+  const_string_vector argv;     /* For CREATE_SUBPROCESS */
+  bool writing;
+
+  handles handles;              /* One handle per connection. */
+  bool can_trim, can_zero;      /* Cached nbd_can_trim, nbd_can_zero. */
+};
+
+static void
+open_one_nbd_handle (struct rw_nbd *rwn)
+{
+  struct nbd_handle *nbd;
+
+  nbd = nbd_create ();
+  if (nbd == NULL) {
+    fprintf (stderr, "%s: %s\n", "nbdcopy", nbd_get_error ());
+    exit (EXIT_FAILURE);
+  }
+
+  nbd_set_debug (nbd, verbose);
+
+  if (extents && !rwn->writing &&
+      nbd_add_meta_context (nbd, "base:allocation") == -1) {
+    fprintf (stderr, "%s: %s\n", "nbdcopy", nbd_get_error ());
+    exit (EXIT_FAILURE);
+  }
+
+  switch (rwn->create_t) {
+  case CREATE_URI:
+    nbd_set_uri_allow_local_file (nbd, true); /* Allow ?tls-psk-file. */
+
+    if (nbd_connect_uri (nbd, rwn->uri) == -1) {
+      fprintf (stderr, "%s: %s: %s\n", "nbdcopy", rwn->uri, nbd_get_error ());
+      exit (EXIT_FAILURE);
+    }
+    break;
+
+  case CREATE_SUBPROCESS:
+    if (nbd_connect_systemd_socket_activation (nbd,
+                                               (char **) rwn->argv.ptr)
+        == -1) {
+      fprintf (stderr, "%s: %s: %s\n", "nbdcopy", rwn->argv.ptr[0],
+               nbd_get_error ());
+      exit (EXIT_FAILURE);
+    }
+  }
+
+  /* Cache these.  We assume with multi-conn that each handle will act
+   * the same way.
+   */
+  if (rwn->handles.size == 0) {
+    rwn->can_trim = nbd_can_trim (nbd) > 0;
+    rwn->can_zero = nbd_can_zero (nbd) > 0;
+    rwn->rw.size = nbd_get_size (nbd);
+    if (rwn->rw.size == -1) {
+      fprintf (stderr, "%s: %s: %s\n", "nbdcopy", rwn->rw.name,
+               nbd_get_error ());
+      exit (EXIT_FAILURE);
+    }
+  }
+
+  if (handles_append (&rwn->handles, nbd) == -1) {
+    perror ("realloc");
+    exit (EXIT_FAILURE);
+  }
+}
+
+struct rw *
+nbd_rw_create_uri (const char *name, const char *uri, bool writing)
+{
+  struct rw_nbd *rwn = calloc (1, sizeof *rwn);
+  if (rwn == NULL) { perror ("calloc"); exit (EXIT_FAILURE); }
+
+  rwn->rw.ops = &nbd_ops;
+  rwn->rw.name = name;
+  rwn->create_t = CREATE_URI;
+  rwn->uri = uri;
+  rwn->writing = writing;
+
+  open_one_nbd_handle (rwn);
+
+  return (struct rw *) rwn;
+}
+
+struct rw *
+nbd_rw_create_subprocess (const char **argv, size_t argc, bool writing)
+{
+  size_t i;
+  struct rw_nbd *rwn = calloc (1, sizeof *rwn);
+  if (rwn == NULL) { perror ("calloc"); exit (EXIT_FAILURE); }
+
+  rwn->rw.ops = &nbd_ops;
+  rwn->rw.name = argv[0];
+  rwn->create_t = CREATE_SUBPROCESS;
+  rwn->writing = writing;
+
+  /* We have to copy the args so we can null-terminate them. */
+  for (i = 0; i < argc; ++i) {
+    if (const_string_vector_append (&rwn->argv, argv[i]) == -1) {
+    memory_error:
+      perror ("realloc");
+      exit (EXIT_FAILURE);
+    }
+  }
+  if (const_string_vector_append (&rwn->argv, NULL) == -1)
+    goto memory_error;
+
+  open_one_nbd_handle (rwn);
+
+  return (struct rw *) rwn;
+}
+
 static void
 nbd_ops_close (struct rw *rw)
 {
+  struct rw_nbd *rwn = (struct rw_nbd *) rw;
   size_t i;
 
-  for (i = 0; i < rw->u.nbd.handles.size; ++i) {
-    if (nbd_shutdown (rw->u.nbd.handles.ptr[i], 0) == -1) {
+  for (i = 0; i < rwn->handles.size; ++i) {
+    if (nbd_shutdown (rwn->handles.ptr[i], 0) == -1) {
       fprintf (stderr, "%s: %s\n", rw->name, nbd_get_error ());
       exit (EXIT_FAILURE);
     }
-    nbd_close (rw->u.nbd.handles.ptr[i]);
+    nbd_close (rwn->handles.ptr[i]);
   }
 
-  handles_reset (&rw->u.nbd.handles);
+  handles_reset (&rwn->handles);
+  const_string_vector_reset (&rwn->argv);
+  free (rw);
 }
 
 static void
 nbd_ops_flush (struct rw *rw)
 {
+  struct rw_nbd *rwn = (struct rw_nbd *) rw;
   size_t i;
 
-  for (i = 0; i < rw->u.nbd.handles.size; ++i) {
-    if (nbd_flush (rw->u.nbd.handles.ptr[i], 0) == -1) {
+  for (i = 0; i < rwn->handles.size; ++i) {
+    if (nbd_flush (rwn->handles.ptr[i], 0) == -1) {
       fprintf (stderr, "%s: %s\n", rw->name, nbd_get_error ());
       exit (EXIT_FAILURE);
     }
   }
 }
 
+static bool
+nbd_ops_is_read_only (struct rw *rw)
+{
+  struct rw_nbd *rwn = (struct rw_nbd *) rw;
+
+  if (rwn->handles.size > 0)
+    return nbd_is_read_only (rwn->handles.ptr[0]);
+  else
+    return false;
+}
+
+static bool
+nbd_ops_can_extents (struct rw *rw)
+{
+  struct rw_nbd *rwn = (struct rw_nbd *) rw;
+
+  if (rwn->handles.size > 0)
+    return nbd_can_meta_context (rwn->handles.ptr[0], "base:allocation");
+  else
+    return false;
+}
+
+static bool
+nbd_ops_can_multi_conn (struct rw *rw)
+{
+  struct rw_nbd *rwn = (struct rw_nbd *) rw;
+
+  if (rwn->handles.size > 0)
+    return nbd_can_multi_conn (rwn->handles.ptr[0]);
+  else
+    return false;
+}
+
+static void
+nbd_ops_start_multi_conn (struct rw *rw)
+{
+  struct rw_nbd *rwn = (struct rw_nbd *) rw;
+  size_t i;
+
+  for (i = 1; i < connections; ++i)
+    open_one_nbd_handle (rwn);
+
+  assert (rwn->handles.size == connections);
+}
+
 static size_t
 nbd_ops_synch_read (struct rw *rw,
                 void *data, size_t len, uint64_t offset)
 {
+  struct rw_nbd *rwn = (struct rw_nbd *) rw;
+
   if (len > rw->size - offset)
     len = rw->size - offset;
   if (len == 0)
     return 0;
 
-  if (nbd_pread (rw->u.nbd.handles.ptr[0], data, len, offset, 0) == -1) {
+  if (nbd_pread (rwn->handles.ptr[0], data, len, offset, 0) == -1) {
     fprintf (stderr, "%s: %s\n", rw->name, nbd_get_error ());
     exit (EXIT_FAILURE);
   }
@@ -77,7 +253,9 @@ static void
 nbd_ops_synch_write (struct rw *rw,
                  const void *data, size_t len, uint64_t offset)
 {
-  if (nbd_pwrite (rw->u.nbd.handles.ptr[0], data, len, offset, 0) == -1) {
+  struct rw_nbd *rwn = (struct rw_nbd *) rw;
+
+  if (nbd_pwrite (rwn->handles.ptr[0], data, len, offset, 0) == -1) {
     fprintf (stderr, "%s: %s\n", rw->name, nbd_get_error ());
     exit (EXIT_FAILURE);
   }
@@ -86,10 +264,12 @@ nbd_ops_synch_write (struct rw *rw,
 static bool
 nbd_ops_synch_trim (struct rw *rw, uint64_t offset, uint64_t count)
 {
-  if (!rw->u.nbd.can_trim)
+  struct rw_nbd *rwn = (struct rw_nbd *) rw;
+
+  if (!rwn->can_trim)
     return false;
 
-  if (nbd_trim (rw->u.nbd.handles.ptr[0], count, offset, 0) == -1) {
+  if (nbd_trim (rwn->handles.ptr[0], count, offset, 0) == -1) {
     fprintf (stderr, "%s: %s\n", rw->name, nbd_get_error ());
     exit (EXIT_FAILURE);
   }
@@ -99,10 +279,12 @@ nbd_ops_synch_trim (struct rw *rw, uint64_t offset, uint64_t count)
 static bool
 nbd_ops_synch_zero (struct rw *rw, uint64_t offset, uint64_t count)
 {
-  if (!rw->u.nbd.can_zero)
+  struct rw_nbd *rwn = (struct rw_nbd *) rw;
+
+  if (!rwn->can_zero)
     return false;
 
-  if (nbd_zero (rw->u.nbd.handles.ptr[0],
+  if (nbd_zero (rwn->handles.ptr[0],
                 count, offset, LIBNBD_CMD_FLAG_NO_HOLE) == -1) {
     fprintf (stderr, "%s: %s\n", rw->name, nbd_get_error ());
     exit (EXIT_FAILURE);
@@ -115,7 +297,9 @@ nbd_ops_asynch_read (struct rw *rw,
                      struct command *command,
                      nbd_completion_callback cb)
 {
-  if (nbd_aio_pread (rw->u.nbd.handles.ptr[command->index],
+  struct rw_nbd *rwn = (struct rw_nbd *) rw;
+
+  if (nbd_aio_pread (rwn->handles.ptr[command->index],
                      slice_ptr (command->slice),
                      command->slice.len, command->offset,
                      cb, 0) == -1) {
@@ -129,7 +313,9 @@ nbd_ops_asynch_write (struct rw *rw,
                       struct command *command,
                       nbd_completion_callback cb)
 {
-  if (nbd_aio_pwrite (rw->u.nbd.handles.ptr[command->index],
+  struct rw_nbd *rwn = (struct rw_nbd *) rw;
+
+  if (nbd_aio_pwrite (rwn->handles.ptr[command->index],
                       slice_ptr (command->slice),
                       command->slice.len, command->offset,
                       cb, 0) == -1) {
@@ -142,12 +328,14 @@ static bool
 nbd_ops_asynch_trim (struct rw *rw, struct command *command,
                      nbd_completion_callback cb)
 {
-  if (!rw->u.nbd.can_trim)
+  struct rw_nbd *rwn = (struct rw_nbd *) rw;
+
+  if (!rwn->can_trim)
     return false;
 
   assert (command->slice.len <= UINT32_MAX);
 
-  if (nbd_aio_trim (rw->u.nbd.handles.ptr[command->index],
+  if (nbd_aio_trim (rwn->handles.ptr[command->index],
                     command->slice.len, command->offset,
                     cb, 0) == -1) {
     fprintf (stderr, "%s: %s\n", rw->name, nbd_get_error ());
@@ -160,12 +348,14 @@ static bool
 nbd_ops_asynch_zero (struct rw *rw, struct command *command,
                      nbd_completion_callback cb)
 {
-  if (!rw->u.nbd.can_zero)
+  struct rw_nbd *rwn = (struct rw_nbd *) rw;
+
+  if (!rwn->can_zero)
     return false;
 
   assert (command->slice.len <= UINT32_MAX);
 
-  if (nbd_aio_zero (rw->u.nbd.handles.ptr[command->index],
+  if (nbd_aio_zero (rwn->handles.ptr[command->index],
                     command->slice.len, command->offset,
                     cb, LIBNBD_CMD_FLAG_NO_HOLE) == -1) {
     fprintf (stderr, "%s: %s\n", rw->name, nbd_get_error ());
@@ -212,19 +402,22 @@ add_extent (void *vp, const char *metacontext,
 static unsigned
 nbd_ops_in_flight (struct rw *rw, uintptr_t index)
 {
+  struct rw_nbd *rwn = (struct rw_nbd *) rw;
+
   /* Since the commands are auto-retired in the callbacks we don't
    * need to count "done" commands.
    */
-  return nbd_aio_in_flight (rw->u.nbd.handles.ptr[index]);
+  return nbd_aio_in_flight (rwn->handles.ptr[index]);
 }
 
 static void
 nbd_ops_get_polling_fd (struct rw *rw, uintptr_t index,
                         int *fd, int *direction)
 {
+  struct rw_nbd *rwn = (struct rw_nbd *) rw;
   struct nbd_handle *nbd;
 
-  nbd = rw->u.nbd.handles.ptr[index];
+  nbd = rwn->handles.ptr[index];
 
   *fd = nbd_aio_get_fd (nbd);
   if (*fd == -1) {
@@ -240,7 +433,8 @@ nbd_ops_get_polling_fd (struct rw *rw, uintptr_t index,
 static void
 nbd_ops_asynch_notify_read (struct rw *rw, uintptr_t index)
 {
-  if (nbd_aio_notify_read (rw->u.nbd.handles.ptr[index]) == -1) {
+  struct rw_nbd *rwn = (struct rw_nbd *) rw;
+  if (nbd_aio_notify_read (rwn->handles.ptr[index]) == -1) {
     fprintf (stderr, "%s: %s\n", rw->name, nbd_get_error ());
     exit (EXIT_FAILURE);
   }
@@ -249,7 +443,8 @@ nbd_ops_asynch_notify_read (struct rw *rw, uintptr_t index)
 static void
 nbd_ops_asynch_notify_write (struct rw *rw, uintptr_t index)
 {
-  if (nbd_aio_notify_write (rw->u.nbd.handles.ptr[index]) == -1) {
+  struct rw_nbd *rwn = (struct rw_nbd *) rw;
+  if (nbd_aio_notify_write (rwn->handles.ptr[index]) == -1) {
     fprintf (stderr, "%s: %s\n", rw->name, nbd_get_error ());
     exit (EXIT_FAILURE);
   }
@@ -266,10 +461,11 @@ nbd_ops_get_extents (struct rw *rw, uintptr_t index,
                      uint64_t offset, uint64_t count,
                      extent_list *ret)
 {
+  struct rw_nbd *rwn = (struct rw_nbd *) rw;
   extent_list exts = empty_vector;
   struct nbd_handle *nbd;
 
-  nbd = rw->u.nbd.handles.ptr[index];
+  nbd = rwn->handles.ptr[index];
 
   ret->size = 0;
 
@@ -331,9 +527,13 @@ nbd_ops_get_extents (struct rw *rw, uintptr_t index,
   free (exts.ptr);
 }
 
-struct rw_ops nbd_ops = {
+static struct rw_ops nbd_ops = {
   .ops_name = "nbd_ops",
   .close = nbd_ops_close,
+  .is_read_only = nbd_ops_is_read_only,
+  .can_extents = nbd_ops_can_extents,
+  .can_multi_conn = nbd_ops_can_multi_conn,
+  .start_multi_conn = nbd_ops_start_multi_conn,
   .flush = nbd_ops_flush,
   .synch_read = nbd_ops_synch_read,
   .synch_write = nbd_ops_synch_write,
diff --git a/copy/nbdcopy.h b/copy/nbdcopy.h
index 69fac2a..94fbdeb 100644
--- a/copy/nbdcopy.h
+++ b/copy/nbdcopy.h
@@ -36,8 +36,6 @@
  */
 #define THREAD_WORK_SIZE (128 * 1024 * 1024)
 
-DEFINE_VECTOR_TYPE (handles, struct nbd_handle *)
-
 /* Abstracts the input (src) and output (dst) parameters on the
  * command line.
  */
@@ -45,21 +43,20 @@ struct rw {
   struct rw_ops *ops;           /* Operations. */
   const char *name;             /* Printable name, for error messages etc. */
   int64_t size;                 /* May be -1 for streams. */
-  union {
-    struct {                    /* For files and pipes. */
-      int fd;
-      struct stat stat;
-      bool seek_hole_supported;
-      int sector_size;
-    } local;
-    struct {
-      handles handles;          /* For NBD, one handle per connection. */
-      bool can_trim, can_zero;  /* Cached nbd_can_trim, nbd_can_zero. */
-    } nbd;
-  } u;
+  /* Followed by private data for the particular subtype. */
 };
 
-extern struct rw src, dst;
+extern struct rw *src, *dst;
+
+/* Create subtypes. */
+extern struct rw *file_create (const char *name,
+                               const struct stat *stat, int fd);
+extern struct rw *nbd_rw_create_uri (const char *name,
+                                     const char *uri, bool writing);
+extern struct rw *nbd_rw_create_subprocess (const char **argv, size_t argc,
+                                            bool writing);
+extern struct rw *null_create (const char *name);
+extern struct rw *pipe_create (const char *name, int fd);
 
 /* Underlying data buffers. */
 struct buffer {
@@ -117,6 +114,28 @@ struct rw_ops {
   /* Close the connection and free up associated resources. */
   void (*close) (struct rw *rw);
 
+  /* Return true if this is a read-only connection. */
+  bool (*is_read_only) (struct rw *rw);
+
+  /* For source only, does it support reading extents? */
+  bool (*can_extents) (struct rw *rw);
+
+  /* Return true if the connection can do multi-conn.  This is true
+   * for files, false for streams, and passed through for NBD.
+   */
+  bool (*can_multi_conn) (struct rw *rw);
+
+  /* For multi-conn capable backends, before copying we must call this
+   * to begin multi-conn.  For NBD this means opening the additional
+   * connections.
+   */
+  void (*start_multi_conn) (struct rw *rw);
+
+  /* Truncate, only called on output files.  This callback can be NULL
+   * for types that don't support this.
+   */
+  void (*truncate) (struct rw *rw, int64_t size);
+
   /* Flush pending writes to permanent storage. */
   void (*flush) (struct rw *rw);
 
@@ -188,10 +207,6 @@ struct rw_ops {
                        uint64_t offset, uint64_t count,
                        extent_list *ret);
 };
-extern struct rw_ops file_ops;
-extern struct rw_ops nbd_ops;
-extern struct rw_ops pipe_ops;
-extern struct rw_ops null_ops;
 
 extern void default_get_extents (struct rw *rw, uintptr_t index,
                                  uint64_t offset, uint64_t count,
diff --git a/copy/null-ops.c b/copy/null-ops.c
index b2ca66f..3262fb5 100644
--- a/copy/null-ops.c
+++ b/copy/null-ops.c
@@ -30,10 +30,28 @@
  * and fast zeroing.
  */
 
+static struct rw_ops null_ops;
+
+struct rw_null {
+  struct rw rw;
+};
+
+struct rw *
+null_create (const char *name)
+{
+  struct rw_null *rw = calloc (1, sizeof *rw);
+  if (rw == NULL) { perror ("calloc"); exit (EXIT_FAILURE); }
+
+  rw->rw.ops = &null_ops;
+  rw->rw.name = name;
+  rw->rw.size = INT64_MAX;
+  return (struct rw *) rw;
+}
+
 static void
 null_close (struct rw *rw)
 {
-  /* nothing */
+  free (rw);
 }
 
 static void
@@ -42,6 +60,30 @@ null_flush (struct rw *rw)
   /* nothing */
 }
 
+static bool
+null_is_read_only (struct rw *rw)
+{
+  return false;
+}
+
+static bool
+null_can_extents (struct rw *rw)
+{
+  return false;
+}
+
+static bool
+null_can_multi_conn (struct rw *rw)
+{
+  return true;
+}
+
+static void
+null_start_multi_conn (struct rw *rw)
+{
+  /* nothing */
+}
+
 static size_t
 null_synch_read (struct rw *rw,
                  void *data, size_t len, uint64_t offset)
@@ -126,9 +168,13 @@ null_get_extents (struct rw *rw, uintptr_t index,
   abort ();
 }
 
-struct rw_ops null_ops = {
+static struct rw_ops null_ops = {
   .ops_name = "null_ops",
   .close = null_close,
+  .is_read_only = null_is_read_only,
+  .can_extents = null_can_extents,
+  .can_multi_conn = null_can_multi_conn,
+  .start_multi_conn = null_start_multi_conn,
   .flush = null_flush,
   .synch_read = null_synch_read,
   .synch_write = null_synch_write,
diff --git a/copy/pipe-ops.c b/copy/pipe-ops.c
index d127dad..286e6c0 100644
--- a/copy/pipe-ops.c
+++ b/copy/pipe-ops.c
@@ -26,10 +26,33 @@
 
 #include "nbdcopy.h"
 
+static struct rw_ops pipe_ops;
+
+struct rw_pipe {
+  struct rw rw;
+  int fd;
+};
+
+struct rw *
+pipe_create (const char *name, int fd)
+{
+  struct rw_pipe *rwp = calloc (1, sizeof *rwp);
+  if (rwp == NULL) { perror ("calloc"); exit (EXIT_FAILURE); }
+
+  /* Set size == -1 which means don't know. */
+  rwp->rw.ops = &pipe_ops;
+  rwp->rw.name = name;
+  rwp->rw.size = -1;
+  rwp->fd = fd;
+  return (struct rw *) rwp;
+}
+
 static void
 pipe_close (struct rw *rw)
 {
-  if (close (rw->u.local.fd) == -1) {
+  struct rw_pipe *rwp = (struct rw_pipe *) rw;
+
+  if (close (rwp->fd) == -1) {
     fprintf (stderr, "%s: close: %m\n", rw->name);
     exit (EXIT_FAILURE);
   }
@@ -43,13 +66,39 @@ pipe_flush (struct rw *rw)
    */
 }
 
+static bool
+pipe_is_read_only (struct rw *rw)
+{
+  return false;
+}
+
+static bool
+pipe_can_extents (struct rw *rw)
+{
+  return false;
+}
+
+static bool
+pipe_can_multi_conn (struct rw *rw)
+{
+  return false;
+}
+
+static void
+pipe_start_multi_conn (struct rw *rw)
+{
+  /* Should never be called. */
+  abort ();
+}
+
 static size_t
 pipe_synch_read (struct rw *rw,
                  void *data, size_t len, uint64_t offset)
 {
+  struct rw_pipe *rwp = (struct rw_pipe *) rw;
   ssize_t r;
 
-  r = read (rw->u.local.fd, data, len);
+  r = read (rwp->fd, data, len);
   if (r == -1) {
     perror (rw->name);
     exit (EXIT_FAILURE);
@@ -61,10 +110,11 @@ static void
 pipe_synch_write (struct rw *rw,
                   const void *data, size_t len, uint64_t offset)
 {
+  struct rw_pipe *rwp = (struct rw_pipe *) rw;
   ssize_t r;
 
   while (len > 0) {
-    r = write (rw->u.local.fd, data, len);
+    r = write (rwp->fd, data, len);
     if (r == -1) {
       perror (rw->name);
       exit (EXIT_FAILURE);
@@ -109,10 +159,16 @@ pipe_in_flight (struct rw *rw, uintptr_t index)
   return 0;
 }
 
-struct rw_ops pipe_ops = {
+static struct rw_ops pipe_ops = {
   .ops_name = "pipe_ops",
 
   .close = pipe_close,
+
+  .is_read_only = pipe_is_read_only,
+  .can_extents = pipe_can_extents,
+  .can_multi_conn = pipe_can_multi_conn,
+  .start_multi_conn = pipe_start_multi_conn,
+
   .flush = pipe_flush,
 
   .synch_read = pipe_synch_read,
diff --git a/copy/synch-copying.c b/copy/synch-copying.c
index 2712c10..985f005 100644
--- a/copy/synch-copying.c
+++ b/copy/synch-copying.c
@@ -38,13 +38,13 @@ synch_copying (void)
   /* If the source size is unknown then we copy data and cannot use
    * extent information.
    */
-  if (src.size == -1) {
+  if (src->size == -1) {
     size_t r;
 
-    while ((r = src.ops->synch_read (&src, buf, sizeof buf, offset)) > 0) {
-      dst.ops->synch_write (&dst, buf, r, offset);
+    while ((r = src->ops->synch_read (src, buf, sizeof buf, offset)) > 0) {
+      dst->ops->synch_write (dst, buf, r, offset);
       offset += r;
-      progress_bar (offset, src.size);
+      progress_bar (offset, src->size);
     }
   }
 
@@ -52,47 +52,47 @@ synch_copying (void)
    * blocks and use extent information to optimize the case.
    */
   else {
-    while (offset < src.size) {
+    while (offset < src->size) {
       extent_list exts = empty_vector;
-      uint64_t count = src.size - offset;
+      uint64_t count = src->size - offset;
       size_t i, r;
 
       if (count > sizeof buf)
         count = sizeof buf;
 
       if (extents)
-        src.ops->get_extents (&src, 0, offset, count, &exts);
+        src->ops->get_extents (src, 0, offset, count, &exts);
       else
-        default_get_extents (&src, 0, offset, count, &exts);
+        default_get_extents (src, 0, offset, count, &exts);
 
       for (i = 0; i < exts.size; ++i) {
         assert (exts.ptr[i].length <= count);
 
         if (exts.ptr[i].zero) {
-          if (!dst.ops->synch_trim (&dst, offset, exts.ptr[i].length) &&
-              !dst.ops->synch_zero (&dst, offset, exts.ptr[i].length)) {
+          if (!dst->ops->synch_trim (dst, offset, exts.ptr[i].length) &&
+              !dst->ops->synch_zero (dst, offset, exts.ptr[i].length)) {
             /* If neither trimming nor efficient zeroing are possible,
              * write zeroes the hard way.
              */
             memset (buf, 0, exts.ptr[i].length);
-            dst.ops->synch_write (&dst, buf, exts.ptr[i].length, offset);
+            dst->ops->synch_write (dst, buf, exts.ptr[i].length, offset);
           }
           offset += exts.ptr[i].length;
         }
         else /* data */ {
-          r = src.ops->synch_read (&src, buf, exts.ptr[i].length, offset);
+          r = src->ops->synch_read (src, buf, exts.ptr[i].length, offset);
 
           /* These cases should never happen unless the file is
            * truncated underneath us.
            */
           if (r == 0 || r < exts.ptr[i].length) {
-            fprintf (stderr, "%s: unexpected end of file\n", src.name);
+            fprintf (stderr, "%s: unexpected end of file\n", src->name);
             exit (EXIT_FAILURE);
           }
 
-          dst.ops->synch_write (&dst, buf, r, offset);
+          dst->ops->synch_write (dst, buf, r, offset);
           offset += r;
-          progress_bar (offset, src.size);
+          progress_bar (offset, src->size);
         }
       }
 
-- 
2.29.0.rc2


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]