[libvirt] [PATCH 1/2] esx: Add libcurl based stream driver

Matthias Bolte matthias.bolte at googlemail.com
Sun Mar 30 19:03:02 UTC 2014


This allows to implement libvirt functions that use streams, such as
virDoaminScreenshot, without the need to store the downloaded data in
a temporary file first. The stream driver directly interacts with
libcurl to send and receive data.

The driver uses the libcurl multi interface that allows to do a transfer
in multiple curl_multi_perform() calls. The easy interface would do the
whole transfer in a single curl_easy_perform() call. This doesn't work
with the libvirt stream API that is driven by multiple calls to the
virStreamSend() and virStreamRecv() functions.

The curl_multi_wait() function is used to do blocking operations. But it
was added in libcurl 7.28.0. For older versions it is emulated using the
socket callback of the multi interface.

The current driver only supports blocking operations. There is already
some code in place for non-blocking mode but it's incomplete. As you can
tell from the copyright date I implemeted this in 2012, but never came
around to publish it then. I did some work in 2013 and now it's 2014 and
I don't want to hold it back any longer.
---
 po/POTFILES.in       |   1 +
 src/Makefile.am      |   1 +
 src/esx/esx_stream.c | 478 +++++++++++++++++++++++++++++++++++++++++++++++++++
 src/esx/esx_stream.h |  32 ++++
 src/esx/esx_vi.c     | 222 +++++++++++++++++++++++-
 src/esx/esx_vi.h     |  19 +-
 6 files changed, 749 insertions(+), 4 deletions(-)
 create mode 100644 src/esx/esx_stream.c
 create mode 100644 src/esx/esx_stream.h

diff --git a/po/POTFILES.in b/po/POTFILES.in
index 5a4112a..1d9d6d0 100644
--- a/po/POTFILES.in
+++ b/po/POTFILES.in
@@ -41,6 +41,7 @@ src/esx/esx_network_driver.c
 src/esx/esx_storage_backend_iscsi.c
 src/esx/esx_storage_backend_vmfs.c
 src/esx/esx_storage_driver.c
+src/esx/esx_stream.c
 src/esx/esx_util.c
 src/esx/esx_vi.c
 src/esx/esx_vi_methods.c
diff --git a/src/Makefile.am b/src/Makefile.am
index 55427ed..dbe6488 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -720,6 +720,7 @@ ESX_DRIVER_SOURCES =							\
 		esx/esx_storage_backend_iscsi.c esx/esx_storage_backend_iscsi.h	\
 		esx/esx_device_monitor.c esx/esx_device_monitor.h	\
 		esx/esx_secret_driver.c esx/esx_secret_driver.h		\
+		esx/esx_stream.c esx/esx_stream.h			\
 		esx/esx_nwfilter_driver.c esx/esx_nwfilter_driver.h	\
 		esx/esx_util.c esx/esx_util.h				\
 		esx/esx_vi.c esx/esx_vi.h				\
diff --git a/src/esx/esx_stream.c b/src/esx/esx_stream.c
new file mode 100644
index 0000000..fb9abbc
--- /dev/null
+++ b/src/esx/esx_stream.c
@@ -0,0 +1,478 @@
+/*
+ * esx_stream.c: libcurl based stream driver
+ *
+ * Copyright (C) 2012-2014 Matthias Bolte <matthias.bolte at googlemail.com>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library.  If not, see
+ * <http://www.gnu.org/licenses/>.
+ *
+ */
+
+#include <config.h>
+
+#include "internal.h"
+#include "datatypes.h"
+#include "viralloc.h"
+#include "virstring.h"
+#include "esx_stream.h"
+
+#define VIR_FROM_THIS VIR_FROM_ESX
+
+/*
+ * This libcurl based stream driver cannot use a libcurl easy handle alone
+ * because curl_easy_perform would do the whole transfer before it returns.
+ * But there is no place in the stream handling concept that would allow for
+ * such a call to be made. The stream is driven by esxStream(Send|Recv) which
+ * is probably called multiple times to send/receive the stream in chunks.
+ * Therefore, a libcurl multi handle is used that allows to perform the data
+ * transfer in chunks and also allows to support non-blocking operations.
+ *
+ * In the upload direction esxStreamSend is called to push data into the
+ * stream and libcurl will call esxVI_CURL_ReadStream to pull data out of
+ * the stream to upload it via HTTP(S). To realize this esxStreamSend calls
+ * esxStreamTransfer that uses esxVI_MultiCURL_(Wait|Perform) to drive the
+ * transfer and makes libcurl read up the data passed to esxStreamSend.
+ *
+ * In the download direction esxStreamRecv is called to pull data out of the
+ * stream and libcurl will call esxVI_CURL_WriteStream to push data into the
+ * stream that it has downloaded via HTTP(S). To realize this esxStreamRecv
+ * calls esxStreamTransfer that uses esxVI_MultiCURL_(Wait|Perform) to drive
+ * the transfer and makes libcurl write to the buffer passed to esxStreamRecv.
+ *
+ * The download direction requires some extra logic because libcurl might
+ * call esxVI_CURL_WriteStream with more data than there is space left in the
+ * buffer passed to esxStreamRecv. But esxVI_CURL_WriteStream is not allowed
+ * to handle only a part of the incoming data, it needs to handle it all at
+ * once. Therefore the stream driver manages a backlog buffer that holds the
+ * extra data that didn't fit into the esxStreamRecv buffer anymore. The next
+ * time esxStreamRecv is called it'll read the data from the backlog buffer
+ * first before asking libcurl for more data.
+ *
+ * Typically libcurl will call esxVI_CURL_WriteStream with up to 16kb data
+ * this means that the typically maximum backlog size should be 16kb as well.
+ */
+
+enum _esxStreamMode {
+    ESX_STREAM_MODE_UPLOAD = 1,
+    ESX_STREAM_MODE_DOWNLOAD = 2
+};
+
+typedef struct _esxStreamPrivate esxStreamPrivate;
+typedef enum _esxStreamMode esxStreamMode;
+
+struct _esxStreamPrivate {
+    esxVI_CURL *curl;
+    int mode;
+
+    /* Backlog of downloaded data that has not been esxStreamRecv'ed yet */
+    char *backlog;
+    size_t backlog_size;
+    size_t backlog_used;
+
+    /* Buffer given to esxStream(Send|Recv) to (read|write) data (from|to) */
+    char *buffer;
+    size_t buffer_size;
+    size_t buffer_used;
+};
+
+static size_t
+esxVI_CURL_ReadStream(char *output, size_t size, size_t nmemb, void *userdata)
+{
+    esxStreamPrivate *priv = userdata;
+    size_t output_size = size * nmemb;
+    size_t output_used = 0;
+
+    if (output_size > priv->buffer_used)
+        output_used = priv->buffer_used;
+    else
+        output_used = output_size;
+
+    memcpy(output, priv->buffer + priv->buffer_size - priv->buffer_used,
+           output_used);
+
+    priv->buffer_used -= output_used;
+
+    return output_used;
+}
+
+static size_t
+esxVI_CURL_WriteStream(char *input, size_t size, size_t nmemb, void *userdata)
+{
+    esxStreamPrivate *priv = userdata;
+    size_t input_size = size * nmemb;
+    size_t input_used = priv->buffer_size - priv->buffer_used;
+
+    if (input_size == 0)
+        return input_size;
+
+    if (input_used > input_size)
+        input_used = input_size;
+
+    /* Fill buffer */
+    memcpy(priv->buffer + priv->buffer_used, input, input_used);
+    priv->buffer_used += input_used;
+
+    /* Move rest to backlog */
+    if (input_size > input_used) {
+        size_t input_remaining = input_size - input_used;
+        size_t backlog_remaining = priv->backlog_size - priv->backlog_used;
+
+        if (!priv->backlog) {
+            priv->backlog_size = input_remaining;
+            priv->backlog_used = 0;
+
+            if (VIR_ALLOC_N(priv->backlog, priv->backlog_size) < 0)
+                return 0;
+        } else if (input_remaining > backlog_remaining) {
+            priv->backlog_size += input_remaining - backlog_remaining;
+
+            if (VIR_REALLOC_N(priv->backlog, priv->backlog_size) < 0)
+                return 0;
+        }
+
+        memcpy(priv->backlog + priv->backlog_used, input + input_used,
+               input_remaining);
+
+        priv->backlog_used += input_remaining;
+    }
+
+    return input_size;
+}
+
+/* Returns -1 on error, 0 if it needs to be called again, and 1 if it's done for now */
+static int
+esxStreamTransfer(esxStreamPrivate *priv, bool blocking)
+{
+    int runningHandles = 0;
+    long responseCode = 0;
+    int status;
+    CURLcode errorCode;
+
+    if (blocking) {
+        if (esxVI_MultiCURL_Wait(priv->curl->multi, &runningHandles) < 0)
+            return -1;
+    } else {
+        if (esxVI_MultiCURL_Perform(priv->curl->multi, &runningHandles) < 0)
+            return -1;
+    }
+
+    if (runningHandles == 0) {
+        /* Transfer is done check for result */
+        status = esxVI_MultiCURL_CheckFirstMessage(priv->curl->multi,
+                                                   &responseCode, &errorCode);
+
+        if (status == 0) {
+            /* No message, transfer finished successfully */
+            return 1;
+        }
+
+        if (status < 0) {
+            virReportError(VIR_ERR_INTERNAL_ERROR,
+                           _("Could not complete transfer: %s (%d)"),
+                           curl_easy_strerror(errorCode), errorCode);
+            return -1;
+        }
+
+        if (responseCode != 200 && responseCode != 206) {
+            virReportError(VIR_ERR_INTERNAL_ERROR,
+                           _("Unexpected HTTP response code %lu"),
+                           responseCode);
+            return -1;
+        }
+
+        return 1;
+    }
+
+    return blocking ? 0 : 1;
+}
+
+static int
+esxStreamSend(virStreamPtr stream, const char *data, size_t nbytes)
+{
+    int result = -1;
+    esxStreamPrivate *priv = stream->privateData;
+    int status;
+
+    if (nbytes == 0)
+        return 0;
+
+    if (!priv) {
+        virReportError(VIR_ERR_INTERNAL_ERROR, "%s", _("Stream is not open"));
+        return -1;
+    }
+
+    if (priv->mode != ESX_STREAM_MODE_UPLOAD) {
+        virReportError(VIR_ERR_INVALID_ARG, "%s", _("Not an upload stream"));
+        return -1;
+    }
+
+    virMutexLock(&priv->curl->lock);
+
+    priv->buffer = (char *)data;
+    priv->buffer_size = nbytes;
+    priv->buffer_used = nbytes;
+
+    if (stream->flags & VIR_STREAM_NONBLOCK) {
+        if (esxStreamTransfer(priv, false) < 0)
+            goto cleanup;
+
+        if (priv->buffer_used < priv->buffer_size)
+            result = priv->buffer_size - priv->buffer_used;
+        else
+            result = -2;
+    } else /* blocking */ {
+        do {
+            status = esxStreamTransfer(priv, true);
+
+            if (status < 0)
+                goto cleanup;
+
+            if (status > 0)
+                break;
+        } while (priv->buffer_used > 0);
+
+        result = priv->buffer_size - priv->buffer_used;
+    }
+
+ cleanup:
+    virMutexUnlock(&priv->curl->lock);
+
+    return result;
+}
+
+static int
+esxStreamRecv(virStreamPtr stream, char *data, size_t nbytes)
+{
+    int result = -1;
+    esxStreamPrivate *priv = stream->privateData;
+    int status;
+
+    if (nbytes == 0)
+        return 0;
+
+    if (!priv) {
+        virReportError(VIR_ERR_INTERNAL_ERROR, "%s", _("Stream is not open"));
+        return -1;
+    }
+
+    if (priv->mode != ESX_STREAM_MODE_DOWNLOAD) {
+        virReportError(VIR_ERR_INVALID_ARG, "%s", _("Not a download stream"));
+        return -1;
+    }
+
+    virMutexLock(&priv->curl->lock);
+
+    priv->buffer = data;
+    priv->buffer_size = nbytes;
+    priv->buffer_used = 0;
+
+    if (priv->backlog_used > 0) {
+        if (priv->buffer_size > priv->backlog_used)
+            priv->buffer_used = priv->backlog_used;
+        else
+            priv->buffer_used = priv->buffer_size;
+
+        memcpy(priv->buffer, priv->backlog, priv->buffer_used);
+        memmove(priv->backlog, priv->backlog + priv->buffer_used,
+                priv->backlog_used - priv->buffer_used);
+
+        priv->backlog_used -= priv->buffer_used;
+        result = priv->buffer_used;
+    } else if (stream->flags & VIR_STREAM_NONBLOCK) {
+        if (esxStreamTransfer(priv, false) < 0)
+            goto cleanup;
+
+        if (priv->buffer_used > 0)
+            result = priv->buffer_used;
+        else
+            result = -2;
+    } else /* blocking */ {
+        do {
+            status = esxStreamTransfer(priv, true);
+
+            if (status < 0)
+                goto cleanup;
+
+            if (status > 0)
+                break;
+        } while (priv->buffer_used < priv->buffer_size);
+
+        result = priv->buffer_used;
+    }
+
+ cleanup:
+    virMutexUnlock(&priv->curl->lock);
+
+    return result;
+}
+
+static void
+esxFreeStreamPrivate(esxStreamPrivate **priv)
+{
+    if (!priv || !*priv)
+        return;
+
+    esxVI_CURL_Free(&(*priv)->curl);
+    VIR_FREE((*priv)->backlog);
+    VIR_FREE(*priv);
+}
+
+static int
+esxStreamClose(virStreamPtr stream, bool finish)
+{
+    int result = 0;
+    esxStreamPrivate *priv = stream->privateData;
+
+    if (!priv)
+        return 0;
+
+    virMutexLock(&priv->curl->lock);
+
+    if (finish && priv->backlog_used > 0) {
+        virReportError(VIR_ERR_INTERNAL_ERROR, "%s",
+                       _("Stream has untransferred data left"));
+        result = -1;
+    }
+
+    stream->privateData = NULL;
+
+    virMutexUnlock(&priv->curl->lock);
+
+    esxFreeStreamPrivate(&priv);
+
+    return result;
+}
+
+static int
+esxStreamFinish(virStreamPtr stream)
+{
+    return esxStreamClose(stream, true);
+}
+
+static int
+esxStreamAbort(virStreamPtr stream)
+{
+    return esxStreamClose(stream, false);
+}
+
+virStreamDriver esxStreamDriver = {
+    .streamSend = esxStreamSend,
+    .streamRecv = esxStreamRecv,
+    /* FIXME: streamAddCallback missing */
+    /* FIXME: streamUpdateCallback missing */
+    /* FIXME: streamRemoveCallback missing */
+    .streamFinish = esxStreamFinish,
+    .streamAbort = esxStreamAbort,
+};
+
+static int
+esxStreamOpen(virStreamPtr stream, esxPrivate *priv, const char *url,
+              unsigned long long offset, unsigned long long length, int mode)
+{
+    int result = -1;
+    esxStreamPrivate *streamPriv;
+    char *range = NULL;
+    char *userpwd = NULL;
+    esxVI_MultiCURL *multi = NULL;
+
+    /* FIXME: Although there is already some code in place to deal with
+     *        non-blocking streams it is currently incomplete, so usage
+     *        of the non-blocking mode is denied here for now. */
+    if (stream->flags & VIR_STREAM_NONBLOCK) {
+        virReportError(VIR_ERR_OPERATION_INVALID, "%s",
+                       _("Non-blocking streams are not supported yet"));
+        return -1;
+    }
+
+    if (VIR_ALLOC(streamPriv) < 0)
+        return -1;
+
+    streamPriv->mode = mode;
+
+    if (length > 0) {
+        if (virAsprintf(&range, "%llu-%llu", offset, offset + length - 1) < 0)
+            goto cleanup;
+    } else if (offset > 0) {
+        if (virAsprintf(&range, "%llu-", offset) < 0)
+            goto cleanup;
+    }
+
+    if (esxVI_CURL_Alloc(&streamPriv->curl) < 0 ||
+        esxVI_CURL_Connect(streamPriv->curl, priv->parsedUri) < 0)
+        goto cleanup;
+
+    if (mode == ESX_STREAM_MODE_UPLOAD) {
+        curl_easy_setopt(streamPriv->curl->handle, CURLOPT_UPLOAD, 1);
+        curl_easy_setopt(streamPriv->curl->handle, CURLOPT_READFUNCTION,
+                         esxVI_CURL_ReadStream);
+        curl_easy_setopt(streamPriv->curl->handle, CURLOPT_READDATA, streamPriv);
+    } else {
+        curl_easy_setopt(streamPriv->curl->handle, CURLOPT_UPLOAD, 0);
+        curl_easy_setopt(streamPriv->curl->handle, CURLOPT_HTTPGET, 1);
+        curl_easy_setopt(streamPriv->curl->handle, CURLOPT_WRITEFUNCTION,
+                         esxVI_CURL_WriteStream);
+        curl_easy_setopt(streamPriv->curl->handle, CURLOPT_WRITEDATA, streamPriv);
+    }
+
+    curl_easy_setopt(streamPriv->curl->handle, CURLOPT_URL, url);
+    curl_easy_setopt(streamPriv->curl->handle, CURLOPT_RANGE, range);
+
+#if LIBCURL_VERSION_NUM >= 0x071301 /* 7.19.1 */
+    curl_easy_setopt(streamPriv->curl->handle, CURLOPT_USERNAME,
+                     priv->primary->username);
+    curl_easy_setopt(streamPriv->curl->handle, CURLOPT_PASSWORD,
+                     priv->primary->password);
+#else
+    if (virAsprintf(&userpwd, "%s:%s", priv->primary->username,
+                    priv->primary->password) < 0)
+        goto cleanup;
+
+    curl_easy_setopt(streamPriv->curl->handle, CURLOPT_USERPWD, userpwd);
+#endif
+
+    if (esxVI_MultiCURL_Alloc(&multi) < 0 ||
+        esxVI_MultiCURL_Add(multi, streamPriv->curl) < 0)
+        goto cleanup;
+
+    stream->driver = &esxStreamDriver;
+    stream->privateData = streamPriv;
+
+    result = 0;
+
+ cleanup:
+    if (result < 0) {
+        if (streamPriv->curl && multi != streamPriv->curl->multi)
+            esxVI_MultiCURL_Free(&multi);
+
+        esxFreeStreamPrivate(&streamPriv);
+    }
+
+    VIR_FREE(range);
+    VIR_FREE(userpwd);
+
+    return result;
+}
+
+int
+esxStreamOpenUpload(virStreamPtr stream, esxPrivate *priv, const char *url)
+{
+    return esxStreamOpen(stream, priv, url, 0, 0, ESX_STREAM_MODE_UPLOAD);
+}
+
+int
+esxStreamOpenDownload(virStreamPtr stream, esxPrivate *priv, const char *url,
+                      unsigned long long offset, unsigned long long length)
+{
+    return esxStreamOpen(stream, priv, url, offset, length, ESX_STREAM_MODE_DOWNLOAD);
+}
diff --git a/src/esx/esx_stream.h b/src/esx/esx_stream.h
new file mode 100644
index 0000000..542a75e
--- /dev/null
+++ b/src/esx/esx_stream.h
@@ -0,0 +1,32 @@
+/*
+ * esx_stream.h: libcurl based stream driver
+ *
+ * Copyright (C) 2012-2014 Matthias Bolte <matthias.bolte at googlemail.com>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library.  If not, see
+ * <http://www.gnu.org/licenses/>.
+ *
+ */
+
+#ifndef __ESX_STREAM_H__
+# define __ESX_STREAM_H__
+
+# include "internal.h"
+# include "esx_private.h"
+
+int esxStreamOpenUpload(virStreamPtr stream, esxPrivate *priv, const char *url);
+int esxStreamOpenDownload(virStreamPtr stream, esxPrivate *priv, const char *url,
+                          unsigned long long offset, unsigned long long length);
+
+#endif /* __ESX_STREAM_H__ */
diff --git a/src/esx/esx_vi.c b/src/esx/esx_vi.c
index 6188139..ba34bfd 100644
--- a/src/esx/esx_vi.c
+++ b/src/esx/esx_vi.c
@@ -2,7 +2,7 @@
  * esx_vi.c: client for the VMware VI API 2.5 to manage ESX hosts
  *
  * Copyright (C) 2010-2012 Red Hat, Inc.
- * Copyright (C) 2009-2012 Matthias Bolte <matthias.bolte at googlemail.com>
+ * Copyright (C) 2009-2012, 2014 Matthias Bolte <matthias.bolte at googlemail.com>
  *
  * This library is free software; you can redistribute it and/or
  * modify it under the terms of the GNU Lesser General Public
@@ -22,6 +22,7 @@
 
 #include <config.h>
 
+#include <poll.h>
 #include <libxml/parser.h>
 #include <libxml/xpathInternals.h>
 
@@ -662,6 +663,68 @@ esxVI_SharedCURL_Remove(esxVI_SharedCURL *shared, esxVI_CURL *curl)
  * MultiCURL
  */
 
+#if ESX_EMULATE_CURL_MULTI_WAIT
+
+static int
+esxVI_MultiCURL_SocketCallback(CURL *handle ATTRIBUTE_UNUSED,
+                               curl_socket_t fd, int action,
+                               void *callback_opaque,
+                               void *socket_opaque ATTRIBUTE_UNUSED)
+{
+    esxVI_MultiCURL *multi = callback_opaque;
+    size_t i;
+    struct pollfd *pollfd = NULL;
+    struct pollfd dummy;
+
+    if (action & CURL_POLL_REMOVE) {
+        for (i = 0; i < multi->npollfds; ++i) {
+            if (multi->pollfds[i].fd == fd) {
+                VIR_DELETE_ELEMENT(multi->pollfds, i, multi->npollfds);
+                break;
+            }
+        }
+    } else {
+        for (i = 0; i < multi->npollfds; ++i) {
+            if (multi->pollfds[i].fd == fd) {
+                pollfd = &multi->pollfds[i];
+                break;
+            }
+        }
+
+        if (pollfd == NULL) {
+            if (VIR_APPEND_ELEMENT(multi->pollfds, multi->npollfds, dummy) < 0) {
+                return 0;
+            }
+
+            pollfd = &multi->pollfds[multi->npollfds - 1];
+        }
+
+        pollfd->fd = fd;
+        pollfd->events = 0;
+
+        if (action & CURL_POLL_IN)
+            pollfd->events |= POLLIN;
+
+        if (action & CURL_POLL_OUT)
+            pollfd->events |= POLLOUT;
+    }
+
+    return 0;
+}
+
+static int
+esxVI_MultiCURL_TimerCallback(CURLM *handle ATTRIBUTE_UNUSED,
+                              long timeout_ms, void *callback_opaque)
+{
+    esxVI_MultiCURL *multi = callback_opaque;
+
+    multi->timeoutPending = true;
+
+    return 0;
+}
+
+#endif
+
 /* esxVI_MultiCURL_Alloc */
 ESX_VI__TEMPLATE__ALLOC(MultiCURL)
 
@@ -677,6 +740,10 @@ ESX_VI__TEMPLATE__FREE(MultiCURL,
     if (item->handle) {
         curl_multi_cleanup(item->handle);
     }
+
+#if ESX_EMULATE_CURL_MULTI_WAIT
+    VIR_FREE(item->pollfds);
+#endif
 })
 
 int
@@ -702,6 +769,15 @@ esxVI_MultiCURL_Add(esxVI_MultiCURL *multi, esxVI_CURL *curl)
                            _("Could not initialize CURL (multi)"));
             return -1;
         }
+
+#if ESX_EMULATE_CURL_MULTI_WAIT
+        curl_multi_setopt(multi->handle, CURLMOPT_SOCKETFUNCTION,
+                          esxVI_MultiCURL_SocketCallback);
+        curl_multi_setopt(multi->handle, CURLMOPT_SOCKETDATA, multi);
+        curl_multi_setopt(multi->handle, CURLMOPT_TIMERFUNCTION,
+                          esxVI_MultiCURL_TimerCallback);
+        curl_multi_setopt(multi->handle, CURLMOPT_TIMERDATA, multi);
+#endif
     }
 
     virMutexLock(&curl->lock);
@@ -750,6 +826,150 @@ esxVI_MultiCURL_Remove(esxVI_MultiCURL *multi, esxVI_CURL *curl)
     return 0;
 }
 
+#if ESX_EMULATE_CURL_MULTI_WAIT
+
+int
+esxVI_MultiCURL_Wait(esxVI_MultiCURL *multi, int *runningHandles)
+{
+    long timeout = -1;
+    CURLMcode errorCode;
+    int rc;
+    size_t i;
+    int action;
+
+    if (multi->timeoutPending) {
+        do {
+            errorCode = curl_multi_socket_action(multi->handle, CURL_SOCKET_TIMEOUT,
+                                                 0, runningHandles);
+        } while (errorCode == CURLM_CALL_MULTI_SOCKET);
+
+        if (errorCode != CURLM_OK) {
+            virReportError(VIR_ERR_INTERNAL_ERROR,
+                           _("Could not trigger socket action: %s (%d)"),
+                           curl_multi_strerror(errorCode), errorCode);
+            return -1;
+        }
+
+        multi->timeoutPending = false;
+    }
+
+    if (multi->npollfds == 0)
+        return 0;
+
+    curl_multi_timeout(multi->handle, &timeout);
+
+    if (timeout < 0) {
+        timeout = 1000; /* default to 1 sec timeout */
+    }
+
+    do {
+        rc = poll(multi->pollfds, multi->npollfds, timeout);
+    } while (rc < 0 && (errno == EAGAIN || errno == EINTR));
+
+    if (rc < 0) {
+        virReportSystemError(errno, "%s", _("Could not wait for transfer"));
+        return -1;
+    }
+
+    for (i = 0; i < multi->npollfds && rc > 0; ++i) {
+        if (multi->pollfds[i].revents == 0)
+            continue;
+
+        --rc;
+        action = 0;
+
+        if (multi->pollfds[i].revents & POLLIN)
+            action |= CURL_POLL_IN;
+
+        if (multi->pollfds[i].revents & POLLOUT)
+            action |= CURL_POLL_OUT;
+
+        do {
+            errorCode = curl_multi_socket_action(multi->handle,
+                                                 multi->pollfds[i].fd, action,
+                                                 runningHandles);
+        } while (errorCode == CURLM_CALL_MULTI_SOCKET);
+
+        if (errorCode != CURLM_OK) {
+            virReportError(VIR_ERR_INTERNAL_ERROR,
+                           _("Could not trigger socket action: %s (%d)"),
+                           curl_multi_strerror(errorCode), errorCode);
+            return -1;
+        }
+    }
+
+    return 0;
+}
+
+#else
+
+int
+esxVI_MultiCURL_Wait(esxVI_MultiCURL *multi, int *runningHandles)
+{
+    long timeout = -1;
+    CURLMcode errorCode;
+
+    curl_multi_timeout(multi->handle, &timeout);
+
+    if (timeout < 0)
+        timeout = 1000; /* default to 1 sec timeout */
+
+    errorCode = curl_multi_wait(multi->handle, NULL, 0, timeout, NULL);
+
+    if (errorCode != CURLM_OK) {
+        virReportError(VIR_ERR_INTERNAL_ERROR,
+                       _("Could not wait for transfer: %s (%d)"),
+                       curl_multi_strerror(errorCode), errorCode);
+        return -1;
+    }
+
+    return esxVI_MultiCURL_Perform(multi, runningHandles);
+}
+
+#endif
+
+int
+esxVI_MultiCURL_Perform(esxVI_MultiCURL *multi, int *runningHandles)
+{
+    CURLMcode errorCode;
+
+    do {
+        errorCode = curl_multi_perform(multi->handle, runningHandles);
+    } while (errorCode == CURLM_CALL_MULTI_PERFORM);
+
+    if (errorCode != CURLM_OK) {
+        virReportError(VIR_ERR_INTERNAL_ERROR,
+                       _("Could not transfer data: %s (%d)"),
+                       curl_multi_strerror(errorCode), errorCode);
+        return -1;
+    }
+
+    return 0;
+}
+
+/* Returns -1 on error, 0 if there is no DONE message, 1 if there is a DONE message */
+int
+esxVI_MultiCURL_CheckFirstMessage(esxVI_MultiCURL *multi, long *responseCode,
+                                  CURLcode *errorCode)
+{
+    int messagesInQueue;
+    CURLMsg* msg = curl_multi_info_read(multi->handle, &messagesInQueue);
+
+    *responseCode = 0;
+
+    if (!msg || msg->msg != CURLMSG_DONE)
+        return 0;
+
+    *errorCode = msg->data.result;
+
+    if (*errorCode != CURLE_OK)
+        return -1;
+
+    curl_easy_getinfo(msg->easy_handle, CURLINFO_RESPONSE_CODE, responseCode);
+
+    return 1;
+}
+
 
 
 /* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *
diff --git a/src/esx/esx_vi.h b/src/esx/esx_vi.h
index 7cc2f71..dac36cd 100644
--- a/src/esx/esx_vi.h
+++ b/src/esx/esx_vi.h
@@ -2,7 +2,7 @@
  * esx_vi.h: client for the VMware VI API 2.5 to manage ESX hosts
  *
  * Copyright (C) 2011 Red Hat, Inc.
- * Copyright (C) 2009-2012 Matthias Bolte <matthias.bolte at googlemail.com>
+ * Copyright (C) 2009-2012, 2014 Matthias Bolte <matthias.bolte at googlemail.com>
  *
  * This library is free software; you can redistribute it and/or
  * modify it under the terms of the GNU Lesser General Public
@@ -33,6 +33,10 @@
 # include "esx_vi_types.h"
 # include "esx_util.h"
 
+/* curl_multi_wait was added in libcurl 7.28.0, emulate it on older versions */
+# define ESX_EMULATE_CURL_MULTI_WAIT (LIBCURL_VERSION_NUM < 0x071C00)
+
+
 
 # define ESX_VI__SOAP__REQUEST_HEADER                                         \
     "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n"                            \
@@ -175,7 +179,7 @@ int esxVI_CURL_Upload(esxVI_CURL *curl, const char *url, const char *content);
 struct _esxVI_SharedCURL {
     CURLSH *handle;
     virMutex locks[3]; /* share, cookie, dns */
-    size_t count;
+    size_t count; /* number of added easy handle */
 };
 
 int esxVI_SharedCURL_Alloc(esxVI_SharedCURL **shared);
@@ -191,13 +195,22 @@ int esxVI_SharedCURL_Remove(esxVI_SharedCURL *shared, esxVI_CURL *curl);
 
 struct _esxVI_MultiCURL {
     CURLM *handle;
-    size_t count;
+    size_t count; /* number of added easy handle */
+# if ESX_EMULATE_CURL_MULTI_WAIT
+    struct pollfd *pollfds;
+    size_t npollfds;
+    bool timeoutPending;
+# endif
 };
 
 int esxVI_MultiCURL_Alloc(esxVI_MultiCURL **multi);
 void esxVI_MultiCURL_Free(esxVI_MultiCURL **multi);
 int esxVI_MultiCURL_Add(esxVI_MultiCURL *multi, esxVI_CURL *curl);
 int esxVI_MultiCURL_Remove(esxVI_MultiCURL *multi, esxVI_CURL *curl);
+int esxVI_MultiCURL_Wait(esxVI_MultiCURL *multi, int *runningHandles);
+int esxVI_MultiCURL_Perform(esxVI_MultiCURL *multi, int *runningHandles);
+int esxVI_MultiCURL_CheckFirstMessage(esxVI_MultiCURL *multi, long *responseCode,
+                                      CURLcode *errorCode);
 
 
 
-- 
1.8.1.2




More information about the libvir-list mailing list