[Libguestfs] [libnbd PATCH 6/8] states: Add nbd_pread_callback API

Eric Blake eblake at redhat.com
Tue Jun 18 00:07:56 UTC 2019


Time to wire up user access to react to structured reads as the chunks
come in, exposing the framework added in the last patch.

I chose to use an int for the callback status rather than an enum for
ease of wiring things up to the various language bindings.  Still, it
required quite a few tweaks to the generator to support an Int type in
a callback.

It's easy to test callback behavior for LIBNBD_READ_DATA/HOLE (the
next patch will add interop tests with qemu-nbd), but at present, I
don't know of any server that responds with
NBD_REPLY_TYPE_ERROR_OFFSET, which in turn makes testing
LIBNBD_READ_ERROR difficult. I used the following hack on top of
qemu.git commit d1bf88e5 to trigger an offset error for local testing:

| diff --git i/nbd/server.c w/nbd/server.c
| index 10faedcfc55d..804dced0bc8d 100644
| --- i/nbd/server.c
| +++ w/nbd/server.c
| @@ -1838,12 +1838,30 @@ static int coroutine_fn nbd_co_send_sparse_read(NBDClient *client,
|
|              trace_nbd_co_send_structured_read_hole(handle, offset + progress,
|                                                     pnum);
| -            set_be_chunk(&chunk.h, final ? NBD_REPLY_FLAG_DONE : 0,
| +            set_be_chunk(&chunk.h, 0,
|                           NBD_REPLY_TYPE_OFFSET_HOLE,
|                           handle, sizeof(chunk) - sizeof(chunk.h));
|              stq_be_p(&chunk.offset, offset + progress);
|              stl_be_p(&chunk.length, pnum);
|              ret = nbd_co_send_iov(client, iov, 1, errp);
| +            if (ret == 0) {
| +                NBDStructuredError chunk;
| +                int64_t off;
| +                struct iovec iov[] = {
| +                    {.iov_base = &chunk, .iov_len = sizeof(chunk)},
| +                    {.iov_base = (char*)"MSG", .iov_len = 3},
| +                    {.iov_base = &off, .iov_len = sizeof(off)},
| +                };
| +
| +                set_be_chunk(&chunk.h, final ? NBD_REPLY_FLAG_DONE : 0,
| +                             NBD_REPLY_TYPE_ERROR_OFFSET,
| +                             handle, sizeof(chunk) - sizeof(chunk.h) +
| +                             3 + sizeof(off));
| +                stl_be_p(&chunk.error, NBD_EPERM);
| +                stw_be_p(&chunk.message_length, 3);
| +                stq_be_p(&off, offset + progress);
| +                ret = nbd_co_send_iov(client, iov, 3, errp);
| +            }
|          } else {
|              ret = blk_pread(exp->blk, offset + progress + exp->dev_offset,
|                              data + progress, pnum);
---
 generator/generator | 113 +++++++++++++++++++++++++++++++++++++++-----
 lib/rw.c            |  32 +++++++++++++
 2 files changed, 134 insertions(+), 11 deletions(-)

diff --git a/generator/generator b/generator/generator
index 2614689..ce77f17 100755
--- a/generator/generator
+++ b/generator/generator
@@ -1305,7 +1305,72 @@ Issue a read command to the NBD server for the range starting
 at C<offset> and ending at C<offset> + C<count> - 1.  NBD
 can only read all or nothing using this call.  The call
 returns when the data has been read fully into C<buf> or there is an
-error.
+error.  See also C<nbd_pread_callback>, if finer visibility is
+required into the server's replies.
+
+The C<flags> parameter must be C<0> for now (it exists for future NBD
+protocol extensions).";
+  };
+
+  "pread_callback", {
+    default_call with
+    args = [ BytesOut ("buf", "count"); UInt64 "offset";
+             Opaque "data";
+             Callback ("chunk", [Opaque "data";
+                                 BytesIn ("buf", "count"); UInt64 "offset";
+                                 Int "status";]);
+             Flags "flags" ];
+    ret = RErr;
+    permitted_states = [ Connected ];
+    shortdesc = "read from the NBD server";
+    longdesc = "\
+Issue a read command to the NBD server for the range starting
+at C<offset> and ending at C<offset> + C<count> - 1.  The server's
+response may be subdivided into chunks which may arrive out of order
+before reassembly into the original buffer; the C<chunk> callback
+is used for notification after each chunk arrives, and may perform
+additional sanity checking on the server's reply. The callback cannot
+call C<nbd_*> APIs on the same handle since it holds the handle lock
+and will cause a deadlock.  If the callback returns C<-1>, and no
+earlier error has been detected, then the overall read command will
+fail with the same value of C<errno> left after the failing callback;
+but any further chunks will still invoke the callback.
+
+The C<chunk> function is called once per chunk of data received.
+The C<buf> and C<count> parameters represent the subset of the original
+buffer which has just been populated by results from the server. The
+C<offset> parameter represents the absolute offset at which C<buf>
+begins within the image (note that this is not the relative offset
+of C<buf> within the original buffer). The C<status> parameter is
+one of
+
+=over 4
+
+=item C<LIBNBD_READ_DATA> = 1
+
+C<buf> was populated with C<count> bytes of data.
+
+=item C<LIBNBD_READ_HOLE> = 2
+
+C<buf> represents a hole, and contains C<count> NUL bytes.
+
+=item C<LIBNBD_READ_ERR> = 3
+
+C<count> is 0, and C<buf> represents the offset of where an error
+occurred. The error is visible in C<errno> or by calling
+C<nbd_get_errno>.
+
+=back
+
+It is possible for the C<chunk> function to be called more times than
+you expect (if the server is buggy). It is also possible that the
+C<chunk> function is not called at all (in particular,
+C<LIBNBD_READ_ERROR> is used only when an error is associated with a
+particular offset), but you are guaranteed that the callback was
+called at least once if the overall read succeeds. Libnbd does not
+validate that the server obeyed the requirement that a read call must
+not have overlapping chunks and must not succeed without enough chunks
+to cover the entire request.

 The C<flags> parameter must be C<0> for now (it exists for future NBD
 protocol extensions).";
@@ -1591,6 +1656,26 @@ C<buf> is valid until the command has completed.  Other
 parameters behave as documented in C<nbd_pread>.";
   };

+  "aio_pread_callback", {
+    default_call with
+    args = [ BytesPersistOut ("buf", "count"); UInt64 "offset";
+             Opaque "data";
+             CallbackPersist ("chunk", [Opaque "data";
+                                        BytesIn ("buf", "count");
+                                        UInt64 "offset";
+                                        Int "status";]);
+             Flags "flags" ];
+    ret = RInt64;
+    permitted_states = [ Connected ];
+    shortdesc = "read from the NBD server";
+    longdesc = "\
+Issue a read command to the NBD server.  This returns the
+unique positive 64 bit handle for this command, or C<-1> on
+error.  To check if the command completed, call
+C<nbd_aio_command_completed>.  Parameters behave as documented
+in C<nbd_pread_callback>.";
+  };
+
   "aio_pwrite", {
     default_call with
     args = [ BytesPersistIn ("buf", "count"); UInt64 "offset"; Flags "flags" ];
@@ -3264,7 +3349,8 @@ let print_python_binding name { args; ret } =
             pr "  PyObject *py_%s = PyList_New (%s);\n" n len;
             pr "  for (size_t i = 0; i < %s; ++i)\n" len;
             pr "    PyList_SET_ITEM (py_%s, i, PyLong_FromUnsignedLong (%s[i]));\n" n n
-         | BytesIn _ -> ()
+         | BytesIn _
+         | Int _ -> ()
          | Opaque n ->
             pr "  struct %s_%s_data *_data = %s;\n" name cb_name n
          | String n
@@ -3272,7 +3358,7 @@ let print_python_binding name { args; ret } =
          (* The following not yet implemented for callbacks XXX *)
          | ArrayAndLen _ | Bool _ | BytesOut _
          | BytesPersistIn _ | BytesPersistOut _ | Callback _ | CallbackPersist _
-         | Flags _ | Int _ | Int64 _ | Path _ | SockAddrAndLen _ | StringList _
+         | Flags _ | Int64 _ | Path _ | SockAddrAndLen _ | StringList _
          | UInt _ | UInt32 _ -> assert false
        ) args;
        pr "\n";
@@ -3282,13 +3368,14 @@ let print_python_binding name { args; ret } =
          function
          | ArrayAndLen (UInt32 n, len) -> pr " \"O\""
          | BytesIn (n, len) -> pr " \"y#\""
+         | Int n -> pr " \"i\""
          | Opaque n -> pr " \"O\""
          | String n -> pr " \"s\""
          | UInt64 n -> pr " \"K\""
          (* The following not yet implemented for callbacks XXX *)
          | ArrayAndLen _ | Bool _ | BytesOut _
          | BytesPersistIn _ | BytesPersistOut _ | Callback _ | CallbackPersist _
-         | Flags _ | Int _ | Int64 _ | Path _ | SockAddrAndLen _ | StringList _
+         | Flags _ | Int64 _ | Path _ | SockAddrAndLen _ | StringList _
          | UInt _ | UInt32 _ -> assert false
        ) args;
        pr " \")\"";
@@ -3297,12 +3384,13 @@ let print_python_binding name { args; ret } =
          | ArrayAndLen (UInt32 n, _) -> pr ", py_%s" n
          | BytesIn (n, len) -> pr ", %s, (int) %s" n len
          | Opaque _ -> pr ", _data->data"
+         | Int n
          | String n
          | UInt64 n -> pr ", %s" n
          (* The following not yet implemented for callbacks XXX *)
          | ArrayAndLen _ | Bool _ | BytesOut _
          | BytesPersistIn _ | BytesPersistOut _ | Callback _ | CallbackPersist _
-         | Flags _ | Int _ | Int64 _ | Path _ | SockAddrAndLen _ | StringList _
+         | Flags _ | Int64 _ | Path _ | SockAddrAndLen _ | StringList _
          | UInt _ | UInt32 _ -> assert false
        ) args;
        pr ");\n";
@@ -3332,13 +3420,14 @@ let print_python_binding name { args; ret } =
          | ArrayAndLen (UInt32 n, _) ->
             pr "  Py_DECREF (py_%s);\n" n
          | BytesIn _
+         | Int _
+         | Opaque _
          | String _
-         | UInt64 _
-         | Opaque _ -> ()
+         | UInt64 _ -> ()
          (* The following not yet implemented for callbacks XXX *)
          | ArrayAndLen _ | Bool _ | BytesOut _
          | BytesPersistIn _ | BytesPersistOut _ | Callback _ | CallbackPersist _
-         | Flags _ | Int _ | Int64 _ | Path _ | SockAddrAndLen _ | StringList _
+         | Flags _ | Int64 _ | Path _ | SockAddrAndLen _ | StringList _
          | UInt _ | UInt32 _ -> assert false
        ) args;
        pr "  return ret;\n";
@@ -3989,13 +4078,13 @@ let print_ocaml_binding (name, { args; ret }) =
          List.map (
            function
            | ArrayAndLen (UInt32 n, _) | BytesIn (n, _)
-           | String n | UInt64 n | Opaque n ->
+           | Int n | Opaque n | String n | UInt64 n ->
               n ^ "v"
            (* The following not yet implemented for callbacks XXX *)
            | ArrayAndLen _ | Bool _ | BytesOut _
            | BytesPersistIn _ | BytesPersistOut _
            | Callback _ | CallbackPersist _
-           | Flags _ | Int _ | Int64 _ | Path _
+           | Flags _ | Int64 _ | Path _
            | SockAddrAndLen _ | StringList _
            | UInt _ | UInt32 _ -> assert false
          ) args in
@@ -4021,6 +4110,8 @@ let print_ocaml_binding (name, { args; ret }) =
          | BytesIn (n, len) ->
             pr "  %sv = caml_alloc_string (%s);\n" n len;
             pr "  memcpy (String_val (%sv), %s, %s);\n" n n len
+         | Int n ->
+            pr "  %sv = caml_copy_int32 (%s);\n" n n
          | String n ->
             pr "  %sv = caml_copy_string (%s);\n" n n
          | UInt64 n ->
@@ -4032,7 +4123,7 @@ let print_ocaml_binding (name, { args; ret }) =
          (* The following not yet implemented for callbacks XXX *)
          | ArrayAndLen _ | Bool _ | BytesOut _
          | BytesPersistIn _ | BytesPersistOut _ | Callback _ | CallbackPersist _
-         | Flags _ | Int _ | Int64 _ | Path _ | SockAddrAndLen _ | StringList _
+         | Flags _ | Int64 _ | Path _ | SockAddrAndLen _ | StringList _
          | UInt _ | UInt32 _ -> assert false
        ) args;

diff --git a/lib/rw.c b/lib/rw.c
index dc81c57..669987e 100644
--- a/lib/rw.c
+++ b/lib/rw.c
@@ -55,6 +55,22 @@ nbd_unlocked_pread (struct nbd_handle *h, void *buf,
   return wait_for_command (h, ch);
 }

+/* Issue a read command with callbacks and wait for the reply. */
+int
+nbd_unlocked_pread_callback (struct nbd_handle *h, void *buf,
+                             size_t count, uint64_t offset,
+                             void *opaque, read_fn read, uint32_t flags)
+{
+  int64_t ch;
+
+  ch = nbd_unlocked_aio_pread_callback (h, buf, count, offset,
+                                        opaque, read, flags);
+  if (ch == -1)
+    return -1;
+
+  return wait_for_command (h, ch);
+}
+
 /* Issue a write command and wait for the reply. */
 int
 nbd_unlocked_pwrite (struct nbd_handle *h, const void *buf,
@@ -231,6 +247,22 @@ nbd_unlocked_aio_pread (struct nbd_handle *h, void *buf,
                                       buf, NULL);
 }

+int64_t
+nbd_unlocked_aio_pread_callback (struct nbd_handle *h, void *buf,
+                                 size_t count, uint64_t offset,
+                                 void *opaque, read_fn read, uint32_t flags)
+{
+  struct command_cb cb = { .opaque = opaque, .fn.read = read, };
+
+  if (flags != 0) {
+    set_error (EINVAL, "invalid flag: %" PRIu32, flags);
+    return -1;
+  }
+
+  return nbd_internal_command_common (h, 0, NBD_CMD_READ, offset, count,
+                                      buf, &cb);
+}
+
 int64_t
 nbd_unlocked_aio_pwrite (struct nbd_handle *h, const void *buf,
                          size_t count, uint64_t offset,
-- 
2.20.1




More information about the Libguestfs mailing list