[Libguestfs] [PATCH libnbd] examples: Include an example of integrating with the glib main loop.

Eric Blake eblake at redhat.com
Wed Jul 17 03:24:24 UTC 2019


On 7/15/19 1:38 PM, Richard W.M. Jones wrote:
> ---
>  .gitignore                |   1 +
>  README                    |   2 +
>  configure.ac              |   9 +
>  examples/Makefile.am      |  22 ++
>  examples/glib-main-loop.c | 501 ++++++++++++++++++++++++++++++++++++++
>  5 files changed, 535 insertions(+)
> 

Rough review (since I've never used a glib main loop before)...

> +static gboolean
> +prepare (GSource *sp, gint *timeout_)
> +{
> +  struct NBDSource *source = (struct NBDSource *) sp;
> +
> +  /* When the NBD handle moves out of the created state (which means
> +   * that it first has a socket associated with it) we must initialize
> +   * and register the pollfd.
> +   */
> +  if (!source->poll_registered && !nbd_aio_is_created (source->nbd)) {
> +    int fd;
> +
> +    if (source->connecting_callback) {
> +      DEBUG (source, "calling connecting_callback");
> +      source->connecting_callback (source);
> +    }

If I understand nbd_aio_connect_tcp properly, there are cases where we
end up trying several options presented from getaddrinfo() with a
possible reset back to START, at least until we get an fd that sticks.
Does this need to take into account that an fd might change while still
trying to do the initial connect?

> +
> +    fd = nbd_aio_get_fd (source->nbd);
> +    assert (fd >= 0);
> +
> +    source->pollfd.fd = fd;
> +    source->pollfd.events = events_from_nbd (source->nbd);
> +    g_source_add_poll ((GSource *)source, &source->pollfd);

https://developer.gnome.org/glib/stable/glib-The-Main-Event-Loop.html#g-source-add-poll
states that g_source_add_unix_fd is preferred these days; do you have a
minimum glib version in mind for targetting?


> +static gboolean
> +dispatch (GSource *sp,
> +          GSourceFunc callback,
> +          gpointer user_data)
> +{
> +  struct NBDSource *source = (struct NBDSource *) sp;
> +
> +  DEBUG (source, "dispatch: revents = 0x%x%s%s",
> +         source->pollfd.revents,
> +         source->pollfd.revents & G_IO_IN ? " G_IO_IN" : "",
> +         source->pollfd.revents & G_IO_OUT ? " G_IO_OUT" : "");
> +
> +  if ((source->pollfd.revents & G_IO_IN) != 0)
> +    nbd_aio_notify_read (source->nbd);
> +  else if ((source->pollfd.revents & G_IO_OUT) != 0)
> +    nbd_aio_notify_write (source->nbd);

No error checking?

> +
> +  return TRUE;

https://developer.gnome.org/glib/stable/glib-The-Main-Event-Loop.html#GSourceFuncs
says dispatch() should return G_SOURCE_CONTINUE (which I assume is an
alias of TRUE?).  Should this ever return G_SOURCE_REMOVE (perhaps when
we detect that the connection has moved to CLOSED)?

> +}
> +
> +static void
> +finalize (GSource *sp)
> +{
> +  struct NBDSource *source = (struct NBDSource *) sp;
> +
> +  DEBUG (source, "finalize");
> +
> +  nbd_close (source->nbd);
> +}
> +
> +GSourceFuncs nbd_source_funcs = {
> +  .prepare = prepare,
> +  .check = check,
> +  .dispatch = dispatch,
> +  .finalize = finalize,
> +};
> +
> +/* Create a libnbd GSource from a libnbd handle.
> + *
> + * Note that the return value is also a ‘GSource *’, you just have to
> + * cast the return value if you need a GSource pointer.
> + */
> +static struct NBDSource *
> +create_libnbd_gsource (struct nbd_handle *nbd)
> +{
> +  struct NBDSource *source;
> +
> +  source =
> +    (struct NBDSource *) g_source_new (&nbd_source_funcs, sizeof *source);
> +  source->nbd = nbd;
> +  source->debug = nbd_get_debug (nbd);
> +  source->poll_registered = false;
> +
> +  return source;
> +}
> +
> +/*----------------------------------------------------------------------*/
> +
> +/* The rest of this file is an example showing how to use the GSource
> + * defined above to control two nbdkit subprocesses, copying from one
> + * to the other in parallel.
> + */
> +
> +/* Source and destination nbdkit instances. */
> +static struct NBDSource *gssrc, *gsdest;
> +
> +#define SIZE (1024*1024*1024)
> +
> +static const char *src_args[] = {
> +  "nbdkit", "-s", "-r", "pattern", "size=1G", NULL
> +};
> +
> +static const char *dest_args[] = {
> +  "nbdkit", "-s", "memory", "size=1G", NULL
> +};

Do you want --exit-with-parent?

> +
> +/* The list of buffers waiting to be written.  Note that the source
> + * server can answer requests out of order so these buffers may not be
> + * sorted by offset.
> + */
> +#define MAX_BUFFERS 16
> +#define BUFFER_SIZE 65536
> +
> +enum buffer_state {
> +  BUFFER_READING,
> +  BUFFER_READ_COMPLETED,
> +  BUFFER_WRITING,
> +};
> +
> +struct buffer {
> +  uint64_t offset;
> +  int64_t cookie;
> +  enum buffer_state state;
> +  char *data;
> +};
> +
> +static struct buffer buffers[MAX_BUFFERS];
> +static size_t nr_buffers;
> +
> +static bool finished, reader_paused;
> +
> +static GMainLoop *loop;
> +
> +static void connected (struct NBDSource *source);
> +static gboolean read_data (gpointer user_data);
> +static int finished_read (void *vp, int64_t cookie, int *error);
> +static gboolean write_data (gpointer user_data);
> +static int finished_write (void *vp, int64_t cookie, int *error);
> +
> +int
> +main (int argc, char *argv[])
> +{
> +  struct nbd_handle *src, *dest;
> +  GMainContext *loopctx = NULL;
> +
> +  /* Create the main loop. */
> +  loop = g_main_loop_new (loopctx, FALSE);
> +
> +  /* Create the two NBD handles and nbdkit instances. */
> +  src = nbd_create ();
> +  if (!src) {
> +    fprintf (stderr, "%s\n", nbd_get_error ());
> +    exit (EXIT_FAILURE);
> +  }
> +  dest = nbd_create ();
> +  if (!dest) {
> +    fprintf (stderr, "%s\n", nbd_get_error ());
> +    exit (EXIT_FAILURE);
> +  }
> +
> +  /* Create the GSource main loop sources from each handle. */
> +  gssrc = create_libnbd_gsource (src);
> +  gsdest = create_libnbd_gsource (dest);
> +  loopctx = g_main_loop_get_context (loop);
> +  g_source_attach ((GSource *) gssrc, loopctx);
> +  g_source_attach ((GSource *) gsdest, loopctx);
> +
> +  /* Make sure we get called back when each handle connects. */
> +  gssrc->connected_callback = connected;
> +  gsdest->connected_callback = connected;
> +
> +  /* Asynchronously start each handle connecting. */
> +  if (nbd_aio_connect_command (src, (char **) src_args) == -1) {
> +    fprintf (stderr, "%s\n", nbd_get_error ());
> +    exit (EXIT_FAILURE);
> +  }
> +  if (nbd_aio_connect_command (dest, (char **) dest_args) == -1) {
> +    fprintf (stderr, "%s\n", nbd_get_error ());
> +    exit (EXIT_FAILURE);
> +  }
> +
> +  /* Run the main loop until quit. */
> +  g_main_loop_run (loop);
> +  exit (EXIT_SUCCESS);

I guess the condition for causing the main loop to run is later on in
one of the callbacks?

> +}
> +
> +/* This is called back when either handle becomes connected.  By
> + * counting the number of times this happens (there are two handles)
> + * we can tell when both handles have finished connecting.
> + */
> +static void
> +connected (struct NBDSource *source)
> +{
> +  static int count = 0;
> +
> +  count++;
> +  if (count == 2) {
> +    DEBUG (source, "both handles are connected");
> +
> +    /* Now that both handles are connected, we can begin copying.
> +     * Register an idle handler that will repeatedly read from the
> +     * source.
> +     */
> +    g_idle_add (read_data, NULL);
> +  }
> +}
> +
> +/* This idle callback reads data from the source nbdkit until the ring
> + * is full.
> + */
> +static gboolean
> +read_data (gpointer user_data)
> +{
> +  static uint64_t posn = 0;
> +  const size_t i = nr_buffers;
> +
> +  if (gssrc == NULL)
> +    return FALSE;
> +
> +  /* Finished reading from the source nbdkit? */
> +  if (posn >= SIZE) {
> +    DEBUG (gssrc, "read_data: finished reading from source");
> +    finished = true;
> +    return FALSE;
> +  }
> +
> +  /* If too many read requests are in flight, return FALSE so this
> +   * idle callback is unregistered.  It will be registered by the
> +   * write callback when nr_buffers decreases.
> +   */
> +  if (nr_buffers >= MAX_BUFFERS) {
> +    DEBUG (gssrc, "read_data: buffer full, pausing reads from source");
> +    reader_paused = true;
> +    return FALSE;
> +  }
> +
> +  /* Begin reading into the new buffer. */
> +  assert (buffers[i].data == NULL);
> +  buffers[i].data = g_new (char, BUFFER_SIZE);
> +  buffers[i].state = BUFFER_READING;
> +  buffers[i].offset = posn;
> +  nr_buffers++;
> +  posn += BUFFER_SIZE;
> +
> +  buffers[i].cookie =
> +    nbd_aio_pread_callback (gssrc->nbd, buffers[i].data,
> +                            BUFFER_SIZE, buffers[i].offset,
> +                            NULL, finished_read, 0);

Of course, if you like my parameter reordering patch for 0.1.8, this
gets a tweak.

> +  if (buffers[i].cookie == -1) {
> +    fprintf (stderr, "%s\n", nbd_get_error ());
> +    exit (EXIT_FAILURE);
> +  }
> +
> +  return TRUE;
> +}
> +
> +/* This callback is called from libnbd when any read command finishes. */
> +static int
> +finished_read (void *vp, int64_t cookie, int *error)
> +{
> +  size_t i;
> +
> +  if (gssrc == NULL)
> +    return 0;
> +
> +  DEBUG (gssrc, "finished_read: read completed");
> +
> +  /* Find the corresponding buffer and mark it as completed. */
> +  for (i = 0; i < nr_buffers; ++i) {
> +    if (buffers[i].cookie == cookie)
> +      goto found;
> +  }
> +  /* This should never happen. */
> +  abort ();
> +
> + found:
> +  buffers[i].state = BUFFER_READ_COMPLETED;

You asked on IRC if we should change the semantics of the *_callback
variants to auto-retire the command (right now, the commands are not
retired until nbd_aio_command_completed, but you can't call that from
this callback because of the nbd handle lock - and calling it shouldn't
reveal any more information than what you already have access to here).
That may be a good idea to play with.

> +
> +  /* Create a writer idle handler. */
> +  g_idle_add (write_data, NULL);
> +
> +  return 0;
> +}
> +
> +/* This idle callback schedules a write. */
> +static gboolean
> +write_data (gpointer user_data)
> +{
> +  size_t i;
> +
> +  if (gsdest == NULL)
> +    return FALSE;
> +
> +  /* Find the first read-completed buffer and schedule it to be
> +   * written.
> +   */
> +  for (i = 0; i < nr_buffers; ++i) {
> +    if (buffers[i].state == BUFFER_READ_COMPLETED)
> +      goto found;
> +  }
> +  goto out;
> +
> + found:
> +  buffers[i].cookie =
> +    nbd_aio_pwrite_callback (gsdest->nbd, buffers[i].data,
> +                             BUFFER_SIZE, buffers[i].offset,
> +                             NULL, finished_write, 0);
> +  if (buffers[i].cookie == -1) {
> +    fprintf (stderr, "%s\n", nbd_get_error ());
> +    exit (EXIT_FAILURE);
> +  }
> +  buffers[i].state = BUFFER_WRITING;
> +
> + out:
> +  /* We always unregister this idle handler because the read side
> +   * creates a new idle handler for every buffer that has to be
> +   * written.
> +   */
> +  return FALSE;
> +}
> +
> +/* This callback is called from libnbd when any write command finishes. */
> +static int
> +finished_write (void *vp, int64_t cookie, int *error)
> +{
> +  size_t i;
> +
> +  if (gsdest == NULL)
> +    return 0;
> +
> +  DEBUG (gsdest, "finished_write: write completed");
> +
> +  /* Find the corresponding buffer and free it. */
> +  for (i = 0; i < nr_buffers; ++i) {
> +    if (buffers[i].cookie == cookie)
> +      goto found;
> +  }
> +  /* This should never happen. */
> +  abort ();
> +
> + found:
> +  g_free (buffers[i].data);
> +  memmove (&buffers[i], &buffers[i+1],
> +           sizeof (struct buffer) * (nr_buffers-(i+1)));
> +  nr_buffers--;
> +  buffers[nr_buffers].data = NULL;
> +
> +  /* If the number of buffers was MAX_BUFFERS and has now gone down to
> +   * MAX_BUFFERS-1 then we need to restart the read handler.
> +   */
> +  if (nr_buffers == MAX_BUFFERS-1 && reader_paused) {
> +    DEBUG (gsdest, "finished_write: restarting reader");
> +    g_idle_add (read_data, NULL);
> +    reader_paused = false;
> +  }
> +
> +  /* If the reader has finished and there are no more buffers then we
> +   * have done.
> +   */
> +  if (finished && nr_buffers == 0) {
> +    DEBUG (gsdest, "finished_write: all finished");
> +    g_source_remove (g_source_get_id ((GSource *) gssrc));
> +    g_source_unref ((GSource *) gssrc);
> +    gssrc = NULL;
> +    g_source_remove (g_source_get_id ((GSource *) gsdest));
> +    g_source_unref ((GSource *) gsdest);
> +    gsdest = NULL;
> +    g_main_loop_quit (loop);
> +  }
> +
> +  return 0;
> +}
> 

-- 
Eric Blake, Principal Software Engineer
Red Hat, Inc.           +1-919-301-3226
Virtualization:  qemu.org | libvirt.org

-------------- next part --------------
A non-text attachment was scrubbed...
Name: signature.asc
Type: application/pgp-signature
Size: 488 bytes
Desc: OpenPGP digital signature
URL: <http://listman.redhat.com/archives/libguestfs/attachments/20190716/8aa08ff5/attachment.sig>


More information about the Libguestfs mailing list