[fedora-virt] [PATCH] Remove receive callbacks
Richard W.M. Jones
rjones at redhat.com
Fri Jul 3 14:37:11 UTC 2009
On Fri, Jul 03, 2009 at 01:50:33PM +0100, Matthew Booth wrote:
> This is a repost of my previous patch, rebased against the current head.
>
> Matt
> --
> Matthew Booth, RHCA, RHCSS
> Red Hat Engineering, Virtualisation Team
>
> M: +44 (0)7977 267231
> GPG ID: D33C3490
> GPG FPR: 3733 612D 2D05 5458 8A8A 1600 3441 EA19 D33C 3490
> >From 5f57439a6139fa03560cb3a5351eedc2ebe42e19 Mon Sep 17 00:00:00 2001
> From: Matthew Booth <mbooth at redhat.com>
> Date: Sat, 27 Jun 2009 22:05:48 +0100
> Subject: [PATCH] Remove receive callbacks
>
> This patch fixes a class of race conditions characterised by the
> following sequence of events:
>
> LIBRARY DAEMON
> send download request
> receive download request
> respond with download response
> start sending file chunks
> set reply callback to 'download'
> run main loop
>
> At this stage the download reply callback receives both the download
> reply and some file chunks. The current architecture doesn't provide a clean way
> to prevent this from happening.
>
> This patch fixes the above problem by changing the socket receive
> handler to do nothing but buffering, and provides 2 new apis:
>
> guestfs_get_reply
> guestfs_free_reply
>
> These will always de-queue exactly 1 message, which is always what is
> wanted.
> ---
> src/generator.ml | 211 ++++++++----------
> src/guestfs.c | 672 +++++++++++++++++++++++++-----------------------------
> src/guestfs.h | 13 +-
> 3 files changed, 417 insertions(+), 479 deletions(-)
>
> diff --git a/src/generator.ml b/src/generator.ml
> index c65e717..c64b8c7 100755
> --- a/src/generator.ml
> +++ b/src/generator.ml
> @@ -3734,83 +3734,6 @@ check_state (guestfs_h *g, const char *caller)
> List.iter (
> fun (shortname, style, _, _, _, _, _) ->
> let name = "guestfs_" ^ shortname in
> -
> - (* Generate the context struct which stores the high-level
> - * state between callback functions.
> - *)
> - pr "struct %s_ctx {\n" shortname;
> - pr " /* This flag is set by the callbacks, so we know we've done\n";
> - pr " * the callbacks as expected, and in the right sequence.\n";
> - pr " * 0 = not called, 1 = reply_cb called.\n";
> - pr " */\n";
> - pr " int cb_sequence;\n";
> - pr " struct guestfs_message_header hdr;\n";
> - pr " struct guestfs_message_error err;\n";
> - (match fst style with
> - | RErr -> ()
> - | RConstString _ ->
> - failwithf "RConstString cannot be returned from a daemon function"
> - | RInt _ | RInt64 _
> - | RBool _ | RString _ | RStringList _
> - | RIntBool _
> - | RPVList _ | RVGList _ | RLVList _
> - | RStat _ | RStatVFS _
> - | RHashtable _
> - | RDirentList _ ->
> - pr " struct %s_ret ret;\n" name
> - );
> - pr "};\n";
> - pr "\n";
> -
> - (* Generate the reply callback function. *)
> - pr "static void %s_reply_cb (guestfs_h *g, void *data, XDR *xdr)\n" shortname;
> - pr "{\n";
> - pr " guestfs_main_loop *ml = guestfs_get_main_loop (g);\n";
> - pr " struct %s_ctx *ctx = (struct %s_ctx *) data;\n" shortname shortname;
> - pr "\n";
> - pr " /* This should definitely not happen. */\n";
> - pr " if (ctx->cb_sequence != 0) {\n";
> - pr " ctx->cb_sequence = 9999;\n";
> - pr " error (g, \"%%s: internal error: reply callback called twice\", \"%s\");\n" name;
> - pr " return;\n";
> - pr " }\n";
> - pr "\n";
> - pr " ml->main_loop_quit (ml, g);\n";
> - pr "\n";
> - pr " if (!xdr_guestfs_message_header (xdr, &ctx->hdr)) {\n";
> - pr " error (g, \"%%s: failed to parse reply header\", \"%s\");\n" name;
> - pr " return;\n";
> - pr " }\n";
> - pr " if (ctx->hdr.status == GUESTFS_STATUS_ERROR) {\n";
> - pr " if (!xdr_guestfs_message_error (xdr, &ctx->err)) {\n";
> - pr " error (g, \"%%s: failed to parse reply error\", \"%s\");\n"
> - name;
> - pr " return;\n";
> - pr " }\n";
> - pr " goto done;\n";
> - pr " }\n";
> -
> - (match fst style with
> - | RErr -> ()
> - | RConstString _ ->
> - failwithf "RConstString cannot be returned from a daemon function"
> - | RInt _ | RInt64 _
> - | RBool _ | RString _ | RStringList _
> - | RIntBool _
> - | RPVList _ | RVGList _ | RLVList _
> - | RStat _ | RStatVFS _
> - | RHashtable _
> - | RDirentList _ ->
> - pr " if (!xdr_%s_ret (xdr, &ctx->ret)) {\n" name;
> - pr " error (g, \"%%s: failed to parse reply\", \"%s\");\n" name;
> - pr " return;\n";
> - pr " }\n";
> - );
> -
> - pr " done:\n";
> - pr " ctx->cb_sequence = 1;\n";
> - pr "}\n\n";
> -
> (* Generate the action stub. *)
> generate_prototype ~extern:false ~semicolon:false ~newline:true
> ~handle:"g" name style;
> @@ -3834,15 +3757,27 @@ check_state (guestfs_h *g, const char *caller)
> | _ -> pr " struct %s_args args;\n" name
> );
>
> - pr " struct %s_ctx ctx;\n" shortname;
> - pr " guestfs_main_loop *ml = guestfs_get_main_loop (g);\n";
> + pr " struct guestfs_message_header hdr = {};\n";
> + pr " struct guestfs_message_error err = {};\n";
> + (match fst style with
> + | RErr -> ()
> + | RConstString _ ->
> + failwithf "RConstString cannot be returned from a daemon function"
> + | RInt _ | RInt64 _
> + | RBool _ | RString _ | RStringList _
> + | RIntBool _
> + | RPVList _ | RVGList _ | RLVList _
> + | RStat _ | RStatVFS _
> + | RHashtable _
> + | RDirentList _ ->
> + pr " struct %s_ret ret = {};\n" name
> + );
> +
> pr " int serial;\n";
> pr "\n";
> pr " if (check_state (g, \"%s\") == -1) return %s;\n" name error_code;
> pr " guestfs_set_busy (g);\n";
> pr "\n";
> - pr " memset (&ctx, 0, sizeof ctx);\n";
> - pr "\n";
>
> (* Send the main header and arguments. *)
> (match snd style with
> @@ -3877,7 +3812,6 @@ check_state (guestfs_h *g, const char *caller)
> pr "\n";
>
> (* Send any additional files (FileIn) requested. *)
> - let need_read_reply_label = ref false in
> List.iter (
> function
> | FileIn n ->
> @@ -3889,83 +3823,130 @@ check_state (guestfs_h *g, const char *caller)
> pr " guestfs_end_busy (g);\n";
> pr " return %s;\n" error_code;
> pr " }\n";
> - pr " if (r == -2) /* daemon cancelled */\n";
> - pr " goto read_reply;\n";
> - need_read_reply_label := true;
> pr " }\n";
> pr "\n";
> | _ -> ()
> ) (snd style);
>
> (* Wait for the reply from the remote end. *)
> - if !need_read_reply_label then pr " read_reply:\n";
> - pr " guestfs__switch_to_receiving (g);\n";
> - pr " ctx.cb_sequence = 0;\n";
> - pr " guestfs_set_reply_callback (g, %s_reply_cb, &ctx);\n" shortname;
> - pr " (void) ml->main_loop_run (ml, g);\n";
> - pr " guestfs_set_reply_callback (g, NULL, NULL);\n";
> - pr " if (ctx.cb_sequence != 1) {\n";
> - pr " error (g, \"%%s reply failed, see earlier error messages\", \"%s\");\n" name;
> - pr " guestfs_end_busy (g);\n";
> - pr " return %s;\n" error_code;
> + pr " guestfs_reply_t reply;\n";
> + pr "\n";
> + pr " for (;;) {\n";
> + pr " guestfs_get_reply (g, &reply, 1);\n";
> + pr "\n";
> + pr " if (GUESTFS_CANCEL_FLAG == reply.len) {\n";
> + pr " /* This message was delayed from a previous file transaction. */\n";
> + pr " continue;\n";
> + pr " }\n";
> + pr "\n";
> + pr " if (GUESTFS_LAUNCH_FLAG == reply.len) {\n";
> + pr " error (g, \"%%s reply failed, received unexpected launch message\",\n";
> + pr " \"%s\");\n" name;
> + pr " guestfs_end_busy (g);\n";
> + pr " return %s;\n" error_code;
> + pr " }\n";
> + pr "\n";
> + pr " if (0 == reply.len) {\n";
> + pr " error (g, \"%%s reply failed, see earlier error messages\", \"%s\");\n" name;
> + pr " guestfs_end_busy (g);\n";
> + pr " return %s;\n" error_code;
> + pr " }\n";
> + pr "\n";
> + pr " break;\n";
> pr " }\n";
> pr "\n";
>
> - pr " if (check_reply_header (g, &ctx.hdr, GUESTFS_PROC_%s, serial) == -1) {\n"
> + pr " if (!xdr_guestfs_message_header (&reply.xdr, &hdr)) {\n";
> + pr " error (g, \"%%s: failed to parse reply header\", \"%s\");\n" name;
> + pr " goto recv_error;\n";
> + pr " }\n";
> + pr "\n";
> + pr " if (hdr.status == GUESTFS_STATUS_ERROR) {\n";
> + pr " if (!xdr_guestfs_message_error (&reply.xdr, &err)) {\n";
> + pr " error (g, \"%%s: failed to parse reply error\", \"%s\");\n"
> + name;
> + pr " goto recv_error;\n";
> + pr " }\n";
> + pr " }\n";
> +
> + (match fst style with
> + | RErr -> ()
> + | RConstString _ ->
> + failwithf "RConstString cannot be returned from a daemon function"
> + | RInt _ | RInt64 _
> + | RBool _ | RString _ | RStringList _
> + | RIntBool _
> + | RPVList _ | RVGList _ | RLVList _
> + | RStat _ | RStatVFS _
> + | RHashtable _
> + | RDirentList _ ->
> + pr " else if (!xdr_%s_ret (&reply.xdr, &ret)) {\n" name;
> + pr " error (g, \"%%s: failed to parse reply\", \"%s\");\n" name;
> + pr " goto recv_error;\n";
> + pr " }\n";
> + );
> +
> + pr " if (check_reply_header (g, &hdr, GUESTFS_PROC_%s, serial) == -1) {\n"
> (String.uppercase shortname);
> - pr " guestfs_end_busy (g);\n";
> - pr " return %s;\n" error_code;
> + pr " goto recv_error;\n";
> pr " }\n";
> pr "\n";
>
> - pr " if (ctx.hdr.status == GUESTFS_STATUS_ERROR) {\n";
> - pr " error (g, \"%%s\", ctx.err.error_message);\n";
> - pr " free (ctx.err.error_message);\n";
> - pr " guestfs_end_busy (g);\n";
> - pr " return %s;\n" error_code;
> + pr " if (hdr.status == GUESTFS_STATUS_ERROR) {\n";
> + pr " error (g, \"%%s\", err.error_message);\n";
> + pr " free (err.error_message);\n";
> + pr " goto recv_error;\n";
> pr " }\n";
> pr "\n";
>
> + pr " guestfs_free_reply (g, &reply);\n\n";
> +
> (* Expecting to receive further files (FileOut)? *)
> List.iter (
> function
> | FileOut n ->
> pr " if (guestfs__receive_file_sync (g, %s) == -1) {\n" n;
> - pr " guestfs_end_busy (g);\n";
> - pr " return %s;\n" error_code;
> + pr " guestfs_end_busy (g);\n";
> + pr " return %s;\n" error_code;
> pr " }\n";
> pr "\n";
> | _ -> ()
> ) (snd style);
>
> - pr " guestfs_end_busy (g);\n";
> + pr " guestfs_end_busy (g);\n\n";
>
> (match fst style with
> | RErr -> pr " return 0;\n"
> | RInt n | RInt64 n | RBool n ->
> - pr " return ctx.ret.%s;\n" n
> + pr " return ret.%s;\n" n
> | RConstString _ ->
> failwithf "RConstString cannot be returned from a daemon function"
> | RString n ->
> - pr " return ctx.ret.%s; /* caller will free */\n" n
> + pr " return ret.%s; /* caller will free */\n" n
> | RStringList n | RHashtable n ->
> pr " /* caller will free this, but we need to add a NULL entry */\n";
> - pr " ctx.ret.%s.%s_val =\n" n n;
> - pr " safe_realloc (g, ctx.ret.%s.%s_val,\n" n n;
> - pr " sizeof (char *) * (ctx.ret.%s.%s_len + 1));\n"
> + pr " ret.%s.%s_val =\n" n n;
> + pr " safe_realloc (g, ret.%s.%s_val,\n" n n;
> + pr " sizeof (char *) * (ret.%s.%s_len + 1));\n"
> n n;
> - pr " ctx.ret.%s.%s_val[ctx.ret.%s.%s_len] = NULL;\n" n n n n;
> - pr " return ctx.ret.%s.%s_val;\n" n n
> + pr " ret.%s.%s_val[ret.%s.%s_len] = NULL;\n" n n n n;
> + pr " return ret.%s.%s_val;\n" n n
> | RIntBool _ ->
> pr " /* caller with free this */\n";
> - pr " return safe_memdup (g, &ctx.ret, sizeof (ctx.ret));\n"
> + pr " return safe_memdup (g, &ret, sizeof (ret));\n"
> | RPVList n | RVGList n | RLVList n
> | RStat n | RStatVFS n
> | RDirentList n ->
> pr " /* caller will free this */\n";
> - pr " return safe_memdup (g, &ctx.ret.%s, sizeof (ctx.ret.%s));\n" n n
> + pr " return safe_memdup (g, &ret.%s, sizeof (ret.%s));\n" n n
> );
>
> + pr "\n";
> + pr " recv_error:\n";
> + pr " guestfs_free_reply (g, &reply);\n";
> + pr " guestfs_end_busy (g);\n";
> + pr " return %s;\n" error_code;
> +
> pr "}\n\n"
> ) daemon_functions
>
> diff --git a/src/guestfs.c b/src/guestfs.c
> index 350d848..79251ca 100644
> --- a/src/guestfs.c
> +++ b/src/guestfs.c
> @@ -21,6 +21,7 @@
> #define _BSD_SOURCE /* for mkdtemp, usleep */
> #define _GNU_SOURCE /* for vasprintf, GNU strerror_r, strchrnul */
>
> +#include <assert.h>
> #include <stdio.h>
> #include <stdlib.h>
> #include <stdarg.h>
> @@ -78,8 +79,10 @@
>
> static void default_error_cb (guestfs_h *g, void *data, const char *msg);
> static void stdout_event (struct guestfs_main_loop *ml, guestfs_h *g, void *data, int watch, int fd, int events);
> -static void sock_read_event (struct guestfs_main_loop *ml, guestfs_h *g, void *data, int watch, int fd, int events);
> -static void sock_write_event (struct guestfs_main_loop *ml, guestfs_h *g, void *data, int watch, int fd, int events);
> +static void sock_event (struct guestfs_main_loop *ml, guestfs_h *g, void *data, int watch, int fd, int events);
> +static void sock_read (guestfs_h *g);
> +static void sock_write (guestfs_h *g);
> +static int sock_update_events (guestfs_h *g);
>
> static void close_handles (void);
>
> @@ -161,6 +164,9 @@ struct guestfs_h
> int stdout_watch; /* Watches qemu stdout for log messages. */
> int sock_watch; /* Watches daemon comm socket. */
>
> + int sock_events; /* events we're listening for on the comm
> + socket */
> +
> char *tmpdir; /* Temporary directory containing socket. */
>
> char *qemu_help, *qemu_version; /* Output of qemu -help, qemu -version. */
> @@ -185,21 +191,18 @@ struct guestfs_h
> void * error_cb_data;
> guestfs_send_cb send_cb;
> void * send_cb_data;
> - guestfs_reply_cb reply_cb;
> - void * reply_cb_data;
> guestfs_log_message_cb log_message_cb;
> void * log_message_cb_data;
> guestfs_subprocess_quit_cb subprocess_quit_cb;
> void * subprocess_quit_cb_data;
> - guestfs_launch_done_cb launch_done_cb;
> - void * launch_done_cb_data;
>
> /* Main loop used by this handle. */
> guestfs_main_loop *main_loop;
>
> /* Messages sent and received from the daemon. */
> char *msg_in;
> - int msg_in_size, msg_in_allocated;
> + size_t msg_in_size, msg_in_pos, msg_in_consumed, msg_in_len;
> +
> char *msg_out;
> int msg_out_size, msg_out_pos;
>
> @@ -228,6 +231,8 @@ guestfs_create (void)
> g->stdout_watch = -1;
> g->sock_watch = -1;
>
> + g->sock_events = 0;
> +
> g->abort_cb = abort;
> g->error_cb = default_error_cb;
> g->error_cb_data = NULL;
> @@ -265,6 +270,11 @@ guestfs_create (void)
> } else
> g->memsize = 500;
>
> + /* Initialise the message receive buffer */
> + g->msg_in_size = GUESTFS_MESSAGE_MAX + sizeof (g->msg_in_len);
As in my comment on the previous revision of this patch, why
is this not '+ 4'?
> + g->msg_in = safe_malloc (g, g->msg_in_size);
> + g->msg_in_pos = g->msg_in_consumed = 0;
> +
> g->main_loop = guestfs_get_default_main_loop ();
>
> /* Start with large serial numbers so they are easy to spot
> @@ -290,9 +300,10 @@ guestfs_create (void)
> return g;
>
> error:
> - free (g->path);
> - free (g->qemu);
> - free (g->append);
> + if (g->msg_in) free (g->msg_in);
> + if (g->path) free (g->path);
> + if (g->qemu) free (g->qemu);
> + if (g->append) free (g->append);
As in my previous comment, this is wrong. free (NULL) is fine,
so you don't need the if statements.
Anyway, -1, but I will try out this patch myself next week.
Rich.
--
Richard Jones, Emerging Technologies, Red Hat http://et.redhat.com/~rjones
virt-p2v converts physical machines to virtual machines. Boot with a
live CD or over the network (PXE) and turn machines into Xen guests.
http://et.redhat.com/~rjones/virt-p2v
More information about the Fedora-virt
mailing list