[libvirt] [PATCH libvirt-glib 2/5] Add support for creating watches on streams
Christophe Fergeau
cfergeau at redhat.com
Mon Nov 28 17:52:43 UTC 2011
On Mon, Nov 28, 2011 at 01:13:45PM +0000, Daniel P. Berrange wrote:
> From: "Daniel P. Berrange" <berrange at redhat.com>
>
> The GIO GInputStream/GOutputStream async model for I/O does not
> work for working with non-blocking bi-directional streams. To
> allow that to be done more effectively, add an API to allow
> main loop watches to be registered against streams.
>
> Since the libvirt level virStreamEventAddCallback API only allows
> a single callback to be registered to a stream at any time, the
> GVirStream object needs to be multiplexing of multiple watches into
> a single libvirt level callback.
>
> Watches can be removed in the normal way with g_source_remove
>
> * libvirt-gobject/libvirt-gobject-stream.c,
> libvirt-gobject/libvirt-gobject-stream.h,
> libvirt-gobject/libvirt-gobject.sym: Add gvir_stream_add_watch
> ---
> libvirt-gobject/libvirt-gobject-stream.c | 180 ++++++++++++++++++++++++++++++
> libvirt-gobject/libvirt-gobject-stream.h | 17 +++
> libvirt-gobject/libvirt-gobject.sym | 1 +
> 3 files changed, 198 insertions(+), 0 deletions(-)
>
> diff --git a/libvirt-gobject/libvirt-gobject-stream.c b/libvirt-gobject/libvirt-gobject-stream.c
> index 0d1c2d1..03b2c84 100644
> --- a/libvirt-gobject/libvirt-gobject-stream.c
> +++ b/libvirt-gobject/libvirt-gobject-stream.c
> @@ -46,8 +46,20 @@ struct _GVirStreamPrivate
> virStreamPtr handle;
> GInputStream *input_stream;
> GOutputStream *output_stream;
> +
> + gboolean eventRegistered;
> + int eventLast;
> + GList *sources;
> };
>
> +typedef struct {
> + GSource source;
> + GVirStreamIOCondition cond;
> + GVirStreamIOCondition newCond;
> + GVirStream *stream;
> +} GVirStreamSource;
> +
> +
> G_DEFINE_TYPE(GVirStream, gvir_stream, G_TYPE_IO_STREAM);
>
>
> @@ -186,6 +198,7 @@ static void gvir_stream_finalize(GObject *object)
> {
> GVirStream *self = GVIR_STREAM(object);
> GVirStreamPrivate *priv = self->priv;
> + GList *tmp;
>
> DEBUG("Finalize GVirStream=%p", self);
>
> @@ -199,6 +212,14 @@ static void gvir_stream_finalize(GObject *object)
> virStreamFree(priv->handle);
> }
>
> + tmp = priv->sources;
> + while (tmp) {
> + GVirStreamSource *source = tmp->data;
> + g_source_remove(g_source_get_id((GSource*)source));
I think g_source_destroy can be used here
> + tmp = tmp->next;
> + }
> + g_list_free(priv->sources);
> +
> G_OBJECT_CLASS(gvir_stream_parent_class)->finalize(object);
> }
>
> @@ -448,3 +469,162 @@ gvir_stream_send_all(GVirStream *self, GVirStreamSourceFunc func, gpointer user_
>
> return r;
> }
> +
> +
> +static void gvir_stream_handle_events(virStreamPtr st G_GNUC_UNUSED,
> + int events,
> + void *opaque)
> +{
> + GVirStream *stream = GVIR_STREAM(opaque);
> + GVirStreamPrivate *priv = stream->priv;
> + GList *tmp = priv->sources;
> +
> + while (tmp) {
> + GVirStreamSource *source = tmp->data;
> + source->newCond = 0;
> + if (source->cond & GVIR_STREAM_IO_CONDITION_READABLE) {
> + if (events & VIR_STREAM_EVENT_READABLE)
> + source->newCond |= GVIR_STREAM_IO_CONDITION_READABLE;
> + if (events & VIR_STREAM_EVENT_HANGUP)
> + source->newCond |= GVIR_STREAM_IO_CONDITION_HANGUP;
> + if (events & VIR_STREAM_EVENT_ERROR)
> + source->newCond |= GVIR_STREAM_IO_CONDITION_ERROR;
> + }
> + if (source->cond & GVIR_STREAM_IO_CONDITION_WRITABLE) {
> + if (events & VIR_STREAM_EVENT_WRITABLE)
> + source->newCond |= GVIR_STREAM_IO_CONDITION_WRITABLE;
> + if (events & VIR_STREAM_EVENT_HANGUP)
> + source->newCond |= GVIR_STREAM_IO_CONDITION_HANGUP;
> + if (events & VIR_STREAM_EVENT_ERROR)
> + source->newCond |= GVIR_STREAM_IO_CONDITION_ERROR;
> + }
> + tmp = tmp->next;
> + }
> +
> +}
> +
> +
> +static void gvir_stream_update_events(GVirStream *stream)
> +{
> + GVirStreamPrivate *priv = stream->priv;
> + int mask = 0;
> + GList *tmp = priv->sources;
> +
> + while (tmp) {
> + GVirStreamSource *source = tmp->data;
> + if (source->cond & GVIR_STREAM_IO_CONDITION_READABLE)
> + mask |= VIR_STREAM_EVENT_READABLE;
> + if (source->cond & GVIR_STREAM_IO_CONDITION_WRITABLE)
> + mask |= VIR_STREAM_EVENT_WRITABLE;
> + tmp = tmp->next;
> + }
> +
> + if (mask) {
> + if (priv->eventRegistered) {
> + virStreamEventUpdateCallback(priv->handle, mask);
> + } else {
> + virStreamEventAddCallback(priv->handle, mask,
> + gvir_stream_handle_events,
> + g_object_ref(stream),
> + g_object_unref);
> + priv->eventRegistered = TRUE;
> + }
> + } else {
> + if (priv->eventRegistered) {
> + virStreamEventRemoveCallback(priv->handle);
> + priv->eventRegistered = FALSE;
> + }
> + }
> +}
> +
> +static gboolean gvir_stream_source_prepare(GSource *source,
> + gint *timeout)
> +{
> + GVirStreamSource *gsource = (GVirStreamSource*)source;
> + if (gsource->newCond) {
> + *timeout = 0;
> + return TRUE;
> + }
> + *timeout = -1;
> + return FALSE;
> +}
> +
> +static gboolean gvir_stream_source_check(GSource *source)
> +{
> + GVirStreamSource *gsource = (GVirStreamSource*)source;
> + if (gsource->newCond)
> + return TRUE;
> + return FALSE;
> +}
> +
> +static gboolean gvir_stream_source_dispatch(GSource *source,
> + GSourceFunc callback,
> + gpointer user_data)
> +{
> + GVirStreamSource *gsource = (GVirStreamSource*)source;
> + GVirStreamIOFunc func = (GVirStreamIOFunc)callback;
> + gboolean ret;
> + ret = func(gsource->stream, gsource->newCond, user_data);
> + gsource->newCond = 0;
> + return ret;
> +}
> +
> +static void gvir_stream_source_finalize(GSource *source)
> +{
> + GVirStreamSource *gsource = (GVirStreamSource*)source;
> + GVirStreamPrivate *priv = gsource->stream->priv;
> + GList *tmp, *prev = NULL;
> +
> + tmp = priv->sources;
> + while (tmp) {
> + if (tmp->data == source) {
> + if (prev) {
> + prev->next = tmp->next;
> + } else {
> + priv->sources = tmp->next;
> + }
> + tmp->next = NULL;
> + g_list_free(tmp);
> + break;
> + }
> +
> + prev = tmp;
> + tmp = tmp->next;
> + }
isn't it doing the same as g_list_remove?
> +
> + gvir_stream_update_events(gsource->stream);
> +}
> +
> +GSourceFuncs gvir_stream_source_funcs = {
> + .prepare = gvir_stream_source_prepare,
> + .check = gvir_stream_source_check,
> + .dispatch = gvir_stream_source_dispatch,
> + .finalize = gvir_stream_source_finalize,
> +};
> +
> +gint gvir_stream_add_watch(GVirStream *stream,
> + GVirStreamIOCondition cond,
> + GVirStreamIOFunc func,
> + gpointer opaque,
> + GDestroyNotify notify)
Dunno if it's worth having both gvir_stream_add_watch and
gvir_stream_add_watch_full to be consistent with most glib source functions
(g_timeout_add, g_idle_add, g_io_add_watch, ...). The notify argument would
only be in the _full variant.
> +{
> + GVirStreamPrivate *priv = stream->priv;
> + gint id;
> + GVirStreamSource *source = (GVirStreamSource*)g_source_new(&gvir_stream_source_funcs,
> + sizeof(GVirStreamSource));
> +
> + source->stream = stream;
> + source->cond = cond;
> +
> + priv->sources = g_list_append(priv->sources, source);
> +
> + gvir_stream_update_events(source->stream);
> +
> + g_source_set_callback((GSource*)source, (GSourceFunc)func, opaque, notify);
> + g_source_attach((GSource*)source, g_main_context_default());
> +
> + id = g_source_get_id((GSource*)source);
g_source_attach returns this id which is of type guint.
> + g_source_unref((GSource*)source);
> +
> + return id;
> +}
> diff --git a/libvirt-gobject/libvirt-gobject-stream.h b/libvirt-gobject/libvirt-gobject-stream.h
> index 5a1ee68..e0004b2 100644
> --- a/libvirt-gobject/libvirt-gobject-stream.h
> +++ b/libvirt-gobject/libvirt-gobject-stream.h
> @@ -93,6 +93,23 @@ typedef gint (* GVirStreamSourceFunc)(GVirStream *stream,
> GType gvir_stream_get_type(void);
> GType gvir_stream_handle_get_type(void);
>
> +typedef enum {
> + GVIR_STREAM_IO_CONDITION_READABLE = (1 << 0),
> + GVIR_STREAM_IO_CONDITION_WRITABLE = (1 << 1),
> + GVIR_STREAM_IO_CONDITION_HANGUP = (1 << 2),
> + GVIR_STREAM_IO_CONDITION_ERROR = (1 << 3),
> +} GVirStreamIOCondition;
> +
> +typedef gboolean (*GVirStreamIOFunc)(GVirStream *stream,
> + GVirStreamIOCondition cond,
> + gpointer opaque);
> +
> +gint gvir_stream_add_watch(GVirStream *stream,
> + GVirStreamIOCondition cond,
> + GVirStreamIOFunc func,
> + gpointer opaque,
> + GDestroyNotify notify);
> +
> gssize gvir_stream_receive_all(GVirStream *stream, GVirStreamSinkFunc func, gpointer user_data, GError **error);
> gssize gvir_stream_receive(GVirStream *stream, gchar *buffer, gsize size, GCancellable *cancellable, GError **error);
>
> diff --git a/libvirt-gobject/libvirt-gobject.sym b/libvirt-gobject/libvirt-gobject.sym
> index 78b3935..6261865 100644
> --- a/libvirt-gobject/libvirt-gobject.sym
> +++ b/libvirt-gobject/libvirt-gobject.sym
> @@ -126,6 +126,7 @@ LIBVIRT_GOBJECT_0.0.1 {
> gvir_stream_get_type;
> gvir_stream_receive_all;
> gvir_stream_handle_get_type;
> + gvir_stream_add_watch;
>
> local:
> *;
> --
> 1.7.6.4
>
> --
> libvir-list mailing list
> libvir-list at redhat.com
> https://www.redhat.com/mailman/listinfo/libvir-list
-------------- next part --------------
A non-text attachment was scrubbed...
Name: not available
Type: application/pgp-signature
Size: 198 bytes
Desc: not available
URL: <http://listman.redhat.com/archives/libvir-list/attachments/20111128/d021118a/attachment-0001.sig>
More information about the libvir-list
mailing list