[virt-tools-list] [vhostmd v2 2/5] Add virtio functions

Michael Trapp Michael.Trapp at sap.com
Fri Oct 5 14:34:04 UTC 2018


At the vhostmd side virtio channels are Unix domain sockets from QEMU
which are created during a VM start and removed when the VM is stopped.

Basically this implementation
- monitors a directory for new virtio channels (channel names contain VM UUID)
- for valid UUIDs, also known by libvirtd, it connects to the UDS
- buffers VM/HOST metrics and handles request/response on the sockets

It provides the functions
virtio_init()
    init function of virtio layer
virtio_run()
    the start_routine for the virtio-thread to handle the virtio based
    communication
virtio_metrics_update()
    called for each VM/HOST metrics update to update the virtio internal
    metrics buffer.
virtio_stop()
    stop the virtio-thread

---
  Here is a brief description of the concept of vhostmd / virtio interaction

 Vhostmd calls the virtio API functions

  *** virtio API ***

    --> virtio_init()
        Initialize the virtio layer.
        Called once before the virtio thread starts.

    --> virtio_run()
        Start routine of the virtio thread.

    --> virtio_stop()
        Set virtio_status to stop the virtio thread.

    --> virtio_metrics_update()
        Add/update the metric buffer of a VM/host.
        It must be called for every change of VM/host metrics.

  *** virtio internal ***

  virtio internal code runs in the virtio thread - see virtio_run().

  Access to mbuffers within the virtio thread is
      - read the mbuffer content
          see vio_channel_update() -> vio_mbuffer_find()
      - check if a VM is 'known' by vhostmd
          see vio_channel_readdir() -> vio_mbuffer_find()
      - expire mbuffers
          see virtio_run() -> vio_mbuffer_expire()
          Expiration timeout is >= (3 * 'vhostmd update_period')

  The mbuffer (metrics buffer) structs of VMs and host are maintained in
  a btree (mbuffer.root).
  Every mbuffer access is exclusive - see mbuffer_mutex.

  *** tests ***

  So far I've tested vhostmd with virtio support in a setup with 100 alpine Vms,
  each VM continiously polling the metrics every 5sec, for several hours.
  To have a more dynamic test environment all VMs were stopped/started
  several times.

  Beside the dependencies to the vu_buffer functions, virtio code does not call
  any libvirt functions and can also be run/tested without vhostmd and libvirt.
  I've also checked this variant in combination with valgrind.
  But at the moment the required test code and the UDS server program is not
  part of this patch.

 include/virtio.h |  50 +++
 vhostmd/virtio.c | 885 +++++++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 935 insertions(+)
 create mode 100644 include/virtio.h
 create mode 100644 vhostmd/virtio.c

diff --git a/include/virtio.h b/include/virtio.h
new file mode 100644
index 0000000..90b3d81
--- /dev/null
+++ b/include/virtio.h
@@ -0,0 +1,50 @@
+/*
+ * Copyright (C) 2018 SAP SE
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307  USA
+ *
+ * Author: Michael Trapp <michael.trapp at sap.com>
+ */
+
+#ifndef __VIRTIO_H__
+#define __VIRTIO_H__
+
+/*
+ * Initialize virtio layer
+ */
+int virtio_init(const char *virtio_path,
+                unsigned max_channel,
+                unsigned expiration_period);
+
+/*
+ * Main virtio function
+ * 'start_routine' of pthread_create()
+ */
+void *virtio_run(void *arg);
+
+/*
+ * Update the metrics response buffer of a VM/host
+ */
+int virtio_metrics_update(const char *buf,
+                          unsigned int len,
+                          const char *uuid,
+                          metric_context ctx);
+
+/*
+ * Stop virtio thread
+ */
+void virtio_stop(void);
+
+#endif                          /* __VIRTIO_H__ */
diff --git a/vhostmd/virtio.c b/vhostmd/virtio.c
new file mode 100644
index 0000000..a813c55
--- /dev/null
+++ b/vhostmd/virtio.c
@@ -0,0 +1,885 @@
+/*
+ * Copyright (C) 2018 SAP SE
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307  USA
+ *
+ * Author: Michael Trapp <michael.trapp at sap.com>
+ */
+
+#include <config.h>
+
+#include <stdlib.h>
+#include <stdio.h>
+#include <stdarg.h>
+#include <string.h>
+#include <stdint.h>
+#include <stddef.h>
+#include <errno.h>
+#include <getopt.h>
+#include <sys/types.h>
+#include <sys/wait.h>
+#include <strings.h>
+#include <sys/un.h>
+#include <sys/socket.h>
+#include <sys/epoll.h>
+#include <arpa/inet.h>
+#include <netinet/in.h>
+#include <netinet/tcp.h>
+#include <unistd.h>
+#include <fcntl.h>
+#include <search.h>
+#include <dirent.h>
+#include <time.h>
+#include <pthread.h>
+#include <libvirt/libvirt.h>
+
+#include "metric.h"
+#include "util.h"
+
+#define DEFAULT_VU_BUFFER_SIZE 1024
+#define MAX_REQUEST_LEN 512
+
+#define VIRTIO_PREFIX_LEN 21
+#define VIRTIO_NAME_BUFLEN (VIRTIO_PREFIX_LEN + VIR_UUID_STRING_BUFLEN)
+
+
+typedef struct {
+    int fd;
+    char uuid[VIR_UUID_STRING_BUFLEN];
+    vu_buffer *request, *response;
+} vio_channel;
+
+typedef struct {
+    char uuid[VIR_UUID_STRING_BUFLEN];
+    time_t last_update;
+    vu_buffer *xml;
+} vio_mbuffer;
+
+static struct {
+    volatile vu_buffer *host;
+    volatile void *root;
+    int max_num;
+    volatile int count;
+    int idx;
+    time_t exp_period, exp_ts;
+} mbuffer = {NULL, NULL, 0, 0, 0, 0, 0};
+
+static struct {
+    void *root;
+    char *path;
+    const char *prefix;
+    int max_num;
+    int count;
+} channel = {NULL, NULL, "org.github.vhostmd.1.", 0, 0};
+
+
+static int epoll_fd = -1;
+static struct epoll_event *epoll_events = NULL;
+static const unsigned max_virtio_path_len =
+    sizeof(((struct sockaddr_un *) 0)->sun_path) - 1;
+static pthread_mutex_t mbuffer_mtx;
+
+static enum {
+    VIRTIO_INIT,
+    VIRTIO_ACTIVE,
+    VIRTIO_STOP,
+    VIRTIO_ERROR
+} virtio_status = VIRTIO_INIT;
+
+
+/*
+ * static functions
+ */
+
+static int vio_channel_compare(const void *a, const void *b);
+static void vio_channel_delete(const void *node, VISIT which, int depth);
+
+static vio_channel *vio_channel_find(const char *uuid);
+static vio_channel *vio_channel_add(const char *uuid);
+static int vio_channel_open(vio_channel * c);
+static void vio_channel_close(vio_channel * c);
+static int vio_channel_update(vio_channel * c);
+static int vio_channel_readdir(const char *path);
+
+static int vio_mbuffer_compare(const void *a, const void *b);
+static void vio_mbuffer_delete(const void *node, VISIT which, int depth);
+static void vio_mbuffer_expire(const void *node, VISIT which, int depth);
+#ifdef ENABLE_DEBUG
+static void vio_mbuffer_print(const void *node, VISIT which, int depth);
+#endif
+
+static vio_mbuffer *vio_mbuffer_find(const char *uuid);
+static vio_mbuffer *vio_mbuffer_add(const char *uuid);
+
+static void vio_hanlde_io(unsigned epoll_wait_ms);
+
+/*
+ * update response buffer of a channel
+ */
+static int vio_channel_update(vio_channel * c)
+{
+    static const char *metrics_start_str = "<metrics>\n";
+    static const char *metrics_end_str = "</metrics>\n\n";
+
+    int rc = 0;
+    vio_mbuffer *b = NULL;
+
+    if (c == NULL)
+        return -1;
+
+    vu_buffer_erase(c->response);
+    vu_buffer_add(c->response, metrics_start_str, -1);
+
+    pthread_mutex_lock(&mbuffer_mtx);
+
+    if (mbuffer.host->content && mbuffer.host->use)
+        vu_buffer_add(c->response, mbuffer.host->content, -1);
+    else
+        vu_buffer_add(c->response, "host metrics not available", -1);
+
+    b = vio_mbuffer_find(c->uuid);
+    if (b && b->xml->use)
+        vu_buffer_add(c->response, b->xml->content, -1);
+    else
+        rc = -1;
+
+    pthread_mutex_unlock(&mbuffer_mtx);
+
+    vu_buffer_add(c->response, metrics_end_str, -1);
+
+#ifdef ENABLE_DEBUG
+    vu_log(VHOSTMD_DEBUG, "DEBUG: new response for %s (%u)\n>>>%s<<<\n",
+           c->uuid, c->response->use, c->response->content);
+#endif
+    return rc;
+}
+
+/*
+ * close channel and free allocated buffers
+ */
+static void vio_channel_close(vio_channel * c)
+{
+    if (c != NULL) {
+        if (c->fd >= 0) {
+            vu_log(VHOSTMD_INFO, "INFO: closed channel '%s%s' (%d/%d)",
+                   channel.prefix, c->uuid, channel.count, channel.max_num);
+            close(c->fd);
+        }
+
+        if (c->request)
+            vu_buffer_delete(c->request);
+        if (c->response)
+            vu_buffer_delete(c->response);
+
+        tdelete((const void *) c, &channel.root, vio_channel_compare);
+        free(c);
+        channel.count--;
+    }
+}
+
+/*
+ * connect channel and add the socket to the epoll desriptor
+ */
+static int vio_channel_open(vio_channel * c)
+{
+    struct sockaddr_un address;
+    const unsigned max_path_len =
+        sizeof(((struct sockaddr_un *) 0)->sun_path) - 1;
+    struct epoll_event evt;
+    unsigned len = strlen(channel.path) + VIRTIO_PREFIX_LEN + strlen(c->uuid);
+    int flags;
+
+    bzero(&address, sizeof(address));
+    address.sun_family = AF_LOCAL;
+
+    if (len >= max_path_len) {
+        vu_log(VHOSTMD_ERR, "ERROR: channel '%s%s%s' name too long (%u/%u)",
+               channel.path, channel.prefix, c->uuid, len, max_path_len);
+        return -1;
+    }
+
+    len =
+        snprintf(address.sun_path, max_path_len, "%s%s%s", channel.path,
+                 channel.prefix, c->uuid);
+
+    if (len >= max_path_len || len == 0) {
+        vu_log(VHOSTMD_ERR,
+               "ERROR: channel '%s%s%s' - name is too long (%u/%u)",
+               channel.path, channel.prefix, c->uuid, len, max_path_len);
+        return -1;
+    }
+
+    if ((c->fd = socket(AF_LOCAL, SOCK_STREAM, 0)) < 0) {
+        vu_log(VHOSTMD_ERR, "ERROR: channel '%s' - socket() failed (%s)",
+               address.sun_path, strerror(errno));
+        return -1;
+    }
+
+    flags = fcntl(c->fd, F_GETFL, 0);
+    if (flags < 0) {
+        vu_log(VHOSTMD_ERR, "ERROR: channel '%s' fcntl() failed (%s)",
+               address.sun_path, strerror(errno));
+        return -1;
+    }
+
+    flags |= flags | O_NONBLOCK;
+    if (fcntl(c->fd, F_SETFL, flags) == -1) {
+        vu_log(VHOSTMD_ERR, "ERROR: channel '%s' fcntl() failed (%s)",
+               address.sun_path, strerror(errno));
+        return -1;
+    }
+
+    if (connect(c->fd, (struct sockaddr *) &address, sizeof(address)) < 0) {
+        vu_log(VHOSTMD_ERR, "ERROR: channel '%s' - connect() failed (%s)",
+               address.sun_path, strerror(errno));
+        return -1;
+    }
+
+    evt.data.ptr = c;
+    evt.events = EPOLLIN;
+
+    if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, c->fd, &evt) == -1) {
+        vu_log(VHOSTMD_ERR,
+               "ERROR: could not add channel '%s' - epoll_ctl() failed (%s)",
+               address.sun_path, strerror(errno));
+        return -1;
+    }
+
+    vu_log(VHOSTMD_INFO, "INFO: opened channel '%s' (%d/%d)",
+           address.sun_path, channel.count, channel.max_num);
+
+    return 0;
+}
+
+/*
+ * lookup UDS sockets in the directory
+ * for valid type/name/mbuffer connect and register channel
+ */
+static int vio_channel_readdir(const char *path)
+{
+    struct dirent *ent;
+    DIR *dir = NULL;
+
+    if ((dir = opendir(path)) == NULL) {
+        vu_log(VHOSTMD_ERR, "ERROR: opendir(%s) failed (%s)", path,
+               strerror(errno));
+        return -1;
+    }
+
+    while ((ent = readdir(dir)) != NULL) {
+
+        if (ent->d_type == DT_SOCK &&
+            strncmp(ent->d_name, channel.prefix, VIRTIO_PREFIX_LEN) == 0 &&
+            strnlen(ent->d_name,
+                    VIRTIO_NAME_BUFLEN) == (VIRTIO_NAME_BUFLEN - 1)) {
+
+            const char *uuid = &ent->d_name[VIRTIO_PREFIX_LEN];
+
+            vio_channel *c = vio_channel_find(uuid);
+
+            if (c == NULL) {
+                if (channel.count >= channel.max_num) {
+                    closedir(dir);
+                    vu_log(VHOSTMD_ERR, "ERROR: could not add channel '%s%s%s'"
+                           " - too many VMs (%d/%d)",
+                           path, channel.prefix, uuid, channel.count,
+                           channel.max_num);
+                    return -1;
+                }
+
+                pthread_mutex_lock(&mbuffer_mtx);
+
+                /* don't add the channel if there is no mbuffer for this VM */
+                if (vio_mbuffer_find(uuid) != NULL) {
+#ifdef ENABLE_DEBUG
+                    vu_log(VHOSTMD_DEBUG, "DEBUG: new channel %s%s\n", path,
+                           ent->d_name);
+#endif
+                    c = vio_channel_add(uuid);
+
+                    if (c == NULL)
+                        vu_log(VHOSTMD_ERR,
+                               "ERROR: could not add channel '%s%s'", path,
+                               ent->d_name);
+                }
+
+                pthread_mutex_unlock(&mbuffer_mtx);
+            }
+        }
+    }
+    closedir(dir);
+
+    return 0;
+}
+
+/*
+ * channel - btree - compare function
+ */
+static int vio_channel_compare(const void *a, const void *b)
+{
+    if (a == NULL || b == NULL)
+        return 1;
+
+    return strncmp(((vio_channel *) a)->uuid, ((vio_channel *) b)->uuid,
+                   VIR_UUID_STRING_BUFLEN);
+}
+
+/*
+ * channel - btree/twalk - action function
+ * delete entries
+ */
+static void vio_channel_delete(const void *node, VISIT which, int depth)
+{
+    if (which == endorder || which == leaf) {
+        if (node) {
+            struct epoll_event evt;
+            vio_channel *c = *(vio_channel **) node;
+            if (c) {
+                epoll_ctl(epoll_fd, EPOLL_CTL_DEL, c->fd, &evt);
+                vio_channel_close(c);
+            }
+        }
+    }
+}
+
+/*
+ * mbuffer - btree - compare function
+ */
+static int vio_mbuffer_compare(const void *a, const void *b)
+{
+    if (a == NULL || b == NULL)
+        return 1;
+
+    return strncmp(((vio_mbuffer *) a)->uuid, ((vio_mbuffer *) b)->uuid,
+                   VIR_UUID_STRING_BUFLEN);
+}
+
+/*
+ * mbuffer - btree/twalk - action function
+ * delete entries
+ */
+static void vio_mbuffer_delete(const void *node, VISIT which, int depth)
+{
+    if (which == endorder || which == leaf) {
+        if (node) {
+            vio_mbuffer *b = *(vio_mbuffer **) node;
+
+            if (b) {
+                if (b->xml)
+                    vu_buffer_delete(b->xml);
+                tdelete((const void *) b, (void **) &mbuffer.root,
+                        vio_mbuffer_compare);
+                free(b);
+                mbuffer.count--;
+            }
+        }
+    }
+}
+
+/*
+ * mbuffer - btree/twalk - action function
+ * expire entries
+ */
+static void vio_mbuffer_expire(const void *node, VISIT which, int depth)
+{
+    if (which == endorder || which == leaf) {
+        if (node) {
+            vio_mbuffer *b = *(vio_mbuffer **) node;
+
+            /*  remove expired mbuffer
+             *  a mbuffer expires when the last update is older
+             *  than the expiration_period
+             *
+             *  action function does not support custom arguments
+             *  --> use a static variable: exp_ts
+             */
+            if (b && b->last_update < mbuffer.exp_ts) {
+                vio_channel *c = vio_channel_find(b->uuid);
+
+                if (c != NULL)
+                    vio_channel_close(c);
+
+#ifdef ENABLE_DEBUG
+                vu_log(VHOSTMD_DEBUG, "DEBUG: expire mbuffer '%s' (%d/%d)",
+                       b->uuid, mbuffer.count, mbuffer.max_num);
+#endif
+                if (b->xml)
+                    vu_buffer_delete(b->xml);
+                tdelete((const void *) b, (void **) &mbuffer.root,
+                        vio_mbuffer_compare);
+                free(b);
+                mbuffer.count--;
+            }
+        }
+    }
+}
+
+#ifdef ENABLE_DEBUG
+static void vio_mbuffer_print(const void *node, VISIT which, int depth)
+{
+    if (which == endorder || which == leaf) {
+        if (node) {
+            vio_mbuffer *b = *(vio_mbuffer **) node;
+
+            if (b) {
+                vu_log(VHOSTMD_DEBUG, "DEBUG: %4d %s %lu\n",
+                       ++mbuffer.idx, b->uuid, b->last_update);
+            }
+        }
+    }
+}
+#endif
+
+/*
+ * lookup metrics buffer in internal btree
+ */
+static vio_mbuffer *vio_mbuffer_find(const char *uuid)
+{
+    vio_mbuffer b;
+    void *p;
+
+    strncpy(b.uuid, uuid, sizeof(b.uuid));
+
+    p = tfind((const void *) &b, (void **) &mbuffer.root, vio_mbuffer_compare);
+    if (p == NULL)
+        return NULL;
+
+    return *(vio_mbuffer **) p;
+}
+
+/*
+ * add metrics buffer to internal btree
+ */
+static vio_mbuffer *vio_mbuffer_add(const char *uuid)
+{
+    vio_mbuffer *b = NULL;
+    void *p = NULL;
+
+    if (mbuffer.count >= mbuffer.max_num) {
+        vu_log(VHOSTMD_ERR,
+               "ERROR: could not add metrics buffer '%s' - too many VMs (%d/%d)",
+               uuid, mbuffer.count, mbuffer.max_num);
+
+#ifdef ENABLE_DEBUG
+        mbuffer.idx = 0;
+        vu_log(VHOSTMD_DEBUG, "DEBUG: exp_ts %lu, allocated mbuffer:\n",
+               mbuffer.exp_ts);
+        twalk(mbuffer.root, vio_mbuffer_print);
+#endif
+
+        return NULL;
+    }
+
+    b = (vio_mbuffer *) calloc(1, sizeof(vio_mbuffer));
+    if (b == NULL)
+        goto error;
+
+    strncpy(b->uuid, uuid, sizeof(b->uuid));
+    b->xml = NULL;
+
+    if (vu_buffer_create(&b->xml, DEFAULT_VU_BUFFER_SIZE) != 0) {
+        free(b);
+        goto error;
+    }
+
+    p = tsearch((const void *) b, (void **) &mbuffer.root, vio_mbuffer_compare);
+    if (p == NULL) {
+        vu_buffer_delete(b->xml);
+        free(b);
+        goto error;
+    }
+
+    mbuffer.count++;
+    return b;
+
+  error:
+
+    vu_log(VHOSTMD_ERR, "ERROR: vio_mbuffer_add(%s) failed (%d/%d)",
+           uuid, mbuffer.count, mbuffer.max_num);
+
+    return NULL;
+}
+
+/*
+ * lookup virtio channel in internal btree
+ */
+static vio_channel *vio_channel_find(const char *uuid)
+{
+    vio_channel c;
+    void *p;
+
+    strncpy(c.uuid, uuid, sizeof(c.uuid));
+
+    p = tfind((const void *) &c, &channel.root, vio_channel_compare);
+    if (p == NULL)
+        return NULL;
+
+    return *(vio_channel **) p;
+}
+
+/*
+ * add virtio channel to internal btree
+ */
+static vio_channel *vio_channel_add(const char *uuid)
+{
+    vio_channel *c = NULL;
+    void *p = NULL;
+
+    c = (vio_channel *) calloc(1, sizeof(vio_channel));
+    if (c == NULL)
+        goto error;
+
+    channel.count++;
+
+    strncpy(c->uuid, uuid, sizeof(c->uuid));
+    c->fd = -1;
+    c->request = NULL;
+    c->response = NULL;
+
+    p = tsearch((const void *) c, &channel.root, vio_channel_compare);
+    if (p == NULL)
+        goto error;
+
+    if (vio_channel_open(c) != 0 ||
+        vu_buffer_create(&c->request, 512) != 0 ||
+        vu_buffer_create(&c->response, DEFAULT_VU_BUFFER_SIZE) != 0)
+        goto error;
+
+    return c;
+
+  error:
+
+    vu_log(VHOSTMD_ERR, "ERROR: vio_channel_add(%s%s) failed", channel.prefix,
+           uuid);
+
+    if (c)
+        vio_channel_close(c);
+
+    return NULL;
+}
+
+static void vio_hanlde_io(unsigned epoll_wait_ms)
+{
+    int i = 0;
+    uint64_t ts_end, ts_now;
+    struct epoll_event evt;
+    struct timespec ts;
+
+    clock_gettime(CLOCK_MONOTONIC, &ts);
+    ts_now = ts.tv_sec * 1000 + ts.tv_nsec / 1000000;
+    ts_end = ts_now + epoll_wait_ms;
+
+    while (ts_now < ts_end) {
+        int wait_ms = (int) (ts_end - ts_now);
+        int n =
+            epoll_wait(epoll_fd, epoll_events, channel.max_num + 1, wait_ms);
+
+        for (i = 0; i < n; i++) {
+            vio_channel *c = (epoll_events + i)->data.ptr;
+
+            if ((epoll_events + i)->events & EPOLLHUP) {
+                epoll_ctl(epoll_fd, EPOLL_CTL_DEL, c->fd, &evt);
+                vio_channel_close(c);
+            } else if ((epoll_events + i)->events & EPOLLIN) {
+                unsigned send_response = 0;
+                int rc = 0;
+
+                do {
+                    char *buf = &c->request->content[c->request->use];
+                    int len = c->request->size - c->request->use - 1;
+
+                    rc = recv(c->fd, buf, len, 0);
+
+                    if (rc > 0) {
+                        const char xml_req_n[] = "GET /metrics/XML\n\n";
+                        const char xml_req_rn[] = "GET /metrics/XML\r\n\r\n";
+
+                        if (strncmp
+                            (c->request->content, xml_req_n,
+                             strlen(xml_req_n)) == 0
+                            || strncmp(c->request->content, xml_req_rn,
+                                       strlen(xml_req_rn)) == 0) {
+                            // valid request
+                            vu_buffer_erase(c->request);
+
+                            if (vio_channel_update(c)) {        // no metrics available -> close channel
+                                epoll_ctl(epoll_fd, EPOLL_CTL_DEL, c->fd, &evt);
+                                vio_channel_close(c);
+                            } else
+                                send_response = 1;
+                        } else if (c->request->use >= (c->request->size - 1) ||
+                                   strstr(c->request->content, "\n\n")) {
+                            // invalid request -> reset buffer
+                            vu_buffer_erase(c->request);
+
+                            vu_buffer_erase(c->response);
+                            vu_buffer_add(c->response, "INVALID REQUEST\n\n",
+                                          -1);
+                            send_response = 1;
+                        } else {
+                            // fragment
+                            c->request->use =
+                                strnlen(c->request->content, c->request->size);
+                        }
+                    }
+                } while (rc > 0 && send_response == 0);
+
+                if (send_response) {
+                    do {
+                        char *buf = &c->response->content[c->response->pos];
+                        int len = c->response->use - c->response->pos;
+
+                        rc = send(c->fd, buf, len, 0);
+                        if (rc > 0)
+                            c->response->pos += rc;
+                    } while ((c->response->pos < c->response->use) && (rc > 0));
+
+                    // incomplete response
+                    if (c->response->use > c->response->pos) {
+                        evt.data.ptr = c;
+                        evt.events = EPOLLOUT;
+                        epoll_ctl(epoll_fd, EPOLL_CTL_MOD, c->fd, &evt);
+                    }
+                }
+            } else if ((epoll_events + i)->events & EPOLLOUT) {
+                int rc = 0;
+                do {
+                    char *buf = &c->response->content[c->response->pos];
+                    int len = c->response->use - c->response->pos;
+
+                    rc = send(c->fd, buf, len, 0);
+                    if (rc > 0)
+                        c->response->pos += rc;
+                } while ((c->response->pos < c->response->use) && (rc > 0));
+
+                if (c->response->use <= c->response->pos) {
+                    evt.data.ptr = c;
+                    evt.events = EPOLLIN;
+                    epoll_ctl(epoll_fd, EPOLL_CTL_MOD, c->fd, &evt);
+                }
+            }
+        }
+
+        clock_gettime(CLOCK_MONOTONIC, &ts);
+        ts_now = ts.tv_sec * 1000 + ts.tv_nsec / 1000000;
+    }
+}
+
+/*
+ * Initialize virtio layer
+ */
+int virtio_init(const char *virtio_path,
+                unsigned max_channel,
+                unsigned expiration_period)
+{
+    if (virtio_status == VIRTIO_INIT) {
+        pthread_mutex_init(&mbuffer_mtx, NULL);
+
+        channel.max_num = max_channel;
+        mbuffer.max_num = max_channel;
+        mbuffer.exp_period = expiration_period;
+
+        if (mbuffer.host == NULL)
+            if (vu_buffer_create
+                ((vu_buffer **) & (mbuffer.host), DEFAULT_VU_BUFFER_SIZE) != 0)
+                goto error;
+
+        if (epoll_fd == -1) {
+
+            epoll_events =
+                calloc(channel.max_num + 1, sizeof(struct epoll_event));
+            if (epoll_events == NULL)
+                goto error;
+
+            epoll_fd = epoll_create(1);
+            if (epoll_fd == -1)
+                goto error;
+
+            if (virtio_path == NULL ||
+                (strnlen(virtio_path, max_virtio_path_len) +
+                 VIR_UUID_STRING_BUFLEN) > (max_virtio_path_len - 2)) {
+
+                vu_log(VHOSTMD_ERR, "ERROR: invalid virtio_path");
+                goto error;
+            }
+
+            if (channel.path == NULL) {
+                channel.path = calloc(1, max_virtio_path_len + 2);
+                if (channel.path == NULL)
+                    goto error;
+
+                strncpy(channel.path, virtio_path,
+                        max_virtio_path_len - VIR_UUID_STRING_BUFLEN);
+
+                if (channel.path[strlen(channel.path) - 1] != '/')
+                    channel.path[strlen(channel.path)] = '/';
+            }
+        }
+
+        virtio_status = VIRTIO_ACTIVE;
+    }
+
+    return 0;
+
+  error:
+    vu_log(VHOSTMD_ERR, "ERROR: virtio_init() initialization failed");
+    virtio_status = VIRTIO_ERROR;
+
+    return -1;
+}
+
+/*
+ * Cleanup virtio layer
+ */
+int virtio_cleanup(void)
+{
+    if (virtio_status == VIRTIO_STOP) {
+
+        if (epoll_fd != -1) {
+            close(epoll_fd);
+            epoll_fd = -1;
+        }
+
+        if (channel.root) {
+            twalk(channel.root, vio_channel_delete);
+            tdestroy(channel.root, free);
+            channel.root = NULL;
+            channel.count = 0;
+        }
+
+        if (channel.path) {
+            free(channel.path);
+            channel.path = NULL;
+        }
+
+        if (mbuffer.root) {
+            twalk((void *) mbuffer.root, vio_mbuffer_delete);
+            tdestroy((void *) mbuffer.root, free);
+            mbuffer.root = NULL;
+            mbuffer.count = 0;
+        }
+
+        if (mbuffer.host) {
+            vu_buffer_delete((vu_buffer *) mbuffer.host);
+            mbuffer.host = NULL;
+        }
+
+        if (epoll_events)
+            free(epoll_events);
+
+        pthread_mutex_destroy(&mbuffer_mtx);
+
+        virtio_status = VIRTIO_INIT;
+
+        return 0;
+    }
+    return -1;
+}
+
+/*
+ * Main virtio function
+ * 'start_routine' of pthread_create()
+ */
+void *virtio_run(void *arg)
+{
+    if (virtio_status != VIRTIO_ACTIVE) {
+        vu_log(VHOSTMD_ERR, "ERROR: virtio was not initialized");
+        return NULL;
+    }
+
+    while (virtio_status == VIRTIO_ACTIVE) {
+        if (channel.count < channel.max_num)
+            vio_channel_readdir(channel.path);
+
+        vio_hanlde_io(3000);    // process avaible requests
+
+        // remove expired metrics buffers
+        mbuffer.exp_ts = time(NULL) - mbuffer.exp_period;
+
+        pthread_mutex_lock(&mbuffer_mtx);
+
+        if (mbuffer.root)
+            twalk((void *) mbuffer.root, vio_mbuffer_expire);
+
+        pthread_mutex_unlock(&mbuffer_mtx);
+    }
+
+    virtio_cleanup();
+
+    return NULL;
+}
+
+/*
+ * Update the metrics response buffer of a VM/host
+ */
+int virtio_metrics_update(const char *buf,
+                          unsigned int len,
+                          const char *uuid,
+                          metric_context ctx)
+{
+    int rc = -1;
+    vio_mbuffer *b;
+
+    if (buf == NULL || len == 0 || virtio_status != VIRTIO_ACTIVE ||
+        (ctx == METRIC_CONTEXT_VM && uuid == NULL))
+        return -1;
+
+    pthread_mutex_lock(&mbuffer_mtx);
+
+    switch (ctx) {
+        case METRIC_CONTEXT_HOST:
+            vu_buffer_erase((vu_buffer *) mbuffer.host);
+            vu_buffer_add((vu_buffer *) mbuffer.host, buf, len);
+#ifdef ENABLE_DEBUG
+            vu_log(VHOSTMD_DEBUG,
+                   "DEBUG: new content for HOST (%u)\n>>>%s<<<\n",
+                   mbuffer.host->len, mbuffer.host->content);
+#endif
+            rc = 0;
+            break;
+        case METRIC_CONTEXT_VM:
+            if ((b = vio_mbuffer_find(uuid)) == NULL) {
+                // for a new VM create a new mbuffer
+                b = vio_mbuffer_add(uuid);
+            }
+
+            if (b != NULL) {
+                vu_buffer_erase(b->xml);
+                vu_buffer_add(b->xml, buf, len);
+                // update the timestamp that mbuffer can be expired
+                b->last_update = time(NULL);
+#ifdef ENABLE_DEBUG
+                vu_log(VHOSTMD_DEBUG,
+                       "DEBUG: new content for %s (%u)\n>>>%s<<<\n", uuid,
+                       b->xml.len, b->xml->content);
+#endif
+                rc = 0;
+            }
+            break;
+    }
+
+    pthread_mutex_unlock(&mbuffer_mtx);
+
+    return rc;
+}
+
+/*
+ * Stop virtio thread
+ */
+void virtio_stop(void)
+{
+    if (virtio_status == VIRTIO_ACTIVE)
+        virtio_status = VIRTIO_STOP;
+}
-- 
2.17.1 (Apple Git-112)




More information about the virt-tools-list mailing list