[Libguestfs] [PATCH nbdkit] streaming plugin: Add support for a sliding window
Richard W.M. Jones
rjones at redhat.com
Wed Oct 15 09:11:15 UTC 2014
[Sorry, can't use git-send-email at the moment ...]
These patches implement a sliding window for the streaming plugin[1]
in nbdkit.
I would like to be able to stream a filesystem from tools such as
'virt-make-fs'[2]. This is a fairly frequently requested feature.
Unfortunately:
(a) The patches make the code significantly more complex and therefore
likely to have bugs.
(b) They are not practically useful. 'parted' likes to write to the
beginning and end of a disk, even when creating a simple MBR, and of
course 'mkfs' scribbles the group headers across the whole disk when
creating a filesystem.
A simple window approach is obviously not sufficient. A better
approach might be something like a sparse, size-limited map recording
writes at any point in the disk. But that has the problem that you
don't know when you can commit a write to the stream -- some heuristic
would have to be used.
I'm posting them to the mailing list for the record and in case anyone
has any better ideas.
Rich.
[1] http://rwmj.wordpress.com/2014/10/14/streaming-nbd-server/#content
[2] http://libguestfs.org/virt-make-fs.1.html
--
Richard Jones, Virtualization Group, Red Hat http://people.redhat.com/~rjones
Read my programming and virtualization blog: http://rwmj.wordpress.com
Fedora Windows cross-compiler. Compile Windows programs, test, and
build Windows installers. Over 100 libraries supported.
http://fedoraproject.org/wiki/MinGW
-------------- next part --------------
>From be039f70da0c3ece9075724bf5ff29a45038dce5 Mon Sep 17 00:00:00 2001
From: "Richard W.M. Jones" <rjones at redhat.com>
Date: Tue, 14 Oct 2014 16:38:50 +0200
Subject: [PATCH 1/2] streaming: Implement sliding window and add window=SIZE
parameter.
---
plugins/streaming/nbdkit-streaming-plugin.pod | 17 +-
plugins/streaming/streaming.c | 290 +++++++++++++++++++++-----
2 files changed, 256 insertions(+), 51 deletions(-)
diff --git a/plugins/streaming/nbdkit-streaming-plugin.pod b/plugins/streaming/nbdkit-streaming-plugin.pod
index a21ed4f..635af69 100644
--- a/plugins/streaming/nbdkit-streaming-plugin.pod
+++ b/plugins/streaming/nbdkit-streaming-plugin.pod
@@ -6,7 +6,7 @@ nbdkit-streaming-plugin - nbdkit streaming plugin
=head1 SYNOPSIS
- nbdkit streaming pipe=FILENAME [size=SIZE]
+ nbdkit streaming pipe=FILENAME [size=SIZE] [window=SIZE]
=head1 DESCRIPTION
@@ -50,12 +50,23 @@ Whether you need to specify this parameter depends on the client.
Some clients don't check the size and just write/stream, others do
checks or calculations based on the apparent size.
+=item B<window=SIZE>
+
+Specify a sliding window of data, allowing limited seeking backwards
+and reads. You can use any size specifier permitted by
+C<nbdkit_parse_size>, eg. C<window=1M>.
+
+Note that this is disabled (set to 0) by default, since enabling it
+causes writes to be delayed until the client moves the window forward
+or until nbdkit exits.
+
=back
=head1 TO DO
-This plugin would be much nicer if it supported the concept of a
-"window" of data, allowing limited reverse seeks and reads.
+Separate read and write windows would make more sense, allowing a
+large read window and a small write window. The smaller (or zero)
+write window would mean that writes are not delayed.
=head1 SEE ALSO
diff --git a/plugins/streaming/streaming.c b/plugins/streaming/streaming.c
index f58fa46..2d08803 100644
--- a/plugins/streaming/streaming.c
+++ b/plugins/streaming/streaming.c
@@ -41,20 +41,28 @@
#include <sys/types.h>
#include <sys/stat.h>
#include <errno.h>
+#include <assert.h>
#include <nbdkit-plugin.h>
+#define min(a,b) ((a)<(b)?(a):(b))
+
static char *filename = NULL;
static int fd = -1;
/* In theory INT64_MAX, but it breaks qemu's NBD driver. */
static int64_t size = INT64_MAX/2;
-/* Flag if we have entered the unrecoverable error state because of
- * a seek backwards.
+/* Flag if we have entered the unrecoverable error state because of a
+ * seek backwards beyond the window.
*/
static int errorstate = 0;
+/* Window. */
+static int64_t window_max_size = 0; /* window= parameter */
+static int64_t window_size = 0; /* current size */
+static char *window = NULL;
+
/* Highest byte (+1) that has been written in the data stream. */
static uint64_t highestwrite = 0;
@@ -73,6 +81,11 @@ streaming_config (const char *key, const char *value)
if (size == -1)
return -1;
}
+ else if (strcmp (key, "window") == 0) {
+ window_max_size = nbdkit_parse_size (value);
+ if (window_max_size == -1)
+ return -1;
+ }
else {
nbdkit_error ("unknown parameter '%s'", key);
return -1;
@@ -110,18 +123,10 @@ streaming_config_complete (void)
return 0;
}
-/* nbdkit is shutting down. */
-static void
-streaming_unload (void)
-{
- if (fd >= 0)
- close (fd);
- free (filename);
-}
-
#define streaming_config_help \
"pipe=<FILENAME> (required) The filename to serve.\n" \
- "size=<SIZE> (optional) Stream size."
+ "size=<SIZE> (optional) Stream size.\n" \
+ "window=<SIZE> (optional) Window size."
/* Create the per-connection handle. */
static void *
@@ -160,13 +165,66 @@ streaming_get_size (void *handle)
return size;
}
+static int
+xwrite (int fd, const char *buf, size_t n)
+{
+ ssize_t r;
+
+ while (n > 0) {
+ r = write (fd, buf, n);
+ if (r == -1) {
+ nbdkit_error ("write: %m");
+ return -1;
+ }
+ buf += r;
+ n -= r;
+ }
+ return 0;
+}
+
+static int
+xwrite_zeroes (int fd, size_t n)
+{
+ ssize_t r;
+ char buf[4096];
+
+ memset (buf, 0, sizeof buf);
+
+ while (n > 0) {
+ r = write (fd, buf, min (n, sizeof buf));
+ if (r == -1) {
+ nbdkit_error ("write: %m");
+ return -1;
+ }
+ n -= r;
+ }
+ return 0;
+}
+
+/*
+This diagram should help when trying to understand the pread and
+pwrite calls below.
+
+Note that we recursively split read and write calls to make the cases
+tractable.
+
+ |<------- window_max_size ------->|
+ |<---- window_size ----->|
+ +------------------------+------------------------+--------+----------
+ ^ ^ ^ ^
+ 0 windowstart highestwrite maxwindow
+
+ */
+
/* Write data to the stream. */
static int
streaming_pwrite (void *handle, const void *buf,
uint32_t count, uint64_t offset)
{
- size_t n;
- ssize_t r;
+ int r;
+ uint64_t windowstart;
+ uint64_t maxwindow;
+ int64_t delta;
if (errorstate) {
nbdkit_error ("unrecoverable error state");
@@ -174,63 +232,199 @@ streaming_pwrite (void *handle, const void *buf,
return -1;
}
- if (offset < highestwrite) {
- nbdkit_error ("client tried to seek backwards and write: the streaming plugin does not currently support this");
+ /* This just makes the recursive case easier to reason about. */
+ if (count == 0)
+ return 0;
+
+ windowstart = highestwrite - window_size;
+
+ if (offset < windowstart) {
+ nbdkit_error ("client seeked backwards > window size: you must increase the window size");
errorstate = 1;
errno = EIO;
return -1;
}
- /* Need to write some zeroes. */
- if (offset > highestwrite) {
- int64_t size = offset - highestwrite;
- char buf[4096];
-
- memset (buf, 0, sizeof buf);
-
- while (size > 0) {
- n = size > sizeof buf ? sizeof buf : size;
- r = write (fd, buf, n);
- if (r == -1) {
- nbdkit_error ("write: %m");
- errorstate = 1;
- return -1;
- }
- highestwrite += r;
- size -= r;
- }
+ /* Split writes across highestwrite and maxwindow boundaries.
+ * Splitting here means we do not have to deal with writes across
+ * the boundary in the code below.
+ */
+ if (offset < highestwrite && offset + count > highestwrite) {
+ uint64_t size = highestwrite - offset;
+
+ r = streaming_pwrite (handle, buf, size, offset);
+ if (r == -1)
+ return -1;
+ buf += size;
+ offset += size;
+ count -= size;
+ return streaming_pwrite (handle, buf, count, offset);
+ }
+
+ maxwindow = windowstart + window_max_size;
+
+ if (offset < maxwindow && offset + count > maxwindow) {
+ uint64_t size = maxwindow - offset;
+
+ r = streaming_pwrite (handle, buf, size, offset);
+ if (r == -1)
+ return -1;
+ buf += size;
+ offset += size;
+ count -= size;
+ return streaming_pwrite (handle, buf, count, offset);
+ }
+
+ /* Handle a write entirely within the current window. */
+ if (offset < highestwrite) {
+ uint64_t windowoffset = window_size - (highestwrite - offset);
+ memcpy (&window[windowoffset], buf, count);
+ return 0;
}
- /* Write the data. */
- while (count > 0) {
- r = write (fd, buf, count);
- if (r == -1) {
- nbdkit_error ("write: %m");
- errorstate = 1;
+ /* A write after highestwrite but not larger than maxwindow causes
+ * the window to be extended but not moved.
+ */
+ if (offset < maxwindow) {
+ uint64_t new_highestwrite = offset + count;
+ uint64_t new_size = new_highestwrite - windowstart;
+ char *new_window;
+
+ new_window = realloc (window, new_size);
+ if (new_window == NULL) {
+ nbdkit_error ("realloc: %m");
return -1;
}
- buf += r;
- highestwrite += r;
- count -= r;
+ window = new_window;
+ /* Make sure the extended window is zeroes to start with. */
+ memset (&window[window_size], 0, new_size - window_size);
+ highestwrite = new_highestwrite;
+ /* Copy the buffer to the new window. */
+ memcpy (&window[offset - windowstart], buf, count);
+ return 0;
+ }
+
+ /* Split writes after maxwindow at highestwrite + window_max_size. */
+ if (offset < highestwrite + window_max_size &&
+ offset + count > highestwrite + window_max_size) {
+ uint64_t size = highestwrite + window_max_size - offset;
+
+ r = streaming_pwrite (handle, buf, size, offset);
+ if (r == -1)
+ return -1;
+ buf += size;
+ offset += size;
+ count -= size;
+ return streaming_pwrite (handle, buf, count, offset);
+ }
+
+ /* Any write here is going to cause the window to move. Delta is
+ * the amount by which the window will move (NB: might be greater
+ * than the window size).
+ */
+ delta = offset + count - highestwrite;
+
+ if (delta <= window_size) {
+ /* Write out the oldest part of the window. */
+ if (xwrite (fd, window, delta) == -1)
+ return -1;
+
+ /* Move the data in the window down. */
+ memmove (window, window + delta, window_size - delta);
+
+ /* Copy the buffer to the new window. */
+ memcpy (window + window_size - count, buf, count);
+ highestwrite += delta;
+ return 0;
}
- return 0;
+ /* The window will move by more than a single window size. Write out
+ * the whole of the old window, then write zeroes, then continue the
+ * write.
+ */
+ if (xwrite (fd, window, window_size) == -1)
+ return -1;
+ memset (window, 0, window_size);
+
+ if (xwrite_zeroes (fd, delta - window_size) == -1)
+ return -1;
+
+ highestwrite += delta - window_size;
+
+ return streaming_pwrite (handle, buf, count, offset);
}
/* Read data back from the stream. */
static int
streaming_pread (void *handle, void *buf, uint32_t count, uint64_t offset)
{
+ uint64_t windowstart;
+ int r;
+
if (errorstate) {
nbdkit_error ("unrecoverable error state");
errno = EIO;
return -1;
}
- nbdkit_error ("client tried to read: the streaming plugin does not currently support this");
- errorstate = 1;
- errno = EIO;
- return -1;
+ /* This just makes the recursive case easier to reason about. */
+ if (count == 0)
+ return 0;
+
+ windowstart = highestwrite - window_size;
+
+ if (offset < windowstart) {
+ nbdkit_error ("client seeked backwards > window size: you must increase the window size");
+ errorstate = 1;
+ errno = EIO;
+ return -1;
+ }
+
+ /* Split reads across highestwrite boundary. Splitting here means
+ * we do not have to deal with reads across the boundary in the code
+ * below.
+ */
+ if (offset < highestwrite && offset + count > highestwrite) {
+ uint64_t size = highestwrite - offset;
+
+ r = streaming_pread (handle, buf, size, offset);
+ if (r == -1)
+ return -1;
+ buf += size;
+ offset += size;
+ count -= size;
+ return streaming_pread (handle, buf, count, offset);
+ }
+
+ /* Handle a read entirely within the window by simply reading the
+ * window contents.
+ */
+ if (offset < highestwrite) {
+ uint64_t windowoffset = window_size - (highestwrite - offset);
+ memcpy (buf, &window[windowoffset], count);
+ return 0;
+ }
+
+ /* Else any read ahead of the current highest write is returned as
+ * all zeroes.
+ */
+ memset (buf, 0, count);
+ return 0;
+}
+
+/* nbdkit is shutting down - the rest of the window should be written out. */
+static void
+streaming_unload (void)
+{
+ if (fd >= 0) {
+ /* XXX impossible to report an error to the client here */
+ xwrite (fd, window, window_size);
+
+ close (fd);
+ }
+
+ free (window);
+ free (filename);
}
static struct nbdkit_plugin plugin = {
--
2.0.4
-------------- next part --------------
>From 0c4ffccc7258dcff94cc40abcf470a3c5ad788c3 Mon Sep 17 00:00:00 2001
From: "Richard W.M. Jones" <rjones at redhat.com>
Date: Tue, 14 Oct 2014 14:31:08 +0200
Subject: [PATCH 2/2] tests: Enable streaming test.
---
plugins/streaming/streaming.c | 13 +++++++++++--
tests/Makefile.am | 17 +++++------------
tests/test-streaming.c | 16 +++++++---------
3 files changed, 23 insertions(+), 23 deletions(-)
diff --git a/plugins/streaming/streaming.c b/plugins/streaming/streaming.c
index 2d08803..da4de63 100644
--- a/plugins/streaming/streaming.c
+++ b/plugins/streaming/streaming.c
@@ -35,6 +35,8 @@
#include <stdio.h>
#include <stdlib.h>
+#include <stdint.h>
+#include <inttypes.h>
#include <string.h>
#include <fcntl.h>
#include <unistd.h>
@@ -232,6 +234,10 @@ streaming_pwrite (void *handle, const void *buf,
return -1;
}
+ nbdkit_debug ("pwrite: offset=%" PRIi64 " count=%" PRIu32
+ " highestwrite=%" PRIu64,
+ offset, count, highestwrite);
+
/* This just makes the recursive case easier to reason about. */
if (count == 0)
return 0;
@@ -239,7 +245,8 @@ streaming_pwrite (void *handle, const void *buf,
windowstart = highestwrite - window_size;
if (offset < windowstart) {
- nbdkit_error ("client seeked backwards > window size: you must increase the window size");
+ nbdkit_error ("pwrite: client backwards seek > window size: you must increase the window size (highestwrite=%" PRIu64 ", window_size=%" PRIi64 ")",
+ highestwrite, window_size);
errorstate = 1;
errno = EIO;
return -1;
@@ -298,6 +305,7 @@ streaming_pwrite (void *handle, const void *buf,
window = new_window;
/* Make sure the extended window is zeroes to start with. */
memset (&window[window_size], 0, new_size - window_size);
+ window_size = new_size;
highestwrite = new_highestwrite;
/* Copy the buffer to the new window. */
memcpy (&window[offset - windowstart], buf, count);
@@ -374,7 +382,8 @@ streaming_pread (void *handle, void *buf, uint32_t count, uint64_t offset)
windowstart = highestwrite - window_size;
if (offset < windowstart) {
- nbdkit_error ("client seeked backwards > window size: you must increase the window size");
+ nbdkit_error ("pread: client backwards seek > window size: you must increase the window size (highestwrite=%" PRIu64 ", window_size=%" PRIi64 ")",
+ highestwrite, window_size);
errorstate = 1;
errno = EIO;
return -1;
diff --git a/tests/Makefile.am b/tests/Makefile.am
index a50e26b..cccd45b 100644
--- a/tests/Makefile.am
+++ b/tests/Makefile.am
@@ -172,18 +172,11 @@ test_python_LDADD = libtest.la $(LIBGUESTFS_LIBS)
endif
# streaming plugin test.
+check_PROGRAMS += test-streaming
+TESTS += test-streaming
-# This is disabled at the moment because the libguestfs appliance
-# kernel tries to read from the device (eg to read the partition
-# table) and the current streaming plugin cannot handle this.
-# Implementing a sliding window in the plugin would fix this. (XXX)
-EXTRA_DIST += test-streaming.c
-
-#check_PROGRAMS += test-streaming
-#TESTS += test-streaming
-#
-#test_streaming_SOURCES = test-streaming.c test.h
-#test_streaming_CFLAGS = $(WARNINGS_CFLAGS) $(LIBGUESTFS_CFLAGS)
-#test_streaming_LDADD = libtest.la $(LIBGUESTFS_LIBS)
+test_streaming_SOURCES = test-streaming.c test.h
+test_streaming_CFLAGS = $(WARNINGS_CFLAGS) $(LIBGUESTFS_CFLAGS)
+test_streaming_LDADD = libtest.la $(LIBGUESTFS_LIBS)
endif
diff --git a/tests/test-streaming.c b/tests/test-streaming.c
index 1631c19..4610fb9 100644
--- a/tests/test-streaming.c
+++ b/tests/test-streaming.c
@@ -48,7 +48,7 @@
#include "test.h"
-static char data[4096];
+static char data[1024];
int
main (int argc, char *argv[])
@@ -69,6 +69,8 @@ main (int argc, char *argv[])
if (test_start_nbdkit (NBDKIT_PLUGIN ("streaming"),
"pipe=streaming.fifo",
+ "size=128k",
+ "window=128k",
NULL) == -1)
exit (EXIT_FAILURE);
@@ -121,14 +123,10 @@ main (int argc, char *argv[])
exit (EXIT_FAILURE);
/* Write linearly to the virtual disk. */
- for (i = 0; i < 10; ++i) {
- memset (data, i+1, sizeof data);
-
- /* Note that we deliberately skip forwards, in order to
- * exercise seeking code in the streaming plugin.
- */
+ memset (data, 1, sizeof data);
+ for (i = 0; i < 32; ++i) {
guestfs_pwrite_device (g, "/dev/sda", data, sizeof data,
- (2 * i) * sizeof data);
+ i * sizeof data);
}
if (guestfs_shutdown (g) == -1)
@@ -148,7 +146,7 @@ main (int argc, char *argv[])
}
md5[32] = '\0';
- if (strcmp (md5, "0123456789abcdef0123456789abcdef") != 0) {
+ if (strcmp (md5, "51ae9fa5fb90e9d51c4f1b4260285c99") != 0) {
fprintf (stderr, "unexpected hash: %s\n", md5);
exit (EXIT_FAILURE);
}
--
2.0.4
More information about the Libguestfs
mailing list