[libvirt RFCv11 14/33] virfile: add new API virFileDiskCopyChannel

Claudio Fontana cfontana at suse.de
Tue Jun 7 09:19:17 UTC 2022


allow interleaved parallel write to a single file,
using a record size equal to the io buffer size (1MB).

Signed-off-by: Claudio Fontana <cfontana at suse.de>
---
 src/util/iohelper.c |   3 +
 src/util/virfile.c  | 151 +++++++++++++++++++++++++++++---------------
 src/util/virfile.h  |   2 +
 3 files changed, 106 insertions(+), 50 deletions(-)

diff --git a/src/util/iohelper.c b/src/util/iohelper.c
index 055540c8c4..dcbdda366f 100644
--- a/src/util/iohelper.c
+++ b/src/util/iohelper.c
@@ -85,6 +85,9 @@ main(int argc, char **argv)
     if (fd < 0 || virFileDiskCopy(fd, path, -1, "stdio") < 0)
         goto error;
 
+    if (VIR_CLOSE(fd) < 0)
+        goto error;
+
     return 0;
 
  error:
diff --git a/src/util/virfile.c b/src/util/virfile.c
index 201d7f4e64..f9ae7d94c4 100644
--- a/src/util/virfile.c
+++ b/src/util/virfile.c
@@ -4761,6 +4761,9 @@ struct runIOParams {
     const char *fdinname;
     int fdout;
     const char *fdoutname;
+    int idx;
+    int nchannels;
+    off_t total;
 };
 
 /**
@@ -4779,12 +4782,18 @@ runIOCopy(const struct runIOParams p)
     off_t total = 0;
     size_t buflen = 1024*1024;
     char *buf = virFileDirectBufferNew(&base, buflen);
+    int diskfd = p.isWrite ? p.fdout : p.fdin;
 
     if (!buf) {
         virReportSystemError(errno, _("Failed to allocate aligned memory in function %s"), __FUNCTION__);
         return -5;
     }
-
+    if (p.idx >= 0) {
+        if (lseek(diskfd, p.idx * buflen, SEEK_CUR) < 0) {
+            virReportSystemError(errno, "%s", _("Failed to lseek to file channel offset"));
+            return -6;
+        }
+    }
     while (1) {
         ssize_t got;
 
@@ -4808,7 +4817,12 @@ runIOCopy(const struct runIOParams p)
             break;
 
         total += got;
-
+        if (p.idx >= 0 && !p.isWrite && total > p.total) {
+            /* do not write to socket too much for this channel, according to CLIA */
+            off_t difference = total - p.total;
+            got -= difference;
+            total -= difference;
+        }
         /* handle last write size align in direct case */
         if (got < buflen && p.isDirect && p.isWrite) {
             ssize_t nwritten = virFileDirectWrite(p.fdout, buf, got);
@@ -4816,7 +4830,7 @@ runIOCopy(const struct runIOParams p)
                 virReportSystemError(errno, _("Unable to write %s"), p.fdoutname);
                 return -3;
             }
-            if (!p.isBlockDev) {
+            if (!p.isBlockDev && p.idx < 0) {
                 off_t off = lseek(p.fdout, (off_t)0, SEEK_CUR);
                 if (off < 0) {
                     virReportSystemError(errno, "%s", _("Failed to lseek to get current file offset"));
@@ -4824,6 +4838,7 @@ runIOCopy(const struct runIOParams p)
                 }
                 if (nwritten > got) {
                     off -= nwritten - got;
+                    total -= nwritten - got;
                 }
                 if (ftruncate(p.fdout, off) < 0) {
                     virReportSystemError(errno, _("Unable to truncate %s"), p.fdoutname);
@@ -4838,51 +4853,61 @@ runIOCopy(const struct runIOParams p)
             virReportSystemError(errno, _("Unable to write %s"), p.fdoutname);
             return -3;
         }
+        if (p.idx >= 0) {
+            if (!p.isWrite && total >= p.total) {
+                /* done for this channel */
+                break;
+            }
+            /* move channel cursor to the next record */
+            if (lseek(diskfd, buflen * (p.nchannels - 1), SEEK_CUR) < 0) {
+                virReportSystemError(errno, "%s", _("Failed to lseek to next channel record"));
+                return -7;
+            }
+        }
     }
     return total;
 }
 
 /**
- * virFileDiskCopy: run IO to copy data between storage and a pipe or socket.
- *
- * @disk_fd:     the already open regular file or block device
- * @disk_path:   the pathname corresponding to disk_fd (for error reporting)
- * @remote_fd:   the pipe or socket
- *               Use -1 to auto-choose between STDIN or STDOUT.
- * @remote_path: the pathname corresponding to remote_fd (for error reporting)
- *
- * Note that the direction of the transfer is detected based on the @disk_fd
- * file access mode (man 2 open). Therefore @disk_fd must be opened with
- * O_RDONLY or O_WRONLY. O_RDWR is not supported.
- *
- * virFileDiskCopy always closes the file descriptor disk_fd,
- * and any error during close(2) is reported and considered a failure.
- *
- * Returns: bytes transferred or < 0 on failure.
+ * virFileDiskCopyChannel: like virFileDiskCopy, channel interleaved read/write
+ * ...
+ * @idx:       channel index
+ * @nchannels: total number of channels
  */
 
 off_t
-virFileDiskCopy(int disk_fd, const char *disk_path, int remote_fd, const char *remote_path)
+virFileDiskCopyChannel(int disk_fd, const char *disk_path, int remote_fd, const char *remote_path,
+                       int idx, int nchannels, off_t total)
 {
-    int ret = -1;
-    off_t total = 0;
+    off_t new_total = -1;
     struct stat sb;
     struct runIOParams p;
     int oflags = -1;
 
+    if ((nchannels == 0) ||
+        (nchannels > 0 && idx >= nchannels) ||
+        (nchannels > 0 && idx < 0) ||
+        (nchannels < 0 && idx >= 0)) {
+        virReportSystemError(EINVAL, "%s", _("Invalid channel arguments"));
+        goto out;
+    }
+    p.idx = idx;
+    p.nchannels = nchannels;
+    p.total = total;
+
     oflags = fcntl(disk_fd, F_GETFL);
 
     if (oflags < 0) {
         virReportSystemError(errno,
                              _("unable to determine access mode of %s"),
                              disk_path);
-        goto cleanup;
+        goto out;
     }
     if (fstat(disk_fd, &sb) < 0) {
         virReportSystemError(errno,
                              _("unable to stat file descriptor %d path %s"),
                              disk_fd, disk_path);
-        goto cleanup;
+        goto out;
     }
     p.isBlockDev = S_ISBLK(sb.st_mode);
     p.isDirect = O_DIRECT && (oflags & O_DIRECT);
@@ -4906,53 +4931,79 @@ virFileDiskCopy(int disk_fd, const char *disk_path, int remote_fd, const char *r
     default:
         virReportSystemError(EINVAL, _("Unable to process file with flags %d"),
                              (oflags & O_ACCMODE));
-        goto cleanup;
+        goto out;
     }
     if (!p.isBlockDev && p.isDirect) {
         off_t off = lseek(disk_fd, 0, SEEK_CUR);
         if (off < 0) {
             virReportSystemError(errno, "%s", _("O_DIRECT needs a seekable file"));
-            goto cleanup;
+            goto out;
         }
         if (virFileDirectAlign(off) != off) {
             /* we could write some zeroes, but maybe it is safer to just fail */
             virReportSystemError(EINVAL, "%s", _("O_DIRECT attempted with unaligned file pointer"));
-            goto cleanup;
+            goto out;
         }
     }
-    total = runIOCopy(p);
-    if (total < 0)
-        goto cleanup;
-
-    /* Ensure all data is written */
-    if (virFileDataSync(p.fdout) < 0) {
-        if (errno != EINVAL && errno != EROFS) {
-            /* fdatasync() may fail on some special FDs, e.g. pipes */
-            virReportSystemError(errno, _("unable to fsync %s"), p.fdoutname);
-            goto cleanup;
+    new_total = runIOCopy(p);
+    if (new_total < 0)
+        goto out;
+
+    if (p.idx < 0 && p.isWrite) {
+        /* without channels we can run the fdatasync here */
+        if (virFileDataSync(disk_fd) < 0) {
+            if (errno != EINVAL && errno != EROFS) {
+                virReportSystemError(errno, _("unable to fsyncdata %s"), p.fdoutname);
+                new_total = -1;
+                goto out;
+            }
         }
     }
 
-    ret = 0;
-
- cleanup:
-    if (VIR_CLOSE(disk_fd) < 0 && ret == 0) {
-        virReportSystemError(errno, _("Unable to close %s"), disk_path);
-        ret = -1;
-    }
-    return ret;
+ out:
+    return new_total;
 }
 
 #else /* WIN32 */
 
 off_t
-virFileDiskCopy(int disk_fd G_GNUC_UNUSED,
-                const char *disk_path G_GNUC_UNUSED,
-                int remote_fd G_GNUC_UNUSED,
-                const char *remote_path G_GNUC_UNUSED)
+virFileDiskCopyChannel(int disk_fd G_GNUC_UNUSED,
+                       const char *disk_path G_GNUC_UNUSED,
+                       int remote_fd G_GNUC_UNUSED,
+                       const char *remote_path G_GNUC_UNUSED,
+                       int idx G_GNUC_UNUSED,
+                       int nchannels G_GNUC_UNUSED,
+                       off_t total G_GNUC_UNUSED)
 {
     virReportError(VIR_ERR_INTERNAL_ERROR, "%s",
-                   _("virFileDiskCopy unsupported on this platform"));
+                   _("virFileDiskCopyChannel unsupported on this platform"));
     return -1;
 }
 #endif /* WIN32 */
+
+/**
+ * virFileDiskCopy: run IO to copy data between storage and a pipe or socket.
+ *
+ * @disk_fd:     the already open regular file or block device
+ * @disk_path:   the pathname corresponding to disk_fd (for error reporting)
+ * @remote_fd:   the pipe or socket
+ *               Use -1 to auto-choose between STDIN or STDOUT.
+ * @remote_path: the pathname corresponding to remote_fd (for error reporting)
+ *
+ * Note that the direction of the transfer is detected based on the @disk_fd
+ * file access mode (man 2 open). Therefore @disk_fd must be opened with
+ * O_RDONLY or O_WRONLY. O_RDWR is not supported.
+ *
+ * virFileDiskCopy always closes the file descriptor disk_fd,
+ * and any error during close(2) is reported and considered a failure.
+ *
+ * Returns: bytes transferred or < 0 on failure.
+ */
+
+off_t
+virFileDiskCopy(int disk_fd, const char *disk_path,
+                int remote_fd, const char *remote_path)
+{
+    return virFileDiskCopyChannel(disk_fd, disk_path, remote_fd, remote_path,
+                                  -1, -1, 0);
+}
diff --git a/src/util/virfile.h b/src/util/virfile.h
index 844261e0a4..4d75389c84 100644
--- a/src/util/virfile.h
+++ b/src/util/virfile.h
@@ -394,3 +394,5 @@ int virFileSetCOW(const char *path,
                   virTristateBool state);
 
 off_t virFileDiskCopy(int disk_fd, const char *disk_path, int remote_fd, const char *remote_path);
+off_t virFileDiskCopyChannel(int disk_fd, const char *disk_path, int remote_fd, const char *remote_path,
+                             int idx, int nchannels, off_t total);
-- 
2.26.2



More information about the libvir-list mailing list