[virt-tools-list] [vhostmd virtio PATCH v3 4/6] Add virtio functions

Trapp, Michael michael.trapp at sap.com
Thu Nov 15 12:13:41 UTC 2018



On 15.11.18, 00:35, "Jim Fehlig" <jfehlig at suse.com> wrote:

    On 11/12/18 8:12 AM, Michael Trapp wrote:
    > At the vhostmd side virtio channels are Unix domain sockets from QEMU
    > which are created during a VM start and removed when the VM is stopped.
    > 
    > Basically this implementation
    > - monitors a directory for new virtio channels (channel names contain VM UUID)
    > - for valid UUIDs, also known by libvirtd, it connects to the UDS
    > - buffers VM/HOST metrics and handles request/response on the sockets
    > 
    > It provides the functions
    > virtio_init()
    >      init function of virtio layer
    > virtio_run()
    >      the start_routine for the virtio-thread to handle the virtio based
    >      communication
    > virtio_metrics_update()
    >      called for each VM/HOST metrics update to update the virtio internal
    >      metrics buffer.
    > virtio_stop()
    >      stop the virtio-thread
    > 
    > ---
    >    Here is a brief description of the concept of vhostmd / virtio interaction
    > 
    >   Vhostmd calls the virtio API functions
    > 
    >    *** virtio API ***
    > 
    >      --> virtio_init()
    >          Initialize the virtio layer.
    >          Called once before the virtio thread starts.
    > 
    >      --> virtio_run()
    >          Start routine of the virtio thread.
    > 
    >      --> virtio_stop()
    >          Set virtio_status to stop the virtio thread.
    > 
    >      --> virtio_metrics_update()
    >          Add/update the metric buffer of a VM/host.
    >          It must be called for every change of VM/host metrics.
    > 
    >    *** virtio internal ***
    > 
    >    virtio internal code runs in the virtio thread - see virtio_run().
    > 
    >    Access to mbuffers within the virtio thread is
    >        - read the mbuffer content
    >            see vio_channel_update() -> vio_mbuffer_find()
    >        - check if a VM is 'known' by vhostmd
    >            see vio_channel_readdir() -> vio_mbuffer_find()
    >        - expire mbuffers
    >            see virtio_run() -> vio_mbuffer_expire()
    >            Expiration timeout is >= (3 * 'vhostmd update_period')
    >            A VM expires when vhostmd did not receive a update for
    >            'expiration_time' seconds.
    > 
    >    The mbuffer (metrics buffer) structs of VMs and host are maintained in
    >    a btree (mbuffer.root).
    >    Every mbuffer access is exclusive - see mbuffer_mutex.
    > 
    >    *** tests ***
    > 
    >    So far I've tested vhostmd with virtio support in a setup with 100 alpine Vms,
    >    each VM continiously polling the metrics every 5sec, for several hours.
    >    To have a more dynamic test environment all VMs were stopped/started
    >    several times.
    > 
    >    Beside the dependencies to the vu_buffer functions, virtio code does not call
    >    any libvirt functions and can also be run/tested without vhostmd and libvirt.
    >    I've also checked this variant in combination with valgrind.
    >    But at the moment the required test code and the UDS server program is not
    >    part of this patch.
    > 
    >   include/util.h   |   4 +
    >   include/virtio.h |  50 +++
    >   vhostmd/virtio.c | 900 +++++++++++++++++++++++++++++++++++++++++++++++
    >   3 files changed, 954 insertions(+)
    >   create mode 100644 include/virtio.h
    >   create mode 100644 vhostmd/virtio.c
    > 
    > diff --git a/include/util.h b/include/util.h
    > index c0bd19a..17ff09c 100644
    > --- a/include/util.h
    > +++ b/include/util.h
    > @@ -26,8 +26,12 @@
    >   
    >   #ifdef __GNUC__
    >   #define ATTRIBUTE_UNUSED __attribute__((unused))
    > +#define ATTRIBUTE_OPTIMIZE_O0 __attribute__((optimize("O0")))
    > +#define ATTRIBUTE_NOINLINE __attribute__((noinline()))
    >   #else
    >   #define ATTRIBUTE_UNUSED
    > +#define ATTRIBUTE_OPTIMIZE_O0
    > +#define ATTRIBUTE_NOINLINE
    >   #endif
    >   
    >   typedef enum {
    > diff --git a/include/virtio.h b/include/virtio.h
    > new file mode 100644
    > index 0000000..b10dab5
    > --- /dev/null
    > +++ b/include/virtio.h
    > @@ -0,0 +1,50 @@
    > +/*
    > + * Copyright (C) 2018 SAP SE
    > + *
    > + * This library is free software; you can redistribute it and/or
    > + * modify it under the terms of the GNU Lesser General Public
    > + * License as published by the Free Software Foundation; either
    > + * version 2.1 of the License, or (at your option) any later version.
    > + *
    > + * This library is distributed in the hope that it will be useful,
    > + * but WITHOUT ANY WARRANTY; without even the implied warranty of
    > + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
    > + * Lesser General Public License for more details.
    > + *
    > + * You should have received a copy of the GNU Lesser General Public
    > + * License along with this library; if not, write to the Free Software
    > + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307  USA
    > + *
    > + * Author: Michael Trapp <michael.trapp at sap.com>
    > + */
    > +
    > +#ifndef __VIRTIO_H__
    > +#define __VIRTIO_H__
    > +
    > +/*
    > + * Initialize virtio layer
    > + */
    > +int virtio_init(const char *virtio_path,
    > +                int max_channel,
    > +                int expiration_period);
    > +
    > +/*
    > + * Main virtio function
    > + * 'start_routine' of pthread_create()
    > + */
    > +void *virtio_run(void *arg);
    > +
    > +/*
    > + * Update the metrics response buffer of a VM/host
    > + */
    > +int virtio_metrics_update(const char *buf,
    > +                          int len,
    > +                          const char *uuid,
    > +                          metric_context ctx);
    > +
    > +/*
    > + * Stop virtio thread
    > + */
    > +void virtio_stop(void);
    > +
    > +#endif                          /* __VIRTIO_H__ */
    > diff --git a/vhostmd/virtio.c b/vhostmd/virtio.c
    > new file mode 100644
    > index 0000000..f5e6306
    > --- /dev/null
    > +++ b/vhostmd/virtio.c
    > @@ -0,0 +1,900 @@
    > +/*
    > + * Copyright (C) 2018 SAP SE
    > + *
    > + * This library is free software; you can redistribute it and/or
    > + * modify it under the terms of the GNU Lesser General Public
    > + * License as published by the Free Software Foundation; either
    > + * version 2.1 of the License, or (at your option) any later version.
    > + *
    > + * This library is distributed in the hope that it will be useful,
    > + * but WITHOUT ANY WARRANTY; without even the implied warranty of
    > + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
    > + * Lesser General Public License for more details.
    > + *
    > + * You should have received a copy of the GNU Lesser General Public
    > + * License along with this library; if not, write to the Free Software
    > + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307  USA
    > + *
    > + * Author: Michael Trapp <michael.trapp at sap.com>
    > + */
    > +
    > +#include <config.h>
    > +
    > +#include <errno.h>
    > +#include <sys/un.h>
    > +#include <sys/socket.h>
    > +#include <sys/epoll.h>
    > +#include <unistd.h>
    > +#include <fcntl.h>
    > +#include <search.h>
    > +#include <dirent.h>
    > +#include <pthread.h>
    > +#include <libvirt/libvirt.h>
    > +
    > +#include "util.h"
    > +#include "metric.h"
    > +#include "virtio.h"
    > +
    > +
    > +#define DEFAULT_VU_BUFFER_SIZE 1024
    > +#define VIRTIO_PREFIX_LEN 21UL
    > +#define VIRTIO_NAME_BUFLEN (VIRTIO_PREFIX_LEN + VIR_UUID_STRING_BUFLEN)
    > +
    > +
    > +typedef struct {
    > +    int fd;
    > +    char uuid[VIR_UUID_STRING_BUFLEN];
    > +    vu_buffer *request;
    > +    vu_buffer *response;
    > +} vio_channel;
    > +
    > +typedef struct {
    > +    char uuid[VIR_UUID_STRING_BUFLEN];
    > +    time_t last_update;
    > +    vu_buffer *xml;
    > +} vio_mbuffer;
    > +
    > +typedef enum {
    > +    REQ_INCOMPLETE,
    > +    REQ_INVALID,
    > +    REQ_GET_XML
    > +} REQUEST_T;
    > +
    > +
    > +static vu_buffer *mbuffer_host = NULL;
    > +static volatile void *mbuffer_root = NULL;
    > +static int mbuffer_max_num = 0;
    > +static volatile int mbuffer_count = 0;
    > +#ifdef ENABLE_DEBUG
    > +static int mbuffer_idx = 0;
    > +#endif
    > +static time_t mbuffer_exp_period = 0;
    > +static time_t mbuffer_exp_ts = 0;
    > +
    > +static void *channel_root = NULL;
    > +static char *channel_path = NULL;
    > +static const char *channel_prefix = "org.github.vhostmd.1.";
    > +static int channel_max_num = 0;
    > +static int channel_count = 0;
    > +
    > +static int epoll_fd = -1;
    > +static struct epoll_event *epoll_events = NULL;
    > +static const size_t max_virtio_path_len =
    > +    sizeof(((struct sockaddr_un *) 0)->sun_path) - 1;
    > +static pthread_mutex_t mbuffer_mtx;
    > +
    > +static enum {
    > +    VIRTIO_INIT,
    > +    VIRTIO_ACTIVE,
    > +    VIRTIO_STOP,
    > +    VIRTIO_ERROR
    > +} virtio_status = VIRTIO_INIT;
    > +
    > +
    > +/*
    > + * static functions
    > + */
    > +
    > +static int vio_channel_compare(const void *a, const void *b);
    > +static void vio_channel_delete(const void *node, VISIT which, int depth);
    > +
    > +static vio_channel *vio_channel_find(const char *uuid);
    > +static vio_channel *vio_channel_add(const char *uuid);
    > +static int vio_channel_open(vio_channel * c);
    > +static void vio_channel_close(vio_channel * c);
    > +static int vio_channel_update(vio_channel * c);
    > +static int vio_channel_readdir(const char *path);
    > +static void vio_channel_recv(vio_channel * c);
    > +static void vio_channel_send(vio_channel * c, uint32_t ep_event);
    > +
    > +static int vio_mbuffer_compare(const void *a, const void *b);
    > +static void vio_mbuffer_delete(const void *node, VISIT which, int depth);
    > +static void vio_mbuffer_expire(const void *node, VISIT which, int depth);
    > +static void **vio_mbuffer_get_root(void);
    > +#ifdef ENABLE_DEBUG
    > +static void vio_mbuffer_print(const void *node, VISIT which, int depth);
    > +#endif
    > +
    > +static vio_mbuffer *vio_mbuffer_find(const char *uuid);
    > +static vio_mbuffer *vio_mbuffer_add(const char *uuid);
    > +static REQUEST_T vio_check_request(vio_channel * c);
    > +
    > +static void vio_handle_io(unsigned epoll_wait_ms);
    > +
    > +int virtio_cleanup(void);
    > +/*
    > + * update response buffer of a channel
    > + */
    > +static int vio_channel_update(vio_channel * c)
    > +{
    > +    static const char *metrics_start_str = "<metrics>\n";
    > +    static const char *metrics_end_str = "</metrics>\n\n";
    > +
    > +    int rc = 0;
    > +    vio_mbuffer *b = NULL;
    > +
    > +    if (c == NULL)
    > +        return -1;
    > +
    > +    vu_buffer_erase(c->response);
    > +    vu_buffer_add(c->response, metrics_start_str, -1);
    > +
    > +    pthread_mutex_lock(&mbuffer_mtx);
    > +
    > +    if (mbuffer_host->content && mbuffer_host->use)
    > +        vu_buffer_add(c->response, mbuffer_host->content, -1);
    > +    else
    > +        vu_buffer_add(c->response, "host metrics not available", -1);
    > +
    > +    b = vio_mbuffer_find(c->uuid);
    > +    if (b && b->xml->use)
    > +        vu_buffer_add(c->response, b->xml->content, -1);
    > +    else
    > +        rc = -1;
    > +
    > +    pthread_mutex_unlock(&mbuffer_mtx);
    > +
    > +    vu_buffer_add(c->response, metrics_end_str, -1);
    > +
    > +#ifdef ENABLE_DEBUG
    > +    vu_log(VHOSTMD_DEBUG, "New response for %s (%u)\n>>>%s<<<\n",
    > +           c->uuid, c->response->use, c->response->content);
    > +#endif
    > +    return rc;
    > +}
    > +
    > +/*
    > + * close channel and free allocated buffers
    > + */
    > +static void vio_channel_close(vio_channel * c)
    > +{
    > +    if (c != NULL) {
    > +        channel_count--;
    > +
    > +        if (c->fd >= 0) {
    > +            vu_log(VHOSTMD_INFO, "Closed channel '%s%s%s' (%d/%d)",
    > +                   channel_path, channel_prefix, c->uuid, channel_count,
    > +                   channel_max_num);
    > +            close(c->fd);
    > +        }
    > +
    > +        if (c->request)
    > +            vu_buffer_delete(c->request);
    > +        if (c->response)
    > +            vu_buffer_delete(c->response);
    > +
    > +        tdelete((const void *) c, &channel_root, vio_channel_compare);
    > +        free(c);
    > +    }
    > +}
    > +
    > +/*
    > + * connect channel and add the socket to the epoll desriptor
    > + */
    > +static int vio_channel_open(vio_channel * c)
    > +{
    > +    struct sockaddr_un address;
    > +    const size_t max_path_len =
    > +        sizeof(((struct sockaddr_un *) 0)->sun_path) - 1;
    > +    struct epoll_event evt;
    > +    int len = (int) (strlen(channel_path) + VIRTIO_PREFIX_LEN + strlen(c->uuid));
    > +    int flags;
    > +
    > +    bzero(&address, sizeof(address));
    > +    address.sun_family = AF_LOCAL;
    > +
    > +    if (len >= (int) max_path_len) {
    > +        vu_log(VHOSTMD_ERR, "Channel '%s%s%s' - name too long (%d/%lu)",
    > +               channel_path, channel_prefix, c->uuid, len, max_path_len);
    > +        return -1;
    > +    }
    > +
    > +    len = snprintf(address.sun_path, max_path_len, "%s%s%s", channel_path,
    > +                   channel_prefix, c->uuid);
    > +
    > +    if (len >= (int) max_path_len || len <= 0) {
    > +        vu_log(VHOSTMD_ERR, "Channel '%s%s%s' - name is invalid (%lu)",
    > +               channel_path, channel_prefix, c->uuid, max_path_len);
    > +        return -1;
    > +    }
    > +
    > +    if ((c->fd = socket(AF_LOCAL, SOCK_STREAM, 0)) < 0) {
    > +        vu_log(VHOSTMD_ERR, "Channel '%s' - socket() failed (%s)",
    > +               address.sun_path, strerror(errno));
    > +        return -1;
    > +    }
    > +
    > +    flags = fcntl(c->fd, F_GETFL, 0);
    > +    if (flags < 0) {
    > +        vu_log(VHOSTMD_ERR, "Channel '%s' - fcntl() failed (%s)",
    > +               address.sun_path, strerror(errno));
    > +        return -1;
    > +    }
    > +
    > +    flags |= flags | O_NONBLOCK;
    > +    if (fcntl(c->fd, F_SETFL, flags) == -1) {
    > +        vu_log(VHOSTMD_ERR, "Channel '%s' - fcntl() failed (%s)",
    > +               address.sun_path, strerror(errno));
    > +        return -1;
    > +    }
    > +
    > +    if (connect(c->fd, (struct sockaddr *) &address,
    > +                (socklen_t) sizeof(address)) < 0) {
    > +        vu_log(VHOSTMD_ERR, "Channel '%s' - connect() failed (%s)",
    > +               address.sun_path, strerror(errno));
    > +        return -1;
    > +    }
    > +
    > +    evt.data.ptr = c;
    > +    evt.events = EPOLLIN;
    > +
    > +    if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, c->fd, &evt) == -1) {
    > +        vu_log(VHOSTMD_ERR, "Could not add channel '%s' - epoll_ctl() failed (%s)",
    > +               address.sun_path, strerror(errno));
    > +        return -1;
    > +    }
    > +
    > +    vu_log(VHOSTMD_INFO, "Opened channel '%s' (%d/%d)",
    > +           address.sun_path, channel_count, channel_max_num);
    > +
    > +    return 0;
    > +}
    > +
    > +/*
    > + * lookup UDS sockets in the directory
    > + * for valid type/name/mbuffer connect and register channel
    > + */
    
    For a long time libvirt has been able to generate a socket path for unix 
    channels. The standard path prefix is /var/lib/libvirt/qemu/channel/target. When 
    a domain is started a subdir is created with name 'domain-<domid>-<domname>, 
    where each unix socket is created based on name attribute of target element. So 
    e.g. a domain with id '5' and name 'foobar' containing the following channel config
    
         <channel type='unix'>
           <source mode='bind'/>
           <target type='virtio' name='org.qemu.guest_agent.0'/>
         </channel>
         <channel type='unix'>
           <source mode='bind'/>
           <target type='virtio' name='org.github.vhostmd.1'/>
         </channel>
    
    will result in
    
    /var/lib/libvirt/qemu/channel/target/domain-5-foobar/org.qemu.guest_agent.0
    /var/lib/libvirt/qemu/channel/target/domain-5-foobar/org.github.vhostmd.1
    
    Within the VM you have
    
    /dev/virtio-ports/org.qemu.guest_agent.0
    /dev/virtio-ports/org.github.vhostmd.1
    
    For consistency with other channels like the guest agent it would be nice to not 
    require specifying the channel path in the source element. I realize the 
    importance of uuid throughout this patch, but would it be possible to make this 
    work using libvirt's naming scheme? Sorry for not noticing this earlier :-(.
    
    Regards,
    Jim

That's quite interesting and, beside the fact that it integrates in the available name scheme and directory structure
of qemu,  it would reduce administration and potential misconfiguration.
>From my understanding the vu_vm.id of a VM must be unique on the host and based on that I can switch my internal 'index'
from uuid to id and use the config you suggested. I guess we can rely on the fact that the unix socket of a virtio channel is immediately closed
and removed from the filesystem with the 'virsh destroy' command, right?

Regards
Michael
    
    > +static int vio_channel_readdir(const char *path)
    > +{
    > +    struct dirent *ent;
    > +    DIR *dir = NULL;
    > +
    > +    if ((dir = opendir(path)) == NULL) {
    > +        vu_log(VHOSTMD_ERR, "opendir(%s) failed (%s)", path, strerror(errno));
    > +        return -1;
    > +    }
    > +
    > +    while ((ent = readdir(dir)) != NULL) {
    > +
    > +        if (ent->d_type == DT_SOCK &&
    > +            strncmp(ent->d_name, channel_prefix, VIRTIO_PREFIX_LEN) == 0 &&
    > +            strnlen(ent->d_name, VIRTIO_NAME_BUFLEN) ==
    > +                (VIRTIO_NAME_BUFLEN - 1)) {
    > +
    > +            const char *uuid = &ent->d_name[VIRTIO_PREFIX_LEN];
    > +
    > +            vio_channel *c = vio_channel_find(uuid);
    > +
    > +            if (c == NULL) {
    > +                if (channel_count >= channel_max_num) {
    > +                    closedir(dir);
    > +                    vu_log(VHOSTMD_ERR, "Could not add channel '%s%s%s'"
    > +                           " - too many VMs (%d/%d)",
    > +                           path, channel_prefix, uuid, channel_count,
    > +                           channel_max_num);
    > +                    return -1;
    > +                }
    > +
    > +                pthread_mutex_lock(&mbuffer_mtx);
    > +
    > +                /* don't add the channel if there is no mbuffer for this VM */
    > +                if (vio_mbuffer_find(uuid) != NULL) {
    > +#ifdef ENABLE_DEBUG
    > +                    vu_log(VHOSTMD_DEBUG, "New channel %s%s\n", path, ent->d_name);
    > +#endif
    > +                    c = vio_channel_add(uuid);
    > +
    > +                    if (c == NULL)
    > +                        vu_log(VHOSTMD_ERR, "Could not add channel '%s%s'", path,
    > +                               ent->d_name);
    > +                }
    > +
    > +                pthread_mutex_unlock(&mbuffer_mtx);
    > +            }
    > +        }
    > +    }
    > +    closedir(dir);
    > +
    > +    return 0;
    > +}
    > +
    > +/*
    > + * channel - btree - compare function
    > + */
    > +static int vio_channel_compare(const void *a, const void *b)
    > +{
    > +    if (a == NULL || b == NULL)
    > +        return 1;
    > +
    > +    return strncmp(((const vio_channel *) a)->uuid, ((const vio_channel *) b)->uuid,
    > +                   (size_t) VIR_UUID_STRING_BUFLEN);
    > +}
    > +
    > +/*
    > + * channel - btree/twalk - action function
    > + * delete entries
    > + */
    > +static void vio_channel_delete(const void *node, VISIT which, int depth ATTRIBUTE_UNUSED)
    > +{
    > +    if (which == endorder || which == leaf) {
    > +        if (node) {
    > +            struct epoll_event evt;
    > +            vio_channel *c = *(vio_channel * const *) node;
    > +            if (c) {
    > +                epoll_ctl(epoll_fd, EPOLL_CTL_DEL, c->fd, &evt);
    > +                vio_channel_close(c);
    > +            }
    > +        }
    > +    }
    > +}
    > +
    > +/*
    > + * mbuffer - btree - compare function
    > + */
    > +static int vio_mbuffer_compare(const void *a, const void *b)
    > +{
    > +    if (a == NULL || b == NULL)
    > +        return 1;
    > +
    > +    return strncmp(((const vio_mbuffer *) a)->uuid, ((const vio_mbuffer *) b)->uuid,
    > +                   (size_t) VIR_UUID_STRING_BUFLEN);
    > +}
    > +
    > +/*
    > + * mbuffer - btree/twalk - action function
    > + * delete entries
    > + */
    > +static void vio_mbuffer_delete(const void *node, VISIT which, int depth ATTRIBUTE_UNUSED)
    > +{
    > +    if (which == endorder || which == leaf) {
    > +        if (node) {
    > +            vio_mbuffer *b = *(vio_mbuffer * const *) node;
    > +
    > +            if (b) {
    > +                if (b->xml)
    > +                    vu_buffer_delete(b->xml);
    > +                tdelete((const void *) b, vio_mbuffer_get_root(),
    > +                        vio_mbuffer_compare);
    > +                free(b);
    > +                mbuffer_count--;
    > +            }
    > +        }
    > +    }
    > +}
    > +
    > +/*
    > + * mbuffer - btree/twalk - action function
    > + * expire entries
    > + */
    > +static void vio_mbuffer_expire(const void *node, VISIT which, int depth ATTRIBUTE_UNUSED)
    > +{
    > +    if (which == endorder || which == leaf) {
    > +        if (node) {
    > +            vio_mbuffer *b = *(vio_mbuffer * const *) node;
    > +
    > +            /*  remove expired mbuffer
    > +             *  a mbuffer expires when the last update is older
    > +             *  than the expiration_period
    > +             *
    > +             *  action function does not support custom arguments
    > +             *  --> use a static variable: exp_ts
    > +             */
    > +            if (b && b->last_update < mbuffer_exp_ts) {
    > +                vio_channel *c = vio_channel_find(b->uuid);
    > +
    > +                if (c != NULL)
    > +                    vio_channel_close(c);
    > +
    > +#ifdef ENABLE_DEBUG
    > +                vu_log(VHOSTMD_DEBUG, "Expire mbuffer '%s' (%d/%d)",
    > +                       b->uuid, mbuffer_count, mbuffer_max_num);
    > +#endif
    > +                if (b->xml)
    > +                    vu_buffer_delete(b->xml);
    > +                tdelete((const void *) b, vio_mbuffer_get_root(),
    > +                        vio_mbuffer_compare);
    > +                free(b);
    > +                mbuffer_count--;
    > +            }
    > +        }
    > +    }
    > +}
    > +
    > +/* gcc's -Wall -Werror require a cast from (volatile void *) to (void *)
    > + * but this discards the volatile. The function attributes * 'otimize_O0'
    > + * and 'noinline' should avoid any optimization for this access.
    > + */
    > +ATTRIBUTE_OPTIMIZE_O0
    > +ATTRIBUTE_NOINLINE
    > +static void **vio_mbuffer_get_root(void)
    > +{
    > +    return (void *) &mbuffer_root;
    > +}
    > +
    > +#ifdef ENABLE_DEBUG
    > +static void vio_mbuffer_print(const void *node,
    > +                              VISIT which,
    > +                              int depth ATTRIBUTE_UNUSED)
    > +{
    > +    if (which == endorder || which == leaf) {
    > +        if (node) {
    > +            vio_mbuffer *b = *(vio_mbuffer **) node;
    > +
    > +            if (b) {
    > +                vu_log(VHOSTMD_DEBUG, "\t%4d %s %lu\n",
    > +                       ++mbuffer_idx, b->uuid, b->last_update);
    > +            }
    > +        }
    > +    }
    > +}
    > +#endif
    > +
    > +/*
    > + * lookup metrics buffer in internal btree
    > + */
    > +static vio_mbuffer *vio_mbuffer_find(const char *uuid)
    > +{
    > +    vio_mbuffer b;
    > +    void *p;
    > +
    > +    strncpy(b.uuid, uuid, sizeof(b.uuid));
    > +
    > +    p = tfind((const void *) &b, vio_mbuffer_get_root(), vio_mbuffer_compare);
    > +    if (p == NULL)
    > +        return NULL;
    > +
    > +    return *(vio_mbuffer **) p;
    > +}
    > +
    > +/*
    > + * add metrics buffer to internal btree
    > + */
    > +static vio_mbuffer *vio_mbuffer_add(const char *uuid)
    > +{
    > +    vio_mbuffer *b = NULL;
    > +    void *p = NULL;
    > +
    > +    if (mbuffer_count >= mbuffer_max_num) {
    > +        vu_log(VHOSTMD_ERR,
    > +               "Could not add metrics buffer '%s' - too many VMs (%d/%d)",
    > +               uuid, mbuffer_count, mbuffer_max_num);
    > +
    > +#ifdef ENABLE_DEBUG
    > +        mbuffer_idx = 0;
    > +        vu_log(VHOSTMD_DEBUG, "exp_ts %lu, allocated mbuffer:\n", mbuffer_exp_ts);
    > +        twalk((const void *) *vio_mbuffer_get_root(), vio_mbuffer_print);
    > +#endif
    > +
    > +        return NULL;
    > +    }
    > +
    > +    b = (vio_mbuffer *) calloc(1UL, sizeof(vio_mbuffer));
    > +    if (b == NULL)
    > +        goto error;
    > +
    > +    strncpy(b->uuid, uuid, sizeof(b->uuid));
    > +    b->xml = NULL;
    > +
    > +    if (vu_buffer_create(&b->xml, DEFAULT_VU_BUFFER_SIZE) != 0) {
    > +        free(b);
    > +        goto error;
    > +    }
    > +
    > +    p = tsearch((const void *) b, vio_mbuffer_get_root(), vio_mbuffer_compare);
    > +    if (p == NULL) {
    > +        vu_buffer_delete(b->xml);
    > +        free(b);
    > +        goto error;
    > +    }
    > +
    > +    mbuffer_count++;
    > +    return b;
    > +
    > +  error:
    > +    vu_log(VHOSTMD_ERR, "Could not add metrics buffer '%s' (%d/%d)",
    > +           uuid, mbuffer_count, mbuffer_max_num);
    > +
    > +    return NULL;
    > +}
    > +
    > +/*
    > + * lookup virtio channel in internal btree
    > + */
    > +static vio_channel *vio_channel_find(const char *uuid)
    > +{
    > +    vio_channel c;
    > +    void *p;
    > +
    > +    strncpy(c.uuid, uuid, sizeof(c.uuid));
    > +
    > +    p = tfind((const void *) &c, &channel_root, vio_channel_compare);
    > +    if (p == NULL)
    > +        return NULL;
    > +
    > +    return *(vio_channel **) p;
    > +}
    > +
    > +/*
    > + * add virtio channel to internal btree
    > + */
    > +static vio_channel *vio_channel_add(const char *uuid)
    > +{
    > +    vio_channel *c = NULL;
    > +    void *p = NULL;
    > +
    > +    c = (vio_channel *) calloc(1UL, sizeof(vio_channel));
    > +    if (c == NULL)
    > +        goto error;
    > +
    > +    channel_count++;
    > +
    > +    strncpy(c->uuid, uuid, sizeof(c->uuid));
    > +    c->fd = -1;
    > +    c->request = NULL;
    > +    c->response = NULL;
    > +
    > +    p = tsearch((const void *) c, &channel_root, vio_channel_compare);
    > +    if (p == NULL)
    > +        goto error;
    > +
    > +    if (vio_channel_open(c) != 0 ||
    > +        vu_buffer_create(&c->request, 512) != 0 ||
    > +        vu_buffer_create(&c->response, DEFAULT_VU_BUFFER_SIZE) != 0)
    > +        goto error;
    > +
    > +    return c;
    > +
    > +  error:
    > +    if (c)
    > +        vio_channel_close(c);
    > +
    > +    return NULL;
    > +}
    > +
    > +static REQUEST_T vio_check_request(vio_channel * c)
    > +{
    > +    const char xml_req_n[] = "GET /metrics/XML\n\n";
    > +    const char xml_req_rn[] = "GET /metrics/XML\r\n\r\n";
    > +
    > +    if (strcmp(c->request->content, xml_req_n) == 0 ||
    > +        strcmp(c->request->content, xml_req_rn) == 0) {
    > +        // valid request
    > +        vu_buffer_erase(c->request);
    > +        return REQ_GET_XML;
    > +    } else if (c->request->use >= (c->request->size - 1) ||
    > +               strstr(c->request->content, "\n\n") ||
    > +               strstr(c->request->content, "\r\n\r\n")) {
    > +        // invalid request -> reset buffer
    > +        vu_buffer_erase(c->request);
    > +
    > +        vu_buffer_erase(c->response);
    > +        vu_buffer_add(c->response, "INVALID REQUEST\n\n", -1);
    > +        return REQ_INVALID;
    > +    } else {
    > +        // fragment
    > +        c->request->use = (unsigned) strnlen(c->request->content,
    > +                                             (size_t) c->request->size);
    > +    }
    > +
    > +    return REQ_INCOMPLETE;
    > +}
    > +
    > +static void vio_channel_recv(vio_channel * c)
    > +{
    > +    struct epoll_event evt;
    > +    ssize_t rc = 0;
    > +    REQUEST_T req_type = REQ_INCOMPLETE;
    > +
    > +    do {
    > +        char *buf = &c->request->content[c->request->use];
    > +        size_t len = c->request->size - c->request->use - 1;
    > +
    > +        rc = recv(c->fd, buf, len, 0);
    > +
    > +        if (rc > 0) {
    > +            req_type = vio_check_request(c);
    > +        }
    > +    } while (rc > 0 && req_type == REQ_INCOMPLETE);
    > +
    > +    if (req_type == REQ_GET_XML) {
    > +        if (vio_channel_update(c)) {
    > +            // no metrics available -> close channel
    > +            epoll_ctl(epoll_fd, EPOLL_CTL_DEL, c->fd, &evt);
    > +            vio_channel_close(c);
    > +        } else
    > +            vio_channel_send(c, EPOLLIN);
    > +    } else if (req_type == REQ_INVALID)
    > +        vio_channel_send(c, EPOLLIN);
    > +}
    > +
    > +static void vio_channel_send(vio_channel * c, uint32_t ep_event)
    > +{
    > +    struct epoll_event evt;
    > +    int len;
    > +
    > +    while ((len = (int) (c->response->use - c->response->pos)) > 0)
    > +    {
    > +        char *buf = &c->response->content[c->response->pos];
    > +        ssize_t rc = send(c->fd, buf, (size_t) len, 0);
    > +
    > +        if (rc > 0)
    > +            c->response->pos += (unsigned) rc;
    > +        else
    > +            break;
    > +    }
    > +
    > +    if (ep_event == EPOLLOUT) {
    > +        if (c->response->use <= c->response->pos) {
    > +            // next request
    > +            evt.data.ptr = c;
    > +            evt.events = EPOLLIN;
    > +            epoll_ctl(epoll_fd, EPOLL_CTL_MOD, c->fd, &evt);
    > +        }
    > +    } else if (ep_event == EPOLLIN) {
    > +        if (c->response->use > c->response->pos) {
    > +            // incomplete response
    > +            evt.data.ptr = c;
    > +            evt.events = EPOLLOUT;
    > +            epoll_ctl(epoll_fd, EPOLL_CTL_MOD, c->fd, &evt);
    > +        }
    > +    }
    > +}
    > +
    > +static void vio_handle_io(unsigned epoll_wait_ms)
    > +{
    > +    int i = 0;
    > +    uint64_t ts_end, ts_now;
    > +    struct epoll_event evt;
    > +    struct timespec ts;
    > +
    > +    clock_gettime(CLOCK_MONOTONIC, &ts);
    > +    ts_now = (uint64_t) (ts.tv_sec * 1000 + ts.tv_nsec / 1000000);
    > +    ts_end = ts_now + epoll_wait_ms;
    > +
    > +    while (ts_now < ts_end) {
    > +        int wait_ms = (int) (ts_end - ts_now);
    > +        int n =
    > +            epoll_wait(epoll_fd, epoll_events, channel_max_num + 1, wait_ms);
    > +
    > +        for (i = 0; i < n; i++) {
    > +            vio_channel *c = (epoll_events + i)->data.ptr;
    > +
    > +            if ((epoll_events + i)->events & EPOLLHUP) {
    > +                epoll_ctl(epoll_fd, EPOLL_CTL_DEL, c->fd, &evt);
    > +                vio_channel_close(c);
    > +            } else if ((epoll_events + i)->events & EPOLLIN) {
    > +                vio_channel_recv(c);
    > +            } else if ((epoll_events + i)->events & EPOLLOUT) {
    > +                vio_channel_send(c, EPOLLOUT);
    > +            }
    > +        }
    > +
    > +        clock_gettime(CLOCK_MONOTONIC, &ts);
    > +        ts_now = (uint64_t) (ts.tv_sec * 1000 + ts.tv_nsec / 1000000);
    > +    }
    > +}
    > +
    > +/*
    > + * Initialize virtio layer
    > + */
    > +int virtio_init(const char *virtio_path,
    > +                int max_channel,
    > +                int expiration_period)
    > +{
    > +    if (virtio_status == VIRTIO_INIT) {
    > +        pthread_mutex_init(&mbuffer_mtx, NULL);
    > +
    > +        channel_max_num = max_channel;
    > +        mbuffer_max_num = max_channel;
    > +        mbuffer_exp_period = expiration_period;
    > +
    > +        if (mbuffer_host == NULL)
    > +            if (vu_buffer_create (&mbuffer_host, DEFAULT_VU_BUFFER_SIZE) != 0)
    > +                goto error;
    > +
    > +        if (epoll_fd == -1) {
    > +
    > +            epoll_events = calloc((size_t) (channel_max_num + 1),
    > +                                  sizeof(struct epoll_event));
    > +            if (epoll_events == NULL)
    > +                goto error;
    > +
    > +            epoll_fd = epoll_create(1);
    > +            if (epoll_fd == -1)
    > +                goto error;
    > +
    > +            if (virtio_path == NULL ||
    > +                (strnlen(virtio_path, max_virtio_path_len) +
    > +                 VIR_UUID_STRING_BUFLEN) > (max_virtio_path_len - 2)) {
    > +
    > +                vu_log(VHOSTMD_ERR, "Invalid virtio_path");
    > +                goto error;
    > +            }
    > +
    > +            if (channel_path == NULL) {
    > +                channel_path = calloc(1UL, max_virtio_path_len + 2);
    > +                if (channel_path == NULL)
    > +                    goto error;
    > +
    > +                strncpy(channel_path, virtio_path,
    > +                        max_virtio_path_len - VIR_UUID_STRING_BUFLEN);
    > +
    > +                if (channel_path[strlen(channel_path) - 1] != '/')
    > +                    channel_path[strlen(channel_path)] = '/';
    > +            }
    > +        }
    > +
    > +        virtio_status = VIRTIO_ACTIVE;
    > +        vu_log(VHOSTMD_INFO,
    > +               "Virtio using path '%s', max_channels %d, expiration_time %ld",
    > +               channel_path, channel_max_num, mbuffer_exp_period);
    > +    }
    > +
    > +    return 0;
    > +
    > +  error:
    > +    vu_log(VHOSTMD_ERR, "Virtio initialization failed");
    > +    virtio_status = VIRTIO_ERROR;
    > +
    > +    return -1;
    > +}
    > +
    > +/*
    > + * Cleanup virtio layer
    > + */
    > +int virtio_cleanup(void)
    > +{
    > +    if (virtio_status == VIRTIO_STOP) {
    > +
    > +        if (epoll_fd != -1) {
    > +            close(epoll_fd);
    > +            epoll_fd = -1;
    > +        }
    > +
    > +        if (channel_root) {
    > +            twalk(channel_root, vio_channel_delete);
    > +            tdestroy(channel_root, free);
    > +            channel_root = NULL;
    > +            channel_count = 0;
    > +        }
    > +
    > +        if (channel_path) {
    > +            free(channel_path);
    > +            channel_path = NULL;
    > +        }
    > +
    > +        if (*vio_mbuffer_get_root()) {
    > +            twalk((const void *) *vio_mbuffer_get_root(), vio_mbuffer_delete);
    > +            tdestroy((void *) *vio_mbuffer_get_root(), free);
    > +            mbuffer_root = NULL;
    > +            mbuffer_count = 0;
    > +        }
    > +
    > +        if (mbuffer_host) {
    > +            vu_buffer_delete((vu_buffer *) mbuffer_host);
    > +            mbuffer_host = NULL;
    > +        }
    > +
    > +        if (epoll_events)
    > +            free(epoll_events);
    > +
    > +        pthread_mutex_destroy(&mbuffer_mtx);
    > +
    > +        virtio_status = VIRTIO_INIT;
    > +
    > +        return 0;
    > +    }
    > +    return -1;
    > +}
    > +
    > +/*
    > + * Main virtio function
    > + * 'start_routine' of pthread_create()
    > + */
    > +void *virtio_run(void *arg ATTRIBUTE_UNUSED)
    > +{
    > +    if (virtio_status != VIRTIO_ACTIVE) {
    > +        vu_log(VHOSTMD_ERR, "Virtio was not initialized");
    > +        return NULL;
    > +    }
    > +
    > +    while (virtio_status == VIRTIO_ACTIVE) {
    > +        if (channel_count < channel_max_num)
    > +            vio_channel_readdir(channel_path);
    > +
    > +        vio_handle_io(3000);    // process avaible requests
    > +
    > +        // remove expired metrics buffers
    > +        mbuffer_exp_ts = time(NULL) - mbuffer_exp_period;
    > +
    > +        pthread_mutex_lock(&mbuffer_mtx);
    > +
    > +        if (*vio_mbuffer_get_root())
    > +            twalk((const void *) *vio_mbuffer_get_root(), vio_mbuffer_expire);
    > +
    > +        pthread_mutex_unlock(&mbuffer_mtx);
    > +    }
    > +
    > +    virtio_cleanup();
    > +
    > +    return NULL;
    > +}
    > +
    > +/*
    > + * Update the metrics response buffer of a VM/host
    > + */
    > +int virtio_metrics_update(const char *buf,
    > +                          int len,
    > +                          const char *uuid,
    > +                          metric_context ctx)
    > +{
    > +    int rc = -1;
    > +    vio_mbuffer *b;
    > +
    > +    if (buf == NULL || len == 0 || virtio_status != VIRTIO_ACTIVE ||
    > +        (ctx == METRIC_CONTEXT_VM && uuid == NULL))
    > +        return -1;
    > +
    > +    pthread_mutex_lock(&mbuffer_mtx);
    > +
    > +    if (ctx == METRIC_CONTEXT_HOST) {
    > +        vu_buffer_erase(mbuffer_host);
    > +        vu_buffer_add(mbuffer_host, buf, len);
    > +#ifdef ENABLE_DEBUG
    > +        vu_log(VHOSTMD_DEBUG, "New content for HOST (%u)\n>>>%s<<<\n",
    > +               len, mbuffer_host->content);
    > +#endif
    > +        rc = 0;
    > +    }
    > +    else if (ctx == METRIC_CONTEXT_VM) {
    > +        if ((b = vio_mbuffer_find(uuid)) == NULL) {
    > +            // for a new VM create a new mbuffer
    > +            b = vio_mbuffer_add(uuid);
    > +        }
    > +
    > +        if (b != NULL) {
    > +            vu_buffer_erase(b->xml);
    > +            vu_buffer_add(b->xml, buf, len);
    > +            // update the timestamp that mbuffer can be expired
    > +            b->last_update = time(NULL);
    > +#ifdef ENABLE_DEBUG
    > +            vu_log(VHOSTMD_DEBUG, "New content for %s (%u)\n>>>%s<<<\n",
    > +                   uuid, len, b->xml->content);
    > +#endif
    > +            rc = 0;
    > +        }
    > +    }
    > +
    > +    pthread_mutex_unlock(&mbuffer_mtx);
    > +
    > +    return rc;
    > +}
    > +
    > +/*
    > + * Stop virtio thread
    > + */
    > +void virtio_stop(void)
    > +{
    > +    if (virtio_status == VIRTIO_ACTIVE)
    > +        virtio_status = VIRTIO_STOP;
    > +}
    > 
    
    





More information about the virt-tools-list mailing list