[Libguestfs] [PATCH libnbd] examples: Include an example of integrating with the glib main loop.

Richard W.M. Jones rjones at redhat.com
Mon Jul 15 18:38:10 UTC 2019


---
 .gitignore                |   1 +
 README                    |   2 +
 configure.ac              |   9 +
 examples/Makefile.am      |  22 ++
 examples/glib-main-loop.c | 501 ++++++++++++++++++++++++++++++++++++++
 5 files changed, 535 insertions(+)

diff --git a/.gitignore b/.gitignore
index edbf941..79b95b5 100644
--- a/.gitignore
+++ b/.gitignore
@@ -41,6 +41,7 @@ Makefile.in
 /docs/libnbd-api.3
 /docs/libnbd-api.pod
 /examples/batched-read-write
+/examples/glib-main-loop
 /examples/threaded-reads-and-writes
 /examples/simple-fetch-first-sector
 /examples/simple-reads-and-writes
diff --git a/README b/README
index 3045312..443bb99 100644
--- a/README
+++ b/README
@@ -72,6 +72,8 @@ Optional:
 
  * qemu, qemu-io, qemu-img for interoperability testing.
 
+ * glib2 for examples that interoperate with the glib main loop.
+
  * OCaml and ocamlfind are both needed to generate the OCaml bindings.
 
  * Perl Pod::Man and Pod::Simple to generate the documentation.
diff --git a/configure.ac b/configure.ac
index 9c706c9..ccb19b5 100644
--- a/configure.ac
+++ b/configure.ac
@@ -153,6 +153,15 @@ AM_CONDITIONAL([HAVE_NBD_SERVER], [test "x$NBD_SERVER" != "x"])
 AC_CHECK_PROG([QEMU_NBD], [qemu-nbd], [qemu-nbd])
 AM_CONDITIONAL([HAVE_QEMU_NBD], [test "x$QEMU_NBD" != "x"])
 
+dnl glib2 main loop for examples that interoperate with the glib main loop.
+PKG_CHECK_MODULES([GLIB], [glib-2.0], [
+    AC_SUBST([GLIB_CFLAGS])
+    AC_SUBST([GLIB_LIBS])
+],[
+    AC_MSG_WARN([glib2 not found, some examples will not be compiled])
+])
+AM_CONDITIONAL([HAVE_GLIB], [test "x$GLIB_LIBS" != "x"])
+
 dnl Check we have enough to run podwrapper.
 AC_CHECK_PROG([PERL],[perl],[perl],[no])
 AS_IF([test "x$PERL" != "xno"],[
diff --git a/examples/Makefile.am b/examples/Makefile.am
index 7560855..de3f090 100644
--- a/examples/Makefile.am
+++ b/examples/Makefile.am
@@ -27,6 +27,11 @@ noinst_PROGRAMS = \
 	strict-structured-reads \
 	$(NULL)
 
+if HAVE_GLIB
+noinst_PROGRAMS += \
+	glib-main-loop
+endif
+
 simple_fetch_first_sector_SOURCES = \
 	simple-fetch-first-sector.c \
 	$(NULL)
@@ -93,3 +98,20 @@ batched_read_write_CFLAGS = \
 batched_read_write_LDADD = \
 	$(top_builddir)/lib/libnbd.la \
 	$(NULL)
+
+if HAVE_GLIB
+glib_main_loop_SOURCES = \
+	glib-main-loop.c \
+	$(NULL)
+glib_main_loop_CPPFLAGS = \
+	-I$(top_srcdir)/include \
+	$(NULL)
+glib_main_loop_CFLAGS = \
+	$(WARNINGS_CFLAGS) \
+	$(GLIB_CFLAGS) \
+	$(NULL)
+glib_main_loop_LDADD = \
+	$(top_builddir)/lib/libnbd.la \
+	$(GLIB_LIBS) \
+	$(NULL)
+endif
diff --git a/examples/glib-main-loop.c b/examples/glib-main-loop.c
new file mode 100644
index 0000000..dea4666
--- /dev/null
+++ b/examples/glib-main-loop.c
@@ -0,0 +1,501 @@
+/* This example shows you how to make libnbd interoperate with the
+ * glib main loop.  For more information about glib main loop see:
+ *
+ * https://developer.gnome.org/glib/stable/glib-The-Main-Event-Loop.html
+ *
+ * To run it, simply do:
+ *
+ *   ./examples/glib-main-loop
+ *
+ * For debugging, do:
+ *
+ *   LIBNBD_DEBUG=1 ./examples/glib-main-loop
+ */
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <stdbool.h>
+#include <stdint.h>
+#include <assert.h>
+
+#include <libnbd.h>
+
+#include <glib.h>
+
+struct NBDSource;
+typedef void (*connecting_callback_t) (struct NBDSource *);
+typedef void (*connected_callback_t) (struct NBDSource *);
+
+/* This is the derived GSource type. */
+struct NBDSource {
+  /* The base type.  This MUST be the first element in this struct. */
+  GSource source;
+
+  /* The underlying libnbd handle. */
+  struct nbd_handle *nbd;
+  bool debug;                   /* true if handle has debug set */
+
+  /* The poll file descriptor, only valid when poll_registered == true. */
+  bool poll_registered;
+  GPollFD pollfd;
+
+  /* You can optionally register callbacks to be called when the
+   * handle changes state:
+   *
+   * connecting_callback is called once when the handle moves from
+   * created to connecting state.
+   *
+   * connected_callback is called once when the handle moves from
+   * connecting to connected (ready) state.
+   */
+  connecting_callback_t connecting_callback;
+  connected_callback_t connected_callback;
+  bool called_connected_callback;
+
+  /* Arbitrary pointer for use by caller. */
+  gpointer user_data;
+};
+
+/* Print debug statements when debugging is set for the handle. */
+#define DEBUG(source, fs, ...)                                          \
+  do {                                                                  \
+    if ((source)->debug)                                                \
+      fprintf (stderr, "glib: debug: " fs "\n", ## __VA_ARGS__);        \
+  } while (0)
+
+/* These are the GSource functions for libnbd handles. */
+static inline int
+events_from_nbd (struct nbd_handle *nbd)
+{
+  int dir = nbd_aio_get_direction (nbd);
+  int r = 0;
+
+  if ((dir & LIBNBD_AIO_DIRECTION_READ) != 0)
+    r |= G_IO_IN;
+  if ((dir & LIBNBD_AIO_DIRECTION_WRITE) != 0)
+    r |= G_IO_OUT;
+  return r;
+}
+
+static gboolean
+prepare (GSource *sp, gint *timeout_)
+{
+  struct NBDSource *source = (struct NBDSource *) sp;
+
+  /* When the NBD handle moves out of the created state (which means
+   * that it first has a socket associated with it) we must initialize
+   * and register the pollfd.
+   */
+  if (!source->poll_registered && !nbd_aio_is_created (source->nbd)) {
+    int fd;
+
+    if (source->connecting_callback) {
+      DEBUG (source, "calling connecting_callback");
+      source->connecting_callback (source);
+    }
+
+    fd = nbd_aio_get_fd (source->nbd);
+    assert (fd >= 0);
+
+    source->pollfd.fd = fd;
+    source->pollfd.events = events_from_nbd (source->nbd);
+    g_source_add_poll ((GSource *)source, &source->pollfd);
+
+    DEBUG (source, "registered pollfd");
+
+    source->poll_registered = true;
+  }
+
+  if (!source->poll_registered)
+    return FALSE;
+
+  if (source->connected_callback &&
+      !source->called_connected_callback &&
+      nbd_aio_is_ready (source->nbd)) {
+    DEBUG (source, "calling connected_callback");
+    source->connected_callback (source);
+    source->called_connected_callback = true;
+  }
+
+  source->pollfd.events = events_from_nbd (source->nbd);
+  *timeout_ = -1;
+
+  DEBUG (source, "prepare: events = 0x%x%s%s",
+         source->pollfd.events,
+         source->pollfd.events & G_IO_IN ? " G_IO_IN" : "",
+         source->pollfd.events & G_IO_OUT ? " G_IO_OUT" : "");
+
+  return FALSE;
+}
+
+static gboolean
+check (GSource *sp)
+{
+  struct NBDSource *source = (struct NBDSource *) sp;
+  int dir;
+
+  if (!source->poll_registered)
+    return FALSE;
+
+  dir = nbd_aio_get_direction (source->nbd);
+
+  DEBUG (source, "check: direction = 0x%x%s%s, revents = 0x%x%s%s",
+         dir,
+         dir & LIBNBD_AIO_DIRECTION_READ ? " READ" : "",
+         dir & LIBNBD_AIO_DIRECTION_WRITE ? " WRITE" : "",
+         source->pollfd.revents,
+         source->pollfd.revents & G_IO_IN ? " G_IO_IN" : "",
+         source->pollfd.revents & G_IO_OUT ? " G_IO_OUT" : "");
+
+  if ((source->pollfd.revents & G_IO_IN) != 0 &&
+      (dir & LIBNBD_AIO_DIRECTION_READ) != 0)
+    return TRUE;
+  if ((source->pollfd.revents & G_IO_OUT) != 0 &&
+      (dir & LIBNBD_AIO_DIRECTION_WRITE) != 0)
+    return TRUE;
+
+  return FALSE;
+}
+
+static gboolean
+dispatch (GSource *sp,
+          GSourceFunc callback,
+          gpointer user_data)
+{
+  struct NBDSource *source = (struct NBDSource *) sp;
+
+  DEBUG (source, "dispatch: revents = 0x%x%s%s",
+         source->pollfd.revents,
+         source->pollfd.revents & G_IO_IN ? " G_IO_IN" : "",
+         source->pollfd.revents & G_IO_OUT ? " G_IO_OUT" : "");
+
+  if ((source->pollfd.revents & G_IO_IN) != 0)
+    nbd_aio_notify_read (source->nbd);
+  else if ((source->pollfd.revents & G_IO_OUT) != 0)
+    nbd_aio_notify_write (source->nbd);
+
+  return TRUE;
+}
+
+static void
+finalize (GSource *sp)
+{
+  struct NBDSource *source = (struct NBDSource *) sp;
+
+  DEBUG (source, "finalize");
+
+  nbd_close (source->nbd);
+}
+
+GSourceFuncs nbd_source_funcs = {
+  .prepare = prepare,
+  .check = check,
+  .dispatch = dispatch,
+  .finalize = finalize,
+};
+
+/* Create a libnbd GSource from a libnbd handle.
+ *
+ * Note that the return value is also a ‘GSource *’, you just have to
+ * cast the return value if you need a GSource pointer.
+ */
+static struct NBDSource *
+create_libnbd_gsource (struct nbd_handle *nbd)
+{
+  struct NBDSource *source;
+
+  source =
+    (struct NBDSource *) g_source_new (&nbd_source_funcs, sizeof *source);
+  source->nbd = nbd;
+  source->debug = nbd_get_debug (nbd);
+  source->poll_registered = false;
+
+  return source;
+}
+
+/*----------------------------------------------------------------------*/
+
+/* The rest of this file is an example showing how to use the GSource
+ * defined above to control two nbdkit subprocesses, copying from one
+ * to the other in parallel.
+ */
+
+/* Source and destination nbdkit instances. */
+static struct NBDSource *gssrc, *gsdest;
+
+#define SIZE (1024*1024*1024)
+
+static const char *src_args[] = {
+  "nbdkit", "-s", "-r", "pattern", "size=1G", NULL
+};
+
+static const char *dest_args[] = {
+  "nbdkit", "-s", "memory", "size=1G", NULL
+};
+
+/* The list of buffers waiting to be written.  Note that the source
+ * server can answer requests out of order so these buffers may not be
+ * sorted by offset.
+ */
+#define MAX_BUFFERS 16
+#define BUFFER_SIZE 65536
+
+enum buffer_state {
+  BUFFER_READING,
+  BUFFER_READ_COMPLETED,
+  BUFFER_WRITING,
+};
+
+struct buffer {
+  uint64_t offset;
+  int64_t cookie;
+  enum buffer_state state;
+  char *data;
+};
+
+static struct buffer buffers[MAX_BUFFERS];
+static size_t nr_buffers;
+
+static bool finished, reader_paused;
+
+static GMainLoop *loop;
+
+static void connected (struct NBDSource *source);
+static gboolean read_data (gpointer user_data);
+static int finished_read (void *vp, int64_t cookie, int *error);
+static gboolean write_data (gpointer user_data);
+static int finished_write (void *vp, int64_t cookie, int *error);
+
+int
+main (int argc, char *argv[])
+{
+  struct nbd_handle *src, *dest;
+  GMainContext *loopctx = NULL;
+
+  /* Create the main loop. */
+  loop = g_main_loop_new (loopctx, FALSE);
+
+  /* Create the two NBD handles and nbdkit instances. */
+  src = nbd_create ();
+  if (!src) {
+    fprintf (stderr, "%s\n", nbd_get_error ());
+    exit (EXIT_FAILURE);
+  }
+  dest = nbd_create ();
+  if (!dest) {
+    fprintf (stderr, "%s\n", nbd_get_error ());
+    exit (EXIT_FAILURE);
+  }
+
+  /* Create the GSource main loop sources from each handle. */
+  gssrc = create_libnbd_gsource (src);
+  gsdest = create_libnbd_gsource (dest);
+  loopctx = g_main_loop_get_context (loop);
+  g_source_attach ((GSource *) gssrc, loopctx);
+  g_source_attach ((GSource *) gsdest, loopctx);
+
+  /* Make sure we get called back when each handle connects. */
+  gssrc->connected_callback = connected;
+  gsdest->connected_callback = connected;
+
+  /* Asynchronously start each handle connecting. */
+  if (nbd_aio_connect_command (src, (char **) src_args) == -1) {
+    fprintf (stderr, "%s\n", nbd_get_error ());
+    exit (EXIT_FAILURE);
+  }
+  if (nbd_aio_connect_command (dest, (char **) dest_args) == -1) {
+    fprintf (stderr, "%s\n", nbd_get_error ());
+    exit (EXIT_FAILURE);
+  }
+
+  /* Run the main loop until quit. */
+  g_main_loop_run (loop);
+  exit (EXIT_SUCCESS);
+}
+
+/* This is called back when either handle becomes connected.  By
+ * counting the number of times this happens (there are two handles)
+ * we can tell when both handles have finished connecting.
+ */
+static void
+connected (struct NBDSource *source)
+{
+  static int count = 0;
+
+  count++;
+  if (count == 2) {
+    DEBUG (source, "both handles are connected");
+
+    /* Now that both handles are connected, we can begin copying.
+     * Register an idle handler that will repeatedly read from the
+     * source.
+     */
+    g_idle_add (read_data, NULL);
+  }
+}
+
+/* This idle callback reads data from the source nbdkit until the ring
+ * is full.
+ */
+static gboolean
+read_data (gpointer user_data)
+{
+  static uint64_t posn = 0;
+  const size_t i = nr_buffers;
+
+  if (gssrc == NULL)
+    return FALSE;
+
+  /* Finished reading from the source nbdkit? */
+  if (posn >= SIZE) {
+    DEBUG (gssrc, "read_data: finished reading from source");
+    finished = true;
+    return FALSE;
+  }
+
+  /* If too many read requests are in flight, return FALSE so this
+   * idle callback is unregistered.  It will be registered by the
+   * write callback when nr_buffers decreases.
+   */
+  if (nr_buffers >= MAX_BUFFERS) {
+    DEBUG (gssrc, "read_data: buffer full, pausing reads from source");
+    reader_paused = true;
+    return FALSE;
+  }
+
+  /* Begin reading into the new buffer. */
+  assert (buffers[i].data == NULL);
+  buffers[i].data = g_new (char, BUFFER_SIZE);
+  buffers[i].state = BUFFER_READING;
+  buffers[i].offset = posn;
+  nr_buffers++;
+  posn += BUFFER_SIZE;
+
+  buffers[i].cookie =
+    nbd_aio_pread_callback (gssrc->nbd, buffers[i].data,
+                            BUFFER_SIZE, buffers[i].offset,
+                            NULL, finished_read, 0);
+  if (buffers[i].cookie == -1) {
+    fprintf (stderr, "%s\n", nbd_get_error ());
+    exit (EXIT_FAILURE);
+  }
+
+  return TRUE;
+}
+
+/* This callback is called from libnbd when any read command finishes. */
+static int
+finished_read (void *vp, int64_t cookie, int *error)
+{
+  size_t i;
+
+  if (gssrc == NULL)
+    return 0;
+
+  DEBUG (gssrc, "finished_read: read completed");
+
+  /* Find the corresponding buffer and mark it as completed. */
+  for (i = 0; i < nr_buffers; ++i) {
+    if (buffers[i].cookie == cookie)
+      goto found;
+  }
+  /* This should never happen. */
+  abort ();
+
+ found:
+  buffers[i].state = BUFFER_READ_COMPLETED;
+
+  /* Create a writer idle handler. */
+  g_idle_add (write_data, NULL);
+
+  return 0;
+}
+
+/* This idle callback schedules a write. */
+static gboolean
+write_data (gpointer user_data)
+{
+  size_t i;
+
+  if (gsdest == NULL)
+    return FALSE;
+
+  /* Find the first read-completed buffer and schedule it to be
+   * written.
+   */
+  for (i = 0; i < nr_buffers; ++i) {
+    if (buffers[i].state == BUFFER_READ_COMPLETED)
+      goto found;
+  }
+  goto out;
+
+ found:
+  buffers[i].cookie =
+    nbd_aio_pwrite_callback (gsdest->nbd, buffers[i].data,
+                             BUFFER_SIZE, buffers[i].offset,
+                             NULL, finished_write, 0);
+  if (buffers[i].cookie == -1) {
+    fprintf (stderr, "%s\n", nbd_get_error ());
+    exit (EXIT_FAILURE);
+  }
+  buffers[i].state = BUFFER_WRITING;
+
+ out:
+  /* We always unregister this idle handler because the read side
+   * creates a new idle handler for every buffer that has to be
+   * written.
+   */
+  return FALSE;
+}
+
+/* This callback is called from libnbd when any write command finishes. */
+static int
+finished_write (void *vp, int64_t cookie, int *error)
+{
+  size_t i;
+
+  if (gsdest == NULL)
+    return 0;
+
+  DEBUG (gsdest, "finished_write: write completed");
+
+  /* Find the corresponding buffer and free it. */
+  for (i = 0; i < nr_buffers; ++i) {
+    if (buffers[i].cookie == cookie)
+      goto found;
+  }
+  /* This should never happen. */
+  abort ();
+
+ found:
+  g_free (buffers[i].data);
+  memmove (&buffers[i], &buffers[i+1],
+           sizeof (struct buffer) * (nr_buffers-(i+1)));
+  nr_buffers--;
+  buffers[nr_buffers].data = NULL;
+
+  /* If the number of buffers was MAX_BUFFERS and has now gone down to
+   * MAX_BUFFERS-1 then we need to restart the read handler.
+   */
+  if (nr_buffers == MAX_BUFFERS-1 && reader_paused) {
+    DEBUG (gsdest, "finished_write: restarting reader");
+    g_idle_add (read_data, NULL);
+    reader_paused = false;
+  }
+
+  /* If the reader has finished and there are no more buffers then we
+   * have done.
+   */
+  if (finished && nr_buffers == 0) {
+    DEBUG (gsdest, "finished_write: all finished");
+    g_source_remove (g_source_get_id ((GSource *) gssrc));
+    g_source_unref ((GSource *) gssrc);
+    gssrc = NULL;
+    g_source_remove (g_source_get_id ((GSource *) gsdest));
+    g_source_unref ((GSource *) gsdest);
+    gsdest = NULL;
+    g_main_loop_quit (loop);
+  }
+
+  return 0;
+}
-- 
2.22.0




More information about the Libguestfs mailing list