[libvirt RFCv11 14/33] virfile: add new API virFileDiskCopyChannel
Claudio Fontana
cfontana at suse.de
Tue Jun 7 10:10:23 UTC 2022
On 6/7/22 11:19, Claudio Fontana wrote:
> allow interleaved parallel write to a single file,
> using a record size equal to the io buffer size (1MB).
>
> Signed-off-by: Claudio Fontana <cfontana at suse.de>
> ---
> src/util/iohelper.c | 3 +
> src/util/virfile.c | 151 +++++++++++++++++++++++++++++---------------
> src/util/virfile.h | 2 +
> 3 files changed, 106 insertions(+), 50 deletions(-)
>
> diff --git a/src/util/iohelper.c b/src/util/iohelper.c
> index 055540c8c4..dcbdda366f 100644
> --- a/src/util/iohelper.c
> +++ b/src/util/iohelper.c
> @@ -85,6 +85,9 @@ main(int argc, char **argv)
> if (fd < 0 || virFileDiskCopy(fd, path, -1, "stdio") < 0)
> goto error;
>
> + if (VIR_CLOSE(fd) < 0)
> + goto error;
> +
> return 0;
>
> error:
> diff --git a/src/util/virfile.c b/src/util/virfile.c
> index 201d7f4e64..f9ae7d94c4 100644
> --- a/src/util/virfile.c
> +++ b/src/util/virfile.c
> @@ -4761,6 +4761,9 @@ struct runIOParams {
> const char *fdinname;
> int fdout;
> const char *fdoutname;
> + int idx;
> + int nchannels;
> + off_t total;
> };
>
> /**
> @@ -4779,12 +4782,18 @@ runIOCopy(const struct runIOParams p)
> off_t total = 0;
> size_t buflen = 1024*1024;
> char *buf = virFileDirectBufferNew(&base, buflen);
> + int diskfd = p.isWrite ? p.fdout : p.fdin;
>
> if (!buf) {
> virReportSystemError(errno, _("Failed to allocate aligned memory in function %s"), __FUNCTION__);
> return -5;
> }
> -
> + if (p.idx >= 0) {
> + if (lseek(diskfd, p.idx * buflen, SEEK_CUR) < 0) {
> + virReportSystemError(errno, "%s", _("Failed to lseek to file channel offset"));
> + return -6;
> + }
> + }
> while (1) {
> ssize_t got;
>
> @@ -4808,7 +4817,12 @@ runIOCopy(const struct runIOParams p)
> break;
>
> total += got;
> -
> + if (p.idx >= 0 && !p.isWrite && total > p.total) {
> + /* do not write to socket too much for this channel, according to CLIA */
> + off_t difference = total - p.total;
> + got -= difference;
> + total -= difference;
> + }
> /* handle last write size align in direct case */
> if (got < buflen && p.isDirect && p.isWrite) {
> ssize_t nwritten = virFileDirectWrite(p.fdout, buf, got);
> @@ -4816,7 +4830,7 @@ runIOCopy(const struct runIOParams p)
> virReportSystemError(errno, _("Unable to write %s"), p.fdoutname);
> return -3;
> }
> - if (!p.isBlockDev) {
> + if (!p.isBlockDev && p.idx < 0) {
> off_t off = lseek(p.fdout, (off_t)0, SEEK_CUR);
> if (off < 0) {
> virReportSystemError(errno, "%s", _("Failed to lseek to get current file offset"));
> @@ -4824,6 +4838,7 @@ runIOCopy(const struct runIOParams p)
> }
> if (nwritten > got) {
> off -= nwritten - got;
> + total -= nwritten - got;
> }
> if (ftruncate(p.fdout, off) < 0) {
> virReportSystemError(errno, _("Unable to truncate %s"), p.fdoutname);
> @@ -4838,51 +4853,61 @@ runIOCopy(const struct runIOParams p)
> virReportSystemError(errno, _("Unable to write %s"), p.fdoutname);
> return -3;
> }
> + if (p.idx >= 0) {
> + if (!p.isWrite && total >= p.total) {
> + /* done for this channel */
> + break;
> + }
> + /* move channel cursor to the next record */
> + if (lseek(diskfd, buflen * (p.nchannels - 1), SEEK_CUR) < 0) {
> + virReportSystemError(errno, "%s", _("Failed to lseek to next channel record"));
> + return -7;
> + }
> + }
> }
> return total;
> }
>
> /**
> - * virFileDiskCopy: run IO to copy data between storage and a pipe or socket.
> - *
> - * @disk_fd: the already open regular file or block device
> - * @disk_path: the pathname corresponding to disk_fd (for error reporting)
> - * @remote_fd: the pipe or socket
> - * Use -1 to auto-choose between STDIN or STDOUT.
> - * @remote_path: the pathname corresponding to remote_fd (for error reporting)
> - *
> - * Note that the direction of the transfer is detected based on the @disk_fd
> - * file access mode (man 2 open). Therefore @disk_fd must be opened with
> - * O_RDONLY or O_WRONLY. O_RDWR is not supported.
> - *
> - * virFileDiskCopy always closes the file descriptor disk_fd,
> - * and any error during close(2) is reported and considered a failure.
> - *
> - * Returns: bytes transferred or < 0 on failure.
> + * virFileDiskCopyChannel: like virFileDiskCopy, channel interleaved read/write
> + * ...
> + * @idx: channel index
> + * @nchannels: total number of channels
> */
>
> off_t
> -virFileDiskCopy(int disk_fd, const char *disk_path, int remote_fd, const char *remote_path)
> +virFileDiskCopyChannel(int disk_fd, const char *disk_path, int remote_fd, const char *remote_path,
> + int idx, int nchannels, off_t total)
> {
> - int ret = -1;
> - off_t total = 0;
> + off_t new_total = -1;
> struct stat sb;
> struct runIOParams p;
> int oflags = -1;
>
> + if ((nchannels == 0) ||
> + (nchannels > 0 && idx >= nchannels) ||
> + (nchannels > 0 && idx < 0) ||
> + (nchannels < 0 && idx >= 0)) {
> + virReportSystemError(EINVAL, "%s", _("Invalid channel arguments"));
> + goto out;
> + }
> + p.idx = idx;
> + p.nchannels = nchannels;
> + p.total = total;
> +
> oflags = fcntl(disk_fd, F_GETFL);
>
> if (oflags < 0) {
> virReportSystemError(errno,
> _("unable to determine access mode of %s"),
> disk_path);
> - goto cleanup;
> + goto out;
> }
> if (fstat(disk_fd, &sb) < 0) {
> virReportSystemError(errno,
> _("unable to stat file descriptor %d path %s"),
> disk_fd, disk_path);
> - goto cleanup;
> + goto out;
> }
> p.isBlockDev = S_ISBLK(sb.st_mode);
> p.isDirect = O_DIRECT && (oflags & O_DIRECT);
> @@ -4906,53 +4931,79 @@ virFileDiskCopy(int disk_fd, const char *disk_path, int remote_fd, const char *r
> default:
> virReportSystemError(EINVAL, _("Unable to process file with flags %d"),
> (oflags & O_ACCMODE));
> - goto cleanup;
> + goto out;
> }
> if (!p.isBlockDev && p.isDirect) {
> off_t off = lseek(disk_fd, 0, SEEK_CUR);
> if (off < 0) {
> virReportSystemError(errno, "%s", _("O_DIRECT needs a seekable file"));
> - goto cleanup;
> + goto out;
> }
> if (virFileDirectAlign(off) != off) {
> /* we could write some zeroes, but maybe it is safer to just fail */
> virReportSystemError(EINVAL, "%s", _("O_DIRECT attempted with unaligned file pointer"));
> - goto cleanup;
> + goto out;
> }
> }
> - total = runIOCopy(p);
> - if (total < 0)
> - goto cleanup;
> -
> - /* Ensure all data is written */
> - if (virFileDataSync(p.fdout) < 0) {
> - if (errno != EINVAL && errno != EROFS) {
> - /* fdatasync() may fail on some special FDs, e.g. pipes */
> - virReportSystemError(errno, _("unable to fsync %s"), p.fdoutname);
> - goto cleanup;
> + new_total = runIOCopy(p);
> + if (new_total < 0)
> + goto out;
> +
> + if (p.idx < 0 && p.isWrite) {
> + /* without channels we can run the fdatasync here */
> + if (virFileDataSync(disk_fd) < 0) {
> + if (errno != EINVAL && errno != EROFS) {
> + virReportSystemError(errno, _("unable to fsyncdata %s"), p.fdoutname);
> + new_total = -1;
> + goto out;
> + }
> }
> }
>
> - ret = 0;
> -
> - cleanup:
> - if (VIR_CLOSE(disk_fd) < 0 && ret == 0) {
> - virReportSystemError(errno, _("Unable to close %s"), disk_path);
> - ret = -1;
> - }
> - return ret;
> + out:
> + return new_total;
> }
>
> #else /* WIN32 */
>
> off_t
> -virFileDiskCopy(int disk_fd G_GNUC_UNUSED,
> - const char *disk_path G_GNUC_UNUSED,
> - int remote_fd G_GNUC_UNUSED,
> - const char *remote_path G_GNUC_UNUSED)
> +virFileDiskCopyChannel(int disk_fd G_GNUC_UNUSED,
> + const char *disk_path G_GNUC_UNUSED,
> + int remote_fd G_GNUC_UNUSED,
> + const char *remote_path G_GNUC_UNUSED,
> + int idx G_GNUC_UNUSED,
> + int nchannels G_GNUC_UNUSED,
> + off_t total G_GNUC_UNUSED)
> {
> virReportError(VIR_ERR_INTERNAL_ERROR, "%s",
> - _("virFileDiskCopy unsupported on this platform"));
> + _("virFileDiskCopyChannel unsupported on this platform"));
> return -1;
> }
> #endif /* WIN32 */
> +
> +/**
> + * virFileDiskCopy: run IO to copy data between storage and a pipe or socket.
> + *
> + * @disk_fd: the already open regular file or block device
> + * @disk_path: the pathname corresponding to disk_fd (for error reporting)
> + * @remote_fd: the pipe or socket
> + * Use -1 to auto-choose between STDIN or STDOUT.
> + * @remote_path: the pathname corresponding to remote_fd (for error reporting)
> + *
> + * Note that the direction of the transfer is detected based on the @disk_fd
> + * file access mode (man 2 open). Therefore @disk_fd must be opened with
> + * O_RDONLY or O_WRONLY. O_RDWR is not supported.
> + *
> + * virFileDiskCopy always closes the file descriptor disk_fd,
> + * and any error during close(2) is reported and considered a failure.
this is not true anymore, the close needs to be done outside of virFileDiskCopy now.
> + *
> + * Returns: bytes transferred or < 0 on failure.
> + */
> +
> +off_t
> +virFileDiskCopy(int disk_fd, const char *disk_path,
> + int remote_fd, const char *remote_path)
> +{
> + return virFileDiskCopyChannel(disk_fd, disk_path, remote_fd, remote_path,
> + -1, -1, 0);
> +}
> diff --git a/src/util/virfile.h b/src/util/virfile.h
> index 844261e0a4..4d75389c84 100644
> --- a/src/util/virfile.h
> +++ b/src/util/virfile.h
> @@ -394,3 +394,5 @@ int virFileSetCOW(const char *path,
> virTristateBool state);
>
> off_t virFileDiskCopy(int disk_fd, const char *disk_path, int remote_fd, const char *remote_path);
> +off_t virFileDiskCopyChannel(int disk_fd, const char *disk_path, int remote_fd, const char *remote_path,
> + int idx, int nchannels, off_t total);
More information about the libvir-list
mailing list