[Libguestfs] [PATCH nbdkit v2 8/9] curl: Use curl multi interface

Richard W.M. Jones rjones at redhat.com
Fri Jul 28 17:17:52 UTC 2023


See the comment at the top of plugins/curl/pool.c for general
information about how this works.

This makes a very large difference to performance over the previous
implementation.  Note for the tests below I also applied the next
commit changing the behaviour of the connections parameter.

Using this test case:

  $ uri=https://cloud-images.ubuntu.com/lunar/current/lunar-server-cloudimg-amd64.img
  $ nbdkit -r -U - curl $uri ipresolve=v4 --run 'nbdcopy -p $uri null'

The times are as follows:

  multi, connections=64           21.5s
  multi, connections=32           30.2s
  multi, connections=16           56.0s
  before this commit             166s
---
 plugins/curl/curldefs.h |  35 ++--
 plugins/curl/config.c   | 246 ---------------------------
 plugins/curl/curl.c     | 366 +++++++++++++++++++++++++++++++++++-----
 plugins/curl/pool.c     | 346 ++++++++++++++++++++++++++++---------
 4 files changed, 616 insertions(+), 377 deletions(-)

diff --git a/plugins/curl/curldefs.h b/plugins/curl/curldefs.h
index 3c54b7c3c..02b28a133 100644
--- a/plugins/curl/curldefs.h
+++ b/plugins/curl/curldefs.h
@@ -76,16 +76,6 @@ struct curl_handle {
   /* The underlying curl handle. */
   CURL *c;
 
-  /* Index of this handle in the pool (for debugging). */
-  size_t i;
-
-  /* True if the handle is in use by a thread. */
-  bool in_use;
-
-  /* These fields are used/initialized when we create the handle. */
-  bool accept_range;
-  int64_t exportsize;
-
   char errbuf[CURL_ERROR_SIZE];
 
   /* Before doing a read or write operation, set these to point to the
@@ -98,8 +88,30 @@ struct curl_handle {
   const char *read_buf;
   uint32_t read_count;
 
+  /* This field is used by curl_get_size. */
+  bool accept_range;
+
   /* Used by scripts.c */
   struct curl_slist *headers_copy;
+
+  /* Used by pool.c */
+  struct command *cmd;
+};
+
+/* Asynchronous commands that can be sent to the pool thread. */
+enum command_type { EASY_HANDLE, STOP };
+struct command {
+  /* These fields are set by the caller. */
+  enum command_type type;       /* command */
+  struct curl_handle *ch;       /* for EASY_HANDLE, the easy handle */
+
+  /* This field is set to a unique value by send_command_and_wait. */
+  uint64_t id;                  /* serial number */
+
+  /* These fields are used to signal back that the command finished. */
+  pthread_mutex_t mutex;        /* completion mutex */
+  pthread_cond_t cond;          /* completion condition */
+  CURLcode status;              /* status code (CURLE_OK = succeeded) */
 };
 
 /* config.c */
@@ -114,8 +126,7 @@ extern void free_handle (struct curl_handle *);
 extern int pool_get_ready (void);
 extern int pool_after_fork (void);
 extern void pool_unload (void);
-extern struct curl_handle *get_handle (void);
-extern void put_handle (struct curl_handle *ch);
+extern CURLcode send_command_and_wait (struct command *cmd);
 
 /* scripts.c */
 extern int do_scripts (struct curl_handle *ch);
diff --git a/plugins/curl/config.c b/plugins/curl/config.c
index b6e02b85a..46bec2bbb 100644
--- a/plugins/curl/config.c
+++ b/plugins/curl/config.c
@@ -48,8 +48,6 @@
 
 #include <nbdkit-plugin.h>
 
-#include "ascii-ctype.h"
-#include "ascii-string.h"
 #include "cleanup.h"
 
 #include "curldefs.h"
@@ -89,12 +87,6 @@ static const char *user_agent = NULL;
 
 static int debug_cb (CURL *handle, curl_infotype type,
                      const char *data, size_t size, void *);
-static size_t write_cb (char *ptr, size_t size, size_t nmemb, void *opaque);
-static size_t read_cb (void *ptr, size_t size, size_t nmemb, void *opaque);
-static int get_content_length_accept_range (struct curl_handle *ch);
-static bool try_fallback_GET_method (struct curl_handle *ch);
-static size_t header_cb (void *ptr, size_t size, size_t nmemb, void *opaque);
-static size_t error_cb (char *ptr, size_t size, size_t nmemb, void *opaque);
 
 /* Use '-D curl.verbose=1' to set. */
 NBDKIT_DLL_PUBLIC int curl_debug_verbose = 0;
@@ -668,17 +660,9 @@ allocate_handle (void)
   if (user_agent)
     curl_easy_setopt (ch->c, CURLOPT_USERAGENT, user_agent);
 
-  if (get_content_length_accept_range (ch) == -1)
-    goto err;
-
   /* Get set up for reading and writing. */
   curl_easy_setopt (ch->c, CURLOPT_HEADERFUNCTION, NULL);
   curl_easy_setopt (ch->c, CURLOPT_HEADERDATA, NULL);
-  curl_easy_setopt (ch->c, CURLOPT_WRITEFUNCTION, write_cb);
-  curl_easy_setopt (ch->c, CURLOPT_WRITEDATA, ch);
-  /* These are only used if !readonly but we always register them. */
-  curl_easy_setopt (ch->c, CURLOPT_READFUNCTION, read_cb);
-  curl_easy_setopt (ch->c, CURLOPT_READDATA, ch);
 
   return ch;
 
@@ -742,233 +726,3 @@ debug_cb (CURL *handle, curl_infotype type,
  out:
   return 0;
 }
-
-/* NB: The terminology used by libcurl is confusing!
- *
- * WRITEFUNCTION / write_cb is used when reading from the remote server
- * READFUNCTION / read_cb is used when writing to the remote server.
- *
- * We use the same terminology as libcurl here.
- */
-
-static size_t
-write_cb (char *ptr, size_t size, size_t nmemb, void *opaque)
-{
-  struct curl_handle *ch = opaque;
-  size_t orig_realsize = size * nmemb;
-  size_t realsize = orig_realsize;
-
-  assert (ch->write_buf);
-
-  /* Don't read more than the requested amount of data, even if the
-   * server or libcurl sends more.
-   */
-  if (realsize > ch->write_count)
-    realsize = ch->write_count;
-
-  memcpy (ch->write_buf, ptr, realsize);
-
-  ch->write_count -= realsize;
-  ch->write_buf += realsize;
-
-  return orig_realsize;
-}
-
-static size_t
-read_cb (void *ptr, size_t size, size_t nmemb, void *opaque)
-{
-  struct curl_handle *ch = opaque;
-  size_t realsize = size * nmemb;
-
-  assert (ch->read_buf);
-  if (realsize > ch->read_count)
-    realsize = ch->read_count;
-
-  memcpy (ptr, ch->read_buf, realsize);
-
-  ch->read_count -= realsize;
-  ch->read_buf += realsize;
-
-  return realsize;
-}
-
-/* Get the file size and also whether the remote HTTP server
- * supports byte ranges.
- */
-static int
-get_content_length_accept_range (struct curl_handle *ch)
-{
-  CURLcode r;
-  long code;
-#ifdef HAVE_CURLINFO_CONTENT_LENGTH_DOWNLOAD_T
-  curl_off_t o;
-#else
-  double d;
-#endif
-
-  /* We must run the scripts if necessary and set headers in the
-   * handle.
-   */
-  if (do_scripts (ch) == -1)
-    return -1;
-
-  /* Set this flag in the handle to false.  The callback should set it
-   * to true if byte ranges are supported, which we check below.
-   */
-  ch->accept_range = false;
-
-  /* No Body, not nobody!  This forces a HEAD request. */
-  curl_easy_setopt (ch->c, CURLOPT_NOBODY, 1L);
-  curl_easy_setopt (ch->c, CURLOPT_HEADERFUNCTION, header_cb);
-  curl_easy_setopt (ch->c, CURLOPT_HEADERDATA, ch);
-  r = curl_easy_perform (ch->c);
-  update_times (ch->c);
-  if (r != CURLE_OK) {
-    display_curl_error (ch, r,
-                        "problem doing HEAD request to fetch size of URL [%s]",
-                        url);
-
-    /* Get the HTTP status code, if available. */
-    r = curl_easy_getinfo (ch->c, CURLINFO_RESPONSE_CODE, &code);
-    if (r == CURLE_OK)
-      nbdkit_debug ("HTTP status code: %ld", code);
-    else
-      code = -1;
-
-    /* See comment on try_fallback_GET_method below. */
-    if (code != 403 || !try_fallback_GET_method (ch))
-      return -1;
-  }
-
-  /* Get the content length.
-   *
-   * Note there is some subtlety here: For web servers using chunked
-   * encoding, either the Content-Length header will not be present,
-   * or if present it should be ignored.  (For such servers the only
-   * way to find out the true length would be to read all of the
-   * content, which we don't want to do).
-   *
-   * Curl itself resolves this for us.  It will ignore the
-   * Content-Length header if chunked encoding is used, returning the
-   * length as -1 which we check below (see also
-   * curl:lib/http.c:Curl_http_size).
-   */
-#ifdef HAVE_CURLINFO_CONTENT_LENGTH_DOWNLOAD_T
-  r = curl_easy_getinfo (ch->c, CURLINFO_CONTENT_LENGTH_DOWNLOAD_T, &o);
-  if (r != CURLE_OK) {
-    display_curl_error (ch, r,
-                        "could not get length of remote file [%s]", url);
-    return -1;
-  }
-
-  if (o == -1) {
-    nbdkit_error ("could not get length of remote file [%s], "
-                  "is the URL correct?", url);
-    return -1;
-  }
-
-  ch->exportsize = o;
-#else
-  r = curl_easy_getinfo (ch->c, CURLINFO_CONTENT_LENGTH_DOWNLOAD, &d);
-  if (r != CURLE_OK) {
-    display_curl_error (ch, r,
-                        "could not get length of remote file [%s]", url);
-    return -1;
-  }
-
-  if (d == -1) {
-    nbdkit_error ("could not get length of remote file [%s], "
-                  "is the URL correct?", url);
-    return -1;
-  }
-
-  ch->exportsize = d;
-#endif
-  nbdkit_debug ("content length: %" PRIi64, ch->exportsize);
-
-  /* If this is HTTP, check that byte ranges are supported. */
-  if (ascii_strncasecmp (url, "http://", strlen ("http://")) == 0 ||
-      ascii_strncasecmp (url, "https://", strlen ("https://")) == 0) {
-    if (!ch->accept_range) {
-      nbdkit_error ("server does not support 'range' (byte range) requests");
-      return -1;
-    }
-
-    nbdkit_debug ("accept range supported (for HTTP/HTTPS)");
-  }
-
-  return 0;
-}
-
-/* S3 servers can return 403 Forbidden for HEAD but still respond
- * to GET, so we give it a second chance in that case.
- * https://github.com/kubevirt/containerized-data-importer/issues/2737
- *
- * This function issues a GET request with a writefunction that always
- * returns an error, thus effectively getting the headers but
- * abandoning the transfer as soon as possible after.
- */
-static bool
-try_fallback_GET_method (struct curl_handle *ch)
-{
-  CURLcode r;
-
-  nbdkit_debug ("attempting to fetch headers using GET method");
-
-  curl_easy_setopt (ch->c, CURLOPT_HTTPGET, 1L);
-  curl_easy_setopt (ch->c, CURLOPT_HEADERFUNCTION, header_cb);
-  curl_easy_setopt (ch->c, CURLOPT_HEADERDATA, ch);
-  curl_easy_setopt (ch->c, CURLOPT_WRITEFUNCTION, error_cb);
-  curl_easy_setopt (ch->c, CURLOPT_WRITEDATA, ch);
-  r = curl_easy_perform (ch->c);
-  update_times (ch->c);
-
-  /* We expect CURLE_WRITE_ERROR here, but CURLE_OK is possible too
-   * (eg if the remote has zero length).  Other errors might happen
-   * but we ignore them since it is a fallback path.
-   */
-  return r == CURLE_OK || r == CURLE_WRITE_ERROR;
-}
-
-static size_t
-header_cb (void *ptr, size_t size, size_t nmemb, void *opaque)
-{
-  struct curl_handle *ch = opaque;
-  size_t realsize = size * nmemb;
-  const char *header = ptr;
-  const char *end = header + realsize;
-  const char *accept_ranges = "accept-ranges:";
-  const char *bytes = "bytes";
-
-  if (realsize >= strlen (accept_ranges) &&
-      ascii_strncasecmp (header, accept_ranges, strlen (accept_ranges)) == 0) {
-    const char *p = strchr (header, ':') + 1;
-
-    /* Skip whitespace between the header name and value. */
-    while (p < end && *p && ascii_isspace (*p))
-      p++;
-
-    if (end - p >= strlen (bytes)
-        && strncmp (p, bytes, strlen (bytes)) == 0) {
-      /* Check that there is nothing but whitespace after the value. */
-      p += strlen (bytes);
-      while (p < end && *p && ascii_isspace (*p))
-        p++;
-
-      if (p == end || !*p)
-        ch->accept_range = true;
-    }
-  }
-
-  return realsize;
-}
-
-static size_t
-error_cb (char *ptr, size_t size, size_t nmemb, void *opaque)
-{
-#ifdef CURL_WRITEFUNC_ERROR
-  return CURL_WRITEFUNC_ERROR;
-#else
-  return 0; /* in older curl, any size < requested will also be an error */
-#endif
-}
diff --git a/plugins/curl/curl.c b/plugins/curl/curl.c
index be42de36f..28cc7bbe6 100644
--- a/plugins/curl/curl.c
+++ b/plugins/curl/curl.c
@@ -48,7 +48,8 @@
 
 #include <nbdkit-plugin.h>
 
-#include "cleanup.h"
+#include "ascii-ctype.h"
+#include "ascii-string.h"
 
 #include "curldefs.h"
 
@@ -118,32 +119,6 @@ curl_close (void *handle)
 
 #define THREAD_MODEL NBDKIT_THREAD_MODEL_PARALLEL
 
-/* Calls get_handle() ... put_handle() to get a handle for the length
- * of the current scope.
- */
-#define GET_HANDLE_FOR_CURRENT_SCOPE(ch) \
-  CLEANUP_PUT_HANDLE struct curl_handle *ch = get_handle ();
-#define CLEANUP_PUT_HANDLE __attribute__ ((cleanup (cleanup_put_handle)))
-static void
-cleanup_put_handle (void *chp)
-{
-  struct curl_handle *ch = * (struct curl_handle **) chp;
-
-  if (ch != NULL)
-    put_handle (ch);
-}
-
-/* Get the file size. */
-static int64_t
-curl_get_size (void *handle)
-{
-  GET_HANDLE_FOR_CURRENT_SCOPE (ch);
-  if (ch == NULL)
-    return -1;
-
-  return ch->exportsize;
-}
-
 /* Multi-conn is safe for read-only connections, but HTTP does not
  * have any concept of flushing so we cannot use it for read-write
  * connections.
@@ -156,23 +131,253 @@ curl_can_multi_conn (void *handle)
   return !! h->readonly;
 }
 
+/* Get the file size. */
+static int get_content_length_accept_range (struct curl_handle *ch);
+static bool try_fallback_GET_method (struct curl_handle *ch);
+static size_t header_cb (void *ptr, size_t size, size_t nmemb, void *opaque);
+static size_t error_cb (char *ptr, size_t size, size_t nmemb, void *opaque);
+
+static int64_t
+curl_get_size (void *handle)
+{
+  struct curl_handle *ch;
+  CURLcode r;
+  long code;
+#ifdef HAVE_CURLINFO_CONTENT_LENGTH_DOWNLOAD_T
+  curl_off_t o;
+#else
+  double d;
+#endif
+  int64_t exportsize;
+
+  /* Get a curl easy handle. */
+  ch = allocate_handle ();
+  if (ch == NULL) goto err;
+
+  /* Prepare to read the headers. */
+  if (get_content_length_accept_range (ch) == -1)
+    goto err;
+
+  /* Send the command to the worker thread and wait. */
+  struct command cmd = {
+    .type = EASY_HANDLE,
+    .ch = ch,
+  };
+
+  r = send_command_and_wait (&cmd);
+  update_times (ch->c);
+  if (r != CURLE_OK) {
+    display_curl_error (ch, r,
+                        "problem doing HEAD request to fetch size of URL [%s]",
+                        url);
+
+    /* Get the HTTP status code, if available. */
+    r = curl_easy_getinfo (ch->c, CURLINFO_RESPONSE_CODE, &code);
+    if (r == CURLE_OK)
+      nbdkit_debug ("HTTP status code: %ld", code);
+    else
+      code = -1;
+
+    /* See comment on try_fallback_GET_method below. */
+    if (code != 403 || !try_fallback_GET_method (ch))
+      goto err;
+  }
+
+  /* Get the content length.
+   *
+   * Note there is some subtlety here: For web servers using chunked
+   * encoding, either the Content-Length header will not be present,
+   * or if present it should be ignored.  (For such servers the only
+   * way to find out the true length would be to read all of the
+   * content, which we don't want to do).
+   *
+   * Curl itself resolves this for us.  It will ignore the
+   * Content-Length header if chunked encoding is used, returning the
+   * length as -1 which we check below (see also
+   * curl:lib/http.c:Curl_http_size).
+   */
+#ifdef HAVE_CURLINFO_CONTENT_LENGTH_DOWNLOAD_T
+  r = curl_easy_getinfo (ch->c, CURLINFO_CONTENT_LENGTH_DOWNLOAD_T, &o);
+  if (r != CURLE_OK) {
+    display_curl_error (ch, r,
+                        "could not get length of remote file [%s]", url);
+    goto err;
+  }
+
+  if (o == -1) {
+    nbdkit_error ("could not get length of remote file [%s], "
+                  "is the URL correct?", url);
+    goto err;
+  }
+
+  exportsize = o;
+#else
+  r = curl_easy_getinfo (ch->c, CURLINFO_CONTENT_LENGTH_DOWNLOAD, &d);
+  if (r != CURLE_OK) {
+    display_curl_error (ch, r,
+                        "could not get length of remote file [%s]", url);
+    goto err;
+  }
+
+  if (d == -1) {
+    nbdkit_error ("could not get length of remote file [%s], "
+                  "is the URL correct?", url);
+    goto err;
+  }
+
+  exportsize = d;
+#endif
+  nbdkit_debug ("content length: %" PRIi64, exportsize);
+
+  /* If this is HTTP, check that byte ranges are supported. */
+  if (ascii_strncasecmp (url, "http://", strlen ("http://")) == 0 ||
+      ascii_strncasecmp (url, "https://", strlen ("https://")) == 0) {
+    if (!ch->accept_range) {
+      nbdkit_error ("server does not support 'range' (byte range) requests");
+      goto err;
+    }
+
+    nbdkit_debug ("accept range supported (for HTTP/HTTPS)");
+  }
+
+  free_handle (ch);
+  return exportsize;
+
+ err:
+  if (ch)
+    free_handle (ch);
+  return -1;
+}
+
+/* Get the file size and also whether the remote HTTP server
+ * supports byte ranges.
+ */
+static int
+get_content_length_accept_range (struct curl_handle *ch)
+{
+  /* We must run the scripts if necessary and set headers in the
+   * handle.
+   */
+  if (do_scripts (ch) == -1)
+    return -1;
+
+  /* Set this flag in the handle to false.  The callback should set it
+   * to true if byte ranges are supported, which we check below.
+   */
+  ch->accept_range = false;
+
+  /* No Body, not nobody!  This forces a HEAD request. */
+  curl_easy_setopt (ch->c, CURLOPT_NOBODY, 1L);
+  curl_easy_setopt (ch->c, CURLOPT_HEADERFUNCTION, header_cb);
+  curl_easy_setopt (ch->c, CURLOPT_HEADERDATA, ch);
+  curl_easy_setopt (ch->c, CURLOPT_WRITEFUNCTION, NULL);
+  curl_easy_setopt (ch->c, CURLOPT_WRITEDATA, NULL);
+  curl_easy_setopt (ch->c, CURLOPT_READFUNCTION, NULL);
+  curl_easy_setopt (ch->c, CURLOPT_READDATA, NULL);
+  return 0;
+}
+
+/* S3 servers can return 403 Forbidden for HEAD but still respond
+ * to GET, so we give it a second chance in that case.
+ * https://github.com/kubevirt/containerized-data-importer/issues/2737
+ *
+ * This function issues a GET request with a writefunction that always
+ * returns an error, thus effectively getting the headers but
+ * abandoning the transfer as soon as possible after.
+ */
+static bool
+try_fallback_GET_method (struct curl_handle *ch)
+{
+  CURLcode r;
+
+  nbdkit_debug ("attempting to fetch headers using GET method");
+
+  curl_easy_setopt (ch->c, CURLOPT_HTTPGET, 1L);
+  curl_easy_setopt (ch->c, CURLOPT_HEADERFUNCTION, header_cb);
+  curl_easy_setopt (ch->c, CURLOPT_HEADERDATA, ch);
+  curl_easy_setopt (ch->c, CURLOPT_WRITEFUNCTION, error_cb);
+  curl_easy_setopt (ch->c, CURLOPT_WRITEDATA, ch);
+
+  struct command cmd = {
+    .type = EASY_HANDLE,
+    .ch = ch,
+  };
+
+  r = send_command_and_wait (&cmd);
+  update_times (ch->c);
+
+  /* We expect CURLE_WRITE_ERROR here, but CURLE_OK is possible too
+   * (eg if the remote has zero length).  Other errors might happen
+   * but we ignore them since it is a fallback path.
+   */
+  return r == CURLE_OK || r == CURLE_WRITE_ERROR;
+}
+
+static size_t
+header_cb (void *ptr, size_t size, size_t nmemb, void *opaque)
+{
+  struct curl_handle *ch = opaque;
+  size_t realsize = size * nmemb;
+  const char *header = ptr;
+  const char *end = header + realsize;
+  const char *accept_ranges = "accept-ranges:";
+  const char *bytes = "bytes";
+
+  if (realsize >= strlen (accept_ranges) &&
+      ascii_strncasecmp (header, accept_ranges, strlen (accept_ranges)) == 0) {
+    const char *p = strchr (header, ':') + 1;
+
+    /* Skip whitespace between the header name and value. */
+    while (p < end && *p && ascii_isspace (*p))
+      p++;
+
+    if (end - p >= strlen (bytes)
+        && strncmp (p, bytes, strlen (bytes)) == 0) {
+      /* Check that there is nothing but whitespace after the value. */
+      p += strlen (bytes);
+      while (p < end && *p && ascii_isspace (*p))
+        p++;
+
+      if (p == end || !*p)
+        ch->accept_range = true;
+    }
+  }
+
+  return realsize;
+}
+
+static size_t
+error_cb (char *ptr, size_t size, size_t nmemb, void *opaque)
+{
+#ifdef CURL_WRITEFUNC_ERROR
+  return CURL_WRITEFUNC_ERROR;
+#else
+  return 0; /* in older curl, any size < requested will also be an error */
+#endif
+}
+
 /* Read data from the remote server. */
+static size_t write_cb (char *ptr, size_t size, size_t nmemb, void *opaque);
+
 static int
 curl_pread (void *handle, void *buf, uint32_t count, uint64_t offset)
 {
   CURLcode r;
+  struct curl_handle *ch;
   char range[128];
 
-  GET_HANDLE_FOR_CURRENT_SCOPE (ch);
-  if (ch == NULL)
-    return -1;
+  /* Get a curl easy handle. */
+  ch = allocate_handle ();
+  if (ch == NULL) goto err;
 
   /* Run the scripts if necessary and set headers in the handle. */
-  if (do_scripts (ch) == -1) return -1;
+  if (do_scripts (ch) == -1) goto err;
 
   /* Tell the write_cb where we want the data to be written.  write_cb
    * will update this if the data comes in multiple sections.
    */
+  curl_easy_setopt (ch->c, CURLOPT_WRITEFUNCTION, write_cb);
+  curl_easy_setopt (ch->c, CURLOPT_WRITEDATA, ch);
   ch->write_buf = buf;
   ch->write_count = count;
 
@@ -183,11 +388,16 @@ curl_pread (void *handle, void *buf, uint32_t count, uint64_t offset)
             offset, offset + count);
   curl_easy_setopt (ch->c, CURLOPT_RANGE, range);
 
-  /* The assumption here is that curl will look after timeouts. */
-  r = curl_easy_perform (ch->c);
+  /* Send the command to the worker thread and wait. */
+  struct command cmd = {
+    .type = EASY_HANDLE,
+    .ch = ch,
+  };
+
+  r = send_command_and_wait (&cmd);
   if (r != CURLE_OK) {
-    display_curl_error (ch, r, "pread: curl_easy_perform");
-    return -1;
+    display_curl_error (ch, r, "pread");
+    goto err;
   }
   update_times (ch->c);
 
@@ -198,26 +408,67 @@ curl_pread (void *handle, void *buf, uint32_t count, uint64_t offset)
   /* As far as I understand the cURL API, this should never happen. */
   assert (ch->write_count == 0);
 
+  free_handle (ch);
   return 0;
+
+ err:
+  if (ch)
+    free_handle (ch);
+  return -1;
+}
+
+/* NB: The terminology used by libcurl is confusing!
+ *
+ * WRITEFUNCTION / write_cb is used when reading from the remote server
+ * READFUNCTION / read_cb is used when writing to the remote server.
+ *
+ * We use the same terminology as libcurl here.
+ */
+static size_t
+write_cb (char *ptr, size_t size, size_t nmemb, void *opaque)
+{
+  struct curl_handle *ch = opaque;
+  size_t orig_realsize = size * nmemb;
+  size_t realsize = orig_realsize;
+
+  assert (ch->write_buf);
+
+  /* Don't read more than the requested amount of data, even if the
+   * server or libcurl sends more.
+   */
+  if (realsize > ch->write_count)
+    realsize = ch->write_count;
+
+  memcpy (ch->write_buf, ptr, realsize);
+
+  ch->write_count -= realsize;
+  ch->write_buf += realsize;
+
+  return orig_realsize;
 }
 
 /* Write data to the remote server. */
+static size_t read_cb (void *ptr, size_t size, size_t nmemb, void *opaque);
+
 static int
 curl_pwrite (void *handle, const void *buf, uint32_t count, uint64_t offset)
 {
   CURLcode r;
+  struct curl_handle *ch;
   char range[128];
 
-  GET_HANDLE_FOR_CURRENT_SCOPE (ch);
-  if (ch == NULL)
-    return -1;
+  /* Get a curl easy handle. */
+  ch = allocate_handle ();
+  if (ch == NULL) goto err;
 
   /* Run the scripts if necessary and set headers in the handle. */
-  if (do_scripts (ch) == -1) return -1;
+  if (do_scripts (ch) == -1) goto err;
 
   /* Tell the read_cb where we want the data to be read from.  read_cb
    * will update this if the data comes in multiple sections.
    */
+  curl_easy_setopt (ch->c, CURLOPT_READFUNCTION, read_cb);
+  curl_easy_setopt (ch->c, CURLOPT_READDATA, ch);
   ch->read_buf = buf;
   ch->read_count = count;
 
@@ -228,11 +479,16 @@ curl_pwrite (void *handle, const void *buf, uint32_t count, uint64_t offset)
             offset, offset + count);
   curl_easy_setopt (ch->c, CURLOPT_RANGE, range);
 
-  /* The assumption here is that curl will look after timeouts. */
-  r = curl_easy_perform (ch->c);
+  /* Send the command to the worker thread and wait. */
+  struct command cmd = {
+    .type = EASY_HANDLE,
+    .ch = ch,
+  };
+
+  r = send_command_and_wait (&cmd);
   if (r != CURLE_OK) {
-    display_curl_error (ch, r, "pwrite: curl_easy_perform");
-    return -1;
+    display_curl_error (ch, r, "pwrite");
+    goto err;
   }
   update_times (ch->c);
 
@@ -243,7 +499,31 @@ curl_pwrite (void *handle, const void *buf, uint32_t count, uint64_t offset)
   /* As far as I understand the cURL API, this should never happen. */
   assert (ch->read_count == 0);
 
+  free_handle (ch);
   return 0;
+
+ err:
+  if (ch)
+    free_handle (ch);
+  return -1;
+}
+
+static size_t
+read_cb (void *ptr, size_t size, size_t nmemb, void *opaque)
+{
+  struct curl_handle *ch = opaque;
+  size_t realsize = size * nmemb;
+
+  assert (ch->read_buf);
+  if (realsize > ch->read_count)
+    realsize = ch->read_count;
+
+  memcpy (ptr, ch->read_buf, realsize);
+
+  ch->read_count -= realsize;
+  ch->read_buf += realsize;
+
+  return realsize;
 }
 
 static struct nbdkit_plugin plugin = {
diff --git a/plugins/curl/pool.c b/plugins/curl/pool.c
index eb2d330e1..2974cda3f 100644
--- a/plugins/curl/pool.c
+++ b/plugins/curl/pool.c
@@ -30,11 +30,29 @@
  * SUCH DAMAGE.
  */
 
-/* Curl handle pool.
+/* Worker thread which processes the curl multi interface.
  *
- * To get a libcurl handle, call get_handle().  When you hold the
- * handle, it is yours exclusively to use.  After you have finished
- * with the handle, put it back into the pool by calling put_handle().
+ * The main nbdkit threads (see curl.c) create curl easy handles
+ * initialized with the work they want to carry out.  Note there is
+ * one easy handle per task (eg. per pread/pwrite request).  The easy
+ * handles are not reused.
+ *
+ * The commands + optional easy handle are submitted to the worker
+ * thread over a self-pipe (it's easy to use a pipe here because the
+ * way curl multi works is it can listen on an extra fd, but not on
+ * anything else like a pthread condition).  The curl multi performs
+ * the work of the outstanding easy handles.
+ *
+ * When an easy handle finishes work or errors, we retire the command
+ * by signalling back to the waiting nbdkit thread using a pthread
+ * condition.
+ *
+ * In my experiments, we're almost always I/O bound so I haven't seen
+ * any strong need to use more than one curl multi / worker thread,
+ * although it would be possible to add more in future.
+ *
+ * See also this extremely useful thread:
+ * https://curl.se/mail/lib-2019-03/0100.html
  */
 
 #include <config.h>
@@ -45,9 +63,19 @@
 #include <stdint.h>
 #include <inttypes.h>
 #include <string.h>
+#include <unistd.h>
 #include <assert.h>
 #include <pthread.h>
 
+#ifdef HAVE_STDATOMIC_H
+#include <stdatomic.h>
+#else
+/* Some old platforms lack atomic types, but 32 bit ints are usually
+ * "atomic enough".
+ */
+#define _Atomic /**/
+#endif
+
 #include <curl/curl.h>
 
 #include <nbdkit-plugin.h>
@@ -62,115 +90,281 @@ NBDKIT_DLL_PUBLIC int curl_debug_pool = 0;
 
 unsigned connections = 4;
 
-/* This lock protects access to the curl_handles vector below. */
-static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
+/* Pipe used to notify background thread that a command is pending in
+ * the queue.  A pointer to the 'struct command' is sent over the
+ * pipe.
+ */
+static int self_pipe[2] = { -1, -1 };
 
-/* List of curl handles.  This is allocated dynamically as more
- * handles are requested.  Currently it does not shrink.  It may grow
- * up to 'connections' in length.
+/* The curl multi handle. */
+static CURLM *multi;
+
+/* List of running easy handles.  We only need to maintain this so we
+ * can remove them from the multi handle when cleaning up.
  */
 DEFINE_VECTOR_TYPE (curl_handle_list, struct curl_handle *);
 static curl_handle_list curl_handles = empty_vector;
 
-/* The condition is used when the curl handles vector is full and
- * we're waiting for a thread to put_handle.
- */
-static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
-static size_t in_use = 0, waiting = 0;
+static const char *
+command_type_to_string (enum command_type type)
+{
+  switch (type) {
+  case EASY_HANDLE: return "EASY_HANDLE";
+  case STOP:        return "STOP";
+  default:          abort ();
+  }
+}
 
 int
 pool_get_ready (void)
 {
+  multi = curl_multi_init ();
+  if (multi == NULL) {
+    nbdkit_error ("curl_multi_init failed: %m");
+    return -1;
+  }
+
   return 0;
 }
 
+/* Start and stop the background thread. */
+static pthread_t thread;
+static bool thread_running;
+static void *pool_worker (void *);
+
 int
 pool_after_fork (void)
 {
+  int err;
+
+  if (pipe (self_pipe) == -1) {
+    nbdkit_error ("pipe: %m");
+    return -1;
+  }
+
+  /* Start the pool background thread where all the curl work is done. */
+  err = pthread_create (&thread, NULL, pool_worker, NULL);
+  if (err != 0) {
+    errno = err;
+    nbdkit_error ("pthread_create: %m");
+    return -1;
+  }
+  thread_running = true;
+
   return 0;
 }
 
-/* Close and free all handles in the pool. */
+/* Unload the background thread. */
 void
 pool_unload (void)
 {
-  size_t i;
+  if (thread_running) {
+    /* Stop the background thread. */
+    struct command cmd = { .type = STOP };
+    send_command_and_wait (&cmd);
+    pthread_join (thread, NULL);
+    thread_running = false;
+  }
 
-  if (curl_debug_pool)
-    nbdkit_debug ("unload_pool: number of curl handles allocated: %zu",
-                  curl_handles.len);
+  if (self_pipe[0] >= 0) {
+    close (self_pipe[0]);
+    self_pipe[0] = -1;
+  }
+  if (self_pipe[1] >= 0) {
+    close (self_pipe[1]);
+    self_pipe[1] = -1;
+  }
 
-  for (i = 0; i < curl_handles.len; ++i)
-    free_handle (curl_handles.ptr[i]);
-  curl_handle_list_reset (&curl_handles);
+  if (multi) {
+    size_t i;
+
+    /* Remove and free any easy handles in the multi. */
+    for (i = 0; i < curl_handles.len; ++i) {
+      curl_multi_remove_handle (multi, curl_handles.ptr[i]->c);
+      free_handle (curl_handles.ptr[i]);
+    }
+
+    curl_multi_cleanup (multi);
+    multi = NULL;
+  }
 }
 
-/* Get a handle from the pool.
- *
- * It is owned exclusively by the caller until they call put_handle.
+/* Command queue. */
+static _Atomic uint64_t id;     /* next command ID */
+
+/* Send command to the background thread and wait for completion.
+ * This is only called by one of the nbdkit threads.
  */
-struct curl_handle *
-get_handle (void)
+CURLcode
+send_command_and_wait (struct command *cmd)
 {
-  ACQUIRE_LOCK_FOR_CURRENT_SCOPE (&lock);
-  size_t i;
-  struct curl_handle *ch;
-
- again:
-  /* Look for a handle which is not in_use. */
-  for (i = 0; i < curl_handles.len; ++i) {
-    ch = curl_handles.ptr[i];
-    if (!ch->in_use) {
-      ch->in_use = true;
-      in_use++;
+  cmd->id = id++;
+
+  /* CURLcode is 0 (CURLE_OK) or > 0, so use -1 as a sentinel to
+   * indicate that the command has not yet been completed and status
+   * set.
+   */
+  cmd->status = -1;
+
+  /* This will be used to signal command completion back to us. */
+  pthread_mutex_init (&cmd->mutex, NULL);
+  pthread_cond_init (&cmd->cond, NULL);
+
+  /* Send the command to the background thread. */
+  if (write (self_pipe[1], &cmd, sizeof cmd) != sizeof cmd)
+    abort ();
+
+  /* Wait for the command to be completed by the background thread. */
+  {
+    ACQUIRE_LOCK_FOR_CURRENT_SCOPE (&cmd->mutex);
+    while (cmd->status == -1) /* for -1, see above */
+      pthread_cond_wait (&cmd->cond, &cmd->mutex);
+  }
+
+  pthread_mutex_destroy (&cmd->mutex);
+  pthread_cond_destroy (&cmd->cond);
+
+  /* Note the main thread must call nbdkit_error on error! */
+  return cmd->status;
+}
+
+/* The background thread. */
+static void check_for_finished_handles (void);
+static void retire_command (struct command *cmd, CURLcode code);
+static void do_easy_handle (struct command *cmd);
+
+static void *
+pool_worker (void *vp)
+{
+  bool stop = false;
+
+  if (curl_debug_pool)
+    nbdkit_debug ("curl: background thread started");
+
+  while (!stop) {
+    struct command *cmd = NULL;
+    struct curl_waitfd extra_fds[1] =
+      { { .fd = self_pipe[0], .events = CURL_WAIT_POLLIN } };
+    CURLMcode mc;
+    int numfds, running_handles, repeats = 0;
+
+    do {
+      /* Process the multi handle. */
+      mc = curl_multi_perform (multi, &running_handles);
+      if (mc != CURLM_OK) {
+        nbdkit_error ("curl_multi_perform: %s", curl_multi_strerror (mc));
+        abort (); /* XXX We don't expect this to happen */
+      }
+
+      check_for_finished_handles ();
+
+      mc = curl_multi_wait (multi, extra_fds, 1, 1000000, &numfds);
+      if (mc != CURLM_OK) {
+        nbdkit_error ("curl_multi_wait: %s", curl_multi_strerror (mc));
+        abort (); /* XXX We don't expect this to happen */
+      }
+
       if (curl_debug_pool)
-        nbdkit_debug ("get_handle: %zu", ch->i);
-      return ch;
-    }
-  }
+        nbdkit_debug ("curl_multi_wait returned: running_handles=%d numfds=%d",
+                      running_handles, numfds);
+
+      if (numfds == 0) {
+        repeats++;
+        if (repeats > 1)
+          nbdkit_nanosleep (1, 0);
+      }
+      else {
+        repeats = 0;
+        if (extra_fds[0].revents == CURL_WAIT_POLLIN) {
+          /* There's a command waiting. */
+          if (read (self_pipe[0], &cmd, sizeof cmd) != sizeof cmd)
+            abort ();
+        }
+      }
+    } while (!cmd);
 
-  /* If more connections are allowed, then allocate a new handle. */
-  if (curl_handles.len < connections) {
-    ch = allocate_handle ();
-    if (ch == NULL)
-      return NULL;
-    if (curl_handle_list_append (&curl_handles, ch) == -1) {
-      free_handle (ch);
-      return NULL;
-    }
-    ch->i = curl_handles.len - 1;
-    ch->in_use = true;
-    in_use++;
     if (curl_debug_pool)
-      nbdkit_debug ("get_handle: %zu", ch->i);
-    return ch;
-  }
+      nbdkit_debug ("curl: dispatching %s command %" PRIu64,
+                    command_type_to_string (cmd->type), cmd->id);
+
+    switch (cmd->type) {
+    case STOP:
+      stop = true;
+      retire_command (cmd, CURLE_OK);
+      break;
 
-  /* Otherwise we have run out of connections so we must wait until
-   * another thread calls put_handle.
-   */
-  assert (in_use == connections);
-  waiting++;
-  while (in_use == connections)
-    pthread_cond_wait (&cond, &lock);
-  waiting--;
+    case EASY_HANDLE:
+      do_easy_handle (cmd);
+      break;
+    }
+  } /* while (!stop) */
 
-  goto again;
+  if (curl_debug_pool)
+    nbdkit_debug ("curl: background thread stopped");
+
+  return NULL;
 }
 
-/* Return the handle to the pool. */
-void
-put_handle (struct curl_handle *ch)
+/* This checks if any easy handles in the multi have
+ * finished and retires the associated commands.
+ */
+static void
+check_for_finished_handles (void)
 {
-  ACQUIRE_LOCK_FOR_CURRENT_SCOPE (&lock);
+  CURLMsg *msg;
+  int msgs_in_queue;
+
+  while ((msg = curl_multi_info_read (multi, &msgs_in_queue)) != NULL) {
+    size_t i;
+    struct curl_handle *ch = NULL;
+
+    if (msg->msg == CURLMSG_DONE) {
+      /* Find this curl_handle. */
+      for (i = 0; i < curl_handles.len; ++i) {
+        if (curl_handles.ptr[i]->c == msg->easy_handle) {
+          ch = curl_handles.ptr[i];
+          curl_handle_list_remove (&curl_handles, i);
+        }
+      }
+      if (ch == NULL) abort ();
+      curl_multi_remove_handle (multi, ch->c);
+
+      retire_command (ch->cmd, msg->data.result);
+    }
+  }
+}
 
+/* Retire a command.  status is a CURLcode. */
+static void
+retire_command (struct command *cmd, CURLcode status)
+{
   if (curl_debug_pool)
-    nbdkit_debug ("put_handle: %zu", ch->i);
+    nbdkit_debug ("curl: retiring %s command %" PRIu64,
+                  command_type_to_string (cmd->type), cmd->id);
+
+  ACQUIRE_LOCK_FOR_CURRENT_SCOPE (&cmd->mutex);
+  cmd->status = status;
+  pthread_cond_signal (&cmd->cond);
+}
+
+static void
+do_easy_handle (struct command *cmd)
+{
+  CURLMcode mc;
+
+  cmd->ch->cmd = cmd;
+
+  /* Add the handle to the multi. */
+  mc = curl_multi_add_handle (multi, cmd->ch->c);
+  if (mc != CURLM_OK) {
+    nbdkit_error ("curl_multi_add_handle: %s", curl_multi_strerror (mc));
+    goto err;
+  }
 
-  ch->in_use = false;
-  in_use--;
+  if (curl_handle_list_append (&curl_handles, cmd->ch) == -1)
+    goto err;
+  return;
 
-  /* Signal the next thread which is waiting. */
-  if (waiting > 0)
-    pthread_cond_signal (&cond);
+ err:
+  retire_command (cmd, CURLE_OUT_OF_MEMORY);
 }
-- 
2.41.0



More information about the Libguestfs mailing list