[libvirt] Re: [PATCH 07/12] Domain Events - remote driver

Daniel P. Berrange berrange at redhat.com
Sun Oct 19 19:28:59 UTC 2008


On Fri, Oct 17, 2008 at 12:02:13PM -0400, Ben Guthro wrote:
> Deliver local callbacks in response to remote events
> 
>  remote_internal.c |  255 ++++++++++++++++++++++++++++++++++++++++++++++++++++--
>  1 file changed, 248 insertions(+), 7 deletions(-)

> @@ -680,6 +689,26 @@ doRemoteOpen (virConnectPtr conn,
>                (xdrproc_t) xdr_void, (char *) NULL) == -1)
>          goto failed;
>  
> +    if(VIR_ALLOC(priv->domainEvents)<0) {
> +        error(conn, VIR_ERR_INVALID_ARG, _("Error allocating domainEvents"));
> +        goto failed;
> +    }
> +
> +    DEBUG0("Adding Handler for remote events");
> +    /* Set up a callback to listen on the socket data */
> +    if (virEventAddHandle(priv->sock,
> +                          POLLIN | POLLERR | POLLHUP,
> +                          remoteDomainEventFired,
> +                          conn) < 0) {
> +        DEBUG0("virEventAddHandle failed: No addHandleImpl defined. continuing without events.");
> +    }
> +
> +    DEBUG0("Adding Timeout for remote event queue flushing");
> +    if ( (priv->eventFlushTimer = virEventAddTimeout(0,
> +                                                     remoteDomainEventQueueFlush,
> +                                                     conn)) < 0) {

Small bug here - this creates an active timer event, which will fire
immediately & forever. Simply change the '0' to a '-1' to register
a timeout that's initially disabled, and then use virEventUpdateTimeout
to toggle it on/off only when required.

> +
> +static int remoteDomainEventRegister (virConnectPtr conn,
> +                               void *callback,
> +                               void *opaque)
> +{
> +    struct private_data *priv = conn->privateData;
> +
> +    /* dispatch an rpc - so the server sde can track
> +       how many callbacks are regstered */
> +    remote_domain_events_register_args args;
> +    args.callback = (unsigned long)callback;
> +    args.user_data = (unsigned long)opaque;

This relates back to my comment on the remote_protocl.x file - i feel
we should probably maintain a generic token, rather than pointing the
actual pointers over the wire as ints.

>  /*----------------------------------------------------------------------*/
>  
> @@ -4367,6 +4444,7 @@ call (virConnectPtr conn, struct private_data *priv,
>          really_write (conn, priv, flags & REMOTE_CALL_IN_OPEN, buffer, len-4) == -1)
>          return -1;
>  
> +retry_read:
>      /* Read and deserialise length word. */
>      if (really_read (conn, priv, flags & REMOTE_CALL_IN_OPEN, buffer2, sizeof buffer2) == -1)
>          return -1;
> @@ -4418,10 +4496,19 @@ call (virConnectPtr conn, struct private_data *priv,
>          return -1;
>      }
>  
> -    /* If we extend the server to actually send asynchronous messages, then
> -     * we'll need to change this so that it can recognise an asynch
> -     * message being received at this point.
> -     */
> +    if (hdr.proc == REMOTE_PROC_DOMAIN_EVENT &&
> +        hdr.direction == REMOTE_MESSAGE) {
> +        /* An async message has come in while we were waiting for the
> +         * response. Process it to pull it off the wire, and try again
> +         */
> +        DEBUG0("Encountered an event while waiting for a response");
> +
> +        remoteDomainQueueEvent(conn, &xdr);

Need to call virEventUpdateTimeout() to enable the timer here.

> +/**
> + * remoteDomainReadEvent
> + *
> + * Read the event data off the wire
> + */
> +static int remoteDomainReadEvent(virConnectPtr conn, XDR *xdr,
> +                                 virDomainPtr *dom, int *event,
> +                                 virConnectDomainEventCallback *cb,
> +                                 void **opaque)
> +{
> +    remote_domain_event_ret ret;
> +    memset (&ret, 0, sizeof ret);
> +
> +    /* unmarshall parameters, and process it*/
> +    if (! xdr_remote_domain_event_ret(xdr, &ret) ) {
> +        error (conn, VIR_ERR_RPC, _("remoteDomainProcessEvent: unmarshalling ret"));
> +        return -1;
> +    }
> +
> +    *dom = get_nonnull_domain(conn,ret.dom);
> +    *event = ret.event;
> +    *cb = (virConnectDomainEventCallback)ret.callback;
> +    *opaque = (void *)ret.user_data;
> +
> +    return 0;
> +}
> +
> +static void remoteDomainProcessEvent(virConnectPtr conn, XDR *xdr)
> +{
> +    virDomainPtr dom;
> +    int event;
> +    virConnectDomainEventCallback cb;
> +    void *opaque;
> +    if(!remoteDomainReadEvent(conn, xdr, &dom, &event, &cb, &opaque)) {
> +        DEBUG0("Calling domain event callback (no queue)");
> +        if(cb)
> +            cb(conn,dom,event,opaque);

Needs to call virDomainFree(dom) to release the object.

> +    }
> +}
> +
> +static void remoteDomainQueueEvent(virConnectPtr conn, XDR *xdr)
> +{
> +    virDomainPtr dom;
> +    int event;
> +    virConnectDomainEventCallback cb;
> +    void *opaque;
> +    struct private_data *priv = conn->privateData;
> +
> +    if(!remoteDomainReadEvent(conn, xdr, &dom, &event, &cb, &opaque))
> +    {
> +        if( virDomainEventCallbackQueuePush(priv->domainEvents, dom, event, cb, opaque) < 0 ) {
> +            DEBUG("%s", "Error adding event to queue");
> +        }
> +    }
> +}
> +
> +/** remoteDomainEventFired:
> + *
> + * The callback for monitoring the remote socket
> + * for event data
> + */
> +void remoteDomainEventFired(int fd ATTRIBUTE_UNUSED,
> +                             int event ATTRIBUTE_UNUSED,
> +                             void *opaque)
> +{
> +    char buffer[REMOTE_MESSAGE_MAX];
> +    char buffer2[4];
> +    struct remote_message_header hdr;
> +    XDR xdr;
> +    int len;
> +
> +    virConnectPtr        conn = opaque;
> +    struct private_data *priv = conn->privateData;
> +
> +    DEBUG("%s : Event fired", __FUNCTION__);
> +
> +    /* Read and deserialise length word. */
> +    if (really_read (conn, priv, 0, buffer2, sizeof buffer2) == -1)
> +        return;
> +
> +    xdrmem_create (&xdr, buffer2, sizeof buffer2, XDR_DECODE);
> +    if (!xdr_int (&xdr, &len)) {
> +        error (conn, VIR_ERR_RPC, _("xdr_int (length word, reply)"));
> +        return;
> +    }
> +    xdr_destroy (&xdr);
> +
> +    /* Length includes length word - adjust to real length to read. */
> +    len -= 4;
> +
> +    if (len < 0 || len > REMOTE_MESSAGE_MAX) {
> +        error (conn, VIR_ERR_RPC, _("packet received from server too large"));
> +        return;
> +    }
> +
> +    /* Read reply header and what follows (either a ret or an error). */
> +    if (really_read (conn, priv, 0, buffer, len) == -1) {
> +        error (conn, VIR_ERR_RPC, _("error reading buffer from memory"));
> +        return;
> +    }
> +
> +    /* Deserialise reply header. */
> +    xdrmem_create (&xdr, buffer, len, XDR_DECODE);
> +    if (!xdr_remote_message_header (&xdr, &hdr)) {
> +        error (conn, VIR_ERR_RPC, _("invalid header in event firing"));
> +        return;
> +    }
> +
> +    if (hdr.proc == REMOTE_PROC_DOMAIN_EVENT &&
> +        hdr.direction == REMOTE_MESSAGE) {
> +        DEBUG0("Encountered an async event");
> +        remoteDomainProcessEvent(conn, &xdr);
> +    } else {
> +        DEBUG0("invalid proc in event firing");
> +        error (conn, VIR_ERR_RPC, _("invalid proc in event firing"));
> +    }
> +}
> +
> +void remoteDomainEventQueueFlush(int timer ATTRIBUTE_UNUSED,
> +                                 void *opaque)
> +{
> +    virDomainEventPtr domEvent;
> +    virConnectPtr        conn = opaque;
> +    struct private_data *priv = conn->privateData;
> +
> +    DEBUG0("Flushing domain events");
> +    while( (domEvent = virDomainEventCallbackQueuePop(priv->domainEvents)) ) {
> +        DEBUG("   Flushing %p", domEvent);
> +        if(domEvent->cb)
> +            domEvent->cb(domEvent->dom->conn,
> +                         domEvent->dom,
> +                         domEvent->event,
> +                         domEvent->opaque);
Needs to also call virDomainFree(domEvent->dom) to release the object.

> +        VIR_FREE(domEvent);
> +    }

And virEventUpdateTimeout to disable the timer again.

> +}

Here's a small additive patch which takes care of the timer issue

diff -r 99dad81d37dd src/remote_internal.c
--- a/src/remote_internal.c	Sun Oct 19 13:46:36 2008 -0400
+++ b/src/remote_internal.c	Sun Oct 19 14:06:33 2008 -0400
@@ -704,7 +704,7 @@
     }
 
     DEBUG0("Adding Timeout for remote event queue flushing");
-    if ( (priv->eventFlushTimer = virEventAddTimeout(0,
+    if ( (priv->eventFlushTimer = virEventAddTimeout(-1,
                                                      remoteDomainEventQueueFlush,
                                                      conn)) < 0) {
         DEBUG0("virEventAddTimeout failed: No addTimeoutImpl defined. continuing without events.");
@@ -4504,6 +4504,7 @@
         DEBUG0("Encountered an event while waiting for a response");
 
         remoteDomainQueueEvent(conn, &xdr);
+        virEventUpdateTimeout(priv->eventFlushTimer, 0);
 
         DEBUG0("Retrying read");
         xdr_destroy (&xdr);
@@ -5182,4 +5183,6 @@
                          domEvent->opaque);
         VIR_FREE(domEvent);
     }
-}
+
+    virEventUpdateTimeout(priv->eventFlushTimer, -1);
+}


Regards,
Daniel
-- 
|: Red Hat, Engineering, London   -o-   http://people.redhat.com/berrange/ :|
|: http://libvirt.org  -o-  http://virt-manager.org  -o-  http://ovirt.org :|
|: http://autobuild.org       -o-         http://search.cpan.org/~danberr/ :|
|: GnuPG: 7D3B9505  -o-  F3C9 553F A1DA 4AC2 5648 23C1 B3DF F742 7D3B 9505 :|




More information about the libvir-list mailing list