[libvirt] [PATCH libvirt-glib 2/5] Add support for creating watches on streams

Christophe Fergeau cfergeau at redhat.com
Mon Nov 28 17:52:43 UTC 2011


On Mon, Nov 28, 2011 at 01:13:45PM +0000, Daniel P. Berrange wrote:
> From: "Daniel P. Berrange" <berrange at redhat.com>
> 
> The GIO GInputStream/GOutputStream async model for I/O does not
> work for working with non-blocking bi-directional streams. To
> allow that to be done more effectively, add an API to allow
> main loop watches to be registered against streams.
> 
> Since the libvirt level virStreamEventAddCallback API only allows
> a single callback to be registered to a stream at any time, the
> GVirStream object needs to be multiplexing of multiple watches into
> a single libvirt level callback.
> 
> Watches can be removed in the normal way with g_source_remove
> 
> * libvirt-gobject/libvirt-gobject-stream.c,
>   libvirt-gobject/libvirt-gobject-stream.h,
>   libvirt-gobject/libvirt-gobject.sym: Add gvir_stream_add_watch
> ---
>  libvirt-gobject/libvirt-gobject-stream.c |  180 ++++++++++++++++++++++++++++++
>  libvirt-gobject/libvirt-gobject-stream.h |   17 +++
>  libvirt-gobject/libvirt-gobject.sym      |    1 +
>  3 files changed, 198 insertions(+), 0 deletions(-)
> 
> diff --git a/libvirt-gobject/libvirt-gobject-stream.c b/libvirt-gobject/libvirt-gobject-stream.c
> index 0d1c2d1..03b2c84 100644
> --- a/libvirt-gobject/libvirt-gobject-stream.c
> +++ b/libvirt-gobject/libvirt-gobject-stream.c
> @@ -46,8 +46,20 @@ struct _GVirStreamPrivate
>      virStreamPtr   handle;
>      GInputStream  *input_stream;
>      GOutputStream  *output_stream;
> +
> +    gboolean eventRegistered;
> +    int eventLast;
> +    GList *sources;
>  };
>  
> +typedef struct {
> +    GSource source;
> +    GVirStreamIOCondition cond;
> +    GVirStreamIOCondition newCond;
> +    GVirStream *stream;
> +} GVirStreamSource;
> +
> +
>  G_DEFINE_TYPE(GVirStream, gvir_stream, G_TYPE_IO_STREAM);
>  
>  
> @@ -186,6 +198,7 @@ static void gvir_stream_finalize(GObject *object)
>  {
>      GVirStream *self = GVIR_STREAM(object);
>      GVirStreamPrivate *priv = self->priv;
> +    GList *tmp;
>  
>      DEBUG("Finalize GVirStream=%p", self);
>  
> @@ -199,6 +212,14 @@ static void gvir_stream_finalize(GObject *object)
>          virStreamFree(priv->handle);
>      }
>  
> +    tmp = priv->sources;
> +    while (tmp) {
> +        GVirStreamSource *source = tmp->data;
> +        g_source_remove(g_source_get_id((GSource*)source));

I think g_source_destroy can be used here

> +        tmp = tmp->next;
> +    }
> +    g_list_free(priv->sources);
> +
>      G_OBJECT_CLASS(gvir_stream_parent_class)->finalize(object);
>  }
>  
> @@ -448,3 +469,162 @@ gvir_stream_send_all(GVirStream *self, GVirStreamSourceFunc func, gpointer user_
>  
>      return r;
>  }
> +
> +
> +static void gvir_stream_handle_events(virStreamPtr st G_GNUC_UNUSED,
> +                                      int events,
> +                                      void *opaque)
> +{
> +    GVirStream *stream = GVIR_STREAM(opaque);
> +    GVirStreamPrivate *priv = stream->priv;
> +    GList *tmp = priv->sources;
> +
> +    while (tmp) {
> +        GVirStreamSource *source = tmp->data;
> +        source->newCond = 0;
> +        if (source->cond & GVIR_STREAM_IO_CONDITION_READABLE) {
> +            if (events & VIR_STREAM_EVENT_READABLE)
> +                source->newCond |= GVIR_STREAM_IO_CONDITION_READABLE;
> +            if (events & VIR_STREAM_EVENT_HANGUP)
> +                source->newCond |= GVIR_STREAM_IO_CONDITION_HANGUP;
> +            if (events & VIR_STREAM_EVENT_ERROR)
> +                source->newCond |= GVIR_STREAM_IO_CONDITION_ERROR;
> +        }
> +        if (source->cond & GVIR_STREAM_IO_CONDITION_WRITABLE) {
> +            if (events & VIR_STREAM_EVENT_WRITABLE)
> +                source->newCond |= GVIR_STREAM_IO_CONDITION_WRITABLE;
> +            if (events & VIR_STREAM_EVENT_HANGUP)
> +                source->newCond |= GVIR_STREAM_IO_CONDITION_HANGUP;
> +            if (events & VIR_STREAM_EVENT_ERROR)
> +                source->newCond |= GVIR_STREAM_IO_CONDITION_ERROR;
> +        }
> +        tmp = tmp->next;
> +    }
> +
> +}
> +
> +
> +static void gvir_stream_update_events(GVirStream *stream)
> +{
> +    GVirStreamPrivate *priv = stream->priv;
> +    int mask = 0;
> +    GList *tmp = priv->sources;
> +
> +    while (tmp) {
> +        GVirStreamSource *source = tmp->data;
> +        if (source->cond & GVIR_STREAM_IO_CONDITION_READABLE)
> +            mask |= VIR_STREAM_EVENT_READABLE;
> +        if (source->cond & GVIR_STREAM_IO_CONDITION_WRITABLE)
> +            mask |= VIR_STREAM_EVENT_WRITABLE;
> +        tmp = tmp->next;
> +    }
> +
> +    if (mask) {
> +        if (priv->eventRegistered) {
> +            virStreamEventUpdateCallback(priv->handle, mask);
> +        } else {
> +            virStreamEventAddCallback(priv->handle, mask,
> +                                      gvir_stream_handle_events,
> +                                      g_object_ref(stream),
> +                                      g_object_unref);
> +            priv->eventRegistered = TRUE;
> +        }
> +    } else {
> +        if (priv->eventRegistered) {
> +            virStreamEventRemoveCallback(priv->handle);
> +            priv->eventRegistered = FALSE;
> +        }
> +    }
> +}
> +
> +static gboolean gvir_stream_source_prepare(GSource *source,
> +                                           gint *timeout)
> +{
> +    GVirStreamSource *gsource = (GVirStreamSource*)source;
> +    if (gsource->newCond) {
> +        *timeout = 0;
> +        return TRUE;
> +    }
> +    *timeout = -1;
> +    return FALSE;
> +}
> +
> +static gboolean gvir_stream_source_check(GSource *source)
> +{
> +    GVirStreamSource *gsource = (GVirStreamSource*)source;
> +    if (gsource->newCond)
> +        return TRUE;
> +    return FALSE;
> +}
> +
> +static gboolean gvir_stream_source_dispatch(GSource *source,
> +                                            GSourceFunc callback,
> +                                            gpointer user_data)
> +{
> +    GVirStreamSource *gsource = (GVirStreamSource*)source;
> +    GVirStreamIOFunc func = (GVirStreamIOFunc)callback;
> +    gboolean ret;
> +    ret = func(gsource->stream, gsource->newCond, user_data);
> +    gsource->newCond = 0;
> +    return ret;
> +}
> +
> +static void gvir_stream_source_finalize(GSource *source)
> +{
> +    GVirStreamSource *gsource = (GVirStreamSource*)source;
> +    GVirStreamPrivate *priv = gsource->stream->priv;
> +    GList *tmp, *prev = NULL;
> +
> +    tmp = priv->sources;
> +    while (tmp) {
> +        if (tmp->data == source) {
> +            if (prev) {
> +                prev->next = tmp->next;
> +            } else {
> +                priv->sources = tmp->next;
> +            }
> +            tmp->next = NULL;
> +            g_list_free(tmp);
> +            break;
> +        }
> +
> +        prev = tmp;
> +        tmp = tmp->next;
> +    }

isn't it doing the same as g_list_remove?

> +
> +    gvir_stream_update_events(gsource->stream);
> +}
> +
> +GSourceFuncs gvir_stream_source_funcs = {
> +    .prepare = gvir_stream_source_prepare,
> +    .check = gvir_stream_source_check,
> +    .dispatch = gvir_stream_source_dispatch,
> +    .finalize = gvir_stream_source_finalize,
> +};
> +
> +gint gvir_stream_add_watch(GVirStream *stream,
> +                           GVirStreamIOCondition cond,
> +                           GVirStreamIOFunc func,
> +                           gpointer opaque,
> +                           GDestroyNotify notify)

Dunno if it's worth having both gvir_stream_add_watch and
gvir_stream_add_watch_full to be consistent with most glib source functions
(g_timeout_add, g_idle_add, g_io_add_watch, ...). The notify argument would
only be in the _full variant.

> +{
> +    GVirStreamPrivate *priv = stream->priv;
> +    gint id;
> +    GVirStreamSource *source = (GVirStreamSource*)g_source_new(&gvir_stream_source_funcs,
> +                                                               sizeof(GVirStreamSource));
> +
> +    source->stream = stream;
> +    source->cond = cond;
> +
> +    priv->sources = g_list_append(priv->sources, source);
> +
> +    gvir_stream_update_events(source->stream);
> +
> +    g_source_set_callback((GSource*)source, (GSourceFunc)func, opaque, notify);
> +    g_source_attach((GSource*)source, g_main_context_default());
> +
> +    id = g_source_get_id((GSource*)source);

g_source_attach returns this id which is of type guint.

> +    g_source_unref((GSource*)source);
> +
> +    return id;
> +}
> diff --git a/libvirt-gobject/libvirt-gobject-stream.h b/libvirt-gobject/libvirt-gobject-stream.h
> index 5a1ee68..e0004b2 100644
> --- a/libvirt-gobject/libvirt-gobject-stream.h
> +++ b/libvirt-gobject/libvirt-gobject-stream.h
> @@ -93,6 +93,23 @@ typedef gint (* GVirStreamSourceFunc)(GVirStream *stream,
>  GType gvir_stream_get_type(void);
>  GType gvir_stream_handle_get_type(void);
>  
> +typedef enum {
> +    GVIR_STREAM_IO_CONDITION_READABLE = (1 << 0),
> +    GVIR_STREAM_IO_CONDITION_WRITABLE = (1 << 1),
> +    GVIR_STREAM_IO_CONDITION_HANGUP   = (1 << 2),
> +    GVIR_STREAM_IO_CONDITION_ERROR    = (1 << 3),
> +} GVirStreamIOCondition;
> +
> +typedef gboolean (*GVirStreamIOFunc)(GVirStream *stream,
> +                                     GVirStreamIOCondition cond,
> +                                     gpointer opaque);
> +
> +gint gvir_stream_add_watch(GVirStream *stream,
> +                           GVirStreamIOCondition cond,
> +                           GVirStreamIOFunc func,
> +                           gpointer opaque,
> +                           GDestroyNotify notify);
> +
>  gssize gvir_stream_receive_all(GVirStream *stream, GVirStreamSinkFunc func, gpointer user_data, GError **error);
>  gssize gvir_stream_receive(GVirStream *stream, gchar *buffer, gsize size, GCancellable *cancellable, GError **error);
>  
> diff --git a/libvirt-gobject/libvirt-gobject.sym b/libvirt-gobject/libvirt-gobject.sym
> index 78b3935..6261865 100644
> --- a/libvirt-gobject/libvirt-gobject.sym
> +++ b/libvirt-gobject/libvirt-gobject.sym
> @@ -126,6 +126,7 @@ LIBVIRT_GOBJECT_0.0.1 {
>  	gvir_stream_get_type;
>  	gvir_stream_receive_all;
>  	gvir_stream_handle_get_type;
> +	gvir_stream_add_watch;
>  
>    local:
>          *;
> -- 
> 1.7.6.4
> 
> --
> libvir-list mailing list
> libvir-list at redhat.com
> https://www.redhat.com/mailman/listinfo/libvir-list
-------------- next part --------------
A non-text attachment was scrubbed...
Name: not available
Type: application/pgp-signature
Size: 198 bytes
Desc: not available
URL: <http://listman.redhat.com/archives/libvir-list/attachments/20111128/d021118a/attachment-0001.sig>


More information about the libvir-list mailing list