[libvirt] [PATCH libvirt-glib 1/5] Add support for writing to streams
Daniel P. Berrange
berrange at redhat.com
Tue Nov 22 12:39:28 UTC 2011
From: "Daniel P. Berrange" <berrange at redhat.com>
---
libvirt-gobject/Makefile.am | 8 +-
libvirt-gobject/libvirt-gobject-output-stream.c | 240 +++++++++++++++++++++++
libvirt-gobject/libvirt-gobject-output-stream.h | 68 +++++++
libvirt-gobject/libvirt-gobject-stream.c | 115 +++++++++++-
libvirt-gobject/libvirt-gobject-stream.h | 27 ++-
5 files changed, 447 insertions(+), 11 deletions(-)
create mode 100644 libvirt-gobject/libvirt-gobject-output-stream.c
create mode 100644 libvirt-gobject/libvirt-gobject-output-stream.h
diff --git a/libvirt-gobject/Makefile.am b/libvirt-gobject/Makefile.am
index 0eef9c8..ec7b454 100644
--- a/libvirt-gobject/Makefile.am
+++ b/libvirt-gobject/Makefile.am
@@ -45,8 +45,7 @@ GOBJECT_GENERATED_FILES = \
libvirt_gobject_1_0_ladir = $(includedir)/libvirt-gobject-1.0/libvirt-gobject
libvirt_gobject_1_0_la_HEADERS = \
- $(GOBJECT_HEADER_FILES) \
- libvirt-gobject-input-stream.h
+ $(GOBJECT_HEADER_FILES)
nodist_libvirt_gobject_1_0_la_HEADERS = \
libvirt-gobject-enums.h
libvirt_gobject_1_0_la_SOURCES = \
@@ -54,7 +53,10 @@ libvirt_gobject_1_0_la_SOURCES = \
$(GOBJECT_SOURCE_FILES) \
libvirt-gobject-domain-device-private.h \
libvirt-gobject-compat.h \
- libvirt-gobject-input-stream.c
+ libvirt-gobject-input-stream.h \
+ libvirt-gobject-input-stream.c \
+ libvirt-gobject-output-stream.h \
+ libvirt-gobject-output-stream.c
nodist_libvirt_gobject_1_0_la_SOURCES = \
$(GOBJECT_GENERATED_FILES)
libvirt_gobject_1_0_la_CFLAGS = \
diff --git a/libvirt-gobject/libvirt-gobject-output-stream.c b/libvirt-gobject/libvirt-gobject-output-stream.c
new file mode 100644
index 0000000..30ee519
--- /dev/null
+++ b/libvirt-gobject/libvirt-gobject-output-stream.c
@@ -0,0 +1,240 @@
+/*
+ * libvirt-gobject-output-stream.h: libvirt gobject integration
+ *
+ * Copyright (C) 2011 Red Hat
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+ *
+ * Authors: Daniel P. Berrange <berrange at redhat.com>
+ * Marc-André Lureau <marcandre.lureau at redhat.com>
+ */
+
+#include <config.h>
+
+#include <libvirt/virterror.h>
+#include <string.h>
+
+#include "libvirt-glib/libvirt-glib.h"
+#include "libvirt-gobject/libvirt-gobject.h"
+#include "libvirt-gobject-output-stream.h"
+
+extern gboolean debugFlag;
+
+#define DEBUG(fmt, ...) do { if (G_UNLIKELY(debugFlag)) g_debug(fmt, ## __VA_ARGS__); } while (0)
+
+#define gvir_output_stream_get_type _gvir_output_stream_get_type
+G_DEFINE_TYPE(GVirOutputStream, gvir_output_stream, G_TYPE_OUTPUT_STREAM);
+
+enum
+{
+ PROP_0,
+ PROP_STREAM
+};
+
+struct _GVirOutputStreamPrivate
+{
+ GVirStream *stream;
+
+ /* pending operation metadata */
+ GSimpleAsyncResult *result;
+ GCancellable *cancellable;
+ const void * buffer;
+ gsize count;
+};
+
+static void gvir_output_stream_get_property(GObject *object,
+ guint prop_id,
+ GValue *value,
+ GParamSpec *pspec)
+{
+ GVirOutputStream *stream = GVIR_OUTPUT_STREAM(object);
+
+ switch (prop_id) {
+ case PROP_STREAM:
+ g_value_set_object(value, stream->priv->stream);
+ break;
+
+ default:
+ G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec);
+ }
+}
+
+static void gvir_output_stream_set_property(GObject *object,
+ guint prop_id,
+ const GValue *value,
+ GParamSpec *pspec)
+{
+ GVirOutputStream *stream = GVIR_OUTPUT_STREAM(object);
+
+ switch (prop_id) {
+ case PROP_STREAM:
+ stream->priv->stream = g_value_get_object(value);
+ break;
+
+ default:
+ G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec);
+ }
+}
+
+static void gvir_output_stream_finalize(GObject *object)
+{
+ GVirOutputStream *stream = GVIR_OUTPUT_STREAM(object);
+
+ DEBUG("Finalize output stream GVirStream=%p", stream->priv->stream);
+ stream->priv->stream = NULL; // unowned
+
+ if (G_OBJECT_CLASS(gvir_output_stream_parent_class)->finalize)
+ (*G_OBJECT_CLASS(gvir_output_stream_parent_class)->finalize)(object);
+}
+
+static void
+gvir_output_stream_write_ready(virStreamPtr st G_GNUC_UNUSED,
+ int events,
+ void *opaque)
+{
+ GVirOutputStream *stream = GVIR_OUTPUT_STREAM(opaque);
+ GVirOutputStreamPrivate *priv = stream->priv;
+ GSimpleAsyncResult *simple;
+ GError *error = NULL;
+ gssize result;
+
+ g_return_if_fail(events & VIR_STREAM_EVENT_WRITABLE);
+
+ result = gvir_stream_send(priv->stream, priv->buffer, priv->count,
+ priv->cancellable, &error);
+
+ if (g_error_matches(error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) {
+ g_warn_if_reached();
+ return;
+ }
+
+ simple = stream->priv->result;
+ stream->priv->result = NULL;
+
+ if (result >= 0)
+ g_simple_async_result_set_op_res_gssize(simple, result);
+
+ if (error)
+ g_simple_async_result_take_error(simple, error);
+
+ if (priv->cancellable) {
+ g_object_unref(stream->priv->cancellable);
+ priv->cancellable = NULL;
+ }
+
+ g_simple_async_result_complete(simple);
+ g_object_unref(simple);
+
+ return;
+}
+
+static void gvir_output_stream_write_async(GOutputStream *stream,
+ const void *buffer,
+ gsize count,
+ int io_priority G_GNUC_UNUSED,
+ GCancellable *cancellable,
+ GAsyncReadyCallback callback,
+ gpointer user_data)
+{
+ GVirOutputStream *output_stream = GVIR_OUTPUT_STREAM(stream);
+ virStreamPtr handle;
+
+ g_return_if_fail(GVIR_IS_OUTPUT_STREAM(stream));
+ g_return_if_fail(output_stream->priv->result == NULL);
+
+ g_object_get(output_stream->priv->stream, "handle", &handle, NULL);
+
+ if (virStreamEventAddCallback(handle, VIR_STREAM_EVENT_WRITABLE,
+ gvir_output_stream_write_ready, stream, NULL) < 0) {
+ g_simple_async_report_error_in_idle(G_OBJECT(stream),
+ callback,
+ user_data,
+ G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
+ "Couldn't add event callback %s",
+ G_STRFUNC);
+ goto end;
+ }
+
+ output_stream->priv->result =
+ g_simple_async_result_new(G_OBJECT(stream), callback, user_data,
+ gvir_output_stream_write_async);
+ if (cancellable)
+ g_object_ref(cancellable);
+ output_stream->priv->cancellable = cancellable;
+ output_stream->priv->buffer = buffer;
+ output_stream->priv->count = count;
+
+end:
+ virStreamFree(handle);
+}
+
+
+static gssize gvir_output_stream_write_finish(GOutputStream *stream,
+ GAsyncResult *result,
+ GError **error G_GNUC_UNUSED)
+{
+ GVirOutputStream *output_stream = GVIR_OUTPUT_STREAM(stream);
+ GSimpleAsyncResult *simple;
+ virStreamPtr handle;
+ gssize count;
+
+ g_return_val_if_fail(GVIR_IS_OUTPUT_STREAM(stream), -1);
+ g_object_get(output_stream->priv->stream, "handle", &handle, NULL);
+
+ simple = G_SIMPLE_ASYNC_RESULT(result);
+
+ g_warn_if_fail(g_simple_async_result_get_source_tag(simple) == gvir_output_stream_write_async);
+
+ count = g_simple_async_result_get_op_res_gssize(simple);
+
+ virStreamEventRemoveCallback(handle);
+ virStreamFree(handle);
+
+ return count;
+}
+
+
+static void gvir_output_stream_class_init(GVirOutputStreamClass *klass)
+{
+ GObjectClass *gobject_class = G_OBJECT_CLASS(klass);
+ GOutputStreamClass *goutputstream_class = G_OUTPUT_STREAM_CLASS(klass);
+
+ g_type_class_add_private(klass, sizeof(GVirOutputStreamPrivate));
+
+ gobject_class->finalize = gvir_output_stream_finalize;
+ gobject_class->get_property = gvir_output_stream_get_property;
+ gobject_class->set_property = gvir_output_stream_set_property;
+
+ goutputstream_class->write_fn = NULL;
+ goutputstream_class->write_async = gvir_output_stream_write_async;
+ goutputstream_class->write_finish = gvir_output_stream_write_finish;
+
+ g_object_class_install_property(gobject_class, PROP_STREAM,
+ g_param_spec_object("stream",
+ "stream",
+ "GVirStream",
+ GVIR_TYPE_STREAM, G_PARAM_CONSTRUCT_ONLY |
+ G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+}
+
+static void gvir_output_stream_init(GVirOutputStream *stream)
+{
+ stream->priv = G_TYPE_INSTANCE_GET_PRIVATE(stream, GVIR_TYPE_OUTPUT_STREAM, GVirOutputStreamPrivate);
+}
+
+GVirOutputStream* _gvir_output_stream_new(GVirStream *stream)
+{
+ return GVIR_OUTPUT_STREAM(g_object_new(GVIR_TYPE_OUTPUT_STREAM, "stream", stream, NULL));
+}
diff --git a/libvirt-gobject/libvirt-gobject-output-stream.h b/libvirt-gobject/libvirt-gobject-output-stream.h
new file mode 100644
index 0000000..0ca0053
--- /dev/null
+++ b/libvirt-gobject/libvirt-gobject-output-stream.h
@@ -0,0 +1,68 @@
+/*
+ * libvirt-gobject-output-stream.h: libvirt gobject integration
+ *
+ * Copyright (C) 2011 Red Hat
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+ *
+ * Authors: Daniel P. Berrange <berrange at redhat.com>
+ * Marc-André Lureau <marcandre.lureau at redhat.com>
+ */
+
+#if !defined(__LIBVIRT_GOBJECT_H__) && !defined(LIBVIRT_GOBJECT_BUILD)
+#error "Only <libvirt-gobject/libvirt-gobject.h> can be included directly."
+#endif
+
+#ifndef __LIBVIRT_GOBJECT_OUTPUT_STREAM_H__
+#define __LIBVIRT_GOBJECT_OUTPUT_STREAM_H__
+
+#include <gio/gio.h>
+#include "libvirt-gobject-stream.h"
+
+G_BEGIN_DECLS
+
+#define GVIR_TYPE_OUTPUT_STREAM (_gvir_output_stream_get_type ())
+#define GVIR_OUTPUT_STREAM(inst) (G_TYPE_CHECK_INSTANCE_CAST ((inst), \
+ GVIR_TYPE_OUTPUT_STREAM, GVirOutputStream))
+#define GVIR_OUTPUT_STREAM_CLASS(class) (G_TYPE_CHECK_CLASS_CAST ((class), \
+ GVIR_TYPE_OUTPUT_STREAM, GVirOutputStreamClass))
+#define GVIR_IS_OUTPUT_STREAM(inst) (G_TYPE_CHECK_INSTANCE_TYPE ((inst), \
+ GVIR_TYPE_OUTPUT_STREAM))
+#define GVIR_IS_OUTPUT_STREAM_CLASS(class) (G_TYPE_CHECK_CLASS_TYPE ((class), \
+ GVIR_TYPE_OUTPUT_STREAM))
+#define GVIR_OUTPUT_STREAM_GET_CLASS(inst) (G_TYPE_INSTANCE_GET_CLASS ((inst), \
+ GVIR_TYPE_OUTPUT_STREAM, GVirOutputStreamClass))
+
+typedef struct _GVirOutputStreamPrivate GVirOutputStreamPrivate;
+typedef struct _GVirOutputStreamClass GVirOutputStreamClass;
+typedef struct _GVirOutputStream GVirOutputStream;
+
+struct _GVirOutputStreamClass
+{
+ GOutputStreamClass parent_class;
+};
+
+struct _GVirOutputStream
+{
+ GOutputStream parent_instance;
+ GVirOutputStreamPrivate *priv;
+};
+
+GType _gvir_output_stream_get_type (void) G_GNUC_CONST;
+GVirOutputStream * _gvir_output_stream_new (GVirStream *stream);
+
+G_END_DECLS
+
+#endif /* __LIBVIRT_GOBJECT_OUTPUT_STREAM_H__ */
diff --git a/libvirt-gobject/libvirt-gobject-stream.c b/libvirt-gobject/libvirt-gobject-stream.c
index 30673aa..0d1c2d1 100644
--- a/libvirt-gobject/libvirt-gobject-stream.c
+++ b/libvirt-gobject/libvirt-gobject-stream.c
@@ -32,6 +32,7 @@
#include "libvirt-gobject-compat.h"
#include "libvirt-gobject/libvirt-gobject-input-stream.h"
+#include "libvirt-gobject/libvirt-gobject-output-stream.h"
extern gboolean debugFlag;
@@ -44,7 +45,7 @@ struct _GVirStreamPrivate
{
virStreamPtr handle;
GInputStream *input_stream;
- gboolean in_dispose;
+ GOutputStream *output_stream;
};
G_DEFINE_TYPE(GVirStream, gvir_stream, G_TYPE_IO_STREAM);
@@ -77,6 +78,17 @@ static GInputStream* gvir_stream_get_input_stream(GIOStream *io_stream)
}
+static GOutputStream* gvir_stream_get_output_stream(GIOStream *io_stream)
+{
+ GVirStream *self = GVIR_STREAM(io_stream);
+
+ if (self->priv->output_stream == NULL)
+ self->priv->output_stream = (GOutputStream *)_gvir_output_stream_new(self);
+
+ return self->priv->output_stream;
+}
+
+
static gboolean gvir_stream_close(GIOStream *io_stream,
GCancellable *cancellable, G_GNUC_UNUSED GError **error)
{
@@ -85,8 +97,8 @@ static gboolean gvir_stream_close(GIOStream *io_stream,
if (self->priv->input_stream)
g_input_stream_close(self->priv->input_stream, cancellable, NULL);
- if (self->priv->in_dispose)
- return TRUE;
+ if (self->priv->output_stream)
+ g_output_stream_close(self->priv->output_stream, cancellable, NULL);
return TRUE; /* FIXME: really close the stream? */
}
@@ -201,6 +213,7 @@ static void gvir_stream_class_init(GVirStreamClass *klass)
object_class->set_property = gvir_stream_set_property;
stream_class->get_input_stream = gvir_stream_get_input_stream;
+ stream_class->get_output_stream = gvir_stream_get_output_stream;
stream_class->close_fn = gvir_stream_close;
stream_class->close_async = gvir_stream_close_async;
stream_class->close_finish = gvir_stream_close_finish;
@@ -339,3 +352,99 @@ gvir_stream_receive_all(GVirStream *self, GVirStreamSinkFunc func, gpointer user
return r;
}
+
+
+/**
+ * gvir_stream_send:
+ * @stream: the stream
+ * @buffer: a buffer to write data from (which should be at least @size
+ * bytes long).
+ * @size: the number of bytes you want to write to the stream
+ * @cancellable: (allow-none): a %GCancellable or %NULL
+ * @error: #GError for error reporting, or %NULL to ignore.
+ *
+ * Send data (up to @size bytes) from a stream.
+ * On error -1 is returned and @error is set accordingly.
+ *
+ * gvir_stream_send() can return any number of bytes, up to
+ * @size. If more than @size bytes have been sendd, the additional
+ * data will be returned in future calls to gvir_stream_send().
+ *
+ * If there is no data available, a %G_IO_ERROR_WOULD_BLOCK error will be
+ * returned.
+ *
+ * Returns: Number of bytes read, or 0 if the end of stream reached,
+ * or -1 on error.
+ */
+gssize gvir_stream_send(GVirStream *self, const gchar *buffer, gsize size,
+ GCancellable *cancellable, GError **error)
+{
+ int got;
+
+ g_return_val_if_fail(GVIR_IS_STREAM(self), -1);
+ g_return_val_if_fail(buffer != NULL, -1);
+
+ if (g_cancellable_set_error_if_cancelled (cancellable, error))
+ return -1;
+
+ got = virStreamSend(self->priv->handle, buffer, size);
+
+ if (got == -2) { /* blocking */
+ g_set_error(error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK, NULL);
+ } else if (got < 0) {
+ g_set_error(error, G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
+ "Got virStreamRecv error in %s", G_STRFUNC);
+ }
+
+ return got;
+}
+
+struct stream_source_helper {
+ GVirStream *self;
+ GVirStreamSourceFunc func;
+ gpointer user_data;
+};
+
+static int
+stream_source(virStreamPtr st G_GNUC_UNUSED,
+ char *bytes, size_t nbytes, void *opaque)
+{
+ struct stream_source_helper *helper = opaque;
+
+ return helper->func(helper->self, bytes, nbytes, helper->user_data);
+}
+
+/**
+ * gvir_stream_send_all:
+ * @stream: the stream
+ * @func: (scope notified): the callback for writing data to application
+ * @user_data: (closure): data to be passed to @callback
+ * Returns: the number of bytes consumed or -1 upon error
+ *
+ * Send the entire data stream, sending the data to the
+ * requested data source. This is simply a convenient alternative
+ * to virStreamRecv, for apps that do blocking-I/o.
+ */
+gssize
+gvir_stream_send_all(GVirStream *self, GVirStreamSourceFunc func, gpointer user_data, GError **err)
+{
+ struct stream_source_helper helper = {
+ .self = self,
+ .func = func,
+ .user_data = user_data
+ };
+ int r;
+
+ g_return_val_if_fail(GVIR_IS_STREAM(self), -1);
+ g_return_val_if_fail(func != NULL, -1);
+
+ r = virStreamSendAll(self->priv->handle, stream_source, &helper);
+ if (r < 0) {
+ if (err != NULL)
+ *err = gvir_error_new_literal(GVIR_STREAM_ERROR,
+ 0,
+ "Unable to perform SendAll");
+ }
+
+ return r;
+}
diff --git a/libvirt-gobject/libvirt-gobject-stream.h b/libvirt-gobject/libvirt-gobject-stream.h
index 35526db..5a1ee68 100644
--- a/libvirt-gobject/libvirt-gobject-stream.h
+++ b/libvirt-gobject/libvirt-gobject-stream.h
@@ -71,17 +71,34 @@ struct _GVirStreamClass
* Returns: the number of bytes filled, 0 upon end
* of file, or -1 upon error
*/
-typedef gint (* GVirStreamSinkFunc) (GVirStream *stream,
- const gchar *buf,
- gsize nbytes,
- gpointer user_data);
+typedef gint (* GVirStreamSinkFunc)(GVirStream *stream,
+ const gchar *buf,
+ gsize nbytes,
+ gpointer user_data);
+
+/**
+ * GVirStreamSourceFunc:
+ * @stream: a #GVirStream
+ * @buf: (out) (array length=nbytes) (transfer none): data pointer
+ * @nbytes: data size
+ * @user_data: user data passed to the function
+ * Returns: the number of bytes filled, 0 upon end
+ * of file, or -1 upon error
+ */
+typedef gint (* GVirStreamSourceFunc)(GVirStream *stream,
+ gchar *buf,
+ gsize nbytes,
+ gpointer user_data);
GType gvir_stream_get_type(void);
GType gvir_stream_handle_get_type(void);
-gssize gvir_stream_receive_all(GVirStream *stream, GVirStreamSinkFunc func, gpointer user_data, GError **err);
+gssize gvir_stream_receive_all(GVirStream *stream, GVirStreamSinkFunc func, gpointer user_data, GError **error);
gssize gvir_stream_receive(GVirStream *stream, gchar *buffer, gsize size, GCancellable *cancellable, GError **error);
+gssize gvir_stream_send_all(GVirStream *stream, GVirStreamSourceFunc func, gpointer user_data, GError **error);
+gssize gvir_stream_send(GVirStream *stream, const gchar *buffer, gsize size, GCancellable *cancellable, GError **error);
+
G_END_DECLS
#endif /* __LIBVIRT_GOBJECT_STREAM_H__ */
--
1.7.6.4
More information about the libvir-list
mailing list