[virt-tools-list] [vhostmd 2/5] Add virtio functions

Jim Fehlig jfehlig at suse.com
Wed Sep 26 20:44:23 UTC 2018


On 8/30/18 4:11 AM, Michael Trapp wrote:
> At the vhostmd side virtio channels are Unix domain sockets from QEMU
> which are created during a VM start and removed when the VM is stopped.
> 
> Basically this implementation
> - monitors a directory for new virtio channels (channel names contain VM UUID)
> - for valid UUIDs, also known by libvirtd, it connects to the UDS
> - buffers VM/HOST metrics and handles request/response on the sockets
> 
> It provides the functions
> virtio_init()
>      init function of virtio layer
> virtio_run()
>      the start_routine for the virtio-thread to handle the virtio based
>      communication
> virtio_metrics_update()
>      called for each VM/HOST metrics update to update the virtio internal
>      metrics buffer.
> virtio_stop()
>      stop the virtio-thread
> 
> ---
> 
>    Here is a brief description of the concept of vhostmd / virtio interaction
> 
>    Vhostmd calls the virtio API functions
> 
>    *** virtio API ***
> 
>      --> virtio_init()
>          Initialize the virtio layer.
>          Called once before the virtio thread starts.
> 
>      --> virtio_run()
>          Start routine of the virtio thread.
> 
>      --> virtio_stop()
>          Reset virtio_status to stop the virtio thread.
> 
>      --> virtio_metrics_update()
>          This adds/updates the metrics buffer of a VM/host.
>          It must be called for every change of VM/host metrics.
> 
>    *** virtio internal ***
> 
>    Every virtio internal code runs in the virtio thread - see virtio_run().
> 
>    Access to mbuffers within the virtio thread is
>        - read the mbuffer content
>            see vio_channel_update() -> vio_mbuffer_find()
>        - check if a VM is 'known' by vhostmd
>            see vio_channel_readdir() -> vio_mbuffer_find()
>        - expire mbuffers
>            see virtio_run() -> vio_mbuffer_expire()
>            Expiration timeout is >= (3 * 'vhostmd update_period')
> 
>    The mbuffer (metrics buffer) structs of VMs and host are maintained in
>    a btree (mbuffer.root).
>    Every mbuffer access is exclusive - see mbuffer_mutex.

Thanks for all the details. What causes a mbuffer to expire? I suppose the 
associated VM has shutdown and the buffer hasn't been updated?

The design and code seem fine to me. So far I have only found small issues and 
nits. The small issues are in the form of compilation errors :-). I'm using gcc 
8.2.1 and '-Wall -Werror'.

> 
>    *** tests ***
> 
>    So far I've tested vhostmd with virtio support in a setup with 100 alpine Vms,
>    each VM continiously polling the metrics every 5sec, for several hours.
>    To have a more dynamic test environment all VMs were stopped/started
>    several times.
> 
>    Beside the dependencies to the vu_buffer functions, virtio code does not call
>    any libvirt functions and can also be run/tested without vhostmd and libvirt.
>    I've also checked this variant in combination with valgrind.
>    But at the moment the required test code and the UDS server program is not
>    part of this patch.

Cool! It is obvious all the test effort paid off in the general quality of the code.

> 
>   include/virtio.h |  53 ++++
>   vhostmd/virtio.c | 833 +++++++++++++++++++++++++++++++++++++++++++++++++++++++
>   2 files changed, 886 insertions(+)
>   create mode 100644 include/virtio.h
>   create mode 100644 vhostmd/virtio.c
> 
> diff --git a/include/virtio.h b/include/virtio.h
> new file mode 100644
> index 0000000..2de4c72
> --- /dev/null
> +++ b/include/virtio.h
> @@ -0,0 +1,53 @@
> +/*
> + * Copyright (C) 2018 SAP SE
> + *
> + * This library is free software; you can redistribute it and/or
> + * modify it under the terms of the GNU Lesser General Public
> + * License as published by the Free Software Foundation; either
> + * version 2.1 of the License, or (at your option) any later version.
> + *
> + * This library is distributed in the hope that it will be useful,
> + * but WITHOUT ANY WARRANTY; without even the implied warranty of
> + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
> + * Lesser General Public License for more details.
> + *
> + * You should have received a copy of the GNU Lesser General Public
> + * License along with this library; if not, write to the Free Software
> + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307  USA
> + *
> + * Author: Michael Trapp <michael.trapp at sap.com>
> + */
> +
> +#ifndef __VIRTIO_H__
> +#define __VIRTIO_H__
> +
> +/*
> + * Initialize virtio layer
> + */
> +int virtio_init(const char *virtio_path, unsigned max_channel, unsigned expiration_period);
> +
> +/*
> + * Cleanup virtio layer
> + */
> +int virtio_cleanup(void);
> +
> +/*
> + * Main virtio function
> + * 'start_routine' of pthread_create()
> + */
> +void *virtio_run(void *arg);
> +
> +/*
> + * Update the metrics response buffer of a VM/host
> + */
> +int virtio_metrics_update(const char     *buf,
> +                          unsigned int    len,
> +                          const char     *uuid,
> +                          metric_context  ctx);

No need to align the parameter names IMO. The extra whitespace can be removed.

> +
> +/*
> + * Stop virtio thread
> + */
> +void virtio_stop(void);
> +
> +#endif /* __VIRTIO_H__ */
> diff --git a/vhostmd/virtio.c b/vhostmd/virtio.c
> new file mode 100644
> index 0000000..7a56dd2
> --- /dev/null
> +++ b/vhostmd/virtio.c
> @@ -0,0 +1,833 @@
> +/*
> + * Copyright (C) 2018 SAP SE
> + *
> + * This library is free software; you can redistribute it and/or
> + * modify it under the terms of the GNU Lesser General Public
> + * License as published by the Free Software Foundation; either
> + * version 2.1 of the License, or (at your option) any later version.
> + *
> + * This library is distributed in the hope that it will be useful,
> + * but WITHOUT ANY WARRANTY; without even the implied warranty of
> + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
> + * Lesser General Public License for more details.
> + *
> + * You should have received a copy of the GNU Lesser General Public
> + * License along with this library; if not, write to the Free Software
> + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307  USA
> + *
> + * Author: Michael Trapp <michael.trapp at sap.com>
> + */
> +
> +#include <config.h>
> +
> +#include <stdlib.h>
> +#include <stdio.h>
> +#include <stdarg.h>
> +#include <string.h>
> +#include <stdint.h>
> +#include <stddef.h>
> +#include <errno.h>
> +#include <getopt.h>
> +#include <sys/types.h>
> +#include <sys/wait.h>
> +#include <strings.h>
> +#include <sys/un.h>
> +#include <sys/socket.h>
> +#include <sys/epoll.h>
> +#include <arpa/inet.h>
> +#include <netinet/in.h>
> +#include <netinet/tcp.h>
> +#include <unistd.h>
> +#include <fcntl.h>
> +#include <search.h>

Are all of these includes needed? E.g. netinit/tcp.h looks suspicious.

> +#include <dirent.h>
> +#include <time.h>
> +#include <pthread.h>
> +#include <libvirt/libvirt.h>
> +
> +#include "metric.h"
> +#include "util.h"
> +
> +#define DEFAULT_VU_BUFFER_SIZE 1024
> +#define MAX_REQUEST_LEN 512
> +
> +#define VIRTIO_PREFIX_LEN 21
> +#define VIRTIO_NAME_BUFLEN (VIRTIO_PREFIX_LEN + VIR_UUID_STRING_BUFLEN)
> +
> +
> +typedef struct {
> +    int  fd;
> +    char uuid[VIR_UUID_STRING_BUFLEN];
> +    vu_buffer *request,
> +              *response;

I'd prefer 'vu_buffer *response'.

> +} vio_channel;
> +
> +typedef struct {
> +    char   uuid[VIR_UUID_STRING_BUFLEN];
> +    time_t last_update;
> +    vu_buffer *xml;
> +} vio_mbuffer;
> +
> +static struct {
> +    volatile vu_buffer *host;
> +    volatile void *root;
> +    int   max_num;
> +    volatile int   count;
> +    int   idx;
> +    time_t exp_period,
> +           exp_ts;
> +} mbuffer = { NULL, NULL, 0, 0, 0, 0, 0 };
> +
> +static struct {
> +    void *root;
> +    char *path;
> +    const char *prefix;
> +    int   max_num;
> +    int   count;
> +} channel = { NULL, NULL, "org.github.vhostmd.1.", 0, 0 };
> +

Same comment about aligning names in these structs. The extra whitespace between 
type and name can be removed.

> +
> +static int epoll_fd = -1;
> +static struct epoll_event *epoll_events = NULL;
> +static const unsigned max_virtio_path_len = sizeof(((struct sockaddr_un *) 0)->sun_path) - 1;
> +static pthread_mutex_t mbuffer_mtx;
> +
> +static enum {
> +    VIRTIO_INIT,
> +    VIRTIO_ACTIVE,
> +    VIRTIO_STOP,
> +    VIRTIO_ERROR
> +} virtio_status = VIRTIO_INIT;
> +
> +
> +/*
> + * static functions
> + */
> +
> +static int vio_channel_compare(const void *a, const void *b);
> +static void vio_channel_delete(const void *node, const VISIT which, const int depth);
> +
> +static vio_channel *vio_channel_find(const char *uuid);
> +static vio_channel *vio_channel_add(const char *uuid);
> +static int vio_channel_open(vio_channel * c);
> +static void vio_channel_close(vio_channel * c);
> +static int vio_channel_update(vio_channel * c);
> +static int vio_channel_readdir(const char const *path);

virtio.c:115:43: error: duplicate ‘const’ declaration specifier 
[-Werror=duplicate-decl-specifier]
  static int vio_channel_readdir(const char const *path);
                                            ^~~~~

> +
> +static int vio_mbuffer_compare(const void *a, const void *b);
> +static void vio_mbuffer_delete(const void *node, const VISIT which, const int depth);
> +static void vio_mbuffer_expire(const void *node, const VISIT which, const int depth);
> +static void vio_mbuffer_print(const void *node, const VISIT which, const int depth);
> +
> +static vio_mbuffer *vio_mbuffer_find(const char *uuid);
> +static vio_mbuffer *vio_mbuffer_add(const char *uuid);
> +
> +
> +static void vio_hanlde_io(unsigned epoll_wait_ms);
> +
> +/*
> + * update response buffer of a channel
> + */
> +static int vio_channel_update(vio_channel *c)
> +{
> +    static const char *metrics_start_str = "<metrics>\n";
> +    static const char *metrics_end_str   = "</metrics>\n\n";
> +
> +    int rc = 0;
> +    vio_mbuffer *b = NULL;
> +
> +    if (c == NULL)
> +        return -1;
> +
> +    vu_buffer_erase(c->response);
> +    vu_buffer_add(c->response, metrics_start_str, -1);
> +
> +    pthread_mutex_lock(&mbuffer_mtx);
> +
> +    if (mbuffer.host->content && mbuffer.host->use)
> +        vu_buffer_add(c->response, mbuffer.host->content, -1);
> +    else
> +        vu_buffer_add(c->response, "host metrics not available", -1);
> +
> +    b = vio_mbuffer_find(c->uuid);
> +    if (b && b->xml->use)
> +        vu_buffer_add(c->response, b->xml->content, -1);
> +    else
> +        rc = -1;
> +
> +    pthread_mutex_unlock(&mbuffer_mtx);
> +
> +    vu_buffer_add(c->response, metrics_end_str, -1);
> +
> +#ifdef ENABLE_DEBUG
> +    vu_log(VHOSTMD_DEBUG, "DEBUG: new response for %s (%u)\n>>>%s<<<\n",
> +           c->uuid, c->response->use, c->response->content);
> +#endif
> +    return rc;
> +}
> +
> +/*
> + * close channel and free allocated buffers
> + */
> +static void vio_channel_close(vio_channel *c)
> +{
> +    if (c != NULL) {
> +        if (c->fd >= 0) {
> +            vu_log(VHOSTMD_INFO, "INFO: closed channel '%s%s' (%d/%d)",
> +                   channel.prefix, c->uuid, channel.count, channel.max_num);
> +            close(c->fd);
> +        }
> +
> +        if (c->request)
> +            vu_buffer_delete(c->request);
> +        if (c->response)
> +            vu_buffer_delete(c->response);
> +
> +        tdelete((const void *) c, &channel.root, vio_channel_compare);
> +        free(c);
> +        channel.count--;
> +    }
> +}
> +
> +/*
> + * connect channel and add the socket to the epoll desriptor
> + */
> +static int vio_channel_open(vio_channel *c)
> +{
> +    struct sockaddr_un address;
> +    const unsigned max_path_len = sizeof(((struct sockaddr_un *) 0)->sun_path) - 1;
> +    struct epoll_event evt;
> +    unsigned len = strlen(channel.path) + VIRTIO_PREFIX_LEN + strlen(c->uuid);
> +    int flags;
> +
> +    bzero(&address, sizeof(address));
> +    address.sun_family = AF_LOCAL;
> +
> +    if (len >= max_path_len) {
> +        vu_log(VHOSTMD_ERR, "ERROR: channel '%s%s%s' name too long (%u/%u)",
> +               channel.path, channel.prefix, c->uuid, len, max_path_len);
> +        return -1;
> +    }
> +
> +    len = snprintf(address.sun_path, max_path_len, "%s%s%s", channel.path, channel.prefix, c->uuid);
> +
> +    if (len >= max_path_len || len == 0) {
> +        vu_log(VHOSTMD_ERR, "ERROR: channel '%s%s%s' - name is too long (%u/%u)",
> +               channel.path, channel.prefix, c->uuid, len, max_path_len);
> +        return -1;
> +    }
> +
> +    if ((c->fd = socket(AF_LOCAL, SOCK_STREAM, 0)) < 0) {
> +        vu_log(VHOSTMD_ERR, "ERROR: channel '%s' - socket() failed (%s)",
> +               address.sun_path, strerror(errno));
> +        return -1;
> +    }
> +
> +    flags = fcntl(c->fd, F_GETFL, 0);
> +    if (flags < 0) {
> +        vu_log(VHOSTMD_ERR, "ERROR: channel '%s' fcntl() failed (%s)",
> +               address.sun_path, strerror(errno));
> +        return -1;
> +    }
> +
> +    flags |= flags | O_NONBLOCK;
> +    if (fcntl(c->fd, F_SETFL, flags) == -1) {
> +        vu_log(VHOSTMD_ERR, "ERROR: channel '%s' fcntl() failed (%s)",
> +               address.sun_path, strerror(errno));
> +        return -1;
> +    }
> +
> +    if (connect(c->fd, (struct sockaddr *) &address, sizeof(address)) < 0) {
> +        vu_log(VHOSTMD_ERR, "ERROR: channel '%s' - connect() failed (%s)",
> +               address.sun_path, strerror(errno));
> +        return -1;
> +    }
> +
> +    evt.data.ptr = c;
> +    evt.events   = EPOLLIN;

I'll stop mentioning that there is no need to align types, names, operators, 
etc. :-)

> +
> +    if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, c->fd, &evt) == -1) {
> +        vu_log(VHOSTMD_ERR, "ERROR: could not add channel '%s' - epoll_ctl() failed (%s)",
> +               address.sun_path, strerror(errno));
> +        return -1;
> +    }
> +
> +    vu_log(VHOSTMD_INFO, "INFO: opened channel '%s' (%d/%d)",
> +           address.sun_path, channel.count, channel.max_num);
> +
> +    return 0;
> +}
> +
> +/*
> + * lookup UDS sockets in the directory
> + * for valid type/name/mbuffer connect and register channel
> + */
> +static int vio_channel_readdir(const char const *path)

virtio.c:265:43: error: duplicate ‘const’ declaration specifier 
[-Werror=duplicate-decl-specifier]
  static int vio_channel_readdir(const char const *path)

> +{
> +    struct dirent *ent;
> +    DIR           *dir = NULL;
> +
> +    if ((dir = opendir(path)) == NULL) {
> +        vu_log(VHOSTMD_ERR, "ERROR: opendir(%s) failed (%s)", path, strerror(errno));
> +        return -1;
> +    }
> +
> +    while ((ent = readdir(dir)) != NULL) {
> +
> +        if (ent->d_type == DT_SOCK &&
> +            strncmp(ent->d_name, channel.prefix, VIRTIO_PREFIX_LEN) == 0 &&
> +            strnlen(ent->d_name, VIRTIO_NAME_BUFLEN) == (VIRTIO_NAME_BUFLEN - 1)) {
> +
> +            const char *uuid = &ent->d_name[VIRTIO_PREFIX_LEN];
> +
> +            vio_channel *c = vio_channel_find(uuid);
> +
> +            if (c == NULL) {
> +                if (channel.count >= channel.max_num) {
> +                    closedir(dir);
> +                    vu_log(VHOSTMD_ERR, "ERROR: could not add channel '%s%s%s'"
> +                           " - too many VMs (%d/%d)",
> +                           path, channel.prefix, uuid, channel.count, channel.max_num);
> +                    return -1;
> +                }
> +
> +                pthread_mutex_lock(&mbuffer_mtx);
> +
> +                /* don't add the channel if there is no mbuffer for this VM */
> +                if (vio_mbuffer_find(uuid) != NULL) {
> +#ifdef ENABLE_DEBUG
> +                    vu_log(VHOSTMD_DEBUG, "DEBUG: new channel %s%s\n", path, ent->d_name);
> +#endif
> +                    c = vio_channel_add(uuid);
> +
> +                    if (c == NULL)
> +                        vu_log(VHOSTMD_ERR, "ERROR: could not add channel '%s%s'",
> +                               path, ent->d_name);
> +                }
> +
> +                pthread_mutex_unlock(&mbuffer_mtx);
> +            }
> +        }
> +    }
> +    closedir(dir);
> +
> +    return 0;
> +}
> +
> +/*
> + * channel - btree - compare function
> + */
> +static int vio_channel_compare(const void *a, const void *b)
> +{
> +    if (a == NULL || b == NULL)
> +        return 1;
> +
> +    return strncmp(((vio_channel *) a)->uuid, ((vio_channel *) b)->uuid, VIR_UUID_STRING_BUFLEN);
> +}
> +
> +/*
> + * channel - btree/twalk - action function
> + * delete entries
> + */
> +static void vio_channel_delete(const void *node, const VISIT which, const int depth)

virtio.c:332:79: error: unused parameter ‘depth’ [-Werror=unused-parameter]
  static void vio_channel_delete(const void *node, const VISIT which, const int 
depth)

> +{
> +    if (which == endorder || which == leaf) {
> +        if (node) {
> +            struct epoll_event evt;
> +            vio_channel *c = *(vio_channel **) node;
> +            if (c) {
> +                epoll_ctl(epoll_fd, EPOLL_CTL_DEL, c->fd, &evt);
> +                vio_channel_close(c);
> +            }
> +        }
> +    }
> +}
> +
> +/*
> + * mbuffer - btree - compare function
> + */
> +static int vio_mbuffer_compare(const void *a, const void *b)
> +{
> +    if (a == NULL || b == NULL)
> +        return 1;
> +
> +    return strncmp(((vio_mbuffer *) a)->uuid, ((vio_mbuffer *) b)->uuid, VIR_UUID_STRING_BUFLEN);
> +}
> +
> +/*
> + * mbuffer - btree/twalk - action function
> + * delete entries
> + */
> +static void vio_mbuffer_delete(const void *node, const VISIT which, const int depth)

virtio.c:361:79: error: unused parameter ‘depth’ [-Werror=unused-parameter]
  static void vio_mbuffer_delete(const void *node, const VISIT which, const int 
depth)

> +{
> +    if (which == endorder || which == leaf) {
> +        if (node) {
> +            vio_mbuffer *b = *(vio_mbuffer **) node;
> +
> +            if (b) {
> +                if (b->xml)
> +                    vu_buffer_delete(b->xml);
> +                tdelete((const void *) b, &mbuffer.root, vio_mbuffer_compare);

virtio.c:370:43: error: passing argument 2 of ‘tdelete’ from incompatible 
pointer type [-Werror=incompatible-pointer-types]
                  tdelete((const void *) b, &mbuffer.root, vio_mbuffer_compare);
In file included from virtio.c:42:
/usr/include/search.h:138:14: note: expected ‘void ** restrict’ but argument is 
of type ‘volatile void **’
  extern void *tdelete (const void *__restrict __key,

> +                free(b);
> +                mbuffer.count--;
> +            }
> +        }
> +    }
> +}
> +
> +/*
> + * mbuffer - btree/twalk - action function
> + * expire entries
> + */
> +static void vio_mbuffer_expire(const void *node, const VISIT which, const int depth)
> +{
> +    if (which == endorder || which == leaf) {
> +        if (node) {
> +            vio_mbuffer *b = *(vio_mbuffer **) node;
> +
> +            // remove expired mbuffer
> +            // action function does not support custom arguments
> +            // --> use a static variable: exp_ts
> +            if (b && b->last_update < mbuffer.exp_ts) {
> +                vio_channel *c = vio_channel_find(b->uuid);
> +
> +                if (c != NULL)
> +                    vio_channel_close(c);
> +
> +#ifdef ENABLE_DEBUG
> +                vu_log(VHOSTMD_DEBUG, "DEBUG: expire mbuffer '%s' (%d/%d)",
> +                       b->uuid, mbuffer.count, mbuffer.max_num);
> +#endif
> +                if (b->xml)
> +                    vu_buffer_delete(b->xml);
> +                tdelete((const void *) b, &mbuffer.root, vio_mbuffer_compare);

Same errors in this function as vio_mbuffer_delete().

> +                free(b);
> +                mbuffer.count--;
> +            }
> +        }
> +    }
> +}
> +
> +static void vio_mbuffer_print(const void *node, const VISIT which, const int depth)

Unused depth param.

> +{
> +    if (which == endorder || which == leaf) {
> +        if (node) {
> +            vio_mbuffer *b = *(vio_mbuffer **) node;
> +
> +            if (b) {
> +#ifdef ENABLE_DEBUG
> +                vu_log(VHOSTMD_DEBUG, "DEBUG: %4d %s %lu\n",
> +                       ++mbuffer.idx, b->uuid, b->last_update);
> +#endif
> +            }
> +        }
> +    }
> +}
> +
> +/*
> + * lookup metrics buffer in internal btree
> + */
> +static vio_mbuffer *vio_mbuffer_find(const char *uuid)
> +{
> +    vio_mbuffer b;
> +    void *p;
> +
> +    strncpy(b.uuid, uuid, sizeof(b.uuid));
> +
> +    p = tfind((const void *) &b, &mbuffer.root, vio_mbuffer_compare);

virtio.c:437:34: error: passing argument 2 of ‘tfind’ from incompatible pointer 
type [-Werror=incompatible-pointer-types]
      p = tfind((const void *) &b, &mbuffer.root, vio_mbuffer_compare);
                                   ^~~~~~~~~~~~~
In file included from virtio.c:42:
/usr/include/search.h:134:14: note: expected ‘void * const*’ but argument is of 
type ‘volatile void **’
  extern void *tfind (const void *__key, void *const *__rootp,
               ^~~~~

> +    if (p == NULL)
> +        return NULL;
> +
> +    return *(vio_mbuffer **) p;
> +}
> +
> +/*
> + * add metrics buffer to internal btree
> + */
> +static vio_mbuffer *vio_mbuffer_add(const char *uuid)
> +{
> +    vio_mbuffer *b = NULL;
> +    void         *p = NULL;
> +
> +    if (mbuffer.count >= mbuffer.max_num) {
> +        vu_log(VHOSTMD_ERR, "ERROR: could not add metrics buffer '%s' - too many VMs (%d/%d)",
> +               uuid, mbuffer.count, mbuffer.max_num);
> +
> +#ifdef ENABLE_DEBUG
> +        mbuffer.idx = 0;
> +        vu_log(VHOSTMD_DEBUG, "DEBUG: exp_ts %lu, allocated mbuffer:\n",
> +               mbuffer.exp_ts);
> +        twalk(mbuffer.root, vio_mbuffer_print);

virtio.c:460:22: error: passing argument 1 of ‘twalk’ discards ‘volatile’ 
qualifier from pointer target type [-Werror=discarded-qualifiers]
          twalk(mbuffer.root, vio_mbuffer_print);
                ~~~~~~~^~~~~
In file included from virtio.c:42:
/usr/include/search.h:150:32: note: expected ‘const void *’ but argument is of 
type ‘volatile void *’
  extern void twalk (const void *__root, __action_fn_t __action);
                     ~~~~~~~~~~~~^~~~~~
There are quite a few more compilation errors throughout this file. I'll stop 
pointing them out as I'm sure you'll be able to find them once you adjust your 
compiler settings.

> +#endif
> +
> +        return NULL;
> +    }
> +
> +    b = (vio_mbuffer *) calloc(1, sizeof(vio_mbuffer));
> +    if (b == NULL)
> +        goto error;
> +
> +    strncpy(b->uuid, uuid, sizeof(b->uuid));
> +    b->xml = NULL;
> +
> +    if (vu_buffer_create(&b->xml, DEFAULT_VU_BUFFER_SIZE) != 0) {
> +        free(b);
> +        goto error;
> +    }
> +
> +    p = tsearch((const void *) b, &mbuffer.root, vio_mbuffer_compare);
> +    if (p == NULL) {
> +        vu_buffer_delete(b->xml);
> +        free(b);
> +        goto error;
> +    }
> +
> +    mbuffer.count++;
> +    return b;
> +
> +  error:
> +
> +    vu_log(VHOSTMD_ERR, "ERROR: vio_mbuffer_add(%s) failed (%d/%d)",
> +           uuid, mbuffer.count, mbuffer.max_num);
> +
> +    return NULL;
> +}
> +
> +/*
> + * lookup virtio channel in internal btree
> + */
> +static vio_channel *vio_channel_find(const char *uuid)
> +{
> +    vio_channel c;
> +    void *p;
> +
> +    strncpy(c.uuid, uuid, sizeof(c.uuid));
> +
> +    p = tfind((const void *) &c, &channel.root, vio_channel_compare);
> +    if (p == NULL)
> +        return NULL;
> +
> +    return *(vio_channel **) p;
> +}
> +
> +/*
> + * add virtio channel to internal btree
> + */
> +static vio_channel *vio_channel_add(const char *uuid)
> +{
> +    vio_channel *c = NULL;
> +    void         *p = NULL;
> +
> +    c = (vio_channel *) calloc(1, sizeof(vio_channel));
> +    if (c == NULL)
> +        goto error;
> +
> +    channel.count++;
> +
> +    strncpy(c->uuid, uuid, sizeof(c->uuid));
> +    c->fd = -1;
> +    c->request  = NULL;
> +    c->response = NULL;
> +
> +    p = tsearch((const void *) c, &channel.root, vio_channel_compare);
> +    if (p == NULL)
> +        goto error;
> +
> +    if (vio_channel_open(c) != 0 ||
> +        vu_buffer_create(&c->request, 512) != 0 ||
> +        vu_buffer_create(&c->response, DEFAULT_VU_BUFFER_SIZE) != 0)
> +        goto error;
> +
> +    return c;
> +
> +error:
> +
> +    vu_log(VHOSTMD_ERR, "ERROR: vio_channel_add(%s%s) failed", channel.prefix, uuid);

Empty line between 'error:' and 'vu_log' can be removed.

> +
> +    if (c)
> +        vio_channel_close(c);
> +
> +    return NULL;
> +}
> +
> +static void vio_hanlde_io(unsigned epoll_wait_ms)
> +{
> +    int i = 0;
> +    uint64_t ts_end, ts_now;
> +    struct epoll_event evt;
> +    struct timespec    ts;
> +
> +    clock_gettime(CLOCK_MONOTONIC, &ts);
> +    ts_now = ts.tv_sec * 1000 + ts.tv_nsec / 1000000;
> +    ts_end = ts_now + epoll_wait_ms;
> +
> +    while (ts_now < ts_end) {
> +        int wait_ms = (int) (ts_end - ts_now);
> +        int n = epoll_wait(epoll_fd, epoll_events, channel.max_num +1, wait_ms);
> +
> +        for (i = 0; i < n; i++) {
> +            vio_channel *c = (epoll_events + i)->data.ptr;
> +
> +            if ((epoll_events + i)->events & EPOLLHUP) {
> +                epoll_ctl(epoll_fd, EPOLL_CTL_DEL, c->fd, &evt);
> +                vio_channel_close(c);
> +            }
> +            else if ((epoll_events + i)->events & EPOLLIN) {

This is nitpick stuff, but I'd prefer the 'else' on same line as closing bracket 
of the if, e.g.

     } else if (blabla) {

> +                unsigned send_response = 0;
> +                int rc = 0;
> +
> +                do {
> +                    char *buf = &c->request->content[c->request->use];
> +                    int   len = c->request->size - c->request->use - 1;
> +
> +                    rc = recv(c->fd, buf, len, 0);
> +
> +                    if (rc > 0) {
> +                        const char xml_request[] = "GET /metrics/XML\n\n";
> +
> +                        if (strncmp(c->request->content, xml_request, strlen(xml_request)) == 0) {
> +                            // valid request
> +                            vu_buffer_erase(c->request);
> +
> +                            if (vio_channel_update(c)) {        // no metrics available -> close channel
> +                                epoll_ctl(epoll_fd, EPOLL_CTL_DEL, c->fd, &evt);
> +                                vio_channel_close(c);
> +                            }
> +                            else
> +                                send_response = 1;
> +                        }
> +                        else if (c->request->use >= (c->request->size - 1) ||
> +                                 strstr(c->request->content, "\n\n")) {
> +                            // invalid request -> reset buffer
> +                            vu_buffer_erase(c->request);
> +
> +                            vu_buffer_erase(c->response);
> +                            vu_buffer_add(c->response, "INVALID REQUEST\n\n", -1);
> +                            send_response = 1;
> +                        }
> +                        else {
> +                            // fragment
> +                            c->request->use = strnlen(c->request->content, c->request->size);
> +                        }
> +                    }
> +                } while (rc > 0 && send_response == 0);
> +
> +                if (send_response) {
> +                    do {
> +                        char *buf = &c->response->content[c->response->pos];
> +                        int   len = c->response->use - c->response->pos;
> +
> +                        rc = send(c->fd, buf, len, 0);
> +                        if (rc > 0)
> +                            c->response->pos += rc;
> +                    } while ((c->response->pos < c->response->use) && (rc > 0));
> +
> +                    // incomplete response
> +                    if (c->response->use > c->response->pos) {
> +                        evt.data.ptr = c;
> +                        evt.events   = EPOLLOUT;
> +                        epoll_ctl(epoll_fd, EPOLL_CTL_MOD, c->fd, &evt);
> +                    }
> +                }
> +            }
> +            else if ((epoll_events + i)->events & EPOLLOUT) {
> +                int rc = 0;
> +                do {
> +                    char *buf = &c->response->content[c->response->pos];
> +                    int   len = c->response->use - c->response->pos;
> +
> +                    rc = send(c->fd, buf, len, 0);
> +                    if (rc > 0)
> +                        c->response->pos += rc;
> +                } while ((c->response->pos < c->response->use) && (rc > 0));
> +
> +                if (c->response->use <= c->response->pos) {
> +                    evt.data.ptr = c;
> +                    evt.events   = EPOLLIN;
> +                    epoll_ctl(epoll_fd, EPOLL_CTL_MOD, c->fd, &evt);
> +                }
> +            }
> +        }
> +
> +        clock_gettime(CLOCK_MONOTONIC, &ts);
> +        ts_now = ts.tv_sec * 1000 + ts.tv_nsec / 1000000;
> +    }

This loop is a little intense, but I've glance through it twice and didn't 
notice any problems.

Looking good!

Regards,
Jim

> +}
> +
> +/*
> + * Initialize virtio layer
> + */
> +int virtio_init(const char *virtio_path, unsigned max_channel, unsigned expiration_period)
> +{
> +    if (virtio_status == VIRTIO_INIT) {
> +        pthread_mutex_init(&mbuffer_mtx, NULL);
> +
> +        channel.max_num = max_channel;
> +        mbuffer.max_num = max_channel;
> +        mbuffer.exp_period = expiration_period;
> +
> +        if (mbuffer.host == NULL)
> +            if (vu_buffer_create(&mbuffer.host, DEFAULT_VU_BUFFER_SIZE) != 0)
> +                goto error;
> +
> +        if (epoll_fd == -1) {
> +
> +            epoll_events = calloc(channel.max_num + 1, sizeof(struct epoll_event));
> +            if (epoll_events == NULL)
> +                goto error;
> +
> +            epoll_fd = epoll_create(1);
> +            if (epoll_fd == -1)
> +                goto error;
> +
> +            if (virtio_path == NULL ||
> +                (strnlen(virtio_path, max_virtio_path_len) + VIR_UUID_STRING_BUFLEN) > (max_virtio_path_len - 2)) {
> +
> +                vu_log(VHOSTMD_ERR, "ERROR: invalid virtio_path");
> +                goto error;
> +            }
> +
> +            channel.path = calloc(1, max_virtio_path_len + 2);
> +            if (channel.path == NULL)
> +                goto error;
> +
> +            strncpy(channel.path, virtio_path, max_virtio_path_len - VIR_UUID_STRING_BUFLEN);
> +
> +            if (channel.path[strlen(channel.path) - 1] != '/')
> +                channel.path[strlen(channel.path)] = '/';
> +        }
> +
> +        virtio_status = VIRTIO_ACTIVE;
> +    }
> +
> +    return 0;
> +
> +  error:
> +    vu_log(VHOSTMD_ERR, "ERROR: virtio_init() initialization failed");
> +    virtio_status = VIRTIO_ERROR;
> +
> +    return -1;
> +}
> +
> +/*
> + * Cleanup virtio layer
> + */
> +int virtio_cleanup(void)
> +{
> +    if (virtio_status == VIRTIO_STOP) {
> +
> +        if (epoll_fd != -1) {
> +            close(epoll_fd);
> +            epoll_fd = -1;
> +        }
> +
> +        if (channel.root) {
> +            twalk(channel.root, vio_channel_delete);
> +            tdestroy(channel.root, free);
> +            channel.count = 0;
> +        }
> +
> +        if (mbuffer.root) {
> +            twalk(mbuffer.root, vio_mbuffer_delete);
> +            tdestroy(mbuffer.root, free);
> +            mbuffer.count = 0;
> +        }
> +
> +        if (epoll_events)
> +            free(epoll_events);
> +
> +        pthread_mutex_destroy(&mbuffer_mtx);
> +
> +        virtio_status = VIRTIO_INIT;
> +
> +        return 0;
> +    }
> +    return -1;
> +}
> +
> +/*
> + * Main virtio function
> + * 'start_routine' of pthread_create()
> + */
> +void *virtio_run(void *arg)
> +{
> +    if (virtio_status != VIRTIO_ACTIVE) {
> +        vu_log(VHOSTMD_ERR, "ERROR: virtio_loop() not initialized");
> +        return -1;
> +    }
> +
> +    while (virtio_status == VIRTIO_ACTIVE) {
> +        if (channel.count < channel.max_num)
> +            vio_channel_readdir(channel.path);
> +
> +        vio_hanlde_io(3000);   // process avaible requests
> +
> +        // remove expired metrics buffers
> +        mbuffer.exp_ts = time(NULL) - mbuffer.exp_period;
> +
> +        pthread_mutex_lock(&mbuffer_mtx);
> +
> +        if (mbuffer.root)
> +            twalk(mbuffer.root, vio_mbuffer_expire);
> +
> +        pthread_mutex_unlock(&mbuffer_mtx);
> +    }
> +
> +    return 0;
> +}
> +
> +/*
> + * Update the metrics response buffer of a VM/host
> + */
> +int virtio_metrics_update(const char     *buf,
> +                          unsigned int    len,
> +                          const char     *uuid,
> +                          metric_context  ctx)
> +{
> +    int rc = -1;
> +    vio_mbuffer *b;
> +
> +    if (buf == NULL || len == 0)
> +        return -1;
> +
> +    pthread_mutex_lock(&mbuffer_mtx);
> +
> +    switch (ctx) {
> +        case METRIC_CONTEXT_HOST:
> +            vu_buffer_erase(mbuffer.host);
> +            vu_buffer_add(mbuffer.host, buf, len);
> +#ifdef ENABLE_DEBUG
> +            vu_log(VHOSTMD_DEBUG, "DEBUG: new content for HOST (%u)\n>>>%s<<<\n",
> +                   mbuffer.host->len, mbuffer.host->content);
> +#endif
> +            rc = 0;
> +            break;
> +        case METRIC_CONTEXT_VM:
> +            if ((b = vio_mbuffer_find(uuid)) == NULL)
> +                b = vio_mbuffer_add(uuid);
> +            if (b != NULL) {
> +                vu_buffer_erase(b->xml);
> +                vu_buffer_add(b->xml, buf, len);
> +                b->last_update = time(NULL);
> +#ifdef ENABLE_DEBUG
> +                vu_log(VHOSTMD_DEBUG, "DEBUG: new content for %s (%u)\n>>>%s<<<\n",
> +                       uuid, b->xml.len, b->xml->content);
> +#endif
> +                rc = 0;
> +            }
> +            break;
> +    }
> +
> +    pthread_mutex_unlock(&mbuffer_mtx);
> +
> +    return rc;
> +}
> +
> +/*
> + * Stop virtio thread
> + */
> +void *virtio_stop(void)
> +{
> +    if (virtio_status == VIRTIO_ACTIVE)
> +        virtio_status = VIRTIO_STOP;
> +}
> 




More information about the virt-tools-list mailing list