[virt-tools-list] [vhostmd virtio PATCH v3 4/6] Add virtio functions

Michael Trapp Michael.Trapp at sap.com
Mon Nov 12 15:12:59 UTC 2018


At the vhostmd side virtio channels are Unix domain sockets from QEMU
which are created during a VM start and removed when the VM is stopped.

Basically this implementation
- monitors a directory for new virtio channels (channel names contain VM UUID)
- for valid UUIDs, also known by libvirtd, it connects to the UDS
- buffers VM/HOST metrics and handles request/response on the sockets

It provides the functions
virtio_init()
    init function of virtio layer
virtio_run()
    the start_routine for the virtio-thread to handle the virtio based
    communication
virtio_metrics_update()
    called for each VM/HOST metrics update to update the virtio internal
    metrics buffer.
virtio_stop()
    stop the virtio-thread

---
  Here is a brief description of the concept of vhostmd / virtio interaction

 Vhostmd calls the virtio API functions

  *** virtio API ***

    --> virtio_init()
        Initialize the virtio layer.
        Called once before the virtio thread starts.

    --> virtio_run()
        Start routine of the virtio thread.

    --> virtio_stop()
        Set virtio_status to stop the virtio thread.

    --> virtio_metrics_update()
        Add/update the metric buffer of a VM/host.
        It must be called for every change of VM/host metrics.

  *** virtio internal ***

  virtio internal code runs in the virtio thread - see virtio_run().

  Access to mbuffers within the virtio thread is
      - read the mbuffer content
          see vio_channel_update() -> vio_mbuffer_find()
      - check if a VM is 'known' by vhostmd
          see vio_channel_readdir() -> vio_mbuffer_find()
      - expire mbuffers
          see virtio_run() -> vio_mbuffer_expire()
          Expiration timeout is >= (3 * 'vhostmd update_period')
          A VM expires when vhostmd did not receive a update for
          'expiration_time' seconds.

  The mbuffer (metrics buffer) structs of VMs and host are maintained in
  a btree (mbuffer.root).
  Every mbuffer access is exclusive - see mbuffer_mutex.

  *** tests ***

  So far I've tested vhostmd with virtio support in a setup with 100 alpine Vms,
  each VM continiously polling the metrics every 5sec, for several hours.
  To have a more dynamic test environment all VMs were stopped/started
  several times.

  Beside the dependencies to the vu_buffer functions, virtio code does not call
  any libvirt functions and can also be run/tested without vhostmd and libvirt.
  I've also checked this variant in combination with valgrind.
  But at the moment the required test code and the UDS server program is not
  part of this patch.

 include/util.h   |   4 +
 include/virtio.h |  50 +++
 vhostmd/virtio.c | 900 +++++++++++++++++++++++++++++++++++++++++++++++
 3 files changed, 954 insertions(+)
 create mode 100644 include/virtio.h
 create mode 100644 vhostmd/virtio.c

diff --git a/include/util.h b/include/util.h
index c0bd19a..17ff09c 100644
--- a/include/util.h
+++ b/include/util.h
@@ -26,8 +26,12 @@
 
 #ifdef __GNUC__
 #define ATTRIBUTE_UNUSED __attribute__((unused))
+#define ATTRIBUTE_OPTIMIZE_O0 __attribute__((optimize("O0")))
+#define ATTRIBUTE_NOINLINE __attribute__((noinline()))
 #else
 #define ATTRIBUTE_UNUSED
+#define ATTRIBUTE_OPTIMIZE_O0
+#define ATTRIBUTE_NOINLINE
 #endif
 
 typedef enum {
diff --git a/include/virtio.h b/include/virtio.h
new file mode 100644
index 0000000..b10dab5
--- /dev/null
+++ b/include/virtio.h
@@ -0,0 +1,50 @@
+/*
+ * Copyright (C) 2018 SAP SE
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307  USA
+ *
+ * Author: Michael Trapp <michael.trapp at sap.com>
+ */
+
+#ifndef __VIRTIO_H__
+#define __VIRTIO_H__
+
+/*
+ * Initialize virtio layer
+ */
+int virtio_init(const char *virtio_path,
+                int max_channel,
+                int expiration_period);
+
+/*
+ * Main virtio function
+ * 'start_routine' of pthread_create()
+ */
+void *virtio_run(void *arg);
+
+/*
+ * Update the metrics response buffer of a VM/host
+ */
+int virtio_metrics_update(const char *buf,
+                          int len,
+                          const char *uuid,
+                          metric_context ctx);
+
+/*
+ * Stop virtio thread
+ */
+void virtio_stop(void);
+
+#endif                          /* __VIRTIO_H__ */
diff --git a/vhostmd/virtio.c b/vhostmd/virtio.c
new file mode 100644
index 0000000..f5e6306
--- /dev/null
+++ b/vhostmd/virtio.c
@@ -0,0 +1,900 @@
+/*
+ * Copyright (C) 2018 SAP SE
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307  USA
+ *
+ * Author: Michael Trapp <michael.trapp at sap.com>
+ */
+
+#include <config.h>
+
+#include <errno.h>
+#include <sys/un.h>
+#include <sys/socket.h>
+#include <sys/epoll.h>
+#include <unistd.h>
+#include <fcntl.h>
+#include <search.h>
+#include <dirent.h>
+#include <pthread.h>
+#include <libvirt/libvirt.h>
+
+#include "util.h"
+#include "metric.h"
+#include "virtio.h"
+
+
+#define DEFAULT_VU_BUFFER_SIZE 1024
+#define VIRTIO_PREFIX_LEN 21UL
+#define VIRTIO_NAME_BUFLEN (VIRTIO_PREFIX_LEN + VIR_UUID_STRING_BUFLEN)
+
+
+typedef struct {
+    int fd;
+    char uuid[VIR_UUID_STRING_BUFLEN];
+    vu_buffer *request;
+    vu_buffer *response;
+} vio_channel;
+
+typedef struct {
+    char uuid[VIR_UUID_STRING_BUFLEN];
+    time_t last_update;
+    vu_buffer *xml;
+} vio_mbuffer;
+
+typedef enum {
+    REQ_INCOMPLETE,
+    REQ_INVALID,
+    REQ_GET_XML
+} REQUEST_T;
+
+
+static vu_buffer *mbuffer_host = NULL;
+static volatile void *mbuffer_root = NULL;
+static int mbuffer_max_num = 0;
+static volatile int mbuffer_count = 0;
+#ifdef ENABLE_DEBUG
+static int mbuffer_idx = 0;
+#endif
+static time_t mbuffer_exp_period = 0;
+static time_t mbuffer_exp_ts = 0;
+
+static void *channel_root = NULL;
+static char *channel_path = NULL;
+static const char *channel_prefix = "org.github.vhostmd.1.";
+static int channel_max_num = 0;
+static int channel_count = 0;
+
+static int epoll_fd = -1;
+static struct epoll_event *epoll_events = NULL;
+static const size_t max_virtio_path_len =
+    sizeof(((struct sockaddr_un *) 0)->sun_path) - 1;
+static pthread_mutex_t mbuffer_mtx;
+
+static enum {
+    VIRTIO_INIT,
+    VIRTIO_ACTIVE,
+    VIRTIO_STOP,
+    VIRTIO_ERROR
+} virtio_status = VIRTIO_INIT;
+
+
+/*
+ * static functions
+ */
+
+static int vio_channel_compare(const void *a, const void *b);
+static void vio_channel_delete(const void *node, VISIT which, int depth);
+
+static vio_channel *vio_channel_find(const char *uuid);
+static vio_channel *vio_channel_add(const char *uuid);
+static int vio_channel_open(vio_channel * c);
+static void vio_channel_close(vio_channel * c);
+static int vio_channel_update(vio_channel * c);
+static int vio_channel_readdir(const char *path);
+static void vio_channel_recv(vio_channel * c);
+static void vio_channel_send(vio_channel * c, uint32_t ep_event);
+
+static int vio_mbuffer_compare(const void *a, const void *b);
+static void vio_mbuffer_delete(const void *node, VISIT which, int depth);
+static void vio_mbuffer_expire(const void *node, VISIT which, int depth);
+static void **vio_mbuffer_get_root(void);
+#ifdef ENABLE_DEBUG
+static void vio_mbuffer_print(const void *node, VISIT which, int depth);
+#endif
+
+static vio_mbuffer *vio_mbuffer_find(const char *uuid);
+static vio_mbuffer *vio_mbuffer_add(const char *uuid);
+static REQUEST_T vio_check_request(vio_channel * c);
+
+static void vio_handle_io(unsigned epoll_wait_ms);
+
+int virtio_cleanup(void);
+/*
+ * update response buffer of a channel
+ */
+static int vio_channel_update(vio_channel * c)
+{
+    static const char *metrics_start_str = "<metrics>\n";
+    static const char *metrics_end_str = "</metrics>\n\n";
+
+    int rc = 0;
+    vio_mbuffer *b = NULL;
+
+    if (c == NULL)
+        return -1;
+
+    vu_buffer_erase(c->response);
+    vu_buffer_add(c->response, metrics_start_str, -1);
+
+    pthread_mutex_lock(&mbuffer_mtx);
+
+    if (mbuffer_host->content && mbuffer_host->use)
+        vu_buffer_add(c->response, mbuffer_host->content, -1);
+    else
+        vu_buffer_add(c->response, "host metrics not available", -1);
+
+    b = vio_mbuffer_find(c->uuid);
+    if (b && b->xml->use)
+        vu_buffer_add(c->response, b->xml->content, -1);
+    else
+        rc = -1;
+
+    pthread_mutex_unlock(&mbuffer_mtx);
+
+    vu_buffer_add(c->response, metrics_end_str, -1);
+
+#ifdef ENABLE_DEBUG
+    vu_log(VHOSTMD_DEBUG, "New response for %s (%u)\n>>>%s<<<\n",
+           c->uuid, c->response->use, c->response->content);
+#endif
+    return rc;
+}
+
+/*
+ * close channel and free allocated buffers
+ */
+static void vio_channel_close(vio_channel * c)
+{
+    if (c != NULL) {
+        channel_count--;
+
+        if (c->fd >= 0) {
+            vu_log(VHOSTMD_INFO, "Closed channel '%s%s%s' (%d/%d)",
+                   channel_path, channel_prefix, c->uuid, channel_count,
+                   channel_max_num);
+            close(c->fd);
+        }
+
+        if (c->request)
+            vu_buffer_delete(c->request);
+        if (c->response)
+            vu_buffer_delete(c->response);
+
+        tdelete((const void *) c, &channel_root, vio_channel_compare);
+        free(c);
+    }
+}
+
+/*
+ * connect channel and add the socket to the epoll desriptor
+ */
+static int vio_channel_open(vio_channel * c)
+{
+    struct sockaddr_un address;
+    const size_t max_path_len =
+        sizeof(((struct sockaddr_un *) 0)->sun_path) - 1;
+    struct epoll_event evt;
+    int len = (int) (strlen(channel_path) + VIRTIO_PREFIX_LEN + strlen(c->uuid));
+    int flags;
+
+    bzero(&address, sizeof(address));
+    address.sun_family = AF_LOCAL;
+
+    if (len >= (int) max_path_len) {
+        vu_log(VHOSTMD_ERR, "Channel '%s%s%s' - name too long (%d/%lu)",
+               channel_path, channel_prefix, c->uuid, len, max_path_len);
+        return -1;
+    }
+
+    len = snprintf(address.sun_path, max_path_len, "%s%s%s", channel_path,
+                   channel_prefix, c->uuid);
+
+    if (len >= (int) max_path_len || len <= 0) {
+        vu_log(VHOSTMD_ERR, "Channel '%s%s%s' - name is invalid (%lu)",
+               channel_path, channel_prefix, c->uuid, max_path_len);
+        return -1;
+    }
+
+    if ((c->fd = socket(AF_LOCAL, SOCK_STREAM, 0)) < 0) {
+        vu_log(VHOSTMD_ERR, "Channel '%s' - socket() failed (%s)",
+               address.sun_path, strerror(errno));
+        return -1;
+    }
+
+    flags = fcntl(c->fd, F_GETFL, 0);
+    if (flags < 0) {
+        vu_log(VHOSTMD_ERR, "Channel '%s' - fcntl() failed (%s)",
+               address.sun_path, strerror(errno));
+        return -1;
+    }
+
+    flags |= flags | O_NONBLOCK;
+    if (fcntl(c->fd, F_SETFL, flags) == -1) {
+        vu_log(VHOSTMD_ERR, "Channel '%s' - fcntl() failed (%s)",
+               address.sun_path, strerror(errno));
+        return -1;
+    }
+
+    if (connect(c->fd, (struct sockaddr *) &address,
+                (socklen_t) sizeof(address)) < 0) {
+        vu_log(VHOSTMD_ERR, "Channel '%s' - connect() failed (%s)",
+               address.sun_path, strerror(errno));
+        return -1;
+    }
+
+    evt.data.ptr = c;
+    evt.events = EPOLLIN;
+
+    if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, c->fd, &evt) == -1) {
+        vu_log(VHOSTMD_ERR, "Could not add channel '%s' - epoll_ctl() failed (%s)",
+               address.sun_path, strerror(errno));
+        return -1;
+    }
+
+    vu_log(VHOSTMD_INFO, "Opened channel '%s' (%d/%d)",
+           address.sun_path, channel_count, channel_max_num);
+
+    return 0;
+}
+
+/*
+ * lookup UDS sockets in the directory
+ * for valid type/name/mbuffer connect and register channel
+ */
+static int vio_channel_readdir(const char *path)
+{
+    struct dirent *ent;
+    DIR *dir = NULL;
+
+    if ((dir = opendir(path)) == NULL) {
+        vu_log(VHOSTMD_ERR, "opendir(%s) failed (%s)", path, strerror(errno));
+        return -1;
+    }
+
+    while ((ent = readdir(dir)) != NULL) {
+
+        if (ent->d_type == DT_SOCK &&
+            strncmp(ent->d_name, channel_prefix, VIRTIO_PREFIX_LEN) == 0 &&
+            strnlen(ent->d_name, VIRTIO_NAME_BUFLEN) ==
+                (VIRTIO_NAME_BUFLEN - 1)) {
+
+            const char *uuid = &ent->d_name[VIRTIO_PREFIX_LEN];
+
+            vio_channel *c = vio_channel_find(uuid);
+
+            if (c == NULL) {
+                if (channel_count >= channel_max_num) {
+                    closedir(dir);
+                    vu_log(VHOSTMD_ERR, "Could not add channel '%s%s%s'"
+                           " - too many VMs (%d/%d)",
+                           path, channel_prefix, uuid, channel_count,
+                           channel_max_num);
+                    return -1;
+                }
+
+                pthread_mutex_lock(&mbuffer_mtx);
+
+                /* don't add the channel if there is no mbuffer for this VM */
+                if (vio_mbuffer_find(uuid) != NULL) {
+#ifdef ENABLE_DEBUG
+                    vu_log(VHOSTMD_DEBUG, "New channel %s%s\n", path, ent->d_name);
+#endif
+                    c = vio_channel_add(uuid);
+
+                    if (c == NULL)
+                        vu_log(VHOSTMD_ERR, "Could not add channel '%s%s'", path,
+                               ent->d_name);
+                }
+
+                pthread_mutex_unlock(&mbuffer_mtx);
+            }
+        }
+    }
+    closedir(dir);
+
+    return 0;
+}
+
+/*
+ * channel - btree - compare function
+ */
+static int vio_channel_compare(const void *a, const void *b)
+{
+    if (a == NULL || b == NULL)
+        return 1;
+
+    return strncmp(((const vio_channel *) a)->uuid, ((const vio_channel *) b)->uuid,
+                   (size_t) VIR_UUID_STRING_BUFLEN);
+}
+
+/*
+ * channel - btree/twalk - action function
+ * delete entries
+ */
+static void vio_channel_delete(const void *node, VISIT which, int depth ATTRIBUTE_UNUSED)
+{
+    if (which == endorder || which == leaf) {
+        if (node) {
+            struct epoll_event evt;
+            vio_channel *c = *(vio_channel * const *) node;
+            if (c) {
+                epoll_ctl(epoll_fd, EPOLL_CTL_DEL, c->fd, &evt);
+                vio_channel_close(c);
+            }
+        }
+    }
+}
+
+/*
+ * mbuffer - btree - compare function
+ */
+static int vio_mbuffer_compare(const void *a, const void *b)
+{
+    if (a == NULL || b == NULL)
+        return 1;
+
+    return strncmp(((const vio_mbuffer *) a)->uuid, ((const vio_mbuffer *) b)->uuid,
+                   (size_t) VIR_UUID_STRING_BUFLEN);
+}
+
+/*
+ * mbuffer - btree/twalk - action function
+ * delete entries
+ */
+static void vio_mbuffer_delete(const void *node, VISIT which, int depth ATTRIBUTE_UNUSED)
+{
+    if (which == endorder || which == leaf) {
+        if (node) {
+            vio_mbuffer *b = *(vio_mbuffer * const *) node;
+
+            if (b) {
+                if (b->xml)
+                    vu_buffer_delete(b->xml);
+                tdelete((const void *) b, vio_mbuffer_get_root(),
+                        vio_mbuffer_compare);
+                free(b);
+                mbuffer_count--;
+            }
+        }
+    }
+}
+
+/*
+ * mbuffer - btree/twalk - action function
+ * expire entries
+ */
+static void vio_mbuffer_expire(const void *node, VISIT which, int depth ATTRIBUTE_UNUSED)
+{
+    if (which == endorder || which == leaf) {
+        if (node) {
+            vio_mbuffer *b = *(vio_mbuffer * const *) node;
+
+            /*  remove expired mbuffer
+             *  a mbuffer expires when the last update is older
+             *  than the expiration_period
+             *
+             *  action function does not support custom arguments
+             *  --> use a static variable: exp_ts
+             */
+            if (b && b->last_update < mbuffer_exp_ts) {
+                vio_channel *c = vio_channel_find(b->uuid);
+
+                if (c != NULL)
+                    vio_channel_close(c);
+
+#ifdef ENABLE_DEBUG
+                vu_log(VHOSTMD_DEBUG, "Expire mbuffer '%s' (%d/%d)",
+                       b->uuid, mbuffer_count, mbuffer_max_num);
+#endif
+                if (b->xml)
+                    vu_buffer_delete(b->xml);
+                tdelete((const void *) b, vio_mbuffer_get_root(),
+                        vio_mbuffer_compare);
+                free(b);
+                mbuffer_count--;
+            }
+        }
+    }
+}
+
+/* gcc's -Wall -Werror require a cast from (volatile void *) to (void *)
+ * but this discards the volatile. The function attributes * 'otimize_O0'
+ * and 'noinline' should avoid any optimization for this access.
+ */
+ATTRIBUTE_OPTIMIZE_O0
+ATTRIBUTE_NOINLINE
+static void **vio_mbuffer_get_root(void)
+{
+    return (void *) &mbuffer_root;
+}
+
+#ifdef ENABLE_DEBUG
+static void vio_mbuffer_print(const void *node,
+                              VISIT which,
+                              int depth ATTRIBUTE_UNUSED)
+{
+    if (which == endorder || which == leaf) {
+        if (node) {
+            vio_mbuffer *b = *(vio_mbuffer **) node;
+
+            if (b) {
+                vu_log(VHOSTMD_DEBUG, "\t%4d %s %lu\n",
+                       ++mbuffer_idx, b->uuid, b->last_update);
+            }
+        }
+    }
+}
+#endif
+
+/*
+ * lookup metrics buffer in internal btree
+ */
+static vio_mbuffer *vio_mbuffer_find(const char *uuid)
+{
+    vio_mbuffer b;
+    void *p;
+
+    strncpy(b.uuid, uuid, sizeof(b.uuid));
+
+    p = tfind((const void *) &b, vio_mbuffer_get_root(), vio_mbuffer_compare);
+    if (p == NULL)
+        return NULL;
+
+    return *(vio_mbuffer **) p;
+}
+
+/*
+ * add metrics buffer to internal btree
+ */
+static vio_mbuffer *vio_mbuffer_add(const char *uuid)
+{
+    vio_mbuffer *b = NULL;
+    void *p = NULL;
+
+    if (mbuffer_count >= mbuffer_max_num) {
+        vu_log(VHOSTMD_ERR,
+               "Could not add metrics buffer '%s' - too many VMs (%d/%d)",
+               uuid, mbuffer_count, mbuffer_max_num);
+
+#ifdef ENABLE_DEBUG
+        mbuffer_idx = 0;
+        vu_log(VHOSTMD_DEBUG, "exp_ts %lu, allocated mbuffer:\n", mbuffer_exp_ts);
+        twalk((const void *) *vio_mbuffer_get_root(), vio_mbuffer_print);
+#endif
+
+        return NULL;
+    }
+
+    b = (vio_mbuffer *) calloc(1UL, sizeof(vio_mbuffer));
+    if (b == NULL)
+        goto error;
+
+    strncpy(b->uuid, uuid, sizeof(b->uuid));
+    b->xml = NULL;
+
+    if (vu_buffer_create(&b->xml, DEFAULT_VU_BUFFER_SIZE) != 0) {
+        free(b);
+        goto error;
+    }
+
+    p = tsearch((const void *) b, vio_mbuffer_get_root(), vio_mbuffer_compare);
+    if (p == NULL) {
+        vu_buffer_delete(b->xml);
+        free(b);
+        goto error;
+    }
+
+    mbuffer_count++;
+    return b;
+
+  error:
+    vu_log(VHOSTMD_ERR, "Could not add metrics buffer '%s' (%d/%d)",
+           uuid, mbuffer_count, mbuffer_max_num);
+
+    return NULL;
+}
+
+/*
+ * lookup virtio channel in internal btree
+ */
+static vio_channel *vio_channel_find(const char *uuid)
+{
+    vio_channel c;
+    void *p;
+
+    strncpy(c.uuid, uuid, sizeof(c.uuid));
+
+    p = tfind((const void *) &c, &channel_root, vio_channel_compare);
+    if (p == NULL)
+        return NULL;
+
+    return *(vio_channel **) p;
+}
+
+/*
+ * add virtio channel to internal btree
+ */
+static vio_channel *vio_channel_add(const char *uuid)
+{
+    vio_channel *c = NULL;
+    void *p = NULL;
+
+    c = (vio_channel *) calloc(1UL, sizeof(vio_channel));
+    if (c == NULL)
+        goto error;
+
+    channel_count++;
+
+    strncpy(c->uuid, uuid, sizeof(c->uuid));
+    c->fd = -1;
+    c->request = NULL;
+    c->response = NULL;
+
+    p = tsearch((const void *) c, &channel_root, vio_channel_compare);
+    if (p == NULL)
+        goto error;
+
+    if (vio_channel_open(c) != 0 ||
+        vu_buffer_create(&c->request, 512) != 0 ||
+        vu_buffer_create(&c->response, DEFAULT_VU_BUFFER_SIZE) != 0)
+        goto error;
+
+    return c;
+
+  error:
+    if (c)
+        vio_channel_close(c);
+
+    return NULL;
+}
+
+static REQUEST_T vio_check_request(vio_channel * c)
+{
+    const char xml_req_n[] = "GET /metrics/XML\n\n";
+    const char xml_req_rn[] = "GET /metrics/XML\r\n\r\n";
+
+    if (strcmp(c->request->content, xml_req_n) == 0 ||
+        strcmp(c->request->content, xml_req_rn) == 0) {
+        // valid request
+        vu_buffer_erase(c->request);
+        return REQ_GET_XML;
+    } else if (c->request->use >= (c->request->size - 1) ||
+               strstr(c->request->content, "\n\n") ||
+               strstr(c->request->content, "\r\n\r\n")) {
+        // invalid request -> reset buffer
+        vu_buffer_erase(c->request);
+
+        vu_buffer_erase(c->response);
+        vu_buffer_add(c->response, "INVALID REQUEST\n\n", -1);
+        return REQ_INVALID;
+    } else {
+        // fragment
+        c->request->use = (unsigned) strnlen(c->request->content,
+                                             (size_t) c->request->size);
+    }
+
+    return REQ_INCOMPLETE;
+}
+
+static void vio_channel_recv(vio_channel * c)
+{
+    struct epoll_event evt;
+    ssize_t rc = 0;
+    REQUEST_T req_type = REQ_INCOMPLETE;
+
+    do {
+        char *buf = &c->request->content[c->request->use];
+        size_t len = c->request->size - c->request->use - 1;
+
+        rc = recv(c->fd, buf, len, 0);
+
+        if (rc > 0) {
+            req_type = vio_check_request(c);
+        }
+    } while (rc > 0 && req_type == REQ_INCOMPLETE);
+
+    if (req_type == REQ_GET_XML) {
+        if (vio_channel_update(c)) {
+            // no metrics available -> close channel
+            epoll_ctl(epoll_fd, EPOLL_CTL_DEL, c->fd, &evt);
+            vio_channel_close(c);
+        } else
+            vio_channel_send(c, EPOLLIN);
+    } else if (req_type == REQ_INVALID)
+        vio_channel_send(c, EPOLLIN);
+}
+
+static void vio_channel_send(vio_channel * c, uint32_t ep_event)
+{
+    struct epoll_event evt;
+    int len;
+
+    while ((len = (int) (c->response->use - c->response->pos)) > 0)
+    {
+        char *buf = &c->response->content[c->response->pos];
+        ssize_t rc = send(c->fd, buf, (size_t) len, 0);
+
+        if (rc > 0)
+            c->response->pos += (unsigned) rc;
+        else
+            break;
+    }
+
+    if (ep_event == EPOLLOUT) {
+        if (c->response->use <= c->response->pos) {
+            // next request
+            evt.data.ptr = c;
+            evt.events = EPOLLIN;
+            epoll_ctl(epoll_fd, EPOLL_CTL_MOD, c->fd, &evt);
+        }
+    } else if (ep_event == EPOLLIN) {
+        if (c->response->use > c->response->pos) {
+            // incomplete response
+            evt.data.ptr = c;
+            evt.events = EPOLLOUT;
+            epoll_ctl(epoll_fd, EPOLL_CTL_MOD, c->fd, &evt);
+        }
+    }
+}
+
+static void vio_handle_io(unsigned epoll_wait_ms)
+{
+    int i = 0;
+    uint64_t ts_end, ts_now;
+    struct epoll_event evt;
+    struct timespec ts;
+
+    clock_gettime(CLOCK_MONOTONIC, &ts);
+    ts_now = (uint64_t) (ts.tv_sec * 1000 + ts.tv_nsec / 1000000);
+    ts_end = ts_now + epoll_wait_ms;
+
+    while (ts_now < ts_end) {
+        int wait_ms = (int) (ts_end - ts_now);
+        int n =
+            epoll_wait(epoll_fd, epoll_events, channel_max_num + 1, wait_ms);
+
+        for (i = 0; i < n; i++) {
+            vio_channel *c = (epoll_events + i)->data.ptr;
+
+            if ((epoll_events + i)->events & EPOLLHUP) {
+                epoll_ctl(epoll_fd, EPOLL_CTL_DEL, c->fd, &evt);
+                vio_channel_close(c);
+            } else if ((epoll_events + i)->events & EPOLLIN) {
+                vio_channel_recv(c);
+            } else if ((epoll_events + i)->events & EPOLLOUT) {
+                vio_channel_send(c, EPOLLOUT);
+            }
+        }
+
+        clock_gettime(CLOCK_MONOTONIC, &ts);
+        ts_now = (uint64_t) (ts.tv_sec * 1000 + ts.tv_nsec / 1000000);
+    }
+}
+
+/*
+ * Initialize virtio layer
+ */
+int virtio_init(const char *virtio_path,
+                int max_channel,
+                int expiration_period)
+{
+    if (virtio_status == VIRTIO_INIT) {
+        pthread_mutex_init(&mbuffer_mtx, NULL);
+
+        channel_max_num = max_channel;
+        mbuffer_max_num = max_channel;
+        mbuffer_exp_period = expiration_period;
+
+        if (mbuffer_host == NULL)
+            if (vu_buffer_create (&mbuffer_host, DEFAULT_VU_BUFFER_SIZE) != 0)
+                goto error;
+
+        if (epoll_fd == -1) {
+
+            epoll_events = calloc((size_t) (channel_max_num + 1),
+                                  sizeof(struct epoll_event));
+            if (epoll_events == NULL)
+                goto error;
+
+            epoll_fd = epoll_create(1);
+            if (epoll_fd == -1)
+                goto error;
+
+            if (virtio_path == NULL ||
+                (strnlen(virtio_path, max_virtio_path_len) +
+                 VIR_UUID_STRING_BUFLEN) > (max_virtio_path_len - 2)) {
+
+                vu_log(VHOSTMD_ERR, "Invalid virtio_path");
+                goto error;
+            }
+
+            if (channel_path == NULL) {
+                channel_path = calloc(1UL, max_virtio_path_len + 2);
+                if (channel_path == NULL)
+                    goto error;
+
+                strncpy(channel_path, virtio_path,
+                        max_virtio_path_len - VIR_UUID_STRING_BUFLEN);
+
+                if (channel_path[strlen(channel_path) - 1] != '/')
+                    channel_path[strlen(channel_path)] = '/';
+            }
+        }
+
+        virtio_status = VIRTIO_ACTIVE;
+        vu_log(VHOSTMD_INFO,
+               "Virtio using path '%s', max_channels %d, expiration_time %ld",
+               channel_path, channel_max_num, mbuffer_exp_period);
+    }
+
+    return 0;
+
+  error:
+    vu_log(VHOSTMD_ERR, "Virtio initialization failed");
+    virtio_status = VIRTIO_ERROR;
+
+    return -1;
+}
+
+/*
+ * Cleanup virtio layer
+ */
+int virtio_cleanup(void)
+{
+    if (virtio_status == VIRTIO_STOP) {
+
+        if (epoll_fd != -1) {
+            close(epoll_fd);
+            epoll_fd = -1;
+        }
+
+        if (channel_root) {
+            twalk(channel_root, vio_channel_delete);
+            tdestroy(channel_root, free);
+            channel_root = NULL;
+            channel_count = 0;
+        }
+
+        if (channel_path) {
+            free(channel_path);
+            channel_path = NULL;
+        }
+
+        if (*vio_mbuffer_get_root()) {
+            twalk((const void *) *vio_mbuffer_get_root(), vio_mbuffer_delete);
+            tdestroy((void *) *vio_mbuffer_get_root(), free);
+            mbuffer_root = NULL;
+            mbuffer_count = 0;
+        }
+
+        if (mbuffer_host) {
+            vu_buffer_delete((vu_buffer *) mbuffer_host);
+            mbuffer_host = NULL;
+        }
+
+        if (epoll_events)
+            free(epoll_events);
+
+        pthread_mutex_destroy(&mbuffer_mtx);
+
+        virtio_status = VIRTIO_INIT;
+
+        return 0;
+    }
+    return -1;
+}
+
+/*
+ * Main virtio function
+ * 'start_routine' of pthread_create()
+ */
+void *virtio_run(void *arg ATTRIBUTE_UNUSED)
+{
+    if (virtio_status != VIRTIO_ACTIVE) {
+        vu_log(VHOSTMD_ERR, "Virtio was not initialized");
+        return NULL;
+    }
+
+    while (virtio_status == VIRTIO_ACTIVE) {
+        if (channel_count < channel_max_num)
+            vio_channel_readdir(channel_path);
+
+        vio_handle_io(3000);    // process avaible requests
+
+        // remove expired metrics buffers
+        mbuffer_exp_ts = time(NULL) - mbuffer_exp_period;
+
+        pthread_mutex_lock(&mbuffer_mtx);
+
+        if (*vio_mbuffer_get_root())
+            twalk((const void *) *vio_mbuffer_get_root(), vio_mbuffer_expire);
+
+        pthread_mutex_unlock(&mbuffer_mtx);
+    }
+
+    virtio_cleanup();
+
+    return NULL;
+}
+
+/*
+ * Update the metrics response buffer of a VM/host
+ */
+int virtio_metrics_update(const char *buf,
+                          int len,
+                          const char *uuid,
+                          metric_context ctx)
+{
+    int rc = -1;
+    vio_mbuffer *b;
+
+    if (buf == NULL || len == 0 || virtio_status != VIRTIO_ACTIVE ||
+        (ctx == METRIC_CONTEXT_VM && uuid == NULL))
+        return -1;
+
+    pthread_mutex_lock(&mbuffer_mtx);
+
+    if (ctx == METRIC_CONTEXT_HOST) {
+        vu_buffer_erase(mbuffer_host);
+        vu_buffer_add(mbuffer_host, buf, len);
+#ifdef ENABLE_DEBUG
+        vu_log(VHOSTMD_DEBUG, "New content for HOST (%u)\n>>>%s<<<\n",
+               len, mbuffer_host->content);
+#endif
+        rc = 0;
+    }
+    else if (ctx == METRIC_CONTEXT_VM) {
+        if ((b = vio_mbuffer_find(uuid)) == NULL) {
+            // for a new VM create a new mbuffer
+            b = vio_mbuffer_add(uuid);
+        }
+
+        if (b != NULL) {
+            vu_buffer_erase(b->xml);
+            vu_buffer_add(b->xml, buf, len);
+            // update the timestamp that mbuffer can be expired
+            b->last_update = time(NULL);
+#ifdef ENABLE_DEBUG
+            vu_log(VHOSTMD_DEBUG, "New content for %s (%u)\n>>>%s<<<\n",
+                   uuid, len, b->xml->content);
+#endif
+            rc = 0;
+        }
+    }
+
+    pthread_mutex_unlock(&mbuffer_mtx);
+
+    return rc;
+}
+
+/*
+ * Stop virtio thread
+ */
+void virtio_stop(void)
+{
+    if (virtio_status == VIRTIO_ACTIVE)
+        virtio_status = VIRTIO_STOP;
+}
-- 
2.17.1 (Apple Git-112)




More information about the virt-tools-list mailing list