[Libguestfs] [PATCH libnbd discussion only 4/5] api: Implement concurrent writer.

Richard W.M. Jones rjones at redhat.com
Mon Jun 3 15:29:47 UTC 2019


---
 docs/libnbd.pod     | 73 +++++++++++++++++++++++++++++++++++++++++++++
 generator/generator | 52 +++++++++++++++++++++++++++-----
 lib/handle.c        | 32 ++++++++++++++++++++
 lib/internal.h      |  7 +++++
 lib/socket.c        | 22 +++++++++++---
 podwrapper.pl.in    |  3 +-
 6 files changed, 177 insertions(+), 12 deletions(-)

diff --git a/docs/libnbd.pod b/docs/libnbd.pod
index 7cbb9cd..ab74be3 100644
--- a/docs/libnbd.pod
+++ b/docs/libnbd.pod
@@ -385,6 +385,79 @@ If you are issuing multiple in-flight requests (see above) and
 limiting the number, then the limit should be applied to each
 individual NBD connection.
 
+=head2 Concurrent writer thread
+
+To achieve the maximum possible performance from libnbd and NBD
+servers, as well as the above techniques you must also use a
+concurrent writer thread.  This feature allows requests to be issued
+on the NBD socket at the same time that replies are being read from
+the socket.  In other words L<send(2)> and L<recv(2)> calls will be
+running at the same time on the same socket.
+
+There is a full example using a concurrent writer available at
+L<https://github.com/libguestfs/libnbd/blob/master/examples/concurrent-writer.c>
+
+To implement this, you change your ordinary AIO code in four ways:
+
+=over 4
+
+=item 1. Call nbd_set_concurrent_writer
+
+ struct writer_data {
+   struct nbd_handle *nbd;
+   /* other data here as required */
+ } data;
+ 
+ nbd_set_concurrent_writer (nbd, &data, writer);
+
+This function can be called on the handle at any time, either after
+the handle is created or after the connection and handshaking has
+completed.
+
+=item 2. Implement non-blocking writer callback
+
+C<writer> is a I<non-blocking> callback which enqueues the buffer into
+a ring or similar FIFO structure:
+
+ struct ring_item {
+   struct writer_data *data;
+   const void *buf;
+   size_t len;
+ };
+ 
+ void writer (void *data, const void *buf, size_t len)
+ {
+   struct ring_item item;
+ 
+   /* add (data, buf, len) to a shared ring */
+   item.data = data;
+   item.buf = malloc (len);
+   memcpy (item.buf, buf, len);
+   item.len = len;
+   ring_add (&item);
+ 
+   writer_signal ();   /* kick the writer thread */
+ }
+
+=item 3. Implement writer thread
+
+You must also supply another thread which picks up data off the ring
+and writes it to the socket (see C<nbd_aio_get_fd>).  If there an
+error when writing to the socket, call C<nbd_concurrent_writer_error>
+with the C<errno>.
+
+You have a choice of whether to implement one thread per nbd_handle or
+one thread shared between all handles.
+
+=item 4. Modify main loop
+
+Finally your main loop can unconditionally call
+C<nbd_aio_notify_write> when C<nbd_aio_get_direction> returns C<WRITE>
+or C<BOTH> (since the concurrent thread can always enqueue more data
+and so is always "ready to write").
+
+=back
+
 =head1 ENCRYPTION AND AUTHENTICATION
 
 The NBD protocol and libnbd supports TLS (sometimes incorrectly called
diff --git a/generator/generator b/generator/generator
index db7c10f..2b48c67 100755
--- a/generator/generator
+++ b/generator/generator
@@ -1094,6 +1094,35 @@ C<\"qemu:dirty-bitmap:...\"> for qemu-nbd
 (see qemu-nbd I<-B> option).  See also C<nbd_block_status>.";
   };
 
+  "set_concurrent_writer", {
+    default_call with
+    args = [ Opaque "data";
+             CallbackPersist ("writer", [Opaque "data";
+                                         BytesIn ("buf", "len")]) ];
+    ret = RErr;
+    permitted_states = [ Created; Connecting; Connected ];
+    shortdesc = "set a concurrent writer thread";
+    longdesc = "\
+Provide an optional concurrent writer thread for better performance.
+See L<libnbd(3)/Concurrent writer thread> for how to use this.";
+  };
+
+  "concurrent_writer_error", {
+    default_call with
+    args = [ Int "err" ]; ret = RErr;
+    shortdesc = "signal an error from the concurrent writer thread";
+    longdesc = "\
+This can be called from the concurrent writer thread to signal
+that there was an error writing to the socket.  As there is no
+way to recover from such errors, the connection will move to the
+dead state soon after.
+
+The parameter is the C<errno> returned by the failed L<send(2)> call.
+It must be non-zero.
+
+See L<libnbd(3)/Concurrent writer thread> for how to use this.";
+  };
+
   "connect_uri", {
     default_call with
     args = [ String "uri" ]; ret = RErr;
@@ -3157,12 +3186,13 @@ let print_python_binding name { args; ret } =
             pr "  PyObject *py_%s = PyList_New (%s);\n" n len;
             pr "  for (size_t i = 0; i < %s; ++i)\n" len;
             pr "    PyList_SET_ITEM (py_%s, i, PyLong_FromUnsignedLong (%s[i]));\n" n n
+         | BytesIn _ -> ()
          | Opaque n ->
             pr "  struct %s_%s_data *_data = %s;\n" name cb_name n
          | String n
          | UInt64 n -> ()
          (* The following not yet implemented for callbacks XXX *)
-         | ArrayAndLen _ | Bool _ | BytesIn _ | BytesOut _
+         | ArrayAndLen _ | Bool _ | BytesOut _
          | BytesPersistIn _ | BytesPersistOut _ | Callback _ | CallbackPersist _
          | Flags _ | Int _ | Int64 _ | Path _ | SockAddrAndLen _ | StringList _
          | UInt _ | UInt32 _ -> assert false
@@ -3173,11 +3203,12 @@ let print_python_binding name { args; ret } =
        List.iter (
          function
          | ArrayAndLen (UInt32 n, len) -> pr " \"O\""
+         | BytesIn (n, len) -> pr " \"y#\""
          | Opaque n -> pr " \"O\""
          | String n -> pr " \"s\""
          | UInt64 n -> pr " \"K\""
          (* The following not yet implemented for callbacks XXX *)
-         | ArrayAndLen _ | Bool _ | BytesIn _ | BytesOut _
+         | ArrayAndLen _ | Bool _ | BytesOut _
          | BytesPersistIn _ | BytesPersistOut _ | Callback _ | CallbackPersist _
          | Flags _ | Int _ | Int64 _ | Path _ | SockAddrAndLen _ | StringList _
          | UInt _ | UInt32 _ -> assert false
@@ -3186,11 +3217,12 @@ let print_python_binding name { args; ret } =
        List.iter (
          function
          | ArrayAndLen (UInt32 n, _) -> pr ", py_%s" n
+         | BytesIn (n, len) -> pr ", %s, (int) %s" n len
          | Opaque _ -> pr ", _data->data"
          | String n
          | UInt64 n -> pr ", %s" n
          (* The following not yet implemented for callbacks XXX *)
-         | ArrayAndLen _ | Bool _ | BytesIn _ | BytesOut _
+         | ArrayAndLen _ | Bool _ | BytesOut _
          | BytesPersistIn _ | BytesPersistOut _ | Callback _ | CallbackPersist _
          | Flags _ | Int _ | Int64 _ | Path _ | SockAddrAndLen _ | StringList _
          | UInt _ | UInt32 _ -> assert false
@@ -3217,11 +3249,12 @@ let print_python_binding name { args; ret } =
          function
          | ArrayAndLen (UInt32 n, _) ->
             pr "  Py_DECREF (py_%s);\n" n
+         | BytesIn _
          | String _
          | UInt64 _
          | Opaque _ -> ()
          (* The following not yet implemented for callbacks XXX *)
-         | ArrayAndLen _ | Bool _ | BytesIn _ | BytesOut _
+         | ArrayAndLen _ | Bool _ | BytesOut _
          | BytesPersistIn _ | BytesPersistOut _ | Callback _ | CallbackPersist _
          | Flags _ | Int _ | Int64 _ | Path _ | SockAddrAndLen _ | StringList _
          | UInt _ | UInt32 _ -> assert false
@@ -3899,10 +3932,11 @@ let print_ocaml_binding (name, { args; ret }) =
        let argnames =
          List.map (
            function
-           | ArrayAndLen (UInt32 n, _) | String n | UInt64 n | Opaque n ->
+           | ArrayAndLen (UInt32 n, _) | BytesIn (n, _)
+           | String n | UInt64 n | Opaque n ->
               n ^ "v"
            (* The following not yet implemented for callbacks XXX *)
-           | ArrayAndLen _ | Bool _ | BytesIn _ | BytesOut _
+           | ArrayAndLen _ | Bool _ | BytesOut _
            | BytesPersistIn _ | BytesPersistOut _
            | Callback _ | CallbackPersist _
            | Flags _ | Int _ | Int64 _ | Path _
@@ -3928,6 +3962,9 @@ let print_ocaml_binding (name, { args; ret }) =
          | ArrayAndLen (UInt32 n, count) ->
             pr "  %sv = nbd_internal_ocaml_alloc_int32_array (%s, %s);\n"
                n n count;
+         | BytesIn (n, len) ->
+            pr "  %sv = caml_alloc_string (%s);\n" n len;
+            pr "  memcpy (String_val (%sv), %s, %s);\n" n n len
          | String n ->
             pr "  %sv = caml_copy_string (%s);\n" n n
          | UInt64 n ->
@@ -3937,7 +3974,7 @@ let print_ocaml_binding (name, { args; ret }) =
             pr "  fnv = *_%s->cb;\n" n;
             pr "  %sv = *_%s->data;\n" n n
          (* The following not yet implemented for callbacks XXX *)
-         | ArrayAndLen _ | Bool _ | BytesIn _ | BytesOut _
+         | ArrayAndLen _ | Bool _ | BytesOut _
          | BytesPersistIn _ | BytesPersistOut _ | Callback _ | CallbackPersist _
          | Flags _ | Int _ | Int64 _ | Path _ | SockAddrAndLen _ | StringList _
          | UInt _ | UInt32 _ -> assert false
@@ -4192,6 +4229,7 @@ let generate_ocaml_nbd_c () =
   pr "\n";
   pr "#include <stdio.h>\n";
   pr "#include <stdlib.h>\n";
+  pr "#include <string.h>\n";
   pr "\n";
   pr "#include <libnbd.h>\n";
   pr "\n";
diff --git a/lib/handle.c b/lib/handle.c
index cc311ba..cc5d40f 100644
--- a/lib/handle.c
+++ b/lib/handle.c
@@ -215,6 +215,38 @@ nbd_add_close_callback (struct nbd_handle *h, nbd_close_callback cb, void *data)
   return ret;
 }
 
+int
+nbd_unlocked_set_concurrent_writer (struct nbd_handle *h,
+                                    void *data, writer_cb writer)
+{
+  /* I suppose we could allow this, but it seems more likely that
+   * it's an error rather than intentional.
+   */
+  if (h->writer != NULL) {
+    set_error (EINVAL, "concurrent writer was already set for this handle");
+    return -1;
+  }
+
+  h->writer = writer;
+  h->writer_data = data;
+  return 0;
+}
+
+int
+nbd_unlocked_concurrent_writer_error (struct nbd_handle *h, int err)
+{
+  if (err != 0) {
+    set_error (EINVAL, "concurrent writer error parameter must be non-zero");
+    return -1;
+  }
+
+  /* Ignore second and subsequent calls, record only the first error. */
+  if (h->writer_error == 0)
+    h->writer_error = err;
+
+  return 0;
+}
+
 const char *
 nbd_unlocked_get_package_name (struct nbd_handle *h)
 {
diff --git a/lib/internal.h b/lib/internal.h
index c8e5094..c41741d 100644
--- a/lib/internal.h
+++ b/lib/internal.h
@@ -43,6 +43,8 @@ struct close_callback;
 struct socket;
 struct command_in_flight;
 
+typedef void (*writer_cb) (void *data, const void *buf, size_t len);
+
 struct nbd_handle {
   /* Lock protecting concurrent access to the handle. */
   pthread_mutex_t lock;
@@ -90,6 +92,11 @@ struct nbd_handle {
   /* The socket or a wrapper if using GnuTLS. */
   struct socket *sock;
 
+  /* Writer callback if using concurrent writer. */
+  void *writer_data;
+  writer_cb writer;
+  int writer_error;
+
   /* Generic way to read into a buffer - set rbuf to point to a
    * buffer, rlen to the amount of data you expect, and in the state
    * machine call recv_into_rbuf.
diff --git a/lib/socket.c b/lib/socket.c
index f48e455..c6fba6d 100644
--- a/lib/socket.c
+++ b/lib/socket.c
@@ -46,10 +46,24 @@ socket_send (struct nbd_handle *h,
 {
   ssize_t r;
 
-  r = send (sock->u.fd, buf, len, 0);
-  if (r == -1 && errno != EAGAIN && errno != EWOULDBLOCK)
-    set_error (errno, "send");
-  return r;
+  if (!h->writer) {
+    r = send (sock->u.fd, buf, len, 0);
+    if (r == -1 && errno != EAGAIN && errno != EWOULDBLOCK)
+      set_error (errno, "send");
+    return r;
+  }
+  else if (h->writer_error) {
+    /* Concurrent writer thread signaled an error earlier, so
+     * return that here.
+     */
+    set_error (h->writer_error, "concurrent writer thread error");
+    return -1;
+  }
+  else {
+    /* Pass the buffer to the concurrent writer thread. */
+    h->writer (h->writer_data, buf, len);
+    return len;
+  }
 }
 
 static int
diff --git a/podwrapper.pl.in b/podwrapper.pl.in
index 2471807..ecff2d6 100755
--- a/podwrapper.pl.in
+++ b/podwrapper.pl.in
@@ -324,7 +324,8 @@ foreach (@lines) {
     die "$progname: $input: line too long:\n$_\n"
         if length $_ > 76 &&
         substr ($_, 0, 1) ne ' ' &&
-        ! m/https?:/;
+        ! m/https?:/ &&
+        ! m/connected and finished handshaking/;
 }
 
 # Output man page.
-- 
2.21.0




More information about the Libguestfs mailing list