[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]

[Libguestfs] [nbdkit PATCH 3/6] connections: Add read/write lock over client I/O



In preparation for parallel processing, we need to be sure that
two threads belonging to the same connection cannot interleave
their I/O except at message boundaries.  Add a mutex around
all reads and writes that must occur as a group (for now, there
is no contention for either mutex).

Signed-off-by: Eric Blake <eblake redhat com>
---
 src/connections.c | 27 +++++++++++++++++++++++++--
 1 file changed, 25 insertions(+), 2 deletions(-)

diff --git a/src/connections.c b/src/connections.c
index dada9aa..dd43a9a 100644
--- a/src/connections.c
+++ b/src/connections.c
@@ -62,6 +62,8 @@
 /* Connection structure. */
 struct connection {
   pthread_mutex_t request_lock;
+  pthread_mutex_t read_lock;
+  pthread_mutex_t write_lock;
   void *handle;
   void *crypto_session;

@@ -206,6 +208,8 @@ new_connection (int sockin, int sockout)
   conn->sockin = sockin;
   conn->sockout = sockout;
   pthread_mutex_init (&conn->request_lock, NULL);
+  pthread_mutex_init (&conn->read_lock, NULL);
+  pthread_mutex_init (&conn->write_lock, NULL);

   conn->recv = raw_recv;
   conn->send = raw_send;
@@ -223,6 +227,8 @@ free_connection (struct connection *conn)
   conn->close (conn);

   pthread_mutex_destroy (&conn->request_lock);
+  pthread_mutex_destroy (&conn->read_lock);
+  pthread_mutex_destroy (&conn->write_lock);

   /* Don't call the plugin again if quit has been set because the main
    * thread will be in the process of unloading it.  The plugin.unload
@@ -888,19 +894,23 @@ recv_request_send_reply (struct connection *conn)
   CLEANUP_FREE char *buf = NULL;

   /* Read the request packet. */
+  pthread_mutex_lock (&conn->read_lock);
   r = conn->recv (conn, &request, sizeof request);
   if (r == -1) {
     nbdkit_error ("read request: %m");
+    pthread_mutex_unlock (&conn->read_lock);
     return -1;
   }
   if (r == 0) {
     debug ("client closed input socket, closing connection");
+    pthread_mutex_unlock (&conn->read_lock);
     return 0;                   /* disconnect */
   }

   magic = be32toh (request.magic);
   if (magic != NBD_REQUEST_MAGIC) {
     nbdkit_error ("invalid request: 'magic' field is incorrect (0x%x)", magic);
+    pthread_mutex_unlock (&conn->read_lock);
     return -1;
   }

@@ -913,14 +923,18 @@ recv_request_send_reply (struct connection *conn)

   if (cmd == NBD_CMD_DISC) {
     debug ("client sent disconnect command, closing connection");
+    pthread_mutex_unlock (&conn->read_lock);
     return 0;                   /* disconnect */
   }

   /* Validate the request. */
   if (!validate_request (conn, cmd, flags, offset, count, &error)) {
     if (cmd == NBD_CMD_WRITE &&
-        skip_over_write_buffer (conn->sockin, count) < 0)
+        skip_over_write_buffer (conn->sockin, count) < 0) {
+      pthread_mutex_unlock (&conn->read_lock);
       return -1;
+    }
+    pthread_mutex_unlock (&conn->read_lock);
     goto send_reply;
   }

@@ -931,8 +945,11 @@ recv_request_send_reply (struct connection *conn)
       perror ("malloc");
       error = ENOMEM;
       if (cmd == NBD_CMD_WRITE &&
-          skip_over_write_buffer (conn->sockin, count) < 0)
+          skip_over_write_buffer (conn->sockin, count) < 0) {
+        pthread_mutex_unlock (&conn->read_lock);
         return -1;
+      }
+      pthread_mutex_unlock (&conn->read_lock);
       goto send_reply;
     }
   }
@@ -946,9 +963,11 @@ recv_request_send_reply (struct connection *conn)
     }
     if (r == -1) {
       nbdkit_error ("read data: %m");
+      pthread_mutex_unlock (&conn->read_lock);
       return -1;
     }
   }
+  pthread_mutex_unlock (&conn->read_lock);

   /* Perform the request.  Only this part happens inside the request lock. */
   if (quit) {
@@ -962,6 +981,7 @@ recv_request_send_reply (struct connection *conn)

   /* Send the reply packet. */
  send_reply:
+  pthread_mutex_lock (&conn->write_lock);
   reply.magic = htobe32 (NBD_REPLY_MAGIC);
   reply.handle = request.handle;
   reply.error = htobe32 (nbd_errno (error));
@@ -978,6 +998,7 @@ recv_request_send_reply (struct connection *conn)
   r = conn->send (conn, &reply, sizeof reply);
   if (r == -1) {
     nbdkit_error ("write reply: %m");
+    pthread_mutex_unlock (&conn->write_lock);
     return -1;
   }

@@ -986,9 +1007,11 @@ recv_request_send_reply (struct connection *conn)
     r = conn->send (conn, buf, count);
     if (r == -1) {
       nbdkit_error ("write data: %m");
+      pthread_mutex_unlock (&conn->write_lock);
       return -1;
     }
   }
+  pthread_mutex_unlock (&conn->write_lock);

   return 1;                     /* command processed ok */
 }
-- 
2.13.6


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]