[Libguestfs] [PATCH libnbd discussion only 4/5] api: Implement concurrent writer.

Eric Blake eblake at redhat.com
Mon Jun 3 18:27:47 UTC 2019


On 6/3/19 10:29 AM, Richard W.M. Jones wrote:
> ---
>  docs/libnbd.pod     | 73 +++++++++++++++++++++++++++++++++++++++++++++
>  generator/generator | 52 +++++++++++++++++++++++++++-----
>  lib/handle.c        | 32 ++++++++++++++++++++
>  lib/internal.h      |  7 +++++
>  lib/socket.c        | 22 +++++++++++---
>  podwrapper.pl.in    |  3 +-
>  6 files changed, 177 insertions(+), 12 deletions(-)
> 
> diff --git a/docs/libnbd.pod b/docs/libnbd.pod
> index 7cbb9cd..ab74be3 100644
> --- a/docs/libnbd.pod
> +++ b/docs/libnbd.pod
> @@ -385,6 +385,79 @@ If you are issuing multiple in-flight requests (see above) and
>  limiting the number, then the limit should be applied to each
>  individual NBD connection.
>  
> +=head2 Concurrent writer thread
> +
> +To achieve the maximum possible performance from libnbd and NBD
> +servers, as well as the above techniques you must also use a
> +concurrent writer thread.  This feature allows requests to be issued
> +on the NBD socket at the same time that replies are being read from
> +the socket.  In other words L<send(2)> and L<recv(2)> calls will be
> +running at the same time on the same socket.

maybe add 'from different threads'

> +
> +There is a full example using a concurrent writer available at
> +L<https://github.com/libguestfs/libnbd/blob/master/examples/concurrent-writer.c>
> +
> +To implement this, you change your ordinary AIO code in four ways:
> +
> +=over 4
> +
> +=item 1. Call nbd_set_concurrent_writer
> +
> + struct writer_data {
> +   struct nbd_handle *nbd;
> +   /* other data here as required */
> + } data;
> + 
> + nbd_set_concurrent_writer (nbd, &data, writer);
> +
> +This function can be called on the handle at any time, either after
> +the handle is created or after the connection and handshaking has
> +completed.
> +
> +=item 2. Implement non-blocking writer callback
> +
> +C<writer> is a I<non-blocking> callback which enqueues the buffer into
> +a ring or similar FIFO structure:
> +
> + struct ring_item {
> +   struct writer_data *data;
> +   const void *buf;
> +   size_t len;
> + };
> + 
> + void writer (void *data, const void *buf, size_t len)

No return value. Comments below at [1]

> + {
> +   struct ring_item item;
> + 
> +   /* add (data, buf, len) to a shared ring */
> +   item.data = data;
> +   item.buf = malloc (len);
> +   memcpy (item.buf, buf, len);
> +   item.len = len;
> +   ring_add (&item);
> + 
> +   writer_signal ();   /* kick the writer thread */
> + }
> +
> +=item 3. Implement writer thread
> +
> +You must also supply another thread which picks up data off the ring
> +and writes it to the socket (see C<nbd_aio_get_fd>).  If there an

s/there/there is/

> +error when writing to the socket, call C<nbd_concurrent_writer_error>
> +with the C<errno>.
> +
> +You have a choice of whether to implement one thread per nbd_handle or
> +one thread shared between all handles.
> +
> +=item 4. Modify main loop
> +
> +Finally your main loop can unconditionally call
> +C<nbd_aio_notify_write> when C<nbd_aio_get_direction> returns C<WRITE>
> +or C<BOTH> (since the concurrent thread can always enqueue more data
> +and so is always "ready to write").

Will we ever actually reach a state that is blocked on a write
completion for aio_get_direction to ever even request a WRITE or BOTH?
Or will the state machine always manage to churn through requests in
their entirety and back to state READY which is just a READ state?

> +
> +=back
> +
>  =head1 ENCRYPTION AND AUTHENTICATION
>  
>  The NBD protocol and libnbd supports TLS (sometimes incorrectly called
> diff --git a/generator/generator b/generator/generator
> index db7c10f..2b48c67 100755
> --- a/generator/generator
> +++ b/generator/generator
> @@ -1094,6 +1094,35 @@ C<\"qemu:dirty-bitmap:...\"> for qemu-nbd
>  (see qemu-nbd I<-B> option).  See also C<nbd_block_status>.";
>    };
>  
> +  "set_concurrent_writer", {
> +    default_call with
> +    args = [ Opaque "data";
> +             CallbackPersist ("writer", [Opaque "data";
> +                                         BytesIn ("buf", "len")]) ];
> +    ret = RErr;
> +    permitted_states = [ Created; Connecting; Connected ];
> +    shortdesc = "set a concurrent writer thread";
> +    longdesc = "\
> +Provide an optional concurrent writer thread for better performance.
> +See L<libnbd(3)/Concurrent writer thread> for how to use this.";
> +  };
> +
> +  "concurrent_writer_error", {
> +    default_call with
> +    args = [ Int "err" ]; ret = RErr;
> +    shortdesc = "signal an error from the concurrent writer thread";
> +    longdesc = "\
> +This can be called from the concurrent writer thread to signal
> +that there was an error writing to the socket.  As there is no
> +way to recover from such errors, the connection will move to the
> +dead state soon after.
> +
> +The parameter is the C<errno> returned by the failed L<send(2)> call.
> +It must be non-zero.
> +
> +See L<libnbd(3)/Concurrent writer thread> for how to use this.";
> +  };
> +
>    "connect_uri", {

>  
> +int
> +nbd_unlocked_set_concurrent_writer (struct nbd_handle *h,
> +                                    void *data, writer_cb writer)
> +{
> +  /* I suppose we could allow this, but it seems more likely that
> +   * it's an error rather than intentional.
> +   */
> +  if (h->writer != NULL) {
> +    set_error (EINVAL, "concurrent writer was already set for this handle");
> +    return -1;
> +  }
> +
> +  h->writer = writer;
> +  h->writer_data = data;
> +  return 0;
> +}

Is it worth a get_concurrent_writer()?  Not strictly necessary, though.

> +
> +int
> +nbd_unlocked_concurrent_writer_error (struct nbd_handle *h, int err)
> +{
> +  if (err != 0) {
> +    set_error (EINVAL, "concurrent writer error parameter must be non-zero");
> +    return -1;
> +  }
> +
> +  /* Ignore second and subsequent calls, record only the first error. */
> +  if (h->writer_error == 0)
> +    h->writer_error = err;
> +
> +  return 0;
> +}
> +

> +++ b/lib/socket.c
> @@ -46,10 +46,24 @@ socket_send (struct nbd_handle *h,
>  {
>    ssize_t r;
>  
> -  r = send (sock->u.fd, buf, len, 0);
> -  if (r == -1 && errno != EAGAIN && errno != EWOULDBLOCK)
> -    set_error (errno, "send");
> -  return r;
> +  if (!h->writer) {
> +    r = send (sock->u.fd, buf, len, 0);
> +    if (r == -1 && errno != EAGAIN && errno != EWOULDBLOCK)
> +      set_error (errno, "send");
> +    return r;
> +  }
> +  else if (h->writer_error) {
> +    /* Concurrent writer thread signaled an error earlier, so
> +     * return that here.
> +     */
> +    set_error (h->writer_error, "concurrent writer thread error");
> +    return -1;
> +  }
> +  else {
> +    /* Pass the buffer to the concurrent writer thread. */
> +    h->writer (h->writer_data, buf, len);
> +    return len;

[1] So h->writer is NOT allowed to fail directly (if it fails, it must
call nbd_concurrent_writer_error instead). Stems from the fact that the
generator doesn't allow callbacks with a return type, but livable.  But
may warrant extra wording in the documentation.

> +  }
>  }
>  
>  static int
> diff --git a/podwrapper.pl.in b/podwrapper.pl.in
> index 2471807..ecff2d6 100755
> --- a/podwrapper.pl.in
> +++ b/podwrapper.pl.in
> @@ -324,7 +324,8 @@ foreach (@lines) {
>      die "$progname: $input: line too long:\n$_\n"
>          if length $_ > 76 &&
>          substr ($_, 0, 1) ne ' ' &&
> -        ! m/https?:/;
> +        ! m/https?:/ &&
> +        ! m/connected and finished handshaking/;
>  }
>  
>  # Output man page.
> 

-- 
Eric Blake, Principal Software Engineer
Red Hat, Inc.           +1-919-301-3226
Virtualization:  qemu.org | libvirt.org

-------------- next part --------------
A non-text attachment was scrubbed...
Name: signature.asc
Type: application/pgp-signature
Size: 488 bytes
Desc: OpenPGP digital signature
URL: <http://listman.redhat.com/archives/libguestfs/attachments/20190603/ecd1611e/attachment.sig>


More information about the Libguestfs mailing list