[Libguestfs] [PATCH libnbd discussion only 5/5] examples: Add concurrent writer example.

Eric Blake eblake at redhat.com
Mon Jun 3 18:56:45 UTC 2019


On 6/3/19 10:29 AM, Richard W.M. Jones wrote:
> ---
>  .gitignore                           |   1 +
>  examples/Makefile.am                 |  12 +
>  examples/concurrent-writer.c         | 450 +++++++++++++++++++++++++++
>  examples/threaded-reads-and-writes.c |   2 +-

Obviously one is copied from the other; should the change to a larger
NR_CYCLES be pushed separately?

> +/* Number of simultaneous connections to the NBD server.  The number
> + * of threads is NR_MULTI_CONN * 2 because there is one thread reading
> + * plus a concurrent writer thread.  Note that some servers only
> + * support a limited number of simultaneous connections, and/or have a
> + * configurable thread pool internally, and if you exceed those limits
> + * then something will break.
> + */
> +#define NR_MULTI_CONN 8
> +

Do things actually break or deadlock, or does it merely reach a point
where performance levels off or starts to degrade from contention, but
where things still continue to work? If a server only has support for X
parallel in-flight commands, it will merely quit read()ing further
requests until the earlier responses are flushed back to the client, but
since our state machine favors response over requests, about all we can
do is run out of memory by queuing up too many requests.

> +static void *
> +start_reader_thread (void *arg)
> +{
> +  struct nbd_handle *nbd;
> +  struct pollfd fds[1];
> +  struct reader_status *status = arg;
> +  struct writer_data writer_data;
> +  pthread_t writer_thread;
> +  int err;
> +  char buf[512];

Fixed-size buffer...


> +  for (i = 0; i < sizeof buf; ++i)
> +    buf[i] = rand ();
> +
> +  /* Start the concurrent writer thread, one per handle. */
> +  writer_data.i = status->i;
> +  writer_data.nbd = nbd;
> +  writer_data.q = writer_data.q_end = NULL;
> +  pthread_mutex_init (&writer_data.q_lock, NULL);
> +
> +  err = pthread_create (&writer_thread, NULL,
> +                        start_writer_thread, &writer_data);

The writer thread may or may not stall when it is only ever sending 512
bytes per command. Should we rework this to have a variable-sized buffer
(maybe even just a binary choice of small or large, where large is
perhaps 256k, as that was the size where I was able to provoke a blocked
send() when developing the fix tested in batched-read-write)?  It may be
that you're not seeing improvements in execution because you haven't
actually saturated the line with enough large writes.

> +  if (err != 0) {
> +    errno = err;
> +    perror ("pthread_create");
> +    exit (EXIT_FAILURE);
> +  }
> +
> +  if (nbd_set_concurrent_writer (nbd, &writer_data, writer) == -1) {
> +    fprintf (stderr, "%s\n", nbd_get_error ());
> +    exit (EXIT_FAILURE);
> +  }
> +
> +  /* Issue commands. */
> +  in_flight = 0;
> +  i = NR_CYCLES;
> +  while (i > 0 || in_flight > 0) {
> +    if (nbd_aio_is_dead (nbd) || nbd_aio_is_closed (nbd)) {
> +      fprintf (stderr, "thread %zu: connection is dead or closed\n",
> +               status->i);
> +      goto error;
> +    }
> +
> +    /* Do we want to send another request and there's room to issue it
> +     * and the connection is in the READY state so it can be used to
> +     * issue a request.
> +     */
> +    want_to_send =
> +      i > 0 && in_flight < MAX_IN_FLIGHT && nbd_aio_is_ready (nbd);

Do we still want to check nbd_aio_is_ready() here, or can we bypass that
and just issue our commands right away (since commit 6af72b87 allows for
a command queue)?

> +
> +    fds[0].fd = nbd_aio_get_fd (nbd);
> +    fds[0].events = want_to_send ? POLLOUT : 0;

Do we really want to be checking for POLLOUT here, or should we leave
that job to the writer thread?

> +    fds[0].revents = 0;
> +
> +    dir = nbd_aio_get_direction (nbd);
> +    if ((dir & LIBNBD_AIO_DIRECTION_WRITE) != 0) {
> +      /* The concurrent writer is always writable, we don't have to
> +       * test the socket in poll.  Since calling nbd_aio_notify_write
> +       * can change the state, after doing it we must restart the
> +       * loop.
> +       */

I'm still not convinced this code is even reachable.

> +      nbd_aio_notify_write (nbd);
> +      continue;
> +    }
> +
> +    if ((dir & LIBNBD_AIO_DIRECTION_READ) != 0)
> +      fds[0].events |= POLLIN;
> +
> +    if (poll (fds, 1, -1) == -1) {
> +      perror ("poll");
> +      goto error;
> +    }
> +
> +    if ((dir & LIBNBD_AIO_DIRECTION_READ) != 0 &&
> +        (fds[0].revents & POLLIN) != 0)
> +      nbd_aio_notify_read (nbd);
> +
> +    /* If we can issue another request, do so.  Note that we reuse the
> +     * same buffer for multiple in-flight requests.  It doesn't matter
> +     * here because we're just trying to write random stuff, but that
> +     * would be Very Bad in a real application.
> +     */
> +    if (want_to_send && (fds[0].revents & POLLOUT) != 0 &&
> +        nbd_aio_is_ready (nbd)) {
> +      offset = rand () % (exportsize - sizeof buf);
> +      cmd = rand () & 1;
> +      if (cmd == 0)
> +        handle = nbd_aio_pwrite (nbd, buf, sizeof buf, offset, 0);
> +      else
> +        handle = nbd_aio_pread (nbd, buf, sizeof buf, offset, 0);

Variable-size packets may make this more interesting or realistic.


> +
> +  nbd_close (nbd);
> +
> +  printf ("thread %zu: finished OK\n", status->i);
> +
> +  status->status = 0;
> +  pthread_exit (status);
> +
> + error:
> +  fprintf (stderr, "thread %zu: failed\n", status->i);
> +  status->status = -1;
> +  pthread_exit (status);

Should we join() the writer thread?

> +}
> +
> +/* This runs in the reader thread and enqueues the data which will be
> + * picked up by the writer thread.
> + */
> +static void
> +writer (void *data, const void *buf, size_t len)
> +{
> +  struct writer_data *writer_data = data;
> +  struct queue *item;
> +
> +  item = malloc (sizeof *item);
> +  if (!item) goto error;
> +  item->next = NULL;
> +  item->buf = malloc (len);
> +  if (item->buf == NULL) {
> +    free (item);
> +    goto error;
> +  }
> +  memcpy (item->buf, buf, len);
> +  item->len = len;
> +
> +  /* Enqueue the item and signal the writer thread. */
> +  pthread_mutex_lock (&writer_data->q_lock);
> +  if (writer_data->q_end == NULL)
> +    writer_data->q = writer_data->q_end = item;
> +  else {
> +    writer_data->q_end->next = item;
> +    writer_data->q_end = item;
> +  }
> +  pthread_cond_signal (&writer_data->q_cond);
> +  pthread_mutex_unlock (&writer_data->q_lock);

Good - you DO need correct locking to feed data to the writer thread in
a safe manner (perhaps there is a way to implement this queuing with
atomics rather than a full mutex, but the point remains that proper
multi-threaded access is a must).

> +  return;
> +
> + error:
> +  nbd_concurrent_writer_error (writer_data->nbd, errno);
> +}
> +
> +static void *
> +start_writer_thread (void *arg)
> +{
> +  struct writer_data *writer_data = arg;
> +  struct nbd_handle *nbd = writer_data->nbd;
> +  struct queue *item;
> +  int fd;
> +  struct pollfd fds[1];
> +  ssize_t r;
> +  void *p;
> +
> +  fd = nbd_aio_get_fd (nbd);
> +

No check for errors?

> +  for (;;) {
> +    /* Pick next job off the queue. */
> +    pthread_mutex_lock (&writer_data->q_lock);
> +    while (writer_data->q == NULL)
> +      pthread_cond_wait (&writer_data->q_cond, &writer_data->q_lock);
> +    item = writer_data->q;
> +    writer_data->q = item->next;
> +    if (writer_data->q == NULL)
> +      writer_data->q_end = NULL;
> +    pthread_mutex_unlock (&writer_data->q_lock);
> +
> +    p = item->buf;
> +    while (item->len > 0) {
> +      /* Wait for the socket to become ready to write. */
> +      fds[0].fd = fd;
> +      fds[0].events = POLLOUT;
> +      fds[0].revents = 0;
> +
> +      if (poll (fds, 1, -1) == -1) goto error;
> +
> +      r = send (fd, p, item->len, 0);
> +      if (r == -1) goto error;
> +
> +      p += r;
> +      item->len -= r;
> +    }
> +
> +    free (item->buf);
> +    free (item);
> +  }

No cleanup except on poll()/send() error? Nothing join()s the thread?

> +
> + error:
> +  nbd_concurrent_writer_error (nbd, errno);
> +  return NULL;
> +}
> diff --git a/examples/threaded-reads-and-writes.c b/examples/threaded-reads-and-writes.c
> index 3e3fc32..a92e7b5 100644
> --- a/examples/threaded-reads-and-writes.c
> +++ b/examples/threaded-reads-and-writes.c
> @@ -52,7 +52,7 @@ static int64_t exportsize;
>  #define MAX_IN_FLIGHT 16
>  
>  /* Number of commands we issue (per thread). */
> -#define NR_CYCLES 10000
> +#define NR_CYCLES 1000000
>  
>  struct thread_status {
>    size_t i;                     /* Thread index, 0 .. NR_MULTI_CONN-1 */
> 

-- 
Eric Blake, Principal Software Engineer
Red Hat, Inc.           +1-919-301-3226
Virtualization:  qemu.org | libvirt.org

-------------- next part --------------
A non-text attachment was scrubbed...
Name: signature.asc
Type: application/pgp-signature
Size: 488 bytes
Desc: OpenPGP digital signature
URL: <http://listman.redhat.com/archives/libguestfs/attachments/20190603/922788eb/attachment.sig>


More information about the Libguestfs mailing list