[Libguestfs] [libnbd PATCH v4 11/11] rust: Add some examples

Richard W.M. Jones rjones at redhat.com
Wed Aug 2 16:53:17 UTC 2023


On Wed, Aug 02, 2023 at 12:40:56PM +0000, Tage Johansson wrote:
> This patch adds a few examples in rust/examples/. The examples are
> compiled and run as part of the test suite.
> ---
>  rust/Makefile.am                       |   3 +
>  rust/examples/concurrent-read-write.rs | 135 +++++++++++++++++++++++++
>  rust/examples/connect-command.rs       |  39 +++++++
>  rust/examples/fetch-first-sector.rs    |  38 +++++++
>  rust/examples/get-size.rs              |  29 ++++++
>  rust/run-tests.sh                      |   7 ++
>  6 files changed, 251 insertions(+)
>  create mode 100644 rust/examples/concurrent-read-write.rs
>  create mode 100644 rust/examples/connect-command.rs
>  create mode 100644 rust/examples/fetch-first-sector.rs
>  create mode 100644 rust/examples/get-size.rs
> 
> diff --git a/rust/Makefile.am b/rust/Makefile.am
> index b954b22..d75163d 100644
> --- a/rust/Makefile.am
> +++ b/rust/Makefile.am
> @@ -32,6 +32,9 @@ source_files = \
>  	src/types.rs \
>  	src/utils.rs \
>  	src/async_handle.rs \
> +	examples/connect-command.rs \
> +	examples/get-size.rs \
> +	examples/fetch-first-sector.rs \

This doesn't list all the source files, it is missing
examples/concurrent-read-write.rs.

If you split out examples/connect-command.rs, examples/get-size.rs and
examples/fetch-first-sector.rs into a separate patch (since those
don't depend on asynch), and moved that patch earlier in the sequence,
then it could go upstream earlier.

>  	libnbd-sys/Cargo.toml \
>  	libnbd-sys/build.rs \
>  	$(NULL)
> diff --git a/rust/examples/concurrent-read-write.rs b/rust/examples/concurrent-read-write.rs
> new file mode 100644
> index 0000000..a1c3e8a
> --- /dev/null
> +++ b/rust/examples/concurrent-read-write.rs
> @@ -0,0 +1,135 @@
> +//! Example usage with nbdkit:
> +//!
> +//!     nbdkit -U - memory 100M \
> +//!       --run 'cargo run --example concurrent-read-write -- $unixsocket'
> +//!
> +//! This will read and write randomly over the first megabyte of the
> +//! plugin using multi-conn, multiple threads and multiple requests in
> +//! flight on each thread.
> +
> +#![deny(warnings)]
> +use rand::prelude::*;
> +use std::env;
> +use std::path::PathBuf;
> +use std::sync::Arc;
> +use tokio::task::JoinSet;
> +
> +/// Number of simultaneous connections to the NBD server.
> +///
> +/// Note that some servers only support a limited number of
> +/// simultaneous connections, and/or have a configurable thread pool
> +/// internally, and if you exceed those limits then something will break.
> +const NR_MULTI_CONN: usize = 8;
> +
> +/// Number of commands that can be "in flight" at the same time on each
> +/// connection.  (Therefore the total number of requests in flight may
> +/// be up to NR_MULTI_CONN * MAX_IN_FLIGHT).
> +const MAX_IN_FLIGHT: usize = 16;
> +
> +/// The size of large reads and writes, must be > 512.
> +const BUFFER_SIZE: usize = 1024;
> +
> +/// Number of commands we issue (per [task][tokio::task]).
> +const NR_CYCLES: usize = 32;
> +
> +/// Statistics gathered during the run.
> +#[derive(Debug, Default)]
> +struct Stats {
> +    /// The total number of requests made.
> +    requests: usize,
> +}
> +
> +#[tokio::main]
> +async fn main() -> anyhow::Result<()> {
> +    let args = env::args_os().collect::<Vec<_>>();
> +    if args.len() != 2 {
> +        anyhow::bail!("Usage: {:?} socket", args[0]);
> +    }
> +    let socket = &args[1];
> +
> +    // We begin by making a connection to the server to get the export size
> +    // and ensure that it supports multiple connections and is writable.
> +    let nbd = libnbd::Handle::new()?;
> +    nbd.connect_unix(&socket)?;
> +    let export_size = nbd.get_size()?;
> +    anyhow::ensure!(
> +        (BUFFER_SIZE as u64) < export_size,
> +        "export is {export_size}B, must be larger than {BUFFER_SIZE}B"
> +    );
> +    anyhow::ensure!(
> +        !nbd.is_read_only()?,
> +        "error: this NBD export is read-only"
> +    );
> +    anyhow::ensure!(
> +        nbd.can_multi_conn()?,
> +        "error: this NBD export does not support multi-conn"
> +    );
> +    drop(nbd); // Close the connection.
> +
> +    // Start the worker tasks, one per connection.
> +    let mut tasks = JoinSet::new();
> +    for i in 0..NR_MULTI_CONN {
> +        tasks.spawn(run_thread(i, socket.clone().into(), export_size));
> +    }
> +
> +    // Wait for the tasks to complete.
> +    let mut stats = Stats::default();
> +    while !tasks.is_empty() {
> +        let this_stats = tasks.join_next().await.unwrap().unwrap()?;
> +        stats.requests += this_stats.requests;
> +    }
> +
> +    // Make sure the number of requests that were required matches what
> +    // we expect.
> +    assert_eq!(stats.requests, NR_MULTI_CONN * NR_CYCLES);
> +
> +    Ok(())
> +}
> +
> +async fn run_thread(
> +    task_idx: usize,
> +    socket: PathBuf,
> +    export_size: u64,
> +) -> anyhow::Result<Stats> {
> +    // Start a new connection to the server.
> +    // We shall spawn many commands concurrently on different tasks and those
> +    // futures must be `'static`, hence we wrap the handle in an [Arc].
> +    let nbd = Arc::new(libnbd::AsyncHandle::new()?);
> +    nbd.connect_unix(socket).await?;
> +
> +    let mut rng = SmallRng::seed_from_u64(44 as u64);
> +
> +    // Issue commands.
> +    let mut stats = Stats::default();
> +    let mut join_set = JoinSet::new();
> +    //tokio::time::sleep(std::time::Duration::from_secs(1)).await;
> +    while stats.requests < NR_CYCLES || !join_set.is_empty() {
> +        while stats.requests < NR_CYCLES && join_set.len() < MAX_IN_FLIGHT {
> +            // If we want to issue another request, do so.  Note that we reuse
> +            // the same buffer for multiple in-flight requests.  It doesn't
> +            // matter here because we're just trying to write random stuff,
> +            // but that would be Very Bad in a real application.
> +            // Simulate a mix of large and small requests.
> +            let size = if rng.gen() { BUFFER_SIZE } else { 512 };
> +            let offset = rng.gen_range(0..export_size - size as u64);
> +
> +            let mut buf = [0u8; BUFFER_SIZE];
> +            let nbd = nbd.clone();
> +            if rng.gen() {
> +                join_set.spawn(async move {
> +                    nbd.pread(&mut buf, offset, None).await
> +                });
> +            } else {
> +                // Fill the buf with random data.
> +                rng.fill(&mut buf);
> +                join_set
> +                    .spawn(async move { nbd.pwrite(&buf, offset, None).await });
> +            }
> +            stats.requests += 1;
> +        }
> +        join_set.join_next().await.unwrap().unwrap()?;
> +    }
> +
> +    if task_idx == 0 {}
> +    Ok(stats)
> +}
> diff --git a/rust/examples/connect-command.rs b/rust/examples/connect-command.rs
> new file mode 100644
> index 0000000..db4adbe
> --- /dev/null
> +++ b/rust/examples/connect-command.rs
> @@ -0,0 +1,39 @@
> +//! This example shows how to run an NBD server
> +//! (nbdkit) as a subprocess of libnbd.
> +
> +
> +fn main() -> libnbd::Result<()> {
> +    // Create the libnbd handle.
> +    let handle = libnbd::Handle::new()?;
> +
> +    // Run nbdkit as a subprocess.
> +    let args = [
> +        "nbdkit",
> +        // You must use ‘-s’ (which tells nbdkit to serve
> +        // a single connection on stdin/stdout).
> +        "-s",
> +        // It is recommended to use ‘--exit-with-parent’
> +        // to ensure nbdkit is always cleaned up even
> +        // if the main program crashes.
> +        "--exit-with-parent",
> +        // Use this to enable nbdkit debugging.
> +        "-v",
> +        // The nbdkit plugin name - this is a RAM disk.
> +        "memory",
> +        "size=1M",
> +    ];
> +    handle.connect_command(&args)?;
> +
> +    // Write some random data to the first sector.
> +    let wbuf: Vec<u8> = (0..512).into_iter().map(|i| (i % 13) as u8).collect();
> +    handle.pwrite(&wbuf, 0, None)?;
> +
> +    // Read the first sector back.
> +    let mut rbuf = [0; 512];
> +    handle.pread(&mut rbuf, 0, None)?;
> +
> +    // What was read must be exactly the same as what was written.
> +    assert_eq!(wbuf.as_slice(), rbuf.as_slice());
> +
> +    Ok(())
> +}
> diff --git a/rust/examples/fetch-first-sector.rs b/rust/examples/fetch-first-sector.rs
> new file mode 100644
> index 0000000..9efb47a
> --- /dev/null
> +++ b/rust/examples/fetch-first-sector.rs
> @@ -0,0 +1,38 @@
> +//! This example shows how to connect to an NBD server
> +//! and fetch and print the first sector (usually the
> +//! boot sector or partition table or filesystem
> +//! superblock).
> +//!
> +//! You can test it with nbdkit like this:
> +//!
> +//!     nbdkit -U - floppy . \
> +//!       --run 'cargo run --example fetch-first-sector -- $unixsocket'
> +//!
> +//! The nbdkit floppy plugin creates an MBR disk so the
> +//! first sector is the partition table.
> +
> +use pretty_hex::pretty_hex;
> +use std::env;
> +
> +fn main() -> anyhow::Result<()> {
> +    let nbd = libnbd::Handle::new()?;
> +
> +    let args = env::args_os().collect::<Vec<_>>();
> +    if args.len() != 2 {
> +        anyhow::bail!("Usage: {:?} socket", args[0]);
> +    }
> +    let socket = &args[1];
> +
> +    // Connect to the NBD server over a
> +    // Unix domain socket.
> +    nbd.connect_unix(socket)?;
> +
> +    // Read the first sector synchronously.
> +    let mut buf = [0; 512];
> +    nbd.pread(&mut buf, 0, None)?;
> +
> +    // Print the sector in hexdump like format.
> +    print!("{}", pretty_hex(&buf));
> +
> +    Ok(())
> +}
> diff --git a/rust/examples/get-size.rs b/rust/examples/get-size.rs
> new file mode 100644
> index 0000000..7f31df5
> --- /dev/null
> +++ b/rust/examples/get-size.rs
> @@ -0,0 +1,29 @@
> +//! This example shows how to connect to an NBD
> +//! server and read the size of the disk.
> +//!
> +//! You can test it with nbdkit like this:
> +//!
> +//!     nbdkit -U - memory 1M \
> +//!       --run 'cargo run --example get-size -- $unixsocket'
> +
> +use std::env;
> +
> +fn main() -> anyhow::Result<()> {
> +    let nbd = libnbd::Handle::new()?;
> +
> +    let args = env::args_os().collect::<Vec<_>>();
> +    if args.len() != 2 {
> +        anyhow::bail!("Usage: {:?} socket", args[0]);
> +    }
> +    let socket = &args[1];
> +
> +    // Connect to the NBD server over a
> +    // Unix domain socket.
> +    nbd.connect_unix(socket)?;
> +
> +    // Read the size in bytes and print it.
> +    let size = nbd.get_size()?;
> +    println!("{:?}: size = {size} bytes", socket);
> +
> +    Ok(())
> +}
> diff --git a/rust/run-tests.sh b/rust/run-tests.sh
> index da7852a..bdc0f16 100755
> --- a/rust/run-tests.sh
> +++ b/rust/run-tests.sh
> @@ -25,6 +25,13 @@ requires nbdkit --version

You'll need to also "requires" that the plugins exist, since not all
will exist on every platform.  So add:

  requires nbdkit floppy --version
  requires nbdkit memory --version

(https://libguestfs.org/nbdkit-probing.1.html)

>  if [ -z "$VG" ]; then
>      $CARGO test -- --nocapture
> +    $CARGO run --example connect-command
> +    nbdkit -U - memory 1M \
> +        --run "$CARGO"' run --example get-size -- $unixsocket'

This fancy quoting does work, but it's probably better and safer to use:

  export CARGO
  nbdkit -U - memory 1M \
         --run '$CARGO run --example get-size -- $unixsocket'

and here:

> +    nbdkit -U - floppy . \
> +        --run "$CARGO"' run --example fetch-first-sector -- $unixsocket'
> +    nbdkit -U - memory 10M \
> +        --run "$CARGO"' run --example concurrent-read-write -- $unixsocket'
>  else
>      $CARGO test --config "target.'cfg(all())'.runner = \"$VG\"" -- --nocapture
>  fi
> -- 
> 2.41.0

Rich.

-- 
Richard Jones, Virtualization Group, Red Hat http://people.redhat.com/~rjones
Read my programming and virtualization blog: http://rwmj.wordpress.com
nbdkit - Flexible, fast NBD server with plugins
https://gitlab.com/nbdkit/nbdkit


More information about the Libguestfs mailing list