[Libguestfs] [PATCH libnbd v2 4/4] examples: Add concurrent writer example.
Eric Blake
eblake at redhat.com
Tue Jun 4 13:46:20 UTC 2019
On 6/4/19 4:59 AM, Richard W.M. Jones wrote:
> ---
> .gitignore | 1 +
> examples/Makefile.am | 12 +
> examples/concurrent-writer.c | 450 +++++++++++++++++++++++++++++++++++
> 3 files changed, 463 insertions(+)
>
> @@ -0,0 +1,450 @@
> +/* Example usage with nbdkit:
> + *
> + * nbdkit -U - memory 100M --run './concurrent-writer $unixsocket'
> + *
> + * This will read and write randomly over the first megabyte of the
Stale comment.
> + * plugin using multi-conn, multiple threads, multiple requests in
> + * flight on each connection, and concurrent writer threads.
> + *
> + * To run it against a remote server over TCP:
> + *
> + * ./concurrent-writer hostname port
> + * or
> + * ./concurrent-writer nbd://hostname:port
> + */
> +
> +
> +/* Number of simultaneous connections to the NBD server. The number
> + * of threads is NR_MULTI_CONN * 2 because there is one thread reading
> + * plus a concurrent writer thread. Note that some servers only
> + * support a limited number of simultaneous connections, and/or have a
> + * configurable thread pool internally, and if you exceed those limits
> + * then something will break.
Possibly stale comment. More likely, you'll reach a point of diminishing
returns.
> +
> + /* Make sure the number of requests that were required matches what
> + * we expect.
> + */
> + assert (requests == NR_MULTI_CONN * NR_CYCLES);
> +
> + printf ("most requests seen in flight = %u (per thread) "
> + "vs MAX_IN_FLIGHT = %d\n",
> + most_in_flight, MAX_IN_FLIGHT);
Now that we queue commands without regards to the server receiving them,
this should always equal MAX_IN_FLIGHT. But it doesn't hurt to print it
to still check.
> +
> + exit (errors == 0 ? EXIT_SUCCESS : EXIT_FAILURE);
> +}
> +
> +struct queue {
> + struct queue *next;
> + void *buf;
> + size_t len;
> +};
> +
> +/* Concurrent writer thread (one per libnbd handle). */
> +struct writer_data {
> + size_t i; /* Thread index, 0 .. NR_MULTI_CONN-1 */
> + struct nbd_handle *nbd; /* NBD handle. */
> + struct queue *q, *q_end; /* Queue of items to write. */
> + pthread_mutex_t q_lock; /* Lock on queue. */
> + pthread_cond_t q_cond; /* Condition on queue. */
I'm half-wondering if we could use sem_t instead of pthread_cond_t for
the same effect, and if it would have any noticeable timing differences.
But that should be a separate experiment on top of this one.
> +};
> +
> +static void *start_writer_thread (void *arg);
> +static int writer (void *data, const void *buf, size_t len);
> +
> +static void *
> +start_reader_thread (void *arg)
> +{
> +
> + dir = nbd_aio_get_direction (nbd);
> + if ((dir & LIBNBD_AIO_DIRECTION_WRITE) != 0) {
> + /* The concurrent writer is always writable, we don't have to
> + * test the socket in poll. Since calling nbd_aio_notify_write
> + * can change the state, after doing it we must restart the
> + * loop.
> + */
> + nbd_aio_notify_write (nbd);
> + continue;
> + }
I'm still not convinced whether we can ever see DIRECTION_WRITE, but
agree that leaving this in for safety doesn't hurt.
> +
> + if ((dir & LIBNBD_AIO_DIRECTION_READ) != 0)
> + fds[0].events |= POLLIN;
> +
Should this ALWAYS look for POLLIN, rather than just checking
DIRECTION_READ? I'm worried that we might deadlock if the poll() is
called with fds[0].events == 0 because we managed to sniff
nbd_aio_get_direction() at a point in time where the state machine was
transiently not in a state that blocks on read. For this example, the
thread posting nbd_aio_pread is the same as the thread calling poll(),
so I guess that shouldn't happen (it's more of a concern for my
nbdkit-nbd usage of libnbd).
> + if (poll (fds, 1, -1) == -1) {
> + perror ("poll");
> + goto error;
> + }
> +
> + if ((dir & LIBNBD_AIO_DIRECTION_READ) != 0 &&
> + (fds[0].revents & POLLIN) != 0)
> + nbd_aio_notify_read (nbd);
> +
> + /* If a command is ready to retire, retire it. */
> + for (j = 0; j < in_flight; ++j) {
> + r = nbd_aio_command_completed (nbd, handles[j]);
> + if (r == -1) {
> + fprintf (stderr, "%s\n", nbd_get_error ());
> + goto error;
> + }
> + if (r) {
> + memmove (&handles[j], &handles[j+1],
> + sizeof (handles[0]) * (in_flight - j - 1));
> + j--;
> + in_flight--;
> + status->requests++;
> + }
> + }
> + }
> +
> + if (nbd_shutdown (nbd) == -1) {
> + fprintf (stderr, "%s\n", nbd_get_error ());
> + exit (EXIT_FAILURE);
> + }
> +
> + nbd_close (nbd);
> +
> + printf ("thread %zu: finished OK\n", status->i);
> +
Still no cleanup of the writer thread.
> + free (buf);
> + status->status = 0;
> + pthread_exit (status);
> +
> + error:
> + free (buf);
> + fprintf (stderr, "thread %zu: failed\n", status->i);
> + status->status = -1;
> + pthread_exit (status);
> +}
> +
> +/* This runs in the reader thread and enqueues the data which will be
> + * picked up by the writer thread.
> + */
> +static int
> +writer (void *data, const void *buf, size_t len)
> +{
May change if we introduce a flags parameter (per your other thread on
potential races/deadlocks that you are noticing).
> + struct writer_data *writer_data = data;
> + struct queue *item;
> +
> + item = malloc (sizeof *item);
> + if (!item) return -1;
> + item->next = NULL;
> + item->buf = malloc (len);
> + if (item->buf == NULL) {
> + free (item);
> + return -1;
> + }
> + memcpy (item->buf, buf, len);
> + item->len = len;
> +
> + /* Enqueue the item and signal the writer thread. */
> + pthread_mutex_lock (&writer_data->q_lock);
> + if (writer_data->q_end == NULL)
> + writer_data->q = writer_data->q_end = item;
> + else {
> + writer_data->q_end->next = item;
> + writer_data->q_end = item;
> + }
> + pthread_cond_signal (&writer_data->q_cond);
> + pthread_mutex_unlock (&writer_data->q_lock);
> +
> + return 0;
> +}
> +
> +static void *
> +start_writer_thread (void *arg)
> +{
> + struct writer_data *writer_data = arg;
> + struct nbd_handle *nbd = writer_data->nbd;
> + struct queue *item;
> + int fd;
> + struct pollfd fds[1];
> + ssize_t r;
> + void *p;
> +
> + fd = nbd_aio_get_fd (nbd);
> + if (fd == -1) {
> + fprintf (stderr, "%s\n", nbd_get_error ());
> + exit (EXIT_FAILURE);
> + }
You already mentioned the potential deadlock here if the writer thread
is started before nbd_connect_*.
> +
> + for (;;) {
> + /* Pick next job off the queue. */
> + pthread_mutex_lock (&writer_data->q_lock);
> + while (writer_data->q == NULL)
> + pthread_cond_wait (&writer_data->q_cond, &writer_data->q_lock);
> + item = writer_data->q;
> + writer_data->q = item->next;
> + if (writer_data->q == NULL)
> + writer_data->q_end = NULL;
> + pthread_mutex_unlock (&writer_data->q_lock);
> +
> + p = item->buf;
> + while (item->len > 0) {
> + /* Wait for the socket to become ready to write. */
> + fds[0].fd = fd;
> + fds[0].events = POLLOUT;
> + fds[0].revents = 0;
> +
> + if (poll (fds, 1, -1) == -1) goto error;
> +
> + r = send (fd, p, item->len, 0);
> + if (r == -1) goto error;
> +
> + p += r;
> + item->len -= r;
> + }
> +
> + free (item->buf);
> + free (item);
> + }
> +
> + error:
> + nbd_concurrent_writer_error (nbd, errno);
Potential use-after-free if the reader thread does not join this one
before calling nbd_close() prior to this thread detecting that the fd is
no longer poll-able.
--
Eric Blake, Principal Software Engineer
Red Hat, Inc. +1-919-301-3226
Virtualization: qemu.org | libvirt.org
-------------- next part --------------
A non-text attachment was scrubbed...
Name: signature.asc
Type: application/pgp-signature
Size: 488 bytes
Desc: OpenPGP digital signature
URL: <http://listman.redhat.com/archives/libguestfs/attachments/20190604/734560c1/attachment.sig>
More information about the Libguestfs
mailing list