[Libguestfs] [PATCH libnbd discussion only 5/5] examples: Add concurrent writer example.

Richard W.M. Jones rjones at redhat.com
Mon Jun 3 15:29:48 UTC 2019


---
 .gitignore                           |   1 +
 examples/Makefile.am                 |  12 +
 examples/concurrent-writer.c         | 450 +++++++++++++++++++++++++++
 examples/threaded-reads-and-writes.c |   2 +-
 4 files changed, 464 insertions(+), 1 deletion(-)

diff --git a/.gitignore b/.gitignore
index 30438c1..e4dad91 100644
--- a/.gitignore
+++ b/.gitignore
@@ -41,6 +41,7 @@ Makefile.in
 /docs/libnbd-api.3
 /docs/libnbd-api.pod
 /examples/batched-read-write
+/examples/concurrent-writer
 /examples/threaded-reads-and-writes
 /examples/simple-fetch-first-sector
 /examples/simple-reads-and-writes
diff --git a/examples/Makefile.am b/examples/Makefile.am
index b933873..b5f7e44 100644
--- a/examples/Makefile.am
+++ b/examples/Makefile.am
@@ -21,6 +21,7 @@ EXTRA_DIST = LICENSE-FOR-EXAMPLES
 
 noinst_PROGRAMS = \
 	batched-read-write \
+	concurrent-writer \
 	simple-fetch-first-sector \
 	simple-reads-and-writes \
 	threaded-reads-and-writes
@@ -54,6 +55,17 @@ threaded_reads_and_writes_LDADD = \
 	$(top_builddir)/lib/libnbd.la \
 	$(PTHREAD_LIBS)
 
+concurrent_writer_SOURCES = \
+	concurrent-writer.c
+concurrent_writer_CPPFLAGS = \
+	-I$(top_srcdir)/include
+concurrent_writer_CFLAGS = \
+	$(WARNINGS_CFLAGS) \
+	$(PTHREAD_CFLAGS)
+concurrent_writer_LDADD = \
+	$(top_builddir)/lib/libnbd.la \
+	$(PTHREAD_LIBS)
+
 batched_read_write_SOURCES = \
 	batched-read-write.c
 batched_read_write_CPPFLAGS = \
diff --git a/examples/concurrent-writer.c b/examples/concurrent-writer.c
new file mode 100644
index 0000000..11a9f22
--- /dev/null
+++ b/examples/concurrent-writer.c
@@ -0,0 +1,450 @@
+/* Example usage with nbdkit:
+ *
+ * nbdkit -U - memory 1M --run './concurrent-writer $unixsocket'
+ *
+ * This will read and write randomly over the first megabyte of the
+ * plugin using multi-conn, multiple threads, multiple requests in
+ * flight on each connection, and concurrent writer threads.
+ *
+ * To run it against a remote server over TCP (note this will destroy
+ * the first megabyte of the remote disk):
+ *
+ * ./concurrent-writer hostname port
+ *  or
+ * ./concurrent-writer nbd://hostname:port
+ */
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <stdbool.h>
+#include <stdint.h>
+#include <string.h>
+#include <unistd.h>
+#include <errno.h>
+#include <signal.h>
+#include <poll.h>
+#include <time.h>
+#include <assert.h>
+
+#include <pthread.h>
+
+#include <libnbd.h>
+
+static int64_t exportsize;
+
+/* Number of simultaneous connections to the NBD server.  The number
+ * of threads is NR_MULTI_CONN * 2 because there is one thread reading
+ * plus a concurrent writer thread.  Note that some servers only
+ * support a limited number of simultaneous connections, and/or have a
+ * configurable thread pool internally, and if you exceed those limits
+ * then something will break.
+ */
+#define NR_MULTI_CONN 8
+
+/* Number of commands that can be "in flight" at the same time on each
+ * connection.  (Therefore the total number of requests in flight may
+ * be up to NR_MULTI_CONN * MAX_IN_FLIGHT).  qemu's NBD client can
+ * have up to 16 requests in flight.
+ *
+ * Some servers do not support multiple requests in flight and may
+ * deadlock or even crash if this is larger than 1, but common NBD
+ * servers should be OK.
+ */
+#define MAX_IN_FLIGHT 16
+
+/* Number of commands we issue (per thread). */
+#define NR_CYCLES 1000000
+
+/* Reader thread. */
+struct reader_status {
+  size_t i;                     /* Thread index, 0 .. NR_MULTI_CONN-1 */
+  int argc;                     /* Command line parameters. */
+  char **argv;
+  int status;                   /* Return status. */
+  unsigned requests;            /* Total number of requests made. */
+  unsigned most_in_flight;      /* Most requests seen in flight. */
+};
+
+static void *start_reader_thread (void *arg);
+
+int
+main (int argc, char *argv[])
+{
+  struct nbd_handle *nbd;
+  pthread_t reader_threads[NR_MULTI_CONN];
+  struct reader_status reader_status[NR_MULTI_CONN];
+  size_t i;
+  int err;
+  unsigned requests, most_in_flight, errors;
+
+  srand (time (NULL));
+
+  if (argc < 2 || argc > 3) {
+    fprintf (stderr, "%s uri | socket | hostname port\n", argv[0]);
+    exit (EXIT_FAILURE);
+  }
+
+  nbd = nbd_create ();
+  if (nbd == NULL) {
+    fprintf (stderr, "%s\n", nbd_get_error ());
+    exit (EXIT_FAILURE);
+  }
+
+  /* Connect first to check if the server supports writes and multi-conn. */
+  if (argc == 2) {
+    if (strstr (argv[1], "://")) {
+      if (nbd_connect_uri (nbd, argv[1]) == -1) {
+        fprintf (stderr, "%s\n", nbd_get_error ());
+        exit (EXIT_FAILURE);
+      }
+    }
+    else if (nbd_connect_unix (nbd, argv[1]) == -1) {
+      fprintf (stderr, "%s\n", nbd_get_error ());
+      exit (EXIT_FAILURE);
+    }
+  }
+  else {
+    if (nbd_connect_tcp (nbd, argv[1], argv[2]) == -1) {
+      fprintf (stderr, "%s\n", nbd_get_error ());
+      exit (EXIT_FAILURE);
+    }
+  }
+
+  exportsize = nbd_get_size (nbd);
+  if (exportsize == -1) {
+    fprintf (stderr, "%s\n", nbd_get_error ());
+    exit (EXIT_FAILURE);
+  }
+
+  if (nbd_read_only (nbd) == 1) {
+    fprintf (stderr, "%s: error: this NBD export is read-only\n", argv[0]);
+    exit (EXIT_FAILURE);
+  }
+
+  if (nbd_can_multi_conn (nbd) == 0) {
+    fprintf (stderr, "%s: error: "
+             "this NBD export does not support multi-conn\n", argv[0]);
+    exit (EXIT_FAILURE);
+  }
+
+  nbd_close (nbd);
+
+  /* Start the reader threads. */
+  for (i = 0; i < NR_MULTI_CONN; ++i) {
+    reader_status[i].i = i;
+    reader_status[i].argc = argc;
+    reader_status[i].argv = argv;
+    reader_status[i].status = 0;
+    reader_status[i].requests = 0;
+    reader_status[i].most_in_flight = 0;
+    err = pthread_create (&reader_threads[i], NULL,
+                          start_reader_thread, &reader_status[i]);
+    if (err != 0) {
+      errno = err;
+      perror ("pthread_create");
+      exit (EXIT_FAILURE);
+    }
+  }
+
+  /* Wait for the threads to exit. */
+  errors = 0;
+  requests = 0;
+  most_in_flight = 0;
+  for (i = 0; i < NR_MULTI_CONN; ++i) {
+    err = pthread_join (reader_threads[i], NULL);
+    if (err != 0) {
+      errno = err;
+      perror ("pthread_join");
+      exit (EXIT_FAILURE);
+    }
+    if (reader_status[i].status != 0) {
+      fprintf (stderr, "thread %zu failed with status %d\n",
+               i, reader_status[i].status);
+      errors++;
+    }
+    requests += reader_status[i].requests;
+    if (reader_status[i].most_in_flight > most_in_flight)
+      most_in_flight = reader_status[i].most_in_flight;
+  }
+
+  /* Make sure the number of requests that were required matches what
+   * we expect.
+   */
+  assert (requests == NR_MULTI_CONN * NR_CYCLES);
+
+  printf ("most requests seen in flight = %u (per thread) "
+          "vs MAX_IN_FLIGHT = %d\n",
+          most_in_flight, MAX_IN_FLIGHT);
+
+  exit (errors == 0 ? EXIT_SUCCESS : EXIT_FAILURE);
+}
+
+struct queue {
+  struct queue *next;
+  void *buf;
+  size_t len;
+};
+
+/* Concurrent writer thread (one per libnbd handle). */
+struct writer_data {
+  size_t i;                     /* Thread index, 0 .. NR_MULTI_CONN-1 */
+  struct nbd_handle *nbd;       /* NBD handle. */
+  struct queue *q, *q_end;      /* Queue of items to write. */
+  pthread_mutex_t q_lock;       /* Lock on queue. */
+  pthread_cond_t q_cond;        /* Condition on queue. */
+};
+
+static void *start_writer_thread (void *arg);
+static void writer (void *data, const void *buf, size_t len);
+
+static void *
+start_reader_thread (void *arg)
+{
+  struct nbd_handle *nbd;
+  struct pollfd fds[1];
+  struct reader_status *status = arg;
+  struct writer_data writer_data;
+  pthread_t writer_thread;
+  int err;
+  char buf[512];
+  size_t i, j;
+  uint64_t offset, handle;
+  uint64_t handles[MAX_IN_FLIGHT];
+  size_t in_flight;        /* counts number of requests in flight */
+  int dir, r, cmd;
+  bool want_to_send;
+
+  nbd = nbd_create ();
+  if (nbd == NULL) {
+    fprintf (stderr, "%s\n", nbd_get_error ());
+    exit (EXIT_FAILURE);
+  }
+
+  if (status->argc == 2) {
+    if (strstr (status->argv[1], "://")) {
+      if (nbd_connect_uri (nbd, status->argv[1]) == -1) {
+        fprintf (stderr, "%s\n", nbd_get_error ());
+        exit (EXIT_FAILURE);
+      }
+    }
+    else if (nbd_connect_unix (nbd, status->argv[1]) == -1) {
+      fprintf (stderr, "%s\n", nbd_get_error ());
+      exit (EXIT_FAILURE);
+    }
+  }
+  else {
+    if (nbd_connect_tcp (nbd, status->argv[1], status->argv[2]) == -1) {
+      fprintf (stderr, "%s\n", nbd_get_error ());
+      exit (EXIT_FAILURE);
+    }
+  }
+
+  for (i = 0; i < sizeof buf; ++i)
+    buf[i] = rand ();
+
+  /* Start the concurrent writer thread, one per handle. */
+  writer_data.i = status->i;
+  writer_data.nbd = nbd;
+  writer_data.q = writer_data.q_end = NULL;
+  pthread_mutex_init (&writer_data.q_lock, NULL);
+
+  err = pthread_create (&writer_thread, NULL,
+                        start_writer_thread, &writer_data);
+  if (err != 0) {
+    errno = err;
+    perror ("pthread_create");
+    exit (EXIT_FAILURE);
+  }
+
+  if (nbd_set_concurrent_writer (nbd, &writer_data, writer) == -1) {
+    fprintf (stderr, "%s\n", nbd_get_error ());
+    exit (EXIT_FAILURE);
+  }
+
+  /* Issue commands. */
+  in_flight = 0;
+  i = NR_CYCLES;
+  while (i > 0 || in_flight > 0) {
+    if (nbd_aio_is_dead (nbd) || nbd_aio_is_closed (nbd)) {
+      fprintf (stderr, "thread %zu: connection is dead or closed\n",
+               status->i);
+      goto error;
+    }
+
+    /* Do we want to send another request and there's room to issue it
+     * and the connection is in the READY state so it can be used to
+     * issue a request.
+     */
+    want_to_send =
+      i > 0 && in_flight < MAX_IN_FLIGHT && nbd_aio_is_ready (nbd);
+
+    fds[0].fd = nbd_aio_get_fd (nbd);
+    fds[0].events = want_to_send ? POLLOUT : 0;
+    fds[0].revents = 0;
+
+    dir = nbd_aio_get_direction (nbd);
+    if ((dir & LIBNBD_AIO_DIRECTION_WRITE) != 0) {
+      /* The concurrent writer is always writable, we don't have to
+       * test the socket in poll.  Since calling nbd_aio_notify_write
+       * can change the state, after doing it we must restart the
+       * loop.
+       */
+      nbd_aio_notify_write (nbd);
+      continue;
+    }
+
+    if ((dir & LIBNBD_AIO_DIRECTION_READ) != 0)
+      fds[0].events |= POLLIN;
+
+    if (poll (fds, 1, -1) == -1) {
+      perror ("poll");
+      goto error;
+    }
+
+    if ((dir & LIBNBD_AIO_DIRECTION_READ) != 0 &&
+        (fds[0].revents & POLLIN) != 0)
+      nbd_aio_notify_read (nbd);
+
+    /* If we can issue another request, do so.  Note that we reuse the
+     * same buffer for multiple in-flight requests.  It doesn't matter
+     * here because we're just trying to write random stuff, but that
+     * would be Very Bad in a real application.
+     */
+    if (want_to_send && (fds[0].revents & POLLOUT) != 0 &&
+        nbd_aio_is_ready (nbd)) {
+      offset = rand () % (exportsize - sizeof buf);
+      cmd = rand () & 1;
+      if (cmd == 0)
+        handle = nbd_aio_pwrite (nbd, buf, sizeof buf, offset, 0);
+      else
+        handle = nbd_aio_pread (nbd, buf, sizeof buf, offset, 0);
+      if (handle == -1) {
+        fprintf (stderr, "%s\n", nbd_get_error ());
+        goto error;
+      }
+      handles[in_flight] = handle;
+      i--;
+      in_flight++;
+      if (in_flight > status->most_in_flight)
+        status->most_in_flight = in_flight;
+    }
+
+    /* If a command is ready to retire, retire it. */
+    for (j = 0; j < in_flight; ++j) {
+      r = nbd_aio_command_completed (nbd, handles[j]);
+      if (r == -1) {
+        fprintf (stderr, "%s\n", nbd_get_error ());
+        goto error;
+      }
+      if (r) {
+        memmove (&handles[j], &handles[j+1],
+                 sizeof (handles[0]) * (in_flight - j - 1));
+        j--;
+        in_flight--;
+        status->requests++;
+      }
+    }
+  }
+
+  if (nbd_shutdown (nbd) == -1) {
+    fprintf (stderr, "%s\n", nbd_get_error ());
+    exit (EXIT_FAILURE);
+  }
+
+  nbd_close (nbd);
+
+  printf ("thread %zu: finished OK\n", status->i);
+
+  status->status = 0;
+  pthread_exit (status);
+
+ error:
+  fprintf (stderr, "thread %zu: failed\n", status->i);
+  status->status = -1;
+  pthread_exit (status);
+}
+
+/* This runs in the reader thread and enqueues the data which will be
+ * picked up by the writer thread.
+ */
+static void
+writer (void *data, const void *buf, size_t len)
+{
+  struct writer_data *writer_data = data;
+  struct queue *item;
+
+  item = malloc (sizeof *item);
+  if (!item) goto error;
+  item->next = NULL;
+  item->buf = malloc (len);
+  if (item->buf == NULL) {
+    free (item);
+    goto error;
+  }
+  memcpy (item->buf, buf, len);
+  item->len = len;
+
+  /* Enqueue the item and signal the writer thread. */
+  pthread_mutex_lock (&writer_data->q_lock);
+  if (writer_data->q_end == NULL)
+    writer_data->q = writer_data->q_end = item;
+  else {
+    writer_data->q_end->next = item;
+    writer_data->q_end = item;
+  }
+  pthread_cond_signal (&writer_data->q_cond);
+  pthread_mutex_unlock (&writer_data->q_lock);
+  return;
+
+ error:
+  nbd_concurrent_writer_error (writer_data->nbd, errno);
+}
+
+static void *
+start_writer_thread (void *arg)
+{
+  struct writer_data *writer_data = arg;
+  struct nbd_handle *nbd = writer_data->nbd;
+  struct queue *item;
+  int fd;
+  struct pollfd fds[1];
+  ssize_t r;
+  void *p;
+
+  fd = nbd_aio_get_fd (nbd);
+
+  for (;;) {
+    /* Pick next job off the queue. */
+    pthread_mutex_lock (&writer_data->q_lock);
+    while (writer_data->q == NULL)
+      pthread_cond_wait (&writer_data->q_cond, &writer_data->q_lock);
+    item = writer_data->q;
+    writer_data->q = item->next;
+    if (writer_data->q == NULL)
+      writer_data->q_end = NULL;
+    pthread_mutex_unlock (&writer_data->q_lock);
+
+    p = item->buf;
+    while (item->len > 0) {
+      /* Wait for the socket to become ready to write. */
+      fds[0].fd = fd;
+      fds[0].events = POLLOUT;
+      fds[0].revents = 0;
+
+      if (poll (fds, 1, -1) == -1) goto error;
+
+      r = send (fd, p, item->len, 0);
+      if (r == -1) goto error;
+
+      p += r;
+      item->len -= r;
+    }
+
+    free (item->buf);
+    free (item);
+  }
+
+ error:
+  nbd_concurrent_writer_error (nbd, errno);
+  return NULL;
+}
diff --git a/examples/threaded-reads-and-writes.c b/examples/threaded-reads-and-writes.c
index 3e3fc32..a92e7b5 100644
--- a/examples/threaded-reads-and-writes.c
+++ b/examples/threaded-reads-and-writes.c
@@ -52,7 +52,7 @@ static int64_t exportsize;
 #define MAX_IN_FLIGHT 16
 
 /* Number of commands we issue (per thread). */
-#define NR_CYCLES 10000
+#define NR_CYCLES 1000000
 
 struct thread_status {
   size_t i;                     /* Thread index, 0 .. NR_MULTI_CONN-1 */
-- 
2.21.0




More information about the Libguestfs mailing list