[Libguestfs] [nbdkit PATCH 3/6] connections: Add read/write lock over client I/O
Richard W.M. Jones
rjones at redhat.com
Fri Nov 17 08:59:34 UTC 2017
On Thu, Nov 16, 2017 at 09:26:54PM -0600, Eric Blake wrote:
> In preparation for parallel processing, we need to be sure that
> two threads belonging to the same connection cannot interleave
> their I/O except at message boundaries. Add a mutex around
> all reads and writes that must occur as a group (for now, there
> is no contention for either mutex).
>
> Signed-off-by: Eric Blake <eblake at redhat.com>
> ---
> src/connections.c | 27 +++++++++++++++++++++++++--
> 1 file changed, 25 insertions(+), 2 deletions(-)
>
> diff --git a/src/connections.c b/src/connections.c
> index dada9aa..dd43a9a 100644
> --- a/src/connections.c
> +++ b/src/connections.c
> @@ -62,6 +62,8 @@
> /* Connection structure. */
> struct connection {
> pthread_mutex_t request_lock;
> + pthread_mutex_t read_lock;
> + pthread_mutex_t write_lock;
> void *handle;
> void *crypto_session;
>
> @@ -206,6 +208,8 @@ new_connection (int sockin, int sockout)
> conn->sockin = sockin;
> conn->sockout = sockout;
> pthread_mutex_init (&conn->request_lock, NULL);
> + pthread_mutex_init (&conn->read_lock, NULL);
> + pthread_mutex_init (&conn->write_lock, NULL);
>
> conn->recv = raw_recv;
> conn->send = raw_send;
> @@ -223,6 +227,8 @@ free_connection (struct connection *conn)
> conn->close (conn);
>
> pthread_mutex_destroy (&conn->request_lock);
> + pthread_mutex_destroy (&conn->read_lock);
> + pthread_mutex_destroy (&conn->write_lock);
>
> /* Don't call the plugin again if quit has been set because the main
> * thread will be in the process of unloading it. The plugin.unload
> @@ -888,19 +894,23 @@ recv_request_send_reply (struct connection *conn)
> CLEANUP_FREE char *buf = NULL;
>
> /* Read the request packet. */
> + pthread_mutex_lock (&conn->read_lock);
> r = conn->recv (conn, &request, sizeof request);
> if (r == -1) {
> nbdkit_error ("read request: %m");
> + pthread_mutex_unlock (&conn->read_lock);
> return -1;
> }
> if (r == 0) {
> debug ("client closed input socket, closing connection");
> + pthread_mutex_unlock (&conn->read_lock);
> return 0; /* disconnect */
> }
>
> magic = be32toh (request.magic);
> if (magic != NBD_REQUEST_MAGIC) {
> nbdkit_error ("invalid request: 'magic' field is incorrect (0x%x)", magic);
> + pthread_mutex_unlock (&conn->read_lock);
> return -1;
> }
>
> @@ -913,14 +923,18 @@ recv_request_send_reply (struct connection *conn)
>
> if (cmd == NBD_CMD_DISC) {
> debug ("client sent disconnect command, closing connection");
> + pthread_mutex_unlock (&conn->read_lock);
> return 0; /* disconnect */
> }
>
> /* Validate the request. */
> if (!validate_request (conn, cmd, flags, offset, count, &error)) {
> if (cmd == NBD_CMD_WRITE &&
> - skip_over_write_buffer (conn->sockin, count) < 0)
> + skip_over_write_buffer (conn->sockin, count) < 0) {
> + pthread_mutex_unlock (&conn->read_lock);
> return -1;
> + }
> + pthread_mutex_unlock (&conn->read_lock);
> goto send_reply;
> }
>
> @@ -931,8 +945,11 @@ recv_request_send_reply (struct connection *conn)
> perror ("malloc");
> error = ENOMEM;
> if (cmd == NBD_CMD_WRITE &&
> - skip_over_write_buffer (conn->sockin, count) < 0)
> + skip_over_write_buffer (conn->sockin, count) < 0) {
> + pthread_mutex_unlock (&conn->read_lock);
> return -1;
> + }
> + pthread_mutex_unlock (&conn->read_lock);
> goto send_reply;
> }
> }
> @@ -946,9 +963,11 @@ recv_request_send_reply (struct connection *conn)
> }
> if (r == -1) {
> nbdkit_error ("read data: %m");
> + pthread_mutex_unlock (&conn->read_lock);
> return -1;
> }
> }
> + pthread_mutex_unlock (&conn->read_lock);
>
> /* Perform the request. Only this part happens inside the request lock. */
> if (quit) {
> @@ -962,6 +981,7 @@ recv_request_send_reply (struct connection *conn)
>
> /* Send the reply packet. */
> send_reply:
> + pthread_mutex_lock (&conn->write_lock);
> reply.magic = htobe32 (NBD_REPLY_MAGIC);
> reply.handle = request.handle;
> reply.error = htobe32 (nbd_errno (error));
> @@ -978,6 +998,7 @@ recv_request_send_reply (struct connection *conn)
> r = conn->send (conn, &reply, sizeof reply);
> if (r == -1) {
> nbdkit_error ("write reply: %m");
> + pthread_mutex_unlock (&conn->write_lock);
> return -1;
> }
>
> @@ -986,9 +1007,11 @@ recv_request_send_reply (struct connection *conn)
> r = conn->send (conn, buf, count);
> if (r == -1) {
> nbdkit_error ("write data: %m");
> + pthread_mutex_unlock (&conn->write_lock);
> return -1;
> }
> }
> + pthread_mutex_unlock (&conn->write_lock);
>
> return 1; /* command processed ok */
> }
There's nothing wrong with this patch, but it might be easier to use
an attribute((cleanup)) handler to deal with the unlocking. See these
links for how we do it in libguestfs:
https://github.com/libguestfs/libguestfs/blob/50ca24b634f59f1a14fd230aa4893d7408347d76/common/utils/cleanups.h#L27
https://github.com/libguestfs/libguestfs/blob/50ca24b634f59f1a14fd230aa4893d7408347d76/lib/guestfs-internal.h#L81-L87
https://github.com/libguestfs/libguestfs/blob/50ca24b634f59f1a14fd230aa4893d7408347d76/lib/errors.c#L179
Rich.
--
Richard Jones, Virtualization Group, Red Hat http://people.redhat.com/~rjones
Read my programming and virtualization blog: http://rwmj.wordpress.com
virt-builder quickly builds VMs from scratch
http://libguestfs.org/virt-builder.1.html
More information about the Libguestfs
mailing list