[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]

Re: [Libguestfs] [nbdkit PATCH 3/6] connections: Add read/write lock over client I/O



On Thu, Nov 16, 2017 at 09:26:54PM -0600, Eric Blake wrote:
> In preparation for parallel processing, we need to be sure that
> two threads belonging to the same connection cannot interleave
> their I/O except at message boundaries.  Add a mutex around
> all reads and writes that must occur as a group (for now, there
> is no contention for either mutex).
> 
> Signed-off-by: Eric Blake <eblake redhat com>
> ---
>  src/connections.c | 27 +++++++++++++++++++++++++--
>  1 file changed, 25 insertions(+), 2 deletions(-)
> 
> diff --git a/src/connections.c b/src/connections.c
> index dada9aa..dd43a9a 100644
> --- a/src/connections.c
> +++ b/src/connections.c
> @@ -62,6 +62,8 @@
>  /* Connection structure. */
>  struct connection {
>    pthread_mutex_t request_lock;
> +  pthread_mutex_t read_lock;
> +  pthread_mutex_t write_lock;
>    void *handle;
>    void *crypto_session;
> 
> @@ -206,6 +208,8 @@ new_connection (int sockin, int sockout)
>    conn->sockin = sockin;
>    conn->sockout = sockout;
>    pthread_mutex_init (&conn->request_lock, NULL);
> +  pthread_mutex_init (&conn->read_lock, NULL);
> +  pthread_mutex_init (&conn->write_lock, NULL);
> 
>    conn->recv = raw_recv;
>    conn->send = raw_send;
> @@ -223,6 +227,8 @@ free_connection (struct connection *conn)
>    conn->close (conn);
> 
>    pthread_mutex_destroy (&conn->request_lock);
> +  pthread_mutex_destroy (&conn->read_lock);
> +  pthread_mutex_destroy (&conn->write_lock);
> 
>    /* Don't call the plugin again if quit has been set because the main
>     * thread will be in the process of unloading it.  The plugin.unload
> @@ -888,19 +894,23 @@ recv_request_send_reply (struct connection *conn)
>    CLEANUP_FREE char *buf = NULL;
> 
>    /* Read the request packet. */
> +  pthread_mutex_lock (&conn->read_lock);
>    r = conn->recv (conn, &request, sizeof request);
>    if (r == -1) {
>      nbdkit_error ("read request: %m");
> +    pthread_mutex_unlock (&conn->read_lock);
>      return -1;
>    }
>    if (r == 0) {
>      debug ("client closed input socket, closing connection");
> +    pthread_mutex_unlock (&conn->read_lock);
>      return 0;                   /* disconnect */
>    }
> 
>    magic = be32toh (request.magic);
>    if (magic != NBD_REQUEST_MAGIC) {
>      nbdkit_error ("invalid request: 'magic' field is incorrect (0x%x)", magic);
> +    pthread_mutex_unlock (&conn->read_lock);
>      return -1;
>    }
> 
> @@ -913,14 +923,18 @@ recv_request_send_reply (struct connection *conn)
> 
>    if (cmd == NBD_CMD_DISC) {
>      debug ("client sent disconnect command, closing connection");
> +    pthread_mutex_unlock (&conn->read_lock);
>      return 0;                   /* disconnect */
>    }
> 
>    /* Validate the request. */
>    if (!validate_request (conn, cmd, flags, offset, count, &error)) {
>      if (cmd == NBD_CMD_WRITE &&
> -        skip_over_write_buffer (conn->sockin, count) < 0)
> +        skip_over_write_buffer (conn->sockin, count) < 0) {
> +      pthread_mutex_unlock (&conn->read_lock);
>        return -1;
> +    }
> +    pthread_mutex_unlock (&conn->read_lock);
>      goto send_reply;
>    }
> 
> @@ -931,8 +945,11 @@ recv_request_send_reply (struct connection *conn)
>        perror ("malloc");
>        error = ENOMEM;
>        if (cmd == NBD_CMD_WRITE &&
> -          skip_over_write_buffer (conn->sockin, count) < 0)
> +          skip_over_write_buffer (conn->sockin, count) < 0) {
> +        pthread_mutex_unlock (&conn->read_lock);
>          return -1;
> +      }
> +      pthread_mutex_unlock (&conn->read_lock);
>        goto send_reply;
>      }
>    }
> @@ -946,9 +963,11 @@ recv_request_send_reply (struct connection *conn)
>      }
>      if (r == -1) {
>        nbdkit_error ("read data: %m");
> +      pthread_mutex_unlock (&conn->read_lock);
>        return -1;
>      }
>    }
> +  pthread_mutex_unlock (&conn->read_lock);
> 
>    /* Perform the request.  Only this part happens inside the request lock. */
>    if (quit) {
> @@ -962,6 +981,7 @@ recv_request_send_reply (struct connection *conn)
> 
>    /* Send the reply packet. */
>   send_reply:
> +  pthread_mutex_lock (&conn->write_lock);
>    reply.magic = htobe32 (NBD_REPLY_MAGIC);
>    reply.handle = request.handle;
>    reply.error = htobe32 (nbd_errno (error));
> @@ -978,6 +998,7 @@ recv_request_send_reply (struct connection *conn)
>    r = conn->send (conn, &reply, sizeof reply);
>    if (r == -1) {
>      nbdkit_error ("write reply: %m");
> +    pthread_mutex_unlock (&conn->write_lock);
>      return -1;
>    }
> 
> @@ -986,9 +1007,11 @@ recv_request_send_reply (struct connection *conn)
>      r = conn->send (conn, buf, count);
>      if (r == -1) {
>        nbdkit_error ("write data: %m");
> +      pthread_mutex_unlock (&conn->write_lock);
>        return -1;
>      }
>    }
> +  pthread_mutex_unlock (&conn->write_lock);
> 
>    return 1;                     /* command processed ok */
>  }

There's nothing wrong with this patch, but it might be easier to use
an attribute((cleanup)) handler to deal with the unlocking.  See these
links for how we do it in libguestfs:

https://github.com/libguestfs/libguestfs/blob/50ca24b634f59f1a14fd230aa4893d7408347d76/common/utils/cleanups.h#L27
https://github.com/libguestfs/libguestfs/blob/50ca24b634f59f1a14fd230aa4893d7408347d76/lib/guestfs-internal.h#L81-L87
https://github.com/libguestfs/libguestfs/blob/50ca24b634f59f1a14fd230aa4893d7408347d76/lib/errors.c#L179

Rich.

-- 
Richard Jones, Virtualization Group, Red Hat http://people.redhat.com/~rjones
Read my programming and virtualization blog: http://rwmj.wordpress.com
virt-builder quickly builds VMs from scratch
http://libguestfs.org/virt-builder.1.html


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]