[virt-tools-list] [vhostmd virtio PATCH v3 4/6] Add virtio functions

Jim Fehlig jfehlig at suse.com
Wed Nov 14 23:34:39 UTC 2018


On 11/12/18 8:12 AM, Michael Trapp wrote:
> At the vhostmd side virtio channels are Unix domain sockets from QEMU
> which are created during a VM start and removed when the VM is stopped.
> 
> Basically this implementation
> - monitors a directory for new virtio channels (channel names contain VM UUID)
> - for valid UUIDs, also known by libvirtd, it connects to the UDS
> - buffers VM/HOST metrics and handles request/response on the sockets
> 
> It provides the functions
> virtio_init()
>      init function of virtio layer
> virtio_run()
>      the start_routine for the virtio-thread to handle the virtio based
>      communication
> virtio_metrics_update()
>      called for each VM/HOST metrics update to update the virtio internal
>      metrics buffer.
> virtio_stop()
>      stop the virtio-thread
> 
> ---
>    Here is a brief description of the concept of vhostmd / virtio interaction
> 
>   Vhostmd calls the virtio API functions
> 
>    *** virtio API ***
> 
>      --> virtio_init()
>          Initialize the virtio layer.
>          Called once before the virtio thread starts.
> 
>      --> virtio_run()
>          Start routine of the virtio thread.
> 
>      --> virtio_stop()
>          Set virtio_status to stop the virtio thread.
> 
>      --> virtio_metrics_update()
>          Add/update the metric buffer of a VM/host.
>          It must be called for every change of VM/host metrics.
> 
>    *** virtio internal ***
> 
>    virtio internal code runs in the virtio thread - see virtio_run().
> 
>    Access to mbuffers within the virtio thread is
>        - read the mbuffer content
>            see vio_channel_update() -> vio_mbuffer_find()
>        - check if a VM is 'known' by vhostmd
>            see vio_channel_readdir() -> vio_mbuffer_find()
>        - expire mbuffers
>            see virtio_run() -> vio_mbuffer_expire()
>            Expiration timeout is >= (3 * 'vhostmd update_period')
>            A VM expires when vhostmd did not receive a update for
>            'expiration_time' seconds.
> 
>    The mbuffer (metrics buffer) structs of VMs and host are maintained in
>    a btree (mbuffer.root).
>    Every mbuffer access is exclusive - see mbuffer_mutex.
> 
>    *** tests ***
> 
>    So far I've tested vhostmd with virtio support in a setup with 100 alpine Vms,
>    each VM continiously polling the metrics every 5sec, for several hours.
>    To have a more dynamic test environment all VMs were stopped/started
>    several times.
> 
>    Beside the dependencies to the vu_buffer functions, virtio code does not call
>    any libvirt functions and can also be run/tested without vhostmd and libvirt.
>    I've also checked this variant in combination with valgrind.
>    But at the moment the required test code and the UDS server program is not
>    part of this patch.
> 
>   include/util.h   |   4 +
>   include/virtio.h |  50 +++
>   vhostmd/virtio.c | 900 +++++++++++++++++++++++++++++++++++++++++++++++
>   3 files changed, 954 insertions(+)
>   create mode 100644 include/virtio.h
>   create mode 100644 vhostmd/virtio.c
> 
> diff --git a/include/util.h b/include/util.h
> index c0bd19a..17ff09c 100644
> --- a/include/util.h
> +++ b/include/util.h
> @@ -26,8 +26,12 @@
>   
>   #ifdef __GNUC__
>   #define ATTRIBUTE_UNUSED __attribute__((unused))
> +#define ATTRIBUTE_OPTIMIZE_O0 __attribute__((optimize("O0")))
> +#define ATTRIBUTE_NOINLINE __attribute__((noinline()))
>   #else
>   #define ATTRIBUTE_UNUSED
> +#define ATTRIBUTE_OPTIMIZE_O0
> +#define ATTRIBUTE_NOINLINE
>   #endif
>   
>   typedef enum {
> diff --git a/include/virtio.h b/include/virtio.h
> new file mode 100644
> index 0000000..b10dab5
> --- /dev/null
> +++ b/include/virtio.h
> @@ -0,0 +1,50 @@
> +/*
> + * Copyright (C) 2018 SAP SE
> + *
> + * This library is free software; you can redistribute it and/or
> + * modify it under the terms of the GNU Lesser General Public
> + * License as published by the Free Software Foundation; either
> + * version 2.1 of the License, or (at your option) any later version.
> + *
> + * This library is distributed in the hope that it will be useful,
> + * but WITHOUT ANY WARRANTY; without even the implied warranty of
> + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
> + * Lesser General Public License for more details.
> + *
> + * You should have received a copy of the GNU Lesser General Public
> + * License along with this library; if not, write to the Free Software
> + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307  USA
> + *
> + * Author: Michael Trapp <michael.trapp at sap.com>
> + */
> +
> +#ifndef __VIRTIO_H__
> +#define __VIRTIO_H__
> +
> +/*
> + * Initialize virtio layer
> + */
> +int virtio_init(const char *virtio_path,
> +                int max_channel,
> +                int expiration_period);
> +
> +/*
> + * Main virtio function
> + * 'start_routine' of pthread_create()
> + */
> +void *virtio_run(void *arg);
> +
> +/*
> + * Update the metrics response buffer of a VM/host
> + */
> +int virtio_metrics_update(const char *buf,
> +                          int len,
> +                          const char *uuid,
> +                          metric_context ctx);
> +
> +/*
> + * Stop virtio thread
> + */
> +void virtio_stop(void);
> +
> +#endif                          /* __VIRTIO_H__ */
> diff --git a/vhostmd/virtio.c b/vhostmd/virtio.c
> new file mode 100644
> index 0000000..f5e6306
> --- /dev/null
> +++ b/vhostmd/virtio.c
> @@ -0,0 +1,900 @@
> +/*
> + * Copyright (C) 2018 SAP SE
> + *
> + * This library is free software; you can redistribute it and/or
> + * modify it under the terms of the GNU Lesser General Public
> + * License as published by the Free Software Foundation; either
> + * version 2.1 of the License, or (at your option) any later version.
> + *
> + * This library is distributed in the hope that it will be useful,
> + * but WITHOUT ANY WARRANTY; without even the implied warranty of
> + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
> + * Lesser General Public License for more details.
> + *
> + * You should have received a copy of the GNU Lesser General Public
> + * License along with this library; if not, write to the Free Software
> + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307  USA
> + *
> + * Author: Michael Trapp <michael.trapp at sap.com>
> + */
> +
> +#include <config.h>
> +
> +#include <errno.h>
> +#include <sys/un.h>
> +#include <sys/socket.h>
> +#include <sys/epoll.h>
> +#include <unistd.h>
> +#include <fcntl.h>
> +#include <search.h>
> +#include <dirent.h>
> +#include <pthread.h>
> +#include <libvirt/libvirt.h>
> +
> +#include "util.h"
> +#include "metric.h"
> +#include "virtio.h"
> +
> +
> +#define DEFAULT_VU_BUFFER_SIZE 1024
> +#define VIRTIO_PREFIX_LEN 21UL
> +#define VIRTIO_NAME_BUFLEN (VIRTIO_PREFIX_LEN + VIR_UUID_STRING_BUFLEN)
> +
> +
> +typedef struct {
> +    int fd;
> +    char uuid[VIR_UUID_STRING_BUFLEN];
> +    vu_buffer *request;
> +    vu_buffer *response;
> +} vio_channel;
> +
> +typedef struct {
> +    char uuid[VIR_UUID_STRING_BUFLEN];
> +    time_t last_update;
> +    vu_buffer *xml;
> +} vio_mbuffer;
> +
> +typedef enum {
> +    REQ_INCOMPLETE,
> +    REQ_INVALID,
> +    REQ_GET_XML
> +} REQUEST_T;
> +
> +
> +static vu_buffer *mbuffer_host = NULL;
> +static volatile void *mbuffer_root = NULL;
> +static int mbuffer_max_num = 0;
> +static volatile int mbuffer_count = 0;
> +#ifdef ENABLE_DEBUG
> +static int mbuffer_idx = 0;
> +#endif
> +static time_t mbuffer_exp_period = 0;
> +static time_t mbuffer_exp_ts = 0;
> +
> +static void *channel_root = NULL;
> +static char *channel_path = NULL;
> +static const char *channel_prefix = "org.github.vhostmd.1.";
> +static int channel_max_num = 0;
> +static int channel_count = 0;
> +
> +static int epoll_fd = -1;
> +static struct epoll_event *epoll_events = NULL;
> +static const size_t max_virtio_path_len =
> +    sizeof(((struct sockaddr_un *) 0)->sun_path) - 1;
> +static pthread_mutex_t mbuffer_mtx;
> +
> +static enum {
> +    VIRTIO_INIT,
> +    VIRTIO_ACTIVE,
> +    VIRTIO_STOP,
> +    VIRTIO_ERROR
> +} virtio_status = VIRTIO_INIT;
> +
> +
> +/*
> + * static functions
> + */
> +
> +static int vio_channel_compare(const void *a, const void *b);
> +static void vio_channel_delete(const void *node, VISIT which, int depth);
> +
> +static vio_channel *vio_channel_find(const char *uuid);
> +static vio_channel *vio_channel_add(const char *uuid);
> +static int vio_channel_open(vio_channel * c);
> +static void vio_channel_close(vio_channel * c);
> +static int vio_channel_update(vio_channel * c);
> +static int vio_channel_readdir(const char *path);
> +static void vio_channel_recv(vio_channel * c);
> +static void vio_channel_send(vio_channel * c, uint32_t ep_event);
> +
> +static int vio_mbuffer_compare(const void *a, const void *b);
> +static void vio_mbuffer_delete(const void *node, VISIT which, int depth);
> +static void vio_mbuffer_expire(const void *node, VISIT which, int depth);
> +static void **vio_mbuffer_get_root(void);
> +#ifdef ENABLE_DEBUG
> +static void vio_mbuffer_print(const void *node, VISIT which, int depth);
> +#endif
> +
> +static vio_mbuffer *vio_mbuffer_find(const char *uuid);
> +static vio_mbuffer *vio_mbuffer_add(const char *uuid);
> +static REQUEST_T vio_check_request(vio_channel * c);
> +
> +static void vio_handle_io(unsigned epoll_wait_ms);
> +
> +int virtio_cleanup(void);
> +/*
> + * update response buffer of a channel
> + */
> +static int vio_channel_update(vio_channel * c)
> +{
> +    static const char *metrics_start_str = "<metrics>\n";
> +    static const char *metrics_end_str = "</metrics>\n\n";
> +
> +    int rc = 0;
> +    vio_mbuffer *b = NULL;
> +
> +    if (c == NULL)
> +        return -1;
> +
> +    vu_buffer_erase(c->response);
> +    vu_buffer_add(c->response, metrics_start_str, -1);
> +
> +    pthread_mutex_lock(&mbuffer_mtx);
> +
> +    if (mbuffer_host->content && mbuffer_host->use)
> +        vu_buffer_add(c->response, mbuffer_host->content, -1);
> +    else
> +        vu_buffer_add(c->response, "host metrics not available", -1);
> +
> +    b = vio_mbuffer_find(c->uuid);
> +    if (b && b->xml->use)
> +        vu_buffer_add(c->response, b->xml->content, -1);
> +    else
> +        rc = -1;
> +
> +    pthread_mutex_unlock(&mbuffer_mtx);
> +
> +    vu_buffer_add(c->response, metrics_end_str, -1);
> +
> +#ifdef ENABLE_DEBUG
> +    vu_log(VHOSTMD_DEBUG, "New response for %s (%u)\n>>>%s<<<\n",
> +           c->uuid, c->response->use, c->response->content);
> +#endif
> +    return rc;
> +}
> +
> +/*
> + * close channel and free allocated buffers
> + */
> +static void vio_channel_close(vio_channel * c)
> +{
> +    if (c != NULL) {
> +        channel_count--;
> +
> +        if (c->fd >= 0) {
> +            vu_log(VHOSTMD_INFO, "Closed channel '%s%s%s' (%d/%d)",
> +                   channel_path, channel_prefix, c->uuid, channel_count,
> +                   channel_max_num);
> +            close(c->fd);
> +        }
> +
> +        if (c->request)
> +            vu_buffer_delete(c->request);
> +        if (c->response)
> +            vu_buffer_delete(c->response);
> +
> +        tdelete((const void *) c, &channel_root, vio_channel_compare);
> +        free(c);
> +    }
> +}
> +
> +/*
> + * connect channel and add the socket to the epoll desriptor
> + */
> +static int vio_channel_open(vio_channel * c)
> +{
> +    struct sockaddr_un address;
> +    const size_t max_path_len =
> +        sizeof(((struct sockaddr_un *) 0)->sun_path) - 1;
> +    struct epoll_event evt;
> +    int len = (int) (strlen(channel_path) + VIRTIO_PREFIX_LEN + strlen(c->uuid));
> +    int flags;
> +
> +    bzero(&address, sizeof(address));
> +    address.sun_family = AF_LOCAL;
> +
> +    if (len >= (int) max_path_len) {
> +        vu_log(VHOSTMD_ERR, "Channel '%s%s%s' - name too long (%d/%lu)",
> +               channel_path, channel_prefix, c->uuid, len, max_path_len);
> +        return -1;
> +    }
> +
> +    len = snprintf(address.sun_path, max_path_len, "%s%s%s", channel_path,
> +                   channel_prefix, c->uuid);
> +
> +    if (len >= (int) max_path_len || len <= 0) {
> +        vu_log(VHOSTMD_ERR, "Channel '%s%s%s' - name is invalid (%lu)",
> +               channel_path, channel_prefix, c->uuid, max_path_len);
> +        return -1;
> +    }
> +
> +    if ((c->fd = socket(AF_LOCAL, SOCK_STREAM, 0)) < 0) {
> +        vu_log(VHOSTMD_ERR, "Channel '%s' - socket() failed (%s)",
> +               address.sun_path, strerror(errno));
> +        return -1;
> +    }
> +
> +    flags = fcntl(c->fd, F_GETFL, 0);
> +    if (flags < 0) {
> +        vu_log(VHOSTMD_ERR, "Channel '%s' - fcntl() failed (%s)",
> +               address.sun_path, strerror(errno));
> +        return -1;
> +    }
> +
> +    flags |= flags | O_NONBLOCK;
> +    if (fcntl(c->fd, F_SETFL, flags) == -1) {
> +        vu_log(VHOSTMD_ERR, "Channel '%s' - fcntl() failed (%s)",
> +               address.sun_path, strerror(errno));
> +        return -1;
> +    }
> +
> +    if (connect(c->fd, (struct sockaddr *) &address,
> +                (socklen_t) sizeof(address)) < 0) {
> +        vu_log(VHOSTMD_ERR, "Channel '%s' - connect() failed (%s)",
> +               address.sun_path, strerror(errno));
> +        return -1;
> +    }
> +
> +    evt.data.ptr = c;
> +    evt.events = EPOLLIN;
> +
> +    if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, c->fd, &evt) == -1) {
> +        vu_log(VHOSTMD_ERR, "Could not add channel '%s' - epoll_ctl() failed (%s)",
> +               address.sun_path, strerror(errno));
> +        return -1;
> +    }
> +
> +    vu_log(VHOSTMD_INFO, "Opened channel '%s' (%d/%d)",
> +           address.sun_path, channel_count, channel_max_num);
> +
> +    return 0;
> +}
> +
> +/*
> + * lookup UDS sockets in the directory
> + * for valid type/name/mbuffer connect and register channel
> + */

For a long time libvirt has been able to generate a socket path for unix 
channels. The standard path prefix is /var/lib/libvirt/qemu/channel/target. When 
a domain is started a subdir is created with name 'domain-<domid>-<domname>, 
where each unix socket is created based on name attribute of target element. So 
e.g. a domain with id '5' and name 'foobar' containing the following channel config

     <channel type='unix'>
       <source mode='bind'/>
       <target type='virtio' name='org.qemu.guest_agent.0'/>
     </channel>
     <channel type='unix'>
       <source mode='bind'/>
       <target type='virtio' name='org.github.vhostmd.1'/>
     </channel>

will result in

/var/lib/libvirt/qemu/channel/target/domain-5-foobar/org.qemu.guest_agent.0
/var/lib/libvirt/qemu/channel/target/domain-5-foobar/org.github.vhostmd.1

Within the VM you have

/dev/virtio-ports/org.qemu.guest_agent.0
/dev/virtio-ports/org.github.vhostmd.1

For consistency with other channels like the guest agent it would be nice to not 
require specifying the channel path in the source element. I realize the 
importance of uuid throughout this patch, but would it be possible to make this 
work using libvirt's naming scheme? Sorry for not noticing this earlier :-(.

Regards,
Jim

> +static int vio_channel_readdir(const char *path)
> +{
> +    struct dirent *ent;
> +    DIR *dir = NULL;
> +
> +    if ((dir = opendir(path)) == NULL) {
> +        vu_log(VHOSTMD_ERR, "opendir(%s) failed (%s)", path, strerror(errno));
> +        return -1;
> +    }
> +
> +    while ((ent = readdir(dir)) != NULL) {
> +
> +        if (ent->d_type == DT_SOCK &&
> +            strncmp(ent->d_name, channel_prefix, VIRTIO_PREFIX_LEN) == 0 &&
> +            strnlen(ent->d_name, VIRTIO_NAME_BUFLEN) ==
> +                (VIRTIO_NAME_BUFLEN - 1)) {
> +
> +            const char *uuid = &ent->d_name[VIRTIO_PREFIX_LEN];
> +
> +            vio_channel *c = vio_channel_find(uuid);
> +
> +            if (c == NULL) {
> +                if (channel_count >= channel_max_num) {
> +                    closedir(dir);
> +                    vu_log(VHOSTMD_ERR, "Could not add channel '%s%s%s'"
> +                           " - too many VMs (%d/%d)",
> +                           path, channel_prefix, uuid, channel_count,
> +                           channel_max_num);
> +                    return -1;
> +                }
> +
> +                pthread_mutex_lock(&mbuffer_mtx);
> +
> +                /* don't add the channel if there is no mbuffer for this VM */
> +                if (vio_mbuffer_find(uuid) != NULL) {
> +#ifdef ENABLE_DEBUG
> +                    vu_log(VHOSTMD_DEBUG, "New channel %s%s\n", path, ent->d_name);
> +#endif
> +                    c = vio_channel_add(uuid);
> +
> +                    if (c == NULL)
> +                        vu_log(VHOSTMD_ERR, "Could not add channel '%s%s'", path,
> +                               ent->d_name);
> +                }
> +
> +                pthread_mutex_unlock(&mbuffer_mtx);
> +            }
> +        }
> +    }
> +    closedir(dir);
> +
> +    return 0;
> +}
> +
> +/*
> + * channel - btree - compare function
> + */
> +static int vio_channel_compare(const void *a, const void *b)
> +{
> +    if (a == NULL || b == NULL)
> +        return 1;
> +
> +    return strncmp(((const vio_channel *) a)->uuid, ((const vio_channel *) b)->uuid,
> +                   (size_t) VIR_UUID_STRING_BUFLEN);
> +}
> +
> +/*
> + * channel - btree/twalk - action function
> + * delete entries
> + */
> +static void vio_channel_delete(const void *node, VISIT which, int depth ATTRIBUTE_UNUSED)
> +{
> +    if (which == endorder || which == leaf) {
> +        if (node) {
> +            struct epoll_event evt;
> +            vio_channel *c = *(vio_channel * const *) node;
> +            if (c) {
> +                epoll_ctl(epoll_fd, EPOLL_CTL_DEL, c->fd, &evt);
> +                vio_channel_close(c);
> +            }
> +        }
> +    }
> +}
> +
> +/*
> + * mbuffer - btree - compare function
> + */
> +static int vio_mbuffer_compare(const void *a, const void *b)
> +{
> +    if (a == NULL || b == NULL)
> +        return 1;
> +
> +    return strncmp(((const vio_mbuffer *) a)->uuid, ((const vio_mbuffer *) b)->uuid,
> +                   (size_t) VIR_UUID_STRING_BUFLEN);
> +}
> +
> +/*
> + * mbuffer - btree/twalk - action function
> + * delete entries
> + */
> +static void vio_mbuffer_delete(const void *node, VISIT which, int depth ATTRIBUTE_UNUSED)
> +{
> +    if (which == endorder || which == leaf) {
> +        if (node) {
> +            vio_mbuffer *b = *(vio_mbuffer * const *) node;
> +
> +            if (b) {
> +                if (b->xml)
> +                    vu_buffer_delete(b->xml);
> +                tdelete((const void *) b, vio_mbuffer_get_root(),
> +                        vio_mbuffer_compare);
> +                free(b);
> +                mbuffer_count--;
> +            }
> +        }
> +    }
> +}
> +
> +/*
> + * mbuffer - btree/twalk - action function
> + * expire entries
> + */
> +static void vio_mbuffer_expire(const void *node, VISIT which, int depth ATTRIBUTE_UNUSED)
> +{
> +    if (which == endorder || which == leaf) {
> +        if (node) {
> +            vio_mbuffer *b = *(vio_mbuffer * const *) node;
> +
> +            /*  remove expired mbuffer
> +             *  a mbuffer expires when the last update is older
> +             *  than the expiration_period
> +             *
> +             *  action function does not support custom arguments
> +             *  --> use a static variable: exp_ts
> +             */
> +            if (b && b->last_update < mbuffer_exp_ts) {
> +                vio_channel *c = vio_channel_find(b->uuid);
> +
> +                if (c != NULL)
> +                    vio_channel_close(c);
> +
> +#ifdef ENABLE_DEBUG
> +                vu_log(VHOSTMD_DEBUG, "Expire mbuffer '%s' (%d/%d)",
> +                       b->uuid, mbuffer_count, mbuffer_max_num);
> +#endif
> +                if (b->xml)
> +                    vu_buffer_delete(b->xml);
> +                tdelete((const void *) b, vio_mbuffer_get_root(),
> +                        vio_mbuffer_compare);
> +                free(b);
> +                mbuffer_count--;
> +            }
> +        }
> +    }
> +}
> +
> +/* gcc's -Wall -Werror require a cast from (volatile void *) to (void *)
> + * but this discards the volatile. The function attributes * 'otimize_O0'
> + * and 'noinline' should avoid any optimization for this access.
> + */
> +ATTRIBUTE_OPTIMIZE_O0
> +ATTRIBUTE_NOINLINE
> +static void **vio_mbuffer_get_root(void)
> +{
> +    return (void *) &mbuffer_root;
> +}
> +
> +#ifdef ENABLE_DEBUG
> +static void vio_mbuffer_print(const void *node,
> +                              VISIT which,
> +                              int depth ATTRIBUTE_UNUSED)
> +{
> +    if (which == endorder || which == leaf) {
> +        if (node) {
> +            vio_mbuffer *b = *(vio_mbuffer **) node;
> +
> +            if (b) {
> +                vu_log(VHOSTMD_DEBUG, "\t%4d %s %lu\n",
> +                       ++mbuffer_idx, b->uuid, b->last_update);
> +            }
> +        }
> +    }
> +}
> +#endif
> +
> +/*
> + * lookup metrics buffer in internal btree
> + */
> +static vio_mbuffer *vio_mbuffer_find(const char *uuid)
> +{
> +    vio_mbuffer b;
> +    void *p;
> +
> +    strncpy(b.uuid, uuid, sizeof(b.uuid));
> +
> +    p = tfind((const void *) &b, vio_mbuffer_get_root(), vio_mbuffer_compare);
> +    if (p == NULL)
> +        return NULL;
> +
> +    return *(vio_mbuffer **) p;
> +}
> +
> +/*
> + * add metrics buffer to internal btree
> + */
> +static vio_mbuffer *vio_mbuffer_add(const char *uuid)
> +{
> +    vio_mbuffer *b = NULL;
> +    void *p = NULL;
> +
> +    if (mbuffer_count >= mbuffer_max_num) {
> +        vu_log(VHOSTMD_ERR,
> +               "Could not add metrics buffer '%s' - too many VMs (%d/%d)",
> +               uuid, mbuffer_count, mbuffer_max_num);
> +
> +#ifdef ENABLE_DEBUG
> +        mbuffer_idx = 0;
> +        vu_log(VHOSTMD_DEBUG, "exp_ts %lu, allocated mbuffer:\n", mbuffer_exp_ts);
> +        twalk((const void *) *vio_mbuffer_get_root(), vio_mbuffer_print);
> +#endif
> +
> +        return NULL;
> +    }
> +
> +    b = (vio_mbuffer *) calloc(1UL, sizeof(vio_mbuffer));
> +    if (b == NULL)
> +        goto error;
> +
> +    strncpy(b->uuid, uuid, sizeof(b->uuid));
> +    b->xml = NULL;
> +
> +    if (vu_buffer_create(&b->xml, DEFAULT_VU_BUFFER_SIZE) != 0) {
> +        free(b);
> +        goto error;
> +    }
> +
> +    p = tsearch((const void *) b, vio_mbuffer_get_root(), vio_mbuffer_compare);
> +    if (p == NULL) {
> +        vu_buffer_delete(b->xml);
> +        free(b);
> +        goto error;
> +    }
> +
> +    mbuffer_count++;
> +    return b;
> +
> +  error:
> +    vu_log(VHOSTMD_ERR, "Could not add metrics buffer '%s' (%d/%d)",
> +           uuid, mbuffer_count, mbuffer_max_num);
> +
> +    return NULL;
> +}
> +
> +/*
> + * lookup virtio channel in internal btree
> + */
> +static vio_channel *vio_channel_find(const char *uuid)
> +{
> +    vio_channel c;
> +    void *p;
> +
> +    strncpy(c.uuid, uuid, sizeof(c.uuid));
> +
> +    p = tfind((const void *) &c, &channel_root, vio_channel_compare);
> +    if (p == NULL)
> +        return NULL;
> +
> +    return *(vio_channel **) p;
> +}
> +
> +/*
> + * add virtio channel to internal btree
> + */
> +static vio_channel *vio_channel_add(const char *uuid)
> +{
> +    vio_channel *c = NULL;
> +    void *p = NULL;
> +
> +    c = (vio_channel *) calloc(1UL, sizeof(vio_channel));
> +    if (c == NULL)
> +        goto error;
> +
> +    channel_count++;
> +
> +    strncpy(c->uuid, uuid, sizeof(c->uuid));
> +    c->fd = -1;
> +    c->request = NULL;
> +    c->response = NULL;
> +
> +    p = tsearch((const void *) c, &channel_root, vio_channel_compare);
> +    if (p == NULL)
> +        goto error;
> +
> +    if (vio_channel_open(c) != 0 ||
> +        vu_buffer_create(&c->request, 512) != 0 ||
> +        vu_buffer_create(&c->response, DEFAULT_VU_BUFFER_SIZE) != 0)
> +        goto error;
> +
> +    return c;
> +
> +  error:
> +    if (c)
> +        vio_channel_close(c);
> +
> +    return NULL;
> +}
> +
> +static REQUEST_T vio_check_request(vio_channel * c)
> +{
> +    const char xml_req_n[] = "GET /metrics/XML\n\n";
> +    const char xml_req_rn[] = "GET /metrics/XML\r\n\r\n";
> +
> +    if (strcmp(c->request->content, xml_req_n) == 0 ||
> +        strcmp(c->request->content, xml_req_rn) == 0) {
> +        // valid request
> +        vu_buffer_erase(c->request);
> +        return REQ_GET_XML;
> +    } else if (c->request->use >= (c->request->size - 1) ||
> +               strstr(c->request->content, "\n\n") ||
> +               strstr(c->request->content, "\r\n\r\n")) {
> +        // invalid request -> reset buffer
> +        vu_buffer_erase(c->request);
> +
> +        vu_buffer_erase(c->response);
> +        vu_buffer_add(c->response, "INVALID REQUEST\n\n", -1);
> +        return REQ_INVALID;
> +    } else {
> +        // fragment
> +        c->request->use = (unsigned) strnlen(c->request->content,
> +                                             (size_t) c->request->size);
> +    }
> +
> +    return REQ_INCOMPLETE;
> +}
> +
> +static void vio_channel_recv(vio_channel * c)
> +{
> +    struct epoll_event evt;
> +    ssize_t rc = 0;
> +    REQUEST_T req_type = REQ_INCOMPLETE;
> +
> +    do {
> +        char *buf = &c->request->content[c->request->use];
> +        size_t len = c->request->size - c->request->use - 1;
> +
> +        rc = recv(c->fd, buf, len, 0);
> +
> +        if (rc > 0) {
> +            req_type = vio_check_request(c);
> +        }
> +    } while (rc > 0 && req_type == REQ_INCOMPLETE);
> +
> +    if (req_type == REQ_GET_XML) {
> +        if (vio_channel_update(c)) {
> +            // no metrics available -> close channel
> +            epoll_ctl(epoll_fd, EPOLL_CTL_DEL, c->fd, &evt);
> +            vio_channel_close(c);
> +        } else
> +            vio_channel_send(c, EPOLLIN);
> +    } else if (req_type == REQ_INVALID)
> +        vio_channel_send(c, EPOLLIN);
> +}
> +
> +static void vio_channel_send(vio_channel * c, uint32_t ep_event)
> +{
> +    struct epoll_event evt;
> +    int len;
> +
> +    while ((len = (int) (c->response->use - c->response->pos)) > 0)
> +    {
> +        char *buf = &c->response->content[c->response->pos];
> +        ssize_t rc = send(c->fd, buf, (size_t) len, 0);
> +
> +        if (rc > 0)
> +            c->response->pos += (unsigned) rc;
> +        else
> +            break;
> +    }
> +
> +    if (ep_event == EPOLLOUT) {
> +        if (c->response->use <= c->response->pos) {
> +            // next request
> +            evt.data.ptr = c;
> +            evt.events = EPOLLIN;
> +            epoll_ctl(epoll_fd, EPOLL_CTL_MOD, c->fd, &evt);
> +        }
> +    } else if (ep_event == EPOLLIN) {
> +        if (c->response->use > c->response->pos) {
> +            // incomplete response
> +            evt.data.ptr = c;
> +            evt.events = EPOLLOUT;
> +            epoll_ctl(epoll_fd, EPOLL_CTL_MOD, c->fd, &evt);
> +        }
> +    }
> +}
> +
> +static void vio_handle_io(unsigned epoll_wait_ms)
> +{
> +    int i = 0;
> +    uint64_t ts_end, ts_now;
> +    struct epoll_event evt;
> +    struct timespec ts;
> +
> +    clock_gettime(CLOCK_MONOTONIC, &ts);
> +    ts_now = (uint64_t) (ts.tv_sec * 1000 + ts.tv_nsec / 1000000);
> +    ts_end = ts_now + epoll_wait_ms;
> +
> +    while (ts_now < ts_end) {
> +        int wait_ms = (int) (ts_end - ts_now);
> +        int n =
> +            epoll_wait(epoll_fd, epoll_events, channel_max_num + 1, wait_ms);
> +
> +        for (i = 0; i < n; i++) {
> +            vio_channel *c = (epoll_events + i)->data.ptr;
> +
> +            if ((epoll_events + i)->events & EPOLLHUP) {
> +                epoll_ctl(epoll_fd, EPOLL_CTL_DEL, c->fd, &evt);
> +                vio_channel_close(c);
> +            } else if ((epoll_events + i)->events & EPOLLIN) {
> +                vio_channel_recv(c);
> +            } else if ((epoll_events + i)->events & EPOLLOUT) {
> +                vio_channel_send(c, EPOLLOUT);
> +            }
> +        }
> +
> +        clock_gettime(CLOCK_MONOTONIC, &ts);
> +        ts_now = (uint64_t) (ts.tv_sec * 1000 + ts.tv_nsec / 1000000);
> +    }
> +}
> +
> +/*
> + * Initialize virtio layer
> + */
> +int virtio_init(const char *virtio_path,
> +                int max_channel,
> +                int expiration_period)
> +{
> +    if (virtio_status == VIRTIO_INIT) {
> +        pthread_mutex_init(&mbuffer_mtx, NULL);
> +
> +        channel_max_num = max_channel;
> +        mbuffer_max_num = max_channel;
> +        mbuffer_exp_period = expiration_period;
> +
> +        if (mbuffer_host == NULL)
> +            if (vu_buffer_create (&mbuffer_host, DEFAULT_VU_BUFFER_SIZE) != 0)
> +                goto error;
> +
> +        if (epoll_fd == -1) {
> +
> +            epoll_events = calloc((size_t) (channel_max_num + 1),
> +                                  sizeof(struct epoll_event));
> +            if (epoll_events == NULL)
> +                goto error;
> +
> +            epoll_fd = epoll_create(1);
> +            if (epoll_fd == -1)
> +                goto error;
> +
> +            if (virtio_path == NULL ||
> +                (strnlen(virtio_path, max_virtio_path_len) +
> +                 VIR_UUID_STRING_BUFLEN) > (max_virtio_path_len - 2)) {
> +
> +                vu_log(VHOSTMD_ERR, "Invalid virtio_path");
> +                goto error;
> +            }
> +
> +            if (channel_path == NULL) {
> +                channel_path = calloc(1UL, max_virtio_path_len + 2);
> +                if (channel_path == NULL)
> +                    goto error;
> +
> +                strncpy(channel_path, virtio_path,
> +                        max_virtio_path_len - VIR_UUID_STRING_BUFLEN);
> +
> +                if (channel_path[strlen(channel_path) - 1] != '/')
> +                    channel_path[strlen(channel_path)] = '/';
> +            }
> +        }
> +
> +        virtio_status = VIRTIO_ACTIVE;
> +        vu_log(VHOSTMD_INFO,
> +               "Virtio using path '%s', max_channels %d, expiration_time %ld",
> +               channel_path, channel_max_num, mbuffer_exp_period);
> +    }
> +
> +    return 0;
> +
> +  error:
> +    vu_log(VHOSTMD_ERR, "Virtio initialization failed");
> +    virtio_status = VIRTIO_ERROR;
> +
> +    return -1;
> +}
> +
> +/*
> + * Cleanup virtio layer
> + */
> +int virtio_cleanup(void)
> +{
> +    if (virtio_status == VIRTIO_STOP) {
> +
> +        if (epoll_fd != -1) {
> +            close(epoll_fd);
> +            epoll_fd = -1;
> +        }
> +
> +        if (channel_root) {
> +            twalk(channel_root, vio_channel_delete);
> +            tdestroy(channel_root, free);
> +            channel_root = NULL;
> +            channel_count = 0;
> +        }
> +
> +        if (channel_path) {
> +            free(channel_path);
> +            channel_path = NULL;
> +        }
> +
> +        if (*vio_mbuffer_get_root()) {
> +            twalk((const void *) *vio_mbuffer_get_root(), vio_mbuffer_delete);
> +            tdestroy((void *) *vio_mbuffer_get_root(), free);
> +            mbuffer_root = NULL;
> +            mbuffer_count = 0;
> +        }
> +
> +        if (mbuffer_host) {
> +            vu_buffer_delete((vu_buffer *) mbuffer_host);
> +            mbuffer_host = NULL;
> +        }
> +
> +        if (epoll_events)
> +            free(epoll_events);
> +
> +        pthread_mutex_destroy(&mbuffer_mtx);
> +
> +        virtio_status = VIRTIO_INIT;
> +
> +        return 0;
> +    }
> +    return -1;
> +}
> +
> +/*
> + * Main virtio function
> + * 'start_routine' of pthread_create()
> + */
> +void *virtio_run(void *arg ATTRIBUTE_UNUSED)
> +{
> +    if (virtio_status != VIRTIO_ACTIVE) {
> +        vu_log(VHOSTMD_ERR, "Virtio was not initialized");
> +        return NULL;
> +    }
> +
> +    while (virtio_status == VIRTIO_ACTIVE) {
> +        if (channel_count < channel_max_num)
> +            vio_channel_readdir(channel_path);
> +
> +        vio_handle_io(3000);    // process avaible requests
> +
> +        // remove expired metrics buffers
> +        mbuffer_exp_ts = time(NULL) - mbuffer_exp_period;
> +
> +        pthread_mutex_lock(&mbuffer_mtx);
> +
> +        if (*vio_mbuffer_get_root())
> +            twalk((const void *) *vio_mbuffer_get_root(), vio_mbuffer_expire);
> +
> +        pthread_mutex_unlock(&mbuffer_mtx);
> +    }
> +
> +    virtio_cleanup();
> +
> +    return NULL;
> +}
> +
> +/*
> + * Update the metrics response buffer of a VM/host
> + */
> +int virtio_metrics_update(const char *buf,
> +                          int len,
> +                          const char *uuid,
> +                          metric_context ctx)
> +{
> +    int rc = -1;
> +    vio_mbuffer *b;
> +
> +    if (buf == NULL || len == 0 || virtio_status != VIRTIO_ACTIVE ||
> +        (ctx == METRIC_CONTEXT_VM && uuid == NULL))
> +        return -1;
> +
> +    pthread_mutex_lock(&mbuffer_mtx);
> +
> +    if (ctx == METRIC_CONTEXT_HOST) {
> +        vu_buffer_erase(mbuffer_host);
> +        vu_buffer_add(mbuffer_host, buf, len);
> +#ifdef ENABLE_DEBUG
> +        vu_log(VHOSTMD_DEBUG, "New content for HOST (%u)\n>>>%s<<<\n",
> +               len, mbuffer_host->content);
> +#endif
> +        rc = 0;
> +    }
> +    else if (ctx == METRIC_CONTEXT_VM) {
> +        if ((b = vio_mbuffer_find(uuid)) == NULL) {
> +            // for a new VM create a new mbuffer
> +            b = vio_mbuffer_add(uuid);
> +        }
> +
> +        if (b != NULL) {
> +            vu_buffer_erase(b->xml);
> +            vu_buffer_add(b->xml, buf, len);
> +            // update the timestamp that mbuffer can be expired
> +            b->last_update = time(NULL);
> +#ifdef ENABLE_DEBUG
> +            vu_log(VHOSTMD_DEBUG, "New content for %s (%u)\n>>>%s<<<\n",
> +                   uuid, len, b->xml->content);
> +#endif
> +            rc = 0;
> +        }
> +    }
> +
> +    pthread_mutex_unlock(&mbuffer_mtx);
> +
> +    return rc;
> +}
> +
> +/*
> + * Stop virtio thread
> + */
> +void virtio_stop(void)
> +{
> +    if (virtio_status == VIRTIO_ACTIVE)
> +        virtio_status = VIRTIO_STOP;
> +}
> 




More information about the virt-tools-list mailing list