[Libguestfs] [libnbd PATCH v8 06/10] rust: async: Create an async friendly handle type

Tage Johansson tage.j.lists at posteo.net
Sat Aug 26 16:24:03 UTC 2023


On 8/24/2023 11:55 PM, Eric Blake wrote:
> On Sun, Aug 20, 2023 at 02:16:25PM +0000, Tage Johansson wrote:
>> Create another handle type: AsyncHandle, which makes use of Rust's
>> builtin asynchronous functions (see
>> <https://doc.rust-lang.org/std/keyword.async.html>) and runs on top of
>> the Tokio runtime (see<https://docs.rs/tokio>). For every asynchronous
>> command, like aio_connect(), a corresponding `async` method is created
>> on the handle. In this case it would be:
>>      async fn connect(...) -> Result<(), ...>
>> When called, it will poll the file descriptor until the command is
>> complete, and then return with a result. All the synchronous
>> counterparts (like nbd_connect()) are excluded from this handle type
>> as they are unnecessary and since they might interfear with the polling
>> made by the Tokio runtime. For more details about how the asynchronous
>> commands are executed, please see the comments in
>> rust/src/async_handle.rs.
>> ---
>>   generator/Rust.ml        | 249 +++++++++++++++++++++++++++++++++++-
>>   generator/Rust.mli       |   2 +
>>   generator/generator.ml   |   2 +
>>   rust/Cargo.toml          |   4 +-
>>   rust/Makefile.am         |   2 +
>>   rust/src/async_handle.rs | 268 +++++++++++++++++++++++++++++++++++++++
>>   rust/src/lib.rs          |   8 ++
>>   rust/src/utils.rs        |   9 ++
>>   scripts/git.orderfile    |   1 +
>>   9 files changed, 538 insertions(+), 7 deletions(-)
>>   create mode 100644 rust/src/async_handle.rs
>>
>> diff --git a/generator/Rust.ml b/generator/Rust.ml
>> index 431c814..1bc81f0 100644
>> --- a/generator/Rust.ml
>> +++ b/generator/Rust.ml
>> @@ -61,11 +61,12 @@ let print_rust_flags { flag_prefix; flags } =
>>   let rec to_upper_snake_case s =
>>     let s = String.uppercase_ascii s in
>>     let s = explode s in
>> -  let s = filter_map (
>> -    function
>> -    |'-' -> Some "_" | ':' -> None
>> -    | ch -> Some (String.make 1 ch)
>> -  ) s in
>> +  let s =
>> +    filter_map
>> +      (function
>> +        | '-' -> Some "_" | ':' -> None | ch -> Some (String.make 1 ch))
>> +      s
>> +  in
>>     String.concat "" s
> This looks like it is just reformatting.  While cleanup patches are
> okay (and I trust Rich's take on OCaml style more than my own), it's
> cleaner to do them in separate patches.
>
>>   
>>   (* Split a string into a list of chars.  In later OCaml we could
>> @@ -75,7 +76,7 @@ and explode str =
>>     let r = ref [] in
>>     for i = 0 to String.length str - 1 do
>>       let c = String.unsafe_get str i in
>> -    r := c :: !r;
>> +    r := c :: !r
>>     done;
>>     List.rev !r
>>   
>> @@ -564,3 +565,239 @@ let generate_rust_bindings () =
>>     pr "impl Handle {\n";
>>     List.iter print_rust_handle_method handle_calls;
>>     pr "}\n\n"
>> +
>> +(*********************************************************)
>> +(* The rest of the file conserns the asynchronous API.   *)
> concerns
>
>> +(*                                                       *)
>> +(* See the comments in rust/src/async_handle.rs for more *)
>> +(* information about how it works.                       *)
>> +(*********************************************************)
>> +
>> +let excluded_handle_calls : NameSet.t =
>> +  NameSet.of_list
>> +    [
>> +      "aio_get_fd";
>> +      "aio_get_direction";
>> +      "aio_notify_read";
>> +      "aio_notify_write";
>> +      "clear_debug_callback";
>> +      "get_debug";
>> +      "poll";
>> +      "poll2";
>> +      "set_debug";
>> +      "set_debug_callback";
>> +    ]
>> +
>> +(* A mapping with names as keys. *)
>> +module NameMap = Map.Make (String)
>> +
>> +(* Strip "aio_" from the beginning of a string. *)
>> +let strip_aio name : string =
>> +  if String.starts_with ~prefix:"aio_" name then
>> +    String.sub name 4 (String.length name - 4)
>> +  else failwithf "Asynchronous call %s must begin with aio_" name
>> +
>> +(* A map with all asynchronous handle calls. The keys are names with "aio_"
>> +   stripped, the values are a tuple with the actual name (with "aio_"), the
>> +   [call] and the [async_kind]. *)
>> +let async_handle_calls : ((string * call) * async_kind) NameMap.t =
> Do we need a 2-deep nested tuple, or can we use (string * call * async_kind)?
>
>> +  handle_calls
>> +  |> List.filter (fun (n, _) -> not (NameSet.mem n excluded_handle_calls))
>> +  |> List.filter_map (fun (name, call) ->
>> +         call.async_kind
>> +         |> Option.map (fun async_kind ->
>> +                (strip_aio name, ((name, call), async_kind))))
>> +  |> List.to_seq |> NameMap.of_seq
>> +
>> +(* A mapping with all synchronous (not asynchronous) handle calls. Excluded
>> +   are also all synchronous calls that has an asynchronous counterpart. So if
> s/has/have/
>
>> +   "foo" is the name of a handle call and an asynchronous call "aio_foo"
>> +   exists, then "foo" will not b in this map. *)
> s/b /be /
>
>> +let sync_handle_calls : call NameMap.t =
>> +  handle_calls
>> +  |> List.filter (fun (n, _) -> not (NameSet.mem n excluded_handle_calls))
>> +  |> List.filter (fun (name, _) ->
>> +         (not (NameMap.mem name async_handle_calls))
>> +         && not
>> +              (String.starts_with ~prefix:"aio_" name
>> +              && NameMap.mem (strip_aio name) async_handle_calls))
>> +  |> List.to_seq |> NameMap.of_seq
>> +
>> +(* Get the Rust type for an argument in the asynchronous API. Like
>> +   [rust_arg_type] but no static lifetime on some buffers. *)
>> +let rust_async_arg_type : arg -> string = function
>> +  | BytesPersistIn _ -> "&[u8]"
>> +  | BytesPersistOut _ -> "&mut [u8]"
>> +  | x -> rust_arg_type x
>> +
>> +(* Get the Rust type for an optional argument in the asynchronous API. Like
>> +   [rust_optarg_type] but no static lifetime on some closures. *)
>> +let rust_async_optarg_type : optarg -> string = function
>> +  | OClosure x -> sprintf "Option<%s>" (rust_async_arg_type (Closure x))
>> +  | x -> rust_optarg_type x
>> +
>> +(* A string of the argument list for a method on the handle, with both
>> +   mandotory and optional arguments. *)
>> +let rust_async_handle_call_args { args; optargs } : string =
>> +  let rust_args_names =
>> +    List.map rust_arg_name args @ List.map rust_optarg_name optargs
>> +  and rust_args_types =
>> +    List.map rust_async_arg_type args
>> +    @ List.map rust_async_optarg_type optargs
>> +  in
>> +  String.concat ", "
>> +    (List.map2 (sprintf "%s: %s") rust_args_names rust_args_types)
>> +
>> +(* Print the Rust function for a not asynchronous handle call. *)
> s/not asynchronous/synchronous/
>
>> +let print_rust_sync_handle_call name call =
>> +  print_rust_handle_call_comment call;
>> +  pr "pub fn %s(&self, %s) -> %s\n" name
>> +    (rust_async_handle_call_args call)
>> +    (rust_ret_type call);
>> +  print_ffi_call name "self.data.handle.handle" call;
>> +  pr "\n"
>> +
>> +(* Print the Rust function for an asynchronous handle call with a completion
>> +   callback. (Note that "callback" might be abbreviated with "cb" in the
>> +   following code. *)
>> +let print_rust_async_handle_call_with_completion_cb name (aio_name, call) =
>> +  (* An array of all optional arguments. Useful because we need to deel with
> s/deel/deal/
>
>> +     the index of the completion callback. *)
>> +  let optargs = Array.of_list call.optargs in
>> +  (* The index of the completion callback in [optargs] *)
>> +  let completion_cb_index =
>> +    Array.find_map
>> +      (fun (i, optarg) ->
>> +        match optarg with
>> +        | OClosure { cbname } ->
>> +            if cbname = "completion" then Some i else None
>> +        | _ -> None)
>> +      (Array.mapi (fun x y -> (x, y)) optargs)
>> +  in
>> +  let completion_cb_index =
>> +    match completion_cb_index with
>> +    | Some x -> x
>> +    | None ->
>> +        failwithf
>> +          "The handle call %s is claimed to have a completion callback among \
>> +           its optional arguments by the async_kind field, but so does not \
> s/so/that/
>
>> +           seem to be the case."
>> +          aio_name
>> +  in
>> +++ b/rust/Cargo.toml
>> @@ -48,9 +48,11 @@ os_socketaddr = "0.2.4"
>>   thiserror = "1.0.40"
>>   log = { version = "0.4.19", optional = true }
>>   libc = "0.2.147"
>> +tokio = { optional = true, version = "1.29.1", default-features = false, features = ["rt", "sync", "net"] }
>> +epoll = "4.3.3"
>>   
>>   [features]
>> -default = ["log"]
>> +default = ["log", "tokio"]
> It looks like you intend for tokio to be an optional dependency (you
> always get the bare-bones Rust bindings, but if tokio is installed,
> you also get the AsyncHandle bindings).  Do we need to document that
> in README at all?  Is there an easy way to set up CI tests to cover
> builds both with and without tokio, so that we can ensure we don't
> break builds on someone who chooses not to install the optional
> dependency?



tokio and log are so called cargo features 
<https://doc.rust-lang.org/cargo/reference/features.html>. They can be 
disabled/enabled when someone includes the crate as a dependency. It has 
nothing to do with whether tokio or log is installed or not.


It makes sense though to test that compilation works even with these 
features disabled. So I've made a new patch which adds "cargo check 
--no-default-features" to the test suite. In fact, it turned out that a 
warning triggered compilation to fail with the features disabled, so 
I've fixed that as well.


>> +++ b/rust/src/async_handle.rs
>> @@ -0,0 +1,268 @@
>> +
>> +#![allow(unused_imports)] // XXX: remove this
> How hard is it to fix this line?
>
>> +use crate::sys;
>> +use crate::Handle;
>> +use crate::{Error, FatalErrorKind, Result};
>> +use crate::{AIO_DIRECTION_BOTH, AIO_DIRECTION_READ, AIO_DIRECTION_WRITE};
>> +use epoll::Events;
>> +use std::sync::Arc;
>> +use std::sync::Mutex;
>> +use tokio::io::{unix::AsyncFd, Interest, Ready as IoReady};
>> +use tokio::sync::{broadcast, Notify};
>> +use tokio::task;
>> +
>> +/// A custom result type with a shared [crate::Error] as default error type.
>> +pub type SharedResult<T, E = Arc<Error>> = Result<T, E>;
>> +
>> +/// An NBD handle using Rust's `async` functionality on top of the
>> +/// [Tokio](https://docs.rs/tokio/) runtime.
>> +pub struct AsyncHandle {
>> +    /// Data shared both by this struct and the polling task.
>> +    pub(crate) data: Arc<HandleData>,
>> +
>> +    /// A task which soely purpose is to poll the NBD handle.
> s/soely/sole/
>
>> +    polling_task: tokio::task::AbortHandle,
>> +}
>> +
>> +pub(crate) struct HandleData {
>> +    /// The underliing handle.
>> +    pub handle: Handle,
>> +
>> +    /// A list of all pending commands.
>> +    ///
>> +    /// For every pending command (commands in flight), a predicate will be
>> +    /// stored in this list. Whenever some progress is made on the file
>> +    /// descriptor, the predicate is called with a reference to the handle
>> +    /// and a reference to the result of that call to `aio_notify_*`.
>> +    /// Iff the predicate returns [true], the command is considered completed
>> +    /// and removed from this list.
>> +    ///
>> +    /// If The polling task dies for some reason, this [SharedResult] will be
>> +    /// set to some error.
>> +    pub pending_commands: Mutex<
>> +        SharedResult<
>> +            Vec<
>> +                Box<
>> +                    dyn FnMut(&Handle, &SharedResult<()>) -> bool
>> +                        + Send
>> +                        + Sync
>> +                        + 'static,
>> +                >,
>> +            >,
>> +        >,
>> +    >,
>> +
>> +    /// A notifier used by commands to notify the polling task when a new
>> +    /// asynchronous command is issued.
>> +    pub new_command: Notify,
>> +}
>> +
>> +impl AsyncHandle {
>> +    pub fn new() -> Result<Self> {
>> +        let handle_data = Arc::new(HandleData {
>> +            handle: Handle::new()?,
>> +            pending_commands: Mutex::new(Ok(Vec::new())),
>> +            new_command: Notify::new(),
>> +        });
>> +
>> +        let handle_data_2 = handle_data.clone();
>> +        let polling_task = task::spawn(async move {
>> +            // The polling task should never finish without an error. If the
>> +            // handle is dropped, the task is aborted so it'll not return in
> s/it'll not/it won't/
>
>> +            // that case either.
>> +            let Err(err) = polling_task(&handle_data_2).await else {
>> +                unreachable!()
>> +            };
>> +            let err = Arc::new(Error::Fatal(err));
>> +            // Call the completion predicates for all pending commands with the
>> +            // error.
>> +            let mut pending_cmds =
>> +                handle_data_2.pending_commands.lock().unwrap();
>> +            let res = Err(err);
>> +            for f in pending_cmds.as_mut().unwrap().iter_mut() {
>> +                f(&handle_data_2.handle, &res);
>> +            }
>> +            *pending_cmds = Err(res.unwrap_err());
>> +        })
>> +        .abort_handle();
>> +        Ok(Self {
>> +            data: handle_data,
>> +            polling_task,
>> +        })
>> +    }
>> +
>> +    /// Get the underliing C pointer to the handle.
>> +    pub(crate) fn raw_handle(&self) -> *mut sys::nbd_handle {
>> +        self.data.handle.raw_handle()
>> +    }
>> +
>> +    /// Call this method when a new command is issued. As argument is passed a
> Not sure if you meant 'An argument is passed' or something else here.
>
>> +    /// predicate which should return [true] iff the command is completed.
>> +    pub(crate) fn add_command(
>> +        &self,
>> +        mut completion_predicate: impl FnMut(&Handle, &SharedResult<()>) -> bool
>> +            + Send
>> +            + Sync
>> +            + 'static,
>> +    ) -> SharedResult<()> {
>> +        if !completion_predicate(&self.data.handle, &Ok(())) {
>> +            let mut pending_cmds_lock =
>> +                self.data.pending_commands.lock().unwrap();
>> +            pending_cmds_lock
>> +                .as_mut()
>> +                .map_err(|e| e.clone())?
>> +                .push(Box::new(completion_predicate));
>> +            self.data.new_command.notify_one();
>> +        }
>> +        Ok(())
>> +    }
>> +}
>> +
>> +impl Drop for AsyncHandle {
>> +    fn drop(&mut self) {
>> +        self.polling_task.abort();
>> +    }
>> +}
>> +
>> +/// Get the read/write direction that the handle wants on the file descriptor.
>> +fn get_fd_interest(handle: &Handle) -> Option<Interest> {
>> +    match handle.aio_get_direction() {
>> +        0 => None,
>> +        AIO_DIRECTION_READ => Some(Interest::READABLE),
>> +        AIO_DIRECTION_WRITE => Some(Interest::WRITABLE),
>> +        AIO_DIRECTION_BOTH => Some(Interest::READABLE | Interest::WRITABLE),
>> +        _ => unreachable!(),
>> +    }
>> +}
>> +
>> +/// A task that will run as long as the handle is alive. It will poll the
>> +/// file descriptor when new data is availlable.
> available
>
>> +async fn polling_task(handle_data: &HandleData) -> Result<(), FatalErrorKind> {
>> +    let HandleData {
>> +        handle,
>> +        pending_commands,
>> +        new_command,
>> +    } = handle_data;
>> +    let fd = handle.aio_get_fd().map_err(Error::to_fatal)?;
>> +    // XXX: Might the file descriptor ever be changed?
>> +    let tokio_fd = AsyncFd::new(fd)?;
> Regarding the XXX, my understanding is that aio_get_fd() returns the
> same fd for the life of the handle once a connection is established,
> so you can drop the comment.  Changing the fd would imply creating a
> new socket, but we don't have automatic internal reconnect built into
> libnbd at this time.  (You can do external reconnect by opening a new
> NBD handle - but then it's obvious that you will call aio_get_fd() on
> the new handle)
>
>> +    let epfd = epoll::create(false)?;
>> +    epoll::ctl(
>> +        epfd,
>> +        epoll::ControlOptions::EPOLL_CTL_ADD,
>> +        fd,
>> +        epoll::Event::new(Events::EPOLLIN | Events::EPOLLOUT, 42),
>> +    )?;
>> +
>> +    // The following loop does approximately the following things:
>> +    //
>> +    // 1. Determine what Libnbd wants to do next on the file descriptor,
>> +    //    (read/write/both/none), and store that in [fd_interest].
>> +    // 2. Wait for either:
>> +    //   a) That interest to be available on the file descriptor in which case:
>> +    //     I.   Call the correct `aio_notify_*` method.
>> +    //     II.  Execute step 1.
>> +    //     III. Send the result of the call to `aio_notify_*` on
>> +    //          [result_channel] to notify pending commands that some progress
>> +    //          has been made.
>> +    //     IV.  Resume execution from step 2.
>> +    //   b) A notification was received on [new_command] signaling that a new
>> +    //      command was registered and that the intrest on the file descriptor
> interest
>
>> +    //      might has changed. Resume execution from step 1.
> s/has/have/
>
>> +    loop {
>> +        let Some(fd_interest) = get_fd_interest(handle) else {
>> +            // The handle does not wait for any data of the file descriptor,
>> +            // so we wait until some command is issued.
>> +            new_command.notified().await;
>> +            continue;
>> +        };
>> +
>> +        if pending_commands
>> +            .lock()
>> +            .unwrap()
>> +            .as_ref()
>> +            .unwrap()
>> +            .is_empty()
>> +        {
>> +            // No command is pending so there is no point to do anything.
>> +            new_command.notified().await;
>> +            continue;
>> +        }
>> +
>> +        // Wait for the requested interest to be available on the fd.
>> +        let mut ready_guard = tokio_fd.ready(fd_interest).await?;
>> +        let readyness = ready_guard.ready();
> Typical spelling is readiness; would affect later lines of code
>
>> +        let res = if readyness.is_readable() && fd_interest.is_readable() {
>> +            handle.aio_notify_read()
>> +        } else if readyness.is_writable() && fd_interest.is_writable() {
>> +            handle.aio_notify_write()
>> +        } else {
>> +            continue;
>> +        };
>> +        let res = match res {
>> +            Ok(()) => Ok(()),
>> +            Err(e @ Error::Recoverable(_)) => Err(Arc::new(e)),
>> +            Err(Error::Fatal(e)) => return Err(e),
>> +        };
>> +
>> +        // Call the completion predicates of all pending commands.
>> +        let mut pending_cmds_lock = pending_commands.lock().unwrap();
>> +        let pending_cmds = pending_cmds_lock.as_mut().unwrap();
>> +        let mut i = 0;
>> +        while i < pending_cmds.len() {
>> +            if (pending_cmds[i])(handle, &res) {
>> +                let _ = pending_cmds.swap_remove(i);
>> +            } else {
>> +                i += 1;
>> +            }
>> +        }
>> +        drop(pending_cmds_lock);
>> +
>> +        // Use epoll to check the current read/write availability on the fd.
>> +        // This is needed because Tokio does only support edge-triggered
> s/does only support/supports only/
>
>> +        // notifications but Libnbd requires level-triggered notifications.
>> +        let mut revent = epoll::Event { data: 0, events: 0 };
>> +        // Setting timeout to 0 means that it will return immediately.
>> +        epoll::wait(epfd, 0, std::slice::from_mut(&mut revent))?;
>> +        let revents = Events::from_bits(revent.events).unwrap();
>> +        if !revents.contains(Events::EPOLLIN) {
>> +            ready_guard.clear_ready_matching(IoReady::READABLE);
>> +        }
>> +        if !revents.contains(Events::EPOLLOUT) {
>> +            ready_guard.clear_ready_matching(IoReady::WRITABLE);
>> +        }
>> +        ready_guard.retain_ready();
>> +    }
>> +}
>> diff --git a/rust/src/lib.rs b/rust/src/lib.rs
>> index a6f3131..56316b4 100644
>> --- a/rust/src/lib.rs
>> +++ b/rust/src/lib.rs
>> @@ -17,11 +17,19 @@
>>   
>>   #![deny(warnings)]
>>   
>> +#[cfg(feature = "tokio")]
>> +mod async_bindings;
>> +#[cfg(feature = "tokio")]
>> +mod async_handle;
>>   mod bindings;
>>   mod error;
>>   mod handle;
>>   pub mod types;
>>   mod utils;
>> +#[cfg(feature = "tokio")]
>> +pub use async_bindings::*;
>> +#[cfg(feature = "tokio")]
>> +pub use async_handle::{AsyncHandle, SharedResult};
>>   pub use bindings::*;
>>   pub use error::{Error, ErrorKind, FatalErrorKind, Result};
>>   pub use handle::Handle;
>> diff --git a/rust/src/utils.rs b/rust/src/utils.rs
>> index b8200c1..8984ebb 100644
>> --- a/rust/src/utils.rs
>> +++ b/rust/src/utils.rs
>> @@ -21,3 +21,12 @@ use std::ffi::c_void;
>>   pub unsafe extern "C" fn drop_data<T>(data: *mut c_void) {
>>       drop(Box::from_raw(data as *mut T))
>>   }
>> +
>> +/// Turn a [FnOnce] (with a single `&mut` argument) to a [FnMut]
>> +/// which panics on the second invocation.
>> +pub fn fn_once_to_fn_mut<T, U>(
>> +    f: impl FnOnce(&mut T) -> U,
>> +) -> impl FnMut(&mut T) -> U {
>> +    let mut f = Some(f);
>> +    move |x| (f.take().unwrap())(x)
>> +}
>> diff --git a/scripts/git.orderfile b/scripts/git.orderfile
>> index b988d87..60ec56d 100644
>> --- a/scripts/git.orderfile
>> +++ b/scripts/git.orderfile
>> @@ -69,6 +69,7 @@ rust/src/types.rs
>>   rust/src/utils.rs
>>   rust/src/lib.rs
>>   rust/src/handle.rs
>> +rust/src/async_handle.rs
>>   rust/libnbd-sys/*
>>   rust/examples/*
>>   rust/tests/*
>> -- 
>> 2.41.0
>>
>> _______________________________________________
>> Libguestfs mailing list
>> Libguestfs at redhat.com
>> https://listman.redhat.com/mailman/listinfo/libguestfs
>>
> I'm still learning Rust, so a lot of this I just have to trust, but
> overall the patch seems like a good framework.  While I definitely
> found some typos to fix, I'm less certain on whethere there are any
> major implementation flaws.


Thanks for the comments. I have corrected all grammatical errors you 
pointed out. A 9th patch series will come soon.


--

Best regards,

Tage

-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://listman.redhat.com/archives/libguestfs/attachments/20230826/34407f16/attachment.htm>


More information about the Libguestfs mailing list