[Libguestfs] [PATCH 2/3] protocol: Abstract out socket operations from protocol code.

Richard W.M. Jones rjones at redhat.com
Thu Mar 7 21:11:54 UTC 2013


From: "Richard W.M. Jones" <rjones at redhat.com>

This creates an abstract layer below the protocol code which handles
the socket operations.  This will make it easier to introduce libvirt
virSocketPtr operations in future.

In the handle, g->conn contains the connection to the appliance.
g->conn is NULL when we're not connected.

poll(2) is used instead of select(2).

All error messages about launch failing or the appliance unexpectedly
dying are handled by two common error message functions, and these
contain a better explanation of what to do.
---
 examples/guestfs-faq.pod       |   3 +
 po/POTFILES                    |   1 +
 src/Makefile.am                |   1 +
 src/conn-socket.c              | 449 ++++++++++++++++++++++
 src/guestfs-internal.h         |  51 ++-
 src/handle.c                   |  13 +-
 src/launch-appliance.c         |  61 ++-
 src/launch-libvirt.c           | 113 +++---
 src/launch-unix.c              |  31 +-
 src/launch.c                   |  17 +-
 src/proto.c                    | 822 +++++++++++++----------------------------
 tests/regressions/rhbz790721.c |   9 +-
 12 files changed, 882 insertions(+), 689 deletions(-)
 create mode 100644 src/conn-socket.c

diff --git a/examples/guestfs-faq.pod b/examples/guestfs-faq.pod
index cf6703f..a5977f9 100644
--- a/examples/guestfs-faq.pod
+++ b/examples/guestfs-faq.pod
@@ -142,6 +142,9 @@ L<https://bugzilla.redhat.com/show_bug.cgi?id=806106>.
 
 =head2 "child process died unexpectedly"
 
+[This error message was changed in libguestfs 1.21.18 to something
+more explanatory.]
+
 This error indicates that qemu failed or the host kernel could not boot.
 To get further information about the failure, you have to run:
 
diff --git a/po/POTFILES b/po/POTFILES
index d8a9d9c..6373fe2 100644
--- a/po/POTFILES
+++ b/po/POTFILES
@@ -241,6 +241,7 @@ src/canonical-name.c
 src/cleanup-structs.c
 src/cleanup.c
 src/command.c
+src/conn-socket.c
 src/dbdump.c
 src/errnostring-gperf.c
 src/errnostring.c
diff --git a/src/Makefile.am b/src/Makefile.am
index 6fdbe41..bf3314b 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -85,6 +85,7 @@ libguestfs_la_SOURCES = \
 	bindtests.c \
 	command.c \
 	canonical-name.c \
+	conn-socket.c \
 	dbdump.c \
 	errors.c \
 	event-string.c \
diff --git a/src/conn-socket.c b/src/conn-socket.c
new file mode 100644
index 0000000..c1ad31c
--- /dev/null
+++ b/src/conn-socket.c
@@ -0,0 +1,449 @@
+/* libguestfs
+ * Copyright (C) 2013 Red Hat Inc.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+ */
+
+/* Connection module for regular POSIX sockets. */
+
+#include <config.h>
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <unistd.h>
+#include <fcntl.h>
+#include <errno.h>
+#include <poll.h>
+#include <sys/stat.h>
+#include <sys/socket.h>
+#include <sys/types.h>
+#include <assert.h>
+
+#include "guestfs.h"
+#include "guestfs-internal.h"
+
+struct connection_socket {
+  const struct connection_ops *ops;
+
+  int console_sock;          /* Appliance console (for debug info). */
+  int daemon_sock;           /* Daemon communications socket. */
+
+  /* Socket for accepting a connection from the daemon.  Only used
+   * before and during accept_connection.
+   */
+  int daemon_accept_sock;
+};
+
+static int handle_log_message (guestfs_h *g, struct connection_socket *conn);
+
+static int
+accept_connection (guestfs_h *g, struct connection *connv)
+{
+  struct connection_socket *conn = (struct connection_socket *) connv;
+  int sock = -1;
+
+  if (conn->daemon_accept_sock == -1) {
+    error (g, _("accept_connection called twice"));
+    return -1;
+  }
+
+  while (sock == -1) {
+    struct pollfd fds[2];
+    nfds_t nfds = 1;
+    int r;
+
+    fds[0].fd = conn->daemon_accept_sock;
+    fds[0].events = POLLIN;
+    fds[0].revents = 0;
+
+    if (conn->console_sock >= 0) {
+      fds[1].fd = conn->console_sock;
+      fds[1].events = POLLIN;
+      fds[1].revents = 0;
+      nfds++;
+    }
+
+    r = poll (fds, nfds, -1);
+    if (r == -1) {
+      if (errno == EINTR || errno == EAGAIN)
+        continue;
+      perrorf (g, "accept_connection: poll");
+      return -1;
+    }
+
+    /* Log message? */
+    if (nfds > 1 && (fds[1].revents & POLLIN) != 0) {
+      r = handle_log_message (g, conn);
+      if (r <= 0)
+        return r;
+    }
+
+    /* Accept on socket? */
+    if ((fds[0].revents & POLLIN) != 0) {
+      sock = accept4 (conn->daemon_accept_sock, NULL, NULL, SOCK_CLOEXEC);
+      if (sock == -1) {
+        if (errno == EINTR || errno == EAGAIN)
+          continue;
+        perrorf (g, "accept_connection: accept");
+        return -1;
+      }
+    }
+  }
+
+  /* Got a connection and accepted it, so update the connection's
+   * internal status.
+   */
+  close (conn->daemon_accept_sock);
+  conn->daemon_accept_sock = -1;
+  conn->daemon_sock = sock;
+
+  /* Make sure the new socket is non-blocking. */
+  if (fcntl (conn->daemon_sock, F_SETFL, O_NONBLOCK) == -1) {
+    perrorf (g, "accept_connection: fcntl");
+    return -1;
+  }
+
+  return 1;
+}
+
+static ssize_t
+read_data (guestfs_h *g, struct connection *connv, void *bufv, size_t len)
+{
+  char *buf = bufv;
+  struct connection_socket *conn = (struct connection_socket *) connv;
+  size_t original_len = len;
+
+  if (conn->daemon_sock == -1) {
+    error (g, _("read_data: socket not connected"));
+    return -1;
+  }
+
+  while (len > 0) {
+    struct pollfd fds[2];
+    nfds_t nfds = 1;
+    int r;
+
+    fds[0].fd = conn->daemon_sock;
+    fds[0].events = POLLIN;
+    fds[0].revents = 0;
+
+    if (conn->console_sock >= 0) {
+      fds[1].fd = conn->console_sock;
+      fds[1].events = POLLIN;
+      fds[1].revents = 0;
+      nfds++;
+    }
+
+    r = poll (fds, nfds, -1);
+    if (r == -1) {
+      if (errno == EINTR || errno == EAGAIN)
+        continue;
+      perrorf (g, "read_data: poll");
+      return -1;
+    }
+
+    /* Log message? */
+    if (nfds > 1 && (fds[1].revents & POLLIN) != 0) {
+      r = handle_log_message (g, conn);
+      if (r <= 0)
+        return r;
+    }
+
+    /* Read data on daemon socket? */
+    if ((fds[0].revents & POLLIN) != 0) {
+      ssize_t n = read (conn->daemon_sock, buf, len);
+      if (n == -1) {
+        if (errno == EINTR || errno == EAGAIN)
+          continue;
+        if (errno == ECONNRESET) /* essentially the same as EOF case */
+          return 0;
+        perrorf (g, "read_data: read");
+        return -1;
+      }
+      if (n == 0)
+        return 0;
+
+      buf += n;
+      len -= n;
+    }
+  }
+
+  return original_len;
+}
+
+static int
+can_read_data (guestfs_h *g, struct connection *connv)
+{
+  struct connection_socket *conn = (struct connection_socket *) connv;
+  struct pollfd fd;
+  int r;
+
+  if (conn->daemon_sock == -1) {
+    error (g, _("can_read_data: socket not connected"));
+    return -1;
+  }
+
+  fd.fd = conn->daemon_sock;
+  fd.events = POLLIN;
+  fd.revents = 0;
+
+ again:
+  r = poll (&fd, 1, 0);
+  if (r == -1) {
+    if (errno == EINTR || errno == EAGAIN)
+      goto again;
+    perrorf (g, "can_read_data: poll");
+    return -1;
+  }
+
+  return (fd.revents & POLLIN) != 0 ? 1 : 0;
+}
+
+static ssize_t
+write_data (guestfs_h *g, struct connection *connv,
+            const void *bufv, size_t len)
+{
+  const char *buf = bufv;
+  struct connection_socket *conn = (struct connection_socket *) connv;
+  size_t original_len = len;
+
+  if (conn->daemon_sock == -1) {
+    error (g, _("write_data: socket not connected"));
+    return -1;
+  }
+
+  while (len > 0) {
+    struct pollfd fds[2];
+    nfds_t nfds = 1;
+    int r;
+
+    fds[0].fd = conn->daemon_sock;
+    fds[0].events = POLLOUT;
+    fds[0].revents = 0;
+
+    if (conn->console_sock >= 0) {
+      fds[1].fd = conn->console_sock;
+      fds[1].events = POLLIN;
+      fds[1].revents = 0;
+      nfds++;
+    }
+
+    r = poll (fds, nfds, -1);
+    if (r == -1) {
+      if (errno == EINTR || errno == EAGAIN)
+        continue;
+      perrorf (g, "write_data: poll");
+      return -1;
+    }
+
+    /* Log message? */
+    if (nfds > 1 && (fds[1].revents & POLLIN) != 0) {
+      r = handle_log_message (g, conn);
+      if (r <= 0)
+        return r;
+    }
+
+    /* Can write data on daemon socket? */
+    if ((fds[0].revents & POLLOUT) != 0) {
+      ssize_t n = write (conn->daemon_sock, buf, len);
+      if (n == -1) {
+        if (errno == EINTR || errno == EAGAIN)
+          continue;
+        if (errno == EPIPE) /* Disconnected from guest (RHBZ#508713). */
+          return 0;
+        perrorf (g, "write_data: write");
+        return -1;
+      }
+
+      buf += n;
+      len -= n;
+    }
+  }
+
+  return original_len;
+}
+
+/* This is called if conn->console_sock becomes ready to read while we
+ * are doing one of the connection operations above.  It reads and
+ * deals with the log message.
+ *
+ * Returns:
+ *   1 = log message(s) were handled successfully
+ *   0 = connection to appliance closed
+ *  -1 = error
+ */
+static int
+handle_log_message (guestfs_h *g,
+                    struct connection_socket *conn)
+{
+  char buf[BUFSIZ];
+  ssize_t n;
+
+  /* Carried over from ancient proto.c code.  The comment there was:
+   *
+   *   "QEMU's console emulates a 16550A serial port.  The real 16550A
+   *   device has a small FIFO buffer (16 bytes) which means here we
+   *   see lots of small reads of 1-16 bytes in length, usually single
+   *   bytes.  Sleeping here for a very brief period groups reads
+   *   together (so we usually get a few lines of output at once) and
+   *   improves overall throughput, as well as making the event
+   *   interface a bit more sane for callers.  With a virtio-serial
+   *   based console (not yet implemented) we may be able to remove
+   *   this.  XXX"
+   */
+  usleep (1000);
+
+  n = read (conn->console_sock, buf, sizeof buf);
+  if (n == 0)
+    return 0;
+
+  if (n == -1) {
+    if (errno == EINTR || errno == EAGAIN)
+      return 1; /* not an error */
+
+    perrorf (g, _("error reading console messages from the appliance"));
+    return -1;
+  }
+
+  /* It's an actual log message, send it upwards if anyone is listening. */
+  guestfs___call_callbacks_message (g, GUESTFS_EVENT_APPLIANCE, buf, n);
+
+  /* XXX This is a gross hack and massive layering violation.  See the
+   * comment above guestfs___launch_send_progress.
+   *
+   * Fix this gross hack by:
+   * (1) Have a guestfs___proto_log_message() function in proto.c.
+   * (2) Replace above guestfs___call_callbacks_message with (1).
+   * (3) Add the code below to guestfs___proto_log_message.
+   */
+  if (g->state == LAUNCHING) {
+    const char *sentinel;
+    size_t len;
+
+    sentinel = "Linux version"; /* kernel up */
+    len = strlen (sentinel);
+    if (memmem (buf, n, sentinel, len) != NULL)
+      guestfs___launch_send_progress (g, 6);
+
+    sentinel = "Starting /init script"; /* /init running */
+    len = strlen (sentinel);
+    if (memmem (buf, n, sentinel, len) != NULL)
+      guestfs___launch_send_progress (g, 9);
+  }
+
+  return 1;
+}
+
+static void
+free_conn_socket (guestfs_h *g, struct connection *connv)
+{
+  struct connection_socket *conn = (struct connection_socket *) connv;
+
+  if (conn->console_sock >= 0)
+    close (conn->console_sock);
+  if (conn->daemon_sock >= 0)
+    close (conn->daemon_sock);
+  if (conn->daemon_accept_sock >= 0)
+    close (conn->daemon_accept_sock);
+
+  free (conn);
+}
+
+static struct connection_ops ops = {
+  .free_connection = free_conn_socket,
+  .accept_connection = accept_connection,
+  .read_data = read_data,
+  .write_data = write_data,
+  .can_read_data = can_read_data,
+};
+
+/* Create a new socket connection, listening.
+ *
+ * Note that it's OK for console_sock to be passed as -1, meaning
+ * there's no console available for this appliance.
+ *
+ * After calling this, daemon_accept_sock is owned by the connection,
+ * and will be closed properly either in accept_connection or
+ * free_connection.
+ */
+struct connection *
+guestfs___new_conn_socket_listening (guestfs_h *g,
+                                     int daemon_accept_sock,
+                                     int console_sock)
+{
+  struct connection_socket *conn = safe_malloc (g, sizeof *conn);
+
+  assert (daemon_accept_sock >= 0);
+
+  if (fcntl (daemon_accept_sock, F_SETFL, O_NONBLOCK) == -1) {
+    perrorf (g, "new_conn_socket_listening: fcntl");
+    return NULL;
+  }
+
+  if (console_sock >= 0) {
+    if (fcntl (console_sock, F_SETFL, O_NONBLOCK) == -1) {
+      perrorf (g, "new_conn_socket_listening: fcntl");
+      return NULL;
+    }
+  }
+
+  /* Set the operations. */
+  conn->ops = &ops;
+
+  /* Set the internal state. */
+  conn->console_sock = console_sock;
+  conn->daemon_sock = -1;
+  conn->daemon_accept_sock = daemon_accept_sock;
+
+  return (struct connection *) conn;
+}
+
+/* Create a new socket connection, connected.
+ *
+ * As above, but the caller passes us a connected daemon_sock
+ * and promises not to call accept_connection.
+ */
+struct connection *
+guestfs___new_conn_socket_connected (guestfs_h *g,
+                                     int daemon_sock,
+                                     int console_sock)
+{
+  struct connection_socket *conn = safe_malloc (g, sizeof *conn);
+
+  assert (daemon_sock >= 0);
+
+  if (fcntl (daemon_sock, F_SETFL, O_NONBLOCK) == -1) {
+    perrorf (g, "new_conn_socket_connected: fcntl");
+    return NULL;
+  }
+
+  if (console_sock >= 0) {
+    if (fcntl (console_sock, F_SETFL, O_NONBLOCK) == -1) {
+      perrorf (g, "new_conn_socket_connected: fcntl");
+      return NULL;
+    }
+  }
+
+  /* Set the operations. */
+  conn->ops = &ops;
+
+  /* Set the internal state. */
+  conn->console_sock = console_sock;
+  conn->daemon_sock = daemon_sock;
+  conn->daemon_accept_sock = -1;
+
+  return (struct connection *) conn;
+}
diff --git a/src/guestfs-internal.h b/src/guestfs-internal.h
index c63ea40..a6df003 100644
--- a/src/guestfs-internal.h
+++ b/src/guestfs-internal.h
@@ -149,6 +149,48 @@ extern struct attach_ops attach_ops_appliance;
 extern struct attach_ops attach_ops_libvirt;
 extern struct attach_ops attach_ops_unix;
 
+/* Connection module.  A 'connection' represents the appliance console
+ * connection plus the daemon connection.  It hides the underlying
+ * representation (POSIX sockets, virStreamPtr).
+ */
+struct connection {
+  const struct connection_ops *ops;
+
+  /* In the real struct, private data used by each connection module
+   * follows here.
+   */
+};
+
+struct connection_ops {
+  /* Close everything and free the connection struct and any internal data. */
+  void (*free_connection) (guestfs_h *g, struct connection *);
+
+  /* Accept the connection (back to us) from the daemon.
+   *
+   * Returns: 1 = accepted, 0 = appliance closed connection, -1 = error
+   */
+  int (*accept_connection) (guestfs_h *g, struct connection *);
+
+  /* Read/write the given buffer from/to the daemon.  The whole buffer is
+   * read or written.  Partial reads/writes are automatically completed
+   * if possible, and if this wasn't possible it returns an error.
+   *
+   * These functions also monitor the console socket and deliver log
+   * messages up as events.  This is entirely transparent to the caller.
+   *
+   * Normal return is number of bytes read/written.  Both functions
+   * return 0 to mean that the appliance closed the connection or
+   * otherwise went away.  -1 means there's an error.
+   */
+  ssize_t (*read_data) (guestfs_h *g, struct connection *, void *buf, size_t len);
+  ssize_t (*write_data) (guestfs_h *g, struct connection *, const void *buf, size_t len);
+
+  /* Test if data is available to read on the daemon socket, without blocking.
+   * Returns: 1 = yes, 0 = no, -1 = error
+   */
+  int (*can_read_data) (guestfs_h *g, struct connection *);
+};
+
 /* Stack of old error handlers. */
 struct error_cb_stack {
   struct error_cb_stack   *next;
@@ -263,8 +305,7 @@ struct guestfs_h
   int unique;
 
   /*** Protocol. ***/
-  int console_sock;          /* Appliance console (for debug info). */
-  int daemon_sock;           /* Daemon communications socket. */
+  struct connection *conn;              /* Connection to appliance. */
   int msg_next_serial;
 
 #if HAVE_FUSE
@@ -486,9 +527,12 @@ extern int guestfs___recv_discard (guestfs_h *g, const char *fn);
 extern int guestfs___send_file (guestfs_h *g, const char *filename);
 extern int guestfs___recv_file (guestfs_h *g, const char *filename);
 extern int guestfs___recv_from_daemon (guestfs_h *g, uint32_t *size_rtn, void **buf_rtn);
-extern int guestfs___accept_from_daemon (guestfs_h *g);
 extern void guestfs___progress_message_callback (guestfs_h *g, const struct guestfs_progress *message);
 
+/* conn-socket.c */
+extern struct connection *guestfs___new_conn_socket_listening (guestfs_h *g, int daemon_accept_sock, int console_sock);
+extern struct connection *guestfs___new_conn_socket_connected (guestfs_h *g, int daemon_sock, int console_sock);
+
 /* events.c */
 extern void guestfs___call_callbacks_void (guestfs_h *g, uint64_t event);
 extern void guestfs___call_callbacks_message (guestfs_h *g, uint64_t event, const char *buf, size_t buf_len);
@@ -510,6 +554,7 @@ extern void guestfs___launch_send_progress (guestfs_h *g, int perdozen);
 extern size_t guestfs___checkpoint_drives (guestfs_h *g);
 extern void guestfs___rollback_drives (guestfs_h *g, size_t);
 extern void guestfs___launch_failed_error (guestfs_h *g);
+extern void guestfs___unexpected_close_error (guestfs_h *g);
 extern void guestfs___add_dummy_appliance_drive (guestfs_h *g);
 extern void guestfs___free_drives (guestfs_h *g);
 extern char *guestfs___appliance_command_line (guestfs_h *g, const char *appliance_dev, int flags);
diff --git a/src/handle.c b/src/handle.c
index 47a2246..1c0e0d5 100644
--- a/src/handle.c
+++ b/src/handle.c
@@ -91,8 +91,7 @@ guestfs_create_flags (unsigned flags, ...)
 
   g->state = CONFIG;
 
-  g->console_sock = -1;
-  g->daemon_sock = -1;
+  g->conn = NULL;
 
   guestfs___init_error_handler (g);
   g->abort_cb = abort;
@@ -366,12 +365,10 @@ shutdown_backend (guestfs_h *g, int check_for_errors)
   }
 
   /* Close sockets. */
-  if (g->console_sock >= 0)
-    close (g->console_sock);
-  if (g->daemon_sock >= 0)
-    close (g->daemon_sock);
-  g->console_sock = -1;
-  g->daemon_sock = -1;
+  if (g->conn) {
+    g->conn->ops->free_connection (g, g->conn);
+    g->conn = NULL;
+  }
 
   if (g->attach_ops->shutdown (g, check_for_errors) == -1)
     ret = -1;
diff --git a/src/launch-appliance.c b/src/launch-appliance.c
index 99bc541..d8d8e03 100644
--- a/src/launch-appliance.c
+++ b/src/launch-appliance.c
@@ -167,6 +167,7 @@ add_cmdline_shell_unquoted (guestfs_h *g, const char *options)
 static int
 launch_appliance (guestfs_h *g, const char *arg)
 {
+  int daemon_accept_sock = -1, console_sock = -1;
   int r;
   int sv[2];
   char guestfsd_sock[256];
@@ -210,27 +211,22 @@ launch_appliance (guestfs_h *g, const char *arg)
   snprintf (guestfsd_sock, sizeof guestfsd_sock, "%s/guestfsd.sock", g->tmpdir);
   unlink (guestfsd_sock);
 
-  g->daemon_sock = socket (AF_UNIX, SOCK_STREAM|SOCK_CLOEXEC, 0);
-  if (g->daemon_sock == -1) {
+  daemon_accept_sock = socket (AF_UNIX, SOCK_STREAM|SOCK_CLOEXEC, 0);
+  if (daemon_accept_sock == -1) {
     perrorf (g, "socket");
     goto cleanup0;
   }
 
-  if (fcntl (g->daemon_sock, F_SETFL, O_NONBLOCK) == -1) {
-    perrorf (g, "fcntl");
-    goto cleanup0;
-  }
-
   addr.sun_family = AF_UNIX;
   strncpy (addr.sun_path, guestfsd_sock, UNIX_PATH_MAX);
   addr.sun_path[UNIX_PATH_MAX-1] = '\0';
 
-  if (bind (g->daemon_sock, &addr, sizeof addr) == -1) {
+  if (bind (daemon_accept_sock, &addr, sizeof addr) == -1) {
     perrorf (g, "bind");
     goto cleanup0;
   }
 
-  if (listen (g->daemon_sock, 1) == -1) {
+  if (listen (daemon_accept_sock, 1) == -1) {
     perrorf (g, "listen");
     goto cleanup0;
   }
@@ -606,12 +602,7 @@ launch_appliance (guestfs_h *g, const char *arg)
     /* Close the other end of the socketpair. */
     close (sv[1]);
 
-    if (fcntl (sv[0], F_SETFL, O_NONBLOCK) == -1) {
-      perrorf (g, "fcntl");
-      goto cleanup1;
-    }
-
-    g->console_sock = sv[0];    /* stdin of child */
+    console_sock = sv[0];       /* stdin of child */
     sv[0] = -1;
   }
 
@@ -620,9 +611,21 @@ launch_appliance (guestfs_h *g, const char *arg)
   /* Wait for qemu to start and to connect back to us via
    * virtio-serial and send the GUESTFS_LAUNCH_FLAG message.
    */
-  r = guestfs___accept_from_daemon (g);
+  g->conn =
+    guestfs___new_conn_socket_listening (g, daemon_accept_sock, console_sock);
+  if (!g->conn)
+    goto cleanup1;
+
+  /* g->conn now owns these sockets. */
+  daemon_accept_sock = console_sock = -1;
+
+  r = g->conn->ops->accept_connection (g, g->conn);
   if (r == -1)
     goto cleanup1;
+  if (r == 0) {
+    guestfs___launch_failed_error (g);
+    goto cleanup1;
+  }
 
   /* NB: We reach here just because qemu has opened the socket.  It
    * does not mean the daemon is up until we read the
@@ -631,20 +634,6 @@ launch_appliance (guestfs_h *g, const char *arg)
    * able to open a drive.
    */
 
-  /* Close the listening socket. */
-  if (close (g->daemon_sock) != 0) {
-    perrorf (g, "close: listening socket");
-    close (r);
-    g->daemon_sock = -1;
-    goto cleanup1;
-  }
-  g->daemon_sock = r; /* This is the accepted data socket. */
-
-  if (fcntl (g->daemon_sock, F_SETFL, O_NONBLOCK) == -1) {
-    perrorf (g, "fcntl");
-    goto cleanup1;
-  }
-
   r = guestfs___recv_from_daemon (g, &size, &buf);
 
   if (r == -1) {
@@ -686,16 +675,18 @@ launch_appliance (guestfs_h *g, const char *arg)
   if (g->app.recoverypid > 0) kill (g->app.recoverypid, 9);
   if (g->app.pid > 0) waitpid (g->app.pid, NULL, 0);
   if (g->app.recoverypid > 0) waitpid (g->app.recoverypid, NULL, 0);
-  if (g->console_sock >= 0) close (g->console_sock);
-  g->console_sock = -1;
   g->app.pid = 0;
   g->app.recoverypid = 0;
   memset (&g->launch_t, 0, sizeof g->launch_t);
 
  cleanup0:
-  if (g->daemon_sock >= 0) {
-    close (g->daemon_sock);
-    g->daemon_sock = -1;
+  if (daemon_accept_sock >= 0)
+    close (daemon_accept_sock);
+  if (console_sock >= 0)
+    close (console_sock);
+  if (g->conn) {
+    g->conn->ops->free_connection (g, g->conn);
+    g->conn = NULL;
   }
   g->state = CONFIG;
   return -1;
diff --git a/src/launch-libvirt.c b/src/launch-libvirt.c
index d771392..10f1281 100644
--- a/src/launch-libvirt.c
+++ b/src/launch-libvirt.c
@@ -119,8 +119,8 @@ struct libvirt_xml_params {
   char *appliance_overlay;      /* path to qcow2 overlay backed by appliance */
   char appliance_dev[64];       /* appliance device name */
   size_t appliance_index;       /* index of appliance */
-  char guestfsd_sock[UNIX_PATH_MAX]; /* paths to sockets */
-  char console_sock[UNIX_PATH_MAX];
+  char guestfsd_path[UNIX_PATH_MAX]; /* paths to sockets */
+  char console_path[UNIX_PATH_MAX];
   bool enable_svirt;            /* false if we decided to disable sVirt */
   bool is_kvm;                  /* false = qemu, true = kvm */
   bool current_proc_is_root;    /* true = euid is root */
@@ -145,6 +145,7 @@ static void selinux_warning (guestfs_h *g, const char *func, const char *selinux
 static int
 launch_libvirt (guestfs_h *g, const char *libvirt_uri)
 {
+  int daemon_accept_sock = -1, console_sock = -1;
   unsigned long version;
   virConnectPtr conn = NULL;
   virDomainPtr dom = NULL;
@@ -157,7 +158,7 @@ launch_libvirt (guestfs_h *g, const char *libvirt_uri)
   CLEANUP_FREE xmlChar *xml = NULL;
   CLEANUP_FREE char *appliance = NULL;
   struct sockaddr_un addr;
-  int console = -1, r;
+  int r;
   uint32_t size;
   CLEANUP_FREE void *buf = NULL;
   struct drive *drv;
@@ -252,56 +253,51 @@ launch_libvirt (guestfs_h *g, const char *libvirt_uri)
   /* Using virtio-serial, we need to create a local Unix domain socket
    * for qemu to connect to.
    */
-  snprintf (params.guestfsd_sock, sizeof params.guestfsd_sock,
+  snprintf (params.guestfsd_path, sizeof params.guestfsd_path,
             "%s/guestfsd.sock", g->tmpdir);
-  unlink (params.guestfsd_sock);
+  unlink (params.guestfsd_path);
 
   set_socket_create_context (g);
 
-  g->daemon_sock = socket (AF_UNIX, SOCK_STREAM|SOCK_CLOEXEC, 0);
-  if (g->daemon_sock == -1) {
+  daemon_accept_sock = socket (AF_UNIX, SOCK_STREAM|SOCK_CLOEXEC, 0);
+  if (daemon_accept_sock == -1) {
     perrorf (g, "socket");
     goto cleanup;
   }
 
-  if (fcntl (g->daemon_sock, F_SETFL, O_NONBLOCK) == -1) {
-    perrorf (g, "fcntl");
-    goto cleanup;
-  }
-
   addr.sun_family = AF_UNIX;
-  memcpy (addr.sun_path, params.guestfsd_sock, UNIX_PATH_MAX);
+  memcpy (addr.sun_path, params.guestfsd_path, UNIX_PATH_MAX);
 
-  if (bind (g->daemon_sock, &addr, sizeof addr) == -1) {
+  if (bind (daemon_accept_sock, &addr, sizeof addr) == -1) {
     perrorf (g, "bind");
     goto cleanup;
   }
 
-  if (listen (g->daemon_sock, 1) == -1) {
+  if (listen (daemon_accept_sock, 1) == -1) {
     perrorf (g, "listen");
     goto cleanup;
   }
 
   /* For the serial console. */
-  snprintf (params.console_sock, sizeof params.console_sock,
+  snprintf (params.console_path, sizeof params.console_path,
             "%s/console.sock", g->tmpdir);
-  unlink (params.console_sock);
+  unlink (params.console_path);
 
-  console = socket (AF_UNIX, SOCK_STREAM|SOCK_CLOEXEC, 0);
-  if (console == -1) {
+  console_sock = socket (AF_UNIX, SOCK_STREAM|SOCK_CLOEXEC, 0);
+  if (console_sock == -1) {
     perrorf (g, "socket");
     goto cleanup;
   }
 
   addr.sun_family = AF_UNIX;
-  memcpy (addr.sun_path, params.console_sock, UNIX_PATH_MAX);
+  memcpy (addr.sun_path, params.console_path, UNIX_PATH_MAX);
 
-  if (bind (console, &addr, sizeof addr) == -1) {
+  if (bind (console_sock, &addr, sizeof addr) == -1) {
     perrorf (g, "bind");
     goto cleanup;
   }
 
-  if (listen (console, 1) == -1) {
+  if (listen (console_sock, 1) == -1) {
     perrorf (g, "listen");
     goto cleanup;
   }
@@ -331,24 +327,24 @@ launch_libvirt (guestfs_h *g, const char *libvirt_uri)
      */
     struct group *grp;
 
-    if (chmod (params.guestfsd_sock, 0660) == -1) {
-      perrorf (g, "chmod: %s", params.guestfsd_sock);
+    if (chmod (params.guestfsd_path, 0660) == -1) {
+      perrorf (g, "chmod: %s", params.guestfsd_path);
       goto cleanup;
     }
 
-    if (chmod (params.console_sock, 0660) == -1) {
-      perrorf (g, "chmod: %s", params.console_sock);
+    if (chmod (params.console_path, 0660) == -1) {
+      perrorf (g, "chmod: %s", params.console_path);
       goto cleanup;
     }
 
     grp = getgrnam ("qemu");
     if (grp != NULL) {
-      if (chown (params.guestfsd_sock, 0, grp->gr_gid) == -1) {
-        perrorf (g, "chown: %s", params.guestfsd_sock);
+      if (chown (params.guestfsd_path, 0, grp->gr_gid) == -1) {
+        perrorf (g, "chown: %s", params.guestfsd_path);
         goto cleanup;
       }
-      if (chown (params.console_sock, 0, grp->gr_gid) == -1) {
-        perrorf (g, "chown: %s", params.console_sock);
+      if (chown (params.console_path, 0, grp->gr_gid) == -1) {
+        perrorf (g, "chown: %s", params.console_path);
         goto cleanup;
       }
     } else
@@ -387,26 +383,37 @@ launch_libvirt (guestfs_h *g, const char *libvirt_uri)
   g->state = LAUNCHING;
 
   /* Wait for console socket to open. */
-  r = accept4 (console, NULL, NULL, SOCK_NONBLOCK|SOCK_CLOEXEC);
+  r = accept4 (console_sock, NULL, NULL, SOCK_NONBLOCK|SOCK_CLOEXEC);
   if (r == -1) {
     perrorf (g, "accept");
     goto cleanup;
   }
-  if (close (console) == -1) {
+  if (close (console_sock) == -1) {
     perrorf (g, "close: console socket");
-    console = -1;
+    console_sock = -1;
     close (r);
     goto cleanup;
   }
-  console = -1;
-  g->console_sock = r;      /* This is the accepted console socket. */
+  console_sock = r;         /* This is the accepted console socket. */
 
   /* Wait for libvirt domain to start and to connect back to us via
    * virtio-serial and send the GUESTFS_LAUNCH_FLAG message.
    */
-  r = guestfs___accept_from_daemon (g);
+  g->conn =
+    guestfs___new_conn_socket_listening (g, daemon_accept_sock, console_sock);
+  if (!g->conn)
+    goto cleanup;
+
+  /* g->conn now owns these sockets. */
+  daemon_accept_sock = console_sock = -1;
+
+  r = g->conn->ops->accept_connection (g, g->conn);
   if (r == -1)
     goto cleanup;
+  if (r == 0) {
+    guestfs___launch_failed_error (g);
+    goto cleanup;
+  }
 
   /* NB: We reach here just because qemu has opened the socket.  It
    * does not mean the daemon is up until we read the
@@ -415,20 +422,6 @@ launch_libvirt (guestfs_h *g, const char *libvirt_uri)
    * able to open a drive.
    */
 
-  /* Close the listening socket. */
-  if (close (g->daemon_sock) == -1) {
-    perrorf (g, "close: listening socket");
-    close (r);
-    g->daemon_sock = -1;
-    goto cleanup;
-  }
-  g->daemon_sock = r; /* This is the accepted data socket. */
-
-  if (fcntl (g->daemon_sock, F_SETFL, O_NONBLOCK) == -1) {
-    perrorf (g, "fcntl");
-    goto cleanup;
-  }
-
   r = guestfs___recv_from_daemon (g, &size, &buf);
 
   if (r == -1) {
@@ -473,15 +466,13 @@ launch_libvirt (guestfs_h *g, const char *libvirt_uri)
  cleanup:
   clear_socket_create_context (g);
 
-  if (console >= 0)
-    close (console);
-  if (g->console_sock >= 0) {
-    close (g->console_sock);
-    g->console_sock = -1;
-  }
-  if (g->daemon_sock >= 0) {
-    close (g->daemon_sock);
-    g->daemon_sock = -1;
+  if (console_sock >= 0)
+    close (console_sock);
+  if (daemon_accept_sock >= 0)
+    close (daemon_accept_sock);
+  if (g->conn) {
+    g->conn->ops->free_connection (g, g->conn);
+    g->conn = NULL;
   }
 
   if (dom) {
@@ -1006,7 +997,7 @@ construct_libvirt_xml_devices (guestfs_h *g,
                                          BAD_CAST "connect"));
   XMLERROR (-1,
             xmlTextWriterWriteAttribute (xo, BAD_CAST "path",
-                                         BAD_CAST params->console_sock));
+                                         BAD_CAST params->console_path));
   XMLERROR (-1, xmlTextWriterEndElement (xo));
   XMLERROR (-1, xmlTextWriterStartElement (xo, BAD_CAST "target"));
   XMLERROR (-1,
@@ -1026,7 +1017,7 @@ construct_libvirt_xml_devices (guestfs_h *g,
                                          BAD_CAST "connect"));
   XMLERROR (-1,
             xmlTextWriterWriteAttribute (xo, BAD_CAST "path",
-                                         BAD_CAST params->guestfsd_sock));
+                                         BAD_CAST params->guestfsd_path));
   XMLERROR (-1, xmlTextWriterEndElement (xo));
   XMLERROR (-1, xmlTextWriterStartElement (xo, BAD_CAST "target"));
   XMLERROR (-1,
diff --git a/src/launch-unix.c b/src/launch-unix.c
index db49a3c..0bbce5f 100644
--- a/src/launch-unix.c
+++ b/src/launch-unix.c
@@ -36,7 +36,7 @@
 static int
 launch_unix (guestfs_h *g, const char *sockpath)
 {
-  int r;
+  int r, daemon_sock = -1;
   struct sockaddr_un addr;
   uint32_t size;
   void *buf = NULL;
@@ -46,16 +46,11 @@ launch_unix (guestfs_h *g, const char *sockpath)
     return -1;
   }
 
-  /* Set this to nothing so we don't try to read from a random file
-   * descriptor.
-   */
-  g->console_sock = -1;
-
   if (g->verbose)
     guestfs___print_timestamped_message (g, "connecting to %s", sockpath);
 
-  g->daemon_sock = socket (AF_UNIX, SOCK_STREAM|SOCK_CLOEXEC, 0);
-  if (g->daemon_sock == -1) {
+  daemon_sock = socket (AF_UNIX, SOCK_STREAM|SOCK_CLOEXEC, 0);
+  if (daemon_sock == -1) {
     perrorf (g, "socket");
     return -1;
   }
@@ -66,20 +61,22 @@ launch_unix (guestfs_h *g, const char *sockpath)
 
   g->state = LAUNCHING;
 
-  if (connect (g->daemon_sock, &addr, sizeof addr) == -1) {
+  if (connect (daemon_sock, &addr, sizeof addr) == -1) {
     perrorf (g, "bind");
     goto cleanup;
   }
 
-  if (fcntl (g->daemon_sock, F_SETFL, O_NONBLOCK) == -1) {
-    perrorf (g, "fcntl");
+  g->conn = guestfs___new_conn_socket_connected (g, daemon_sock, -1);
+  if (!g->conn)
     goto cleanup;
-  }
+
+  /* g->conn now owns this socket. */
+  daemon_sock = -1;
 
   r = guestfs___recv_from_daemon (g, &size, &buf);
   free (buf);
 
-  if (r == -1) return -1;
+  if (r == -1) goto cleanup;
 
   if (size != GUESTFS_LAUNCH_FLAG) {
     error (g, _("guestfs_launch failed, unexpected initial message from guestfsd"));
@@ -97,8 +94,12 @@ launch_unix (guestfs_h *g, const char *sockpath)
   return 0;
 
  cleanup:
-  close (g->daemon_sock);
-  g->daemon_sock = -1;
+  if (daemon_sock >= 0)
+    close (daemon_sock);
+  if (g->conn) {
+    g->conn->ops->free_connection (g, g->conn);
+    g->conn = NULL;
+  }
   return -1;
 }
 
diff --git a/src/launch.c b/src/launch.c
index 7c37667..0090b28 100644
--- a/src/launch.c
+++ b/src/launch.c
@@ -698,8 +698,23 @@ guestfs___launch_failed_error (guestfs_h *g)
     error (g, _("guestfs_launch failed, see earlier error messages"));
   else
     error (g, _("guestfs_launch failed.\n"
+                "This usually means the libguestfs appliance failed to start or crashed.\n"
                 "See http://libguestfs.org/guestfs-faq.1.html#debugging-libguestfs\n"
-                "and/or run 'libguestfs-test-tool'."));
+                "or run 'libguestfs-test-tool' and post the *complete* output into a\n"
+                "bug report or message to the libguestfs mailing list."));
+}
+
+/* As above, but for crashes that occur after launch. */
+void
+guestfs___unexpected_close_error (guestfs_h *g)
+{
+  if (g->verbose)
+    error (g, _("appliance closed the connection unexpectedly, see earlier error messages"));
+  else
+    error (g, _("appliance closed the connection unexpectedly.\n"
+                "This usually means the libguestfs appliance crashed.\n"
+                "See http://libguestfs.org/guestfs-faq.1.html#debugging-libguestfs\n"
+                "for information about how to debug libguestfs and report bugs."));
 }
 
 int
diff --git a/src/proto.c b/src/proto.c
index 45ad4fd..7d0a0d3 100644
--- a/src/proto.c
+++ b/src/proto.c
@@ -30,10 +30,7 @@
 #include <time.h>
 #include <errno.h>
 #include <sys/stat.h>
-#include <sys/select.h>
-#include <sys/socket.h>
 #include <sys/types.h>
-#include <sys/un.h>
 #include <sys/wait.h>
 #include <assert.h>
 
@@ -86,12 +83,10 @@
  * this in the current API, but they would be implemented as a
  * combination of cases (3) and (4).
  *
- * During all writes and reads, we also select(2) on qemu stdout
- * looking for messages (guestfsd stderr and guest kernel dmesg), and
- * anything received is passed up through the log_message_cb.  This is
- * also the reason why all the sockets are non-blocking.  We also have
- * to check for EOF (qemu died).  All of this is handled by the
- * functions send_to_daemon and recv_from_daemon.
+ * All read/write/etc operations are performed using the current
+ * connection module (g->conn).  During operations the connection
+ * module transparently handles log messages that appear on the
+ * console.
  */
 
 /* This is called if we detect EOF, ie. qemu died. */
@@ -101,125 +96,16 @@ child_cleanup (guestfs_h *g)
   debug (g, "child_cleanup: %p: child process died", g);
 
   g->attach_ops->shutdown (g, 0);
-  if (g->console_sock >= 0) close (g->console_sock);
-  close (g->daemon_sock);
-  g->console_sock = -1;
-  g->daemon_sock = -1;
+  if (g->conn) {
+    g->conn->ops->free_connection (g, g->conn);
+    g->conn = NULL;
+  }
   memset (&g->launch_t, 0, sizeof g->launch_t);
   guestfs___free_drives (g);
   g->state = CONFIG;
   guestfs___call_callbacks_void (g, GUESTFS_EVENT_SUBPROCESS_QUIT);
 }
 
-static int
-read_log_message_or_eof (guestfs_h *g, int fd, int error_if_eof)
-{
-  char buf[BUFSIZ];
-  ssize_t n;
-
-  /* QEMU's console emulates a 16550A serial port.  The real 16550A
-   * device has a small FIFO buffer (16 bytes) which means here we see
-   * lots of small reads of 1-16 bytes in length, usually single
-   * bytes.  Sleeping here for a very brief period groups reads
-   * together (so we usually get a few lines of output at once) and
-   * improves overall throughput, as well as making the event
-   * interface a bit more sane for callers.  With a virtio-serial
-   * based console (not yet implemented) we may be able to remove
-   * this.  XXX
-   */
-  usleep (1000);
-
-  n = read (fd, buf, sizeof buf);
-  if (n == 0) {
-    /* Hopefully this indicates the qemu child process has died. */
-    child_cleanup (g);
-
-    if (error_if_eof) {
-      /* We weren't expecting eof here (called from launch) so place
-       * something in the error buffer.  RHBZ#588851.
-       */
-      error (g, "child process died unexpectedly");
-    }
-    return -1;
-  }
-
-  if (n == -1) {
-    if (errno == EINTR || errno == EAGAIN)
-      return 0;
-
-    perrorf (g, "read");
-    return -1;
-  }
-
-  /* It's an actual log message, send it upwards if anyone is listening. */
-  guestfs___call_callbacks_message (g, GUESTFS_EVENT_APPLIANCE, buf, n);
-
-  /* This is a gross hack.  See the comment above
-   * guestfs___launch_send_progress.
-   */
-  if (g->state == LAUNCHING) {
-    const char *sentinel;
-    size_t len;
-
-    sentinel = "Linux version"; /* kernel up */
-    len = strlen (sentinel);
-    if (memmem (buf, n, sentinel, len) != NULL)
-      guestfs___launch_send_progress (g, 6);
-
-    sentinel = "Starting /init script"; /* /init running */
-    len = strlen (sentinel);
-    if (memmem (buf, n, sentinel, len) != NULL)
-      guestfs___launch_send_progress (g, 9);
-  }
-
-  return 0;
-}
-
-/* Read 'n' bytes, setting the socket to blocking temporarily so
- * that we really read the number of bytes requested.
- * Returns:  0 == EOF while reading
- *          -1 == error, error() function has been called
- *           n == read 'n' bytes in full
- */
-static ssize_t
-really_read_from_socket (guestfs_h *g, int sock, char *buf, size_t n)
-{
-  long flags;
-  ssize_t r;
-  size_t got;
-
-  /* Set socket to blocking. */
-  flags = fcntl (sock, F_GETFL);
-  if (flags == -1) {
-    perrorf (g, "fcntl");
-    return -1;
-  }
-  if (fcntl (sock, F_SETFL, flags & ~O_NONBLOCK) == -1) {
-    perrorf (g, "fcntl");
-    return -1;
-  }
-
-  got = 0;
-  while (got < n) {
-    r = read (sock, &buf[got], n-got);
-    if (r == -1) {
-      perrorf (g, "read");
-      return -1;
-    }
-    if (r == 0)
-      return 0; /* EOF */
-    got += r;
-  }
-
-  /* Restore original socket flags. */
-  if (fcntl (sock, F_SETFL, flags) == -1) {
-    perrorf (g, "fcntl");
-    return -1;
-  }
-
-  return (ssize_t) got;
-}
-
 /* Convenient wrapper to generate a progress message callback. */
 void
 guestfs___progress_message_callback (guestfs_h *g,
@@ -236,22 +122,32 @@ guestfs___progress_message_callback (guestfs_h *g,
                                   array, sizeof array / sizeof array[0]);
 }
 
-static int
-check_for_daemon_cancellation_or_eof (guestfs_h *g, int fd)
+/* Before writing to the daemon socket, check the read side of the
+ * daemon socket for any of these conditions:
+ *
+ *   - error:                       return -1
+ *   - daemon cancellation message: return -2
+ *   - progress message:            handle it here
+ *   - end of input / appliance exited unexpectedly: return 0
+ *   - anything else:               return 1
+ */
+static ssize_t
+check_daemon_socket (guestfs_h *g)
 {
   char buf[4];
   ssize_t n;
   uint32_t flag;
   XDR xdr;
 
-  n = really_read_from_socket (g, fd, buf, 4);
-  if (n == -1)
-    return -1;
-  if (n == 0) {
-    /* Hopefully this indicates the qemu child process has died. */
-    child_cleanup (g);
-    return -1;
-  }
+  assert (g->conn); /* callers must check this */
+
+ again:
+  if (! g->conn->ops->can_read_data (g, g->conn))
+    return 1;
+
+  n = g->conn->ops->read_data (g, g->conn, buf, 4);
+  if (n <= 0) /* 0 or -1 */
+    return n;
 
   xdrmem_create (&xdr, buf, 4, XDR_DECODE);
   xdr_uint32_t (&xdr, &flag);
@@ -262,13 +158,9 @@ check_for_daemon_cancellation_or_eof (guestfs_h *g, int fd)
     char buf[PROGRESS_MESSAGE_SIZE];
     guestfs_progress message;
 
-    n = really_read_from_socket (g, fd, buf, PROGRESS_MESSAGE_SIZE);
-    if (n == -1)
-      return -1;
-    if (n == 0) {
-      child_cleanup (g);
-      return -1;
-    }
+    n = g->conn->ops->read_data (g, g->conn, buf, PROGRESS_MESSAGE_SIZE);
+    if (n <= 0) /* 0 or -1 */
+      return n;
 
     xdrmem_create (&xdr, buf, PROGRESS_MESSAGE_SIZE, XDR_DECODE);
     xdr_guestfs_progress (&xdr, &message);
@@ -276,11 +168,11 @@ check_for_daemon_cancellation_or_eof (guestfs_h *g, int fd)
 
     guestfs___progress_message_callback (g, &message);
 
-    return 0;
+    goto again;
   }
 
   if (flag != GUESTFS_CANCEL_FLAG) {
-    error (g, _("check_for_daemon_cancellation_or_eof: read 0x%x from daemon, expected 0x%x\n"),
+    error (g, _("check_daemon_socket: read 0x%x from daemon, expected 0x%x.  Lost protocol synchronization (bad!)\n"),
            flag, GUESTFS_CANCEL_FLAG);
     return -1;
   }
@@ -288,417 +180,6 @@ check_for_daemon_cancellation_or_eof (guestfs_h *g, int fd)
   return -2;
 }
 
-/* This writes the whole N bytes of BUF to the daemon socket.
- *
- * If the whole write is successful, it returns 0.
- * If there was an error, it returns -1.
- * If the daemon sent a cancellation message, it returns -2.
- *
- * It also checks qemu stdout for log messages and passes those up
- * through log_message_cb.
- *
- * It also checks for EOF (qemu died) and passes that up through the
- * child_cleanup function above.
- */
-static int
-send_to_daemon (guestfs_h *g, const void *v_buf, size_t n)
-{
-  const char *buf = v_buf;
-  fd_set rset, rset2;
-  fd_set wset, wset2;
-
-  FD_ZERO (&rset);
-  FD_ZERO (&wset);
-
-  if (g->console_sock >= 0) /* Read qemu stdout for log messages & EOF. */
-    FD_SET (g->console_sock, &rset);
-  FD_SET (g->daemon_sock, &rset); /* Read socket for cancellation & EOF. */
-  FD_SET (g->daemon_sock, &wset); /* Write to socket to send the data. */
-
-  int max_fd = MAX (g->daemon_sock, g->console_sock);
-
-  while (n > 0) {
-    rset2 = rset;
-    wset2 = wset;
-    int r = select (max_fd+1, &rset2, &wset2, NULL, NULL);
-    if (r == -1) {
-      if (errno == EINTR || errno == EAGAIN)
-        continue;
-      perrorf (g, "select");
-      return -1;
-    }
-
-    if (g->console_sock >= 0 && FD_ISSET (g->console_sock, &rset2)) {
-      if (read_log_message_or_eof (g, g->console_sock, 0) == -1)
-        return -1;
-    }
-    if (FD_ISSET (g->daemon_sock, &rset2)) {
-      r = check_for_daemon_cancellation_or_eof (g, g->daemon_sock);
-      if (r == -1)
-	return r;
-      if (r == -2) {
-	/* Daemon sent cancel message.  But to maintain
-	 * synchronization we must write out the remainder of the
-	 * write buffer before we return (RHBZ#576879).
-	 */
-	if (xwrite (g->daemon_sock, buf, n) == -1) {
-	  perrorf (g, "write");
-	  return -1;
-	}
-	return -2; /* cancelled */
-      }
-    }
-    if (FD_ISSET (g->daemon_sock, &wset2)) {
-      r = write (g->daemon_sock, buf, n);
-      if (r == -1) {
-        if (errno == EINTR || errno == EAGAIN)
-          continue;
-        perrorf (g, "write");
-        if (errno == EPIPE) /* Disconnected from guest (RHBZ#508713). */
-          child_cleanup (g);
-        return -1;
-      }
-      buf += r;
-      n -= r;
-    }
-  }
-
-  return 0;
-}
-
-/* This reads a single message, file chunk, launch flag or
- * cancellation flag from the daemon.  If something was read, it
- * returns 0, otherwise -1.
- *
- * Both size_rtn and buf_rtn must be passed by the caller as non-NULL.
- *
- * *size_rtn returns the size of the returned message or it may be
- * GUESTFS_LAUNCH_FLAG or GUESTFS_CANCEL_FLAG.
- *
- * *buf_rtn is returned containing the message (if any) or will be set
- * to NULL.  *buf_rtn must be freed by the caller.
- *
- * It also checks qemu stdout for log messages and passes those up
- * through log_message_cb.
- *
- * It also checks for EOF (qemu died) and passes that up through the
- * child_cleanup function above.
- *
- * Progress notifications are handled transparently by this function.
- * If the callback exists, it is called.  The caller of this function
- * will not see GUESTFS_PROGRESS_FLAG.
- */
-
-static inline void
-unexpected_end_of_file_from_daemon_error (guestfs_h *g)
-{
-#define UNEXPEOF_ERROR "unexpected end of file when reading from daemon.\n"
-#define UNEXPEOF_TEST_TOOL \
-  "Or you can run 'libguestfs-test-tool' and post the complete output into\n" \
-  "a bug report or message to the libguestfs mailing list."
-  if (!g->verbose)
-    error (g, _(UNEXPEOF_ERROR
-"This usually means the libguestfs appliance failed to start up.  Please\n"
-"enable debugging (LIBGUESTFS_DEBUG=1) and rerun the command, then look at\n"
-"the debug messages output prior to this error.\n"
-UNEXPEOF_TEST_TOOL));
-  else
-    error (g, _(UNEXPEOF_ERROR
-"See earlier debug messages.\n"
-UNEXPEOF_TEST_TOOL));
-}
-
-static inline void
-unexpected_closed_connection_from_daemon_error (guestfs_h *g)
-{
-#define UNEXPCLO_ERROR "connection to daemon was closed unexpectedly.\n"
-  if (!g->verbose)
-    error (g, _(UNEXPCLO_ERROR
-"This usually means the libguestfs appliance crashed.  Please enable\n"
-"debugging (LIBGUESTFS_DEBUG=1) and rerun the command, then look at the\n"
-"debug messages output prior to this error."));
-  else
-    error (g, _(UNEXPCLO_ERROR
-"See earlier debug messages."));
-}
-
-static int
-recv_from_daemon (guestfs_h *g, uint32_t *size_rtn, void **buf_rtn)
-{
-  fd_set rset, rset2;
-  int max_fd;
-  char lenbuf[4];
-  ssize_t nr;
-
-  /* RHBZ#914931: Along some (rare) paths, we might have closed the
-   * socket connection just before this function is called, so just
-   * return an error if this happens.
-   */
-  if (g->daemon_sock == -1) {
-    unexpected_closed_connection_from_daemon_error (g);
-    return -1;
-  }
-
-  FD_ZERO (&rset);
-
-  if (g->console_sock >= 0) /* Read qemu stdout for log messages & EOF. */
-    FD_SET (g->console_sock, &rset);
-  FD_SET (g->daemon_sock, &rset); /* Read socket for data & EOF. */
-
-  max_fd = MAX (g->daemon_sock, g->console_sock);
-
-  *size_rtn = 0;
-  *buf_rtn = NULL;
-
-  /* nr is the size of the message, but we prime it as -4 because we
-   * have to read the message length word first.
-   */
-  nr = -4;
-
-  for (;;) {
-    ssize_t message_size =
-      *size_rtn != GUESTFS_PROGRESS_FLAG ?
-      *size_rtn : PROGRESS_MESSAGE_SIZE;
-    if (nr >= message_size)
-      break;
-
-    rset2 = rset;
-    int r = select (max_fd+1, &rset2, NULL, NULL, NULL);
-    if (r == -1) {
-      if (errno == EINTR || errno == EAGAIN)
-        continue;
-      perrorf (g, "select");
-      free (*buf_rtn);
-      *buf_rtn = NULL;
-      return -1;
-    }
-
-    if (g->console_sock >= 0 && FD_ISSET (g->console_sock, &rset2)) {
-      if (read_log_message_or_eof (g, g->console_sock, 0) == -1) {
-        free (*buf_rtn);
-        *buf_rtn = NULL;
-        return -1;
-      }
-    }
-    if (FD_ISSET (g->daemon_sock, &rset2)) {
-      if (nr < 0) {    /* Have we read the message length word yet? */
-        r = read (g->daemon_sock, lenbuf+nr+4, -nr);
-        if (r == -1) {
-          if (errno == EINTR || errno == EAGAIN)
-            continue;
-          int err = errno;
-          perrorf (g, "read");
-          /* Under some circumstances we see "Connection reset by peer"
-           * here when the child dies suddenly.  Catch this and call
-           * the cleanup function, same as for EOF.
-           */
-          if (err == ECONNRESET)
-            child_cleanup (g);
-          return -1;
-        }
-        if (r == 0) {
-          unexpected_end_of_file_from_daemon_error (g);
-          child_cleanup (g);
-          return -1;
-        }
-        nr += r;
-
-        if (nr < 0)         /* Still not got the whole length word. */
-          continue;
-
-        XDR xdr;
-        xdrmem_create (&xdr, lenbuf, 4, XDR_DECODE);
-        xdr_uint32_t (&xdr, size_rtn);
-        xdr_destroy (&xdr);
-
-        /* *size_rtn changed, recalculate message_size */
-        message_size =
-          *size_rtn != GUESTFS_PROGRESS_FLAG ?
-          *size_rtn : PROGRESS_MESSAGE_SIZE;
-
-        if (*size_rtn == GUESTFS_LAUNCH_FLAG) {
-          if (g->state != LAUNCHING)
-            error (g, _("received magic signature from guestfsd, but in state %d"),
-                   g->state);
-          else {
-            g->state = READY;
-            guestfs___call_callbacks_void (g, GUESTFS_EVENT_LAUNCH_DONE);
-          }
-          debug (g, "recv_from_daemon: received GUESTFS_LAUNCH_FLAG");
-          return 0;
-        }
-        else if (*size_rtn == GUESTFS_CANCEL_FLAG) {
-          debug (g, "recv_from_daemon: received GUESTFS_CANCEL_FLAG");
-          return 0;
-        }
-        else if (*size_rtn == GUESTFS_PROGRESS_FLAG)
-          /*FALLTHROUGH*/;
-        /* If this happens, it's pretty bad and we've probably lost
-         * synchronization.
-         */
-        else if (*size_rtn > GUESTFS_MESSAGE_MAX) {
-          error (g, _("message length (%u) > maximum possible size (%d)"),
-                 (unsigned) *size_rtn, GUESTFS_MESSAGE_MAX);
-          return -1;
-        }
-
-        /* Allocate the complete buffer, size now known. */
-        *buf_rtn = safe_malloc (g, message_size);
-        /*FALLTHROUGH*/
-      }
-
-      size_t sizetoread = message_size - nr;
-      if (sizetoread > BUFSIZ) sizetoread = BUFSIZ;
-
-      r = read (g->daemon_sock, (char *) (*buf_rtn) + nr, sizetoread);
-      if (r == -1) {
-        if (errno == EINTR || errno == EAGAIN)
-          continue;
-        perrorf (g, "read");
-        free (*buf_rtn);
-        *buf_rtn = NULL;
-        return -1;
-      }
-      if (r == 0) {
-        unexpected_end_of_file_from_daemon_error (g);
-        child_cleanup (g);
-        free (*buf_rtn);
-        *buf_rtn = NULL;
-        return -1;
-      }
-      nr += r;
-    }
-  }
-
-  /* Got the full message, caller can start processing it. */
-#ifdef ENABLE_PACKET_DUMP
-  if (g->verbose) {
-    ssize_t i, j;
-
-    for (i = 0; i < nr; i += 16) {
-      printf ("%04zx: ", i);
-      for (j = i; j < MIN (i+16, nr); ++j)
-        printf ("%02x ", (*(unsigned char **)buf_rtn)[j]);
-      for (; j < i+16; ++j)
-        printf ("   ");
-      printf ("|");
-      for (j = i; j < MIN (i+16, nr); ++j)
-        if (c_isprint ((*(char **)buf_rtn)[j]))
-          printf ("%c", (*(char **)buf_rtn)[j]);
-        else
-          printf (".");
-      for (; j < i+16; ++j)
-        printf (" ");
-      printf ("|\n");
-    }
-  }
-#endif
-
-  return 0;
-}
-
-int
-guestfs___recv_from_daemon (guestfs_h *g, uint32_t *size_rtn, void **buf_rtn)
-{
-  int r;
-
- again:
-  r = recv_from_daemon (g, size_rtn, buf_rtn);
-  if (r == -1)
-    return -1;
-
-  if (*size_rtn == GUESTFS_PROGRESS_FLAG) {
-    guestfs_progress message;
-    XDR xdr;
-
-    xdrmem_create (&xdr, *buf_rtn, PROGRESS_MESSAGE_SIZE, XDR_DECODE);
-    xdr_guestfs_progress (&xdr, &message);
-    xdr_destroy (&xdr);
-
-    guestfs___progress_message_callback (g, &message);
-
-    free (*buf_rtn);
-    *buf_rtn = NULL;
-
-    /* Process next message. */
-    goto again;
-  }
-
-  if (*size_rtn == GUESTFS_LAUNCH_FLAG || *size_rtn == GUESTFS_CANCEL_FLAG)
-    return 0;
-
-  /* ... it's a normal message (not progress/launch/cancel) so display
-   * it if we're debugging.
-   */
-  assert (*buf_rtn != NULL);
-
-  return 0;
-}
-
-/* This is very much like recv_from_daemon above, but g->daemon_sock is
- * a listening socket and we are accepting a new connection on
- * that socket instead of reading anything.  Returns the newly
- * accepted socket.
- */
-int
-guestfs___accept_from_daemon (guestfs_h *g)
-{
-  fd_set rset, rset2;
-
-  debug (g, "accept_from_daemon: %p g->state = %d", g, g->state);
-
-  FD_ZERO (&rset);
-
-  if (g->console_sock >= 0) /* Read qemu stdout for log messages & EOF. */
-    FD_SET (g->console_sock, &rset);
-  FD_SET (g->daemon_sock, &rset); /* Read socket for accept. */
-
-  int max_fd = MAX (g->daemon_sock, g->console_sock);
-  int sock = -1;
-
-  while (sock == -1) {
-#if 0
-    /* RWMJ: Temporarily disable this.  It *should* happen that the
-     * zombie is cleaned up now on the usual close path, but we
-     * might need to reenable this if that is not the case.
-     * 2012-07-20.
-     */
-    /* If the qemu process has died, clean up the zombie (RHBZ#579155).
-     * By partially polling in the select below we ensure that this
-     * function will be called eventually.
-     */
-    waitpid (g->pid, NULL, WNOHANG);
-#endif
-
-    rset2 = rset;
-
-    struct timeval tv = { .tv_sec = 1, .tv_usec = 0 };
-    int r = select (max_fd+1, &rset2, NULL, NULL, &tv);
-    if (r == -1) {
-      if (errno == EINTR || errno == EAGAIN)
-        continue;
-      perrorf (g, "select");
-      return -1;
-    }
-
-    if (g->console_sock >= 0 && FD_ISSET (g->console_sock, &rset2)) {
-      if (read_log_message_or_eof (g, g->console_sock, 1) == -1)
-        return -1;
-    }
-    if (FD_ISSET (g->daemon_sock, &rset2)) {
-      sock = accept4 (g->daemon_sock, NULL, NULL, SOCK_CLOEXEC);
-      if (sock == -1) {
-        if (errno == EINTR || errno == EAGAIN)
-          continue;
-        perrorf (g, "accept");
-        return -1;
-      }
-    }
-  }
-
-  return sock;
-}
-
 int
 guestfs___send (guestfs_h *g, int proc_nr,
                 uint64_t progress_hint, uint64_t optargs_bitmask,
@@ -708,10 +189,15 @@ guestfs___send (guestfs_h *g, int proc_nr,
   XDR xdr;
   u_int32_t len;
   int serial = g->msg_next_serial++;
-  int r;
+  ssize_t r;
   CLEANUP_FREE char *msg_out = NULL;
   size_t msg_out_size;
 
+  if (!g->conn) {
+    guestfs___unexpected_close_error (g);
+    return -1;
+  }
+
   /* We have to allocate this message buffer on the heap because
    * it is quite large (although will be mostly unused).  We
    * can't allocate it on the stack because in some environments
@@ -758,12 +244,28 @@ guestfs___send (guestfs_h *g, int proc_nr,
   xdrmem_create (&xdr, msg_out, 4, XDR_ENCODE);
   xdr_uint32_t (&xdr, &len);
 
- again:
-  r = send_to_daemon (g, msg_out, msg_out_size);
-  if (r == -2)                  /* Ignore stray daemon cancellations. */
-    goto again;
+  /* Look for stray daemon cancellation messages from earlier calls
+   * and ignore them.
+   */
+  r = check_daemon_socket (g);
+  /* r == -2 (cancellation) is ignored */
+  if (r == -1)
+    return -1;
+  if (r == 0) {
+    guestfs___unexpected_close_error (g);
+    child_cleanup (g);
+    return -1;
+  }
+
+  /* Send the message. */
+  r = g->conn->ops->write_data (g, g->conn, msg_out, msg_out_size);
   if (r == -1)
     return -1;
+  if (r == 0) {
+    guestfs___unexpected_close_error (g);
+    child_cleanup (g);
+    return -1;
+  }
 
   return serial;
 }
@@ -877,7 +379,7 @@ static int
 send_file_chunk (guestfs_h *g, int cancel, const char *buf, size_t buflen)
 {
   u_int32_t len;
-  int r;
+  ssize_t r;
   guestfs_chunk chunk;
   XDR xdr;
   CLEANUP_FREE char *msg_out = NULL;
@@ -911,17 +413,195 @@ send_file_chunk (guestfs_h *g, int cancel, const char *buf, size_t buflen)
   xdrmem_create (&xdr, msg_out, 4, XDR_ENCODE);
   xdr_uint32_t (&xdr, &len);
 
-  r = send_to_daemon (g, msg_out, msg_out_size);
-
   /* Did the daemon send a cancellation message? */
+  r = check_daemon_socket (g);
   if (r == -2) {
     debug (g, "got daemon cancellation");
     return -2;
   }
+  if (r == -1)
+    return -1;
+  if (r == 0) {
+    guestfs___unexpected_close_error (g);
+    child_cleanup (g);
+    return -1;
+  }
+
+  /* Send the chunk. */
+  r = g->conn->ops->write_data (g, g->conn, msg_out, msg_out_size);
+  if (r == -1)
+    return -1;
+  if (r == 0) {
+    guestfs___unexpected_close_error (g);
+    child_cleanup (g);
+    return -1;
+  }
+
+  return 0;
+}
+
+/* guestfs___recv_from_daemon: This reads a single message, file
+ * chunk, launch flag or cancellation flag from the daemon.  If
+ * something was read, it returns 0, otherwise -1.
+ *
+ * Both size_rtn and buf_rtn must be passed by the caller as non-NULL.
+ *
+ * *size_rtn returns the size of the returned message or it may be
+ * GUESTFS_LAUNCH_FLAG or GUESTFS_CANCEL_FLAG.
+ *
+ * *buf_rtn is returned containing the message (if any) or will be set
+ * to NULL.  *buf_rtn must be freed by the caller.
+ *
+ * This checks for EOF (appliance died) and passes that up through the
+ * child_cleanup function above.
+ *
+ * Log message, progress messages are handled transparently here.
+ */
+
+static int
+recv_from_daemon (guestfs_h *g, uint32_t *size_rtn, void **buf_rtn)
+{
+  char lenbuf[4];
+  ssize_t n;
+  XDR xdr;
+  size_t message_size;
+
+  *size_rtn = 0;
+  *buf_rtn = NULL;
+
+  /* RHBZ#914931: Along some (rare) paths, we might have closed the
+   * socket connection just before this function is called, so just
+   * return an error if this happens.
+   */
+  if (!g->conn) {
+    guestfs___unexpected_close_error (g);
+    return -1;
+  }
+
+  /* Read the 4 byte size / flag. */
+  n = g->conn->ops->read_data (g, g->conn, lenbuf, 4);
+  if (n == -1)
+    return -1;
+  if (n == 0) {
+    guestfs___unexpected_close_error (g);
+    child_cleanup (g);
+    return -1;
+  }
+
+  xdrmem_create (&xdr, lenbuf, 4, XDR_DECODE);
+  xdr_uint32_t (&xdr, size_rtn);
+  xdr_destroy (&xdr);
+
+  if (*size_rtn == GUESTFS_LAUNCH_FLAG) {
+    if (g->state != LAUNCHING)
+      error (g, _("received magic signature from guestfsd, but in state %d"),
+             g->state);
+    else {
+      g->state = READY;
+      guestfs___call_callbacks_void (g, GUESTFS_EVENT_LAUNCH_DONE);
+    }
+    debug (g, "recv_from_daemon: received GUESTFS_LAUNCH_FLAG");
+    return 0;
+  }
+  else if (*size_rtn == GUESTFS_CANCEL_FLAG) {
+    debug (g, "recv_from_daemon: received GUESTFS_CANCEL_FLAG");
+    return 0;
+  }
+  else if (*size_rtn == GUESTFS_PROGRESS_FLAG)
+    /*FALLTHROUGH*/;
+  else if (*size_rtn > GUESTFS_MESSAGE_MAX) {
+    /* If this happens, it's pretty bad and we've probably lost
+     * synchronization.
+     */
+    error (g, _("message length (%u) > maximum possible size (%d)"),
+           (unsigned) *size_rtn, GUESTFS_MESSAGE_MAX);
+    return -1;
+  }
+
+  /* Calculate the message size. */
+  message_size =
+    *size_rtn != GUESTFS_PROGRESS_FLAG ? *size_rtn : PROGRESS_MESSAGE_SIZE;
+
+  /* Allocate the complete buffer, size now known. */
+  *buf_rtn = safe_malloc (g, message_size);
+
+  /* Read the message. */
+  n = g->conn->ops->read_data (g, g->conn, *buf_rtn, message_size);
+  if (n == -1) {
+    free (*buf_rtn);
+    *buf_rtn = NULL;
+    return -1;
+  }
+  if (n == 0) {
+    guestfs___unexpected_close_error (g);
+    child_cleanup (g);
+    free (*buf_rtn);
+    *buf_rtn = NULL;
+    return -1;
+  }
+
+  /* ... it's a normal message (not progress/launch/cancel) so display
+   * it if we're debugging.
+   */
+#ifdef ENABLE_PACKET_DUMP
+  if (g->verbose) {
+    ssize_t i, j;
+
+    for (i = 0; i < nr; i += 16) {
+      printf ("%04zx: ", i);
+      for (j = i; j < MIN (i+16, nr); ++j)
+        printf ("%02x ", (*(unsigned char **)buf_rtn)[j]);
+      for (; j < i+16; ++j)
+        printf ("   ");
+      printf ("|");
+      for (j = i; j < MIN (i+16, nr); ++j)
+        if (c_isprint ((*(char **)buf_rtn)[j]))
+          printf ("%c", (*(char **)buf_rtn)[j]);
+        else
+          printf (".");
+      for (; j < i+16; ++j)
+        printf (" ");
+      printf ("|\n");
+    }
+  }
+#endif
+
+  return 0;
+}
+
+int
+guestfs___recv_from_daemon (guestfs_h *g, uint32_t *size_rtn, void **buf_rtn)
+{
+  int r;
 
+ again:
+  r = recv_from_daemon (g, size_rtn, buf_rtn);
   if (r == -1)
     return -1;
 
+  if (*size_rtn == GUESTFS_PROGRESS_FLAG) {
+    guestfs_progress message;
+    XDR xdr;
+
+    xdrmem_create (&xdr, *buf_rtn, PROGRESS_MESSAGE_SIZE, XDR_DECODE);
+    xdr_guestfs_progress (&xdr, &message);
+    xdr_destroy (&xdr);
+
+    guestfs___progress_message_callback (g, &message);
+
+    free (*buf_rtn);
+    *buf_rtn = NULL;
+
+    /* Process next message. */
+    goto again;
+  }
+
+  if (*size_rtn == GUESTFS_LAUNCH_FLAG || *size_rtn == GUESTFS_CANCEL_FLAG)
+    return 0;
+
+  /* Got the full message, caller can start processing it. */
+  assert (*buf_rtn != NULL);
+
   return 0;
 }
 
@@ -1014,9 +694,27 @@ guestfs___recv_discard (guestfs_h *g, const char *fn)
 
 /* Receive a file. */
 
-/* Returns -1 = error, 0 = EOF, > 0 = more data */
+static int
+xwrite (int fd, const void *v_buf, size_t len)
+{
+  const char *buf = v_buf;
+  int r;
+
+  while (len > 0) {
+    r = write (fd, buf, len);
+    if (r == -1)
+      return -1;
+
+    buf += r;
+    len -= r;
+  }
+
+  return 0;
+}
+
 static ssize_t receive_file_data (guestfs_h *g, void **buf);
 
+/* Returns -1 = error, 0 = EOF, > 0 = more data */
 int
 guestfs___recv_file (guestfs_h *g, const char *filename)
 {
@@ -1077,7 +775,7 @@ guestfs___recv_file (guestfs_h *g, const char *filename)
   xdr_uint32_t (&xdr, &flag);
   xdr_destroy (&xdr);
 
-  if (xwrite (g->daemon_sock, fbuf, sizeof fbuf) == -1) {
+  if (g->conn->ops->write_data (g, g->conn, fbuf, sizeof fbuf) == -1) {
     perrorf (g, _("write to daemon socket"));
     return -1;
   }
diff --git a/tests/regressions/rhbz790721.c b/tests/regressions/rhbz790721.c
index a1661ce..71ea105 100644
--- a/tests/regressions/rhbz790721.c
+++ b/tests/regressions/rhbz790721.c
@@ -176,11 +176,12 @@ start_thread (void *vi)
     pthread_exit (vi);
   }
 
-  /* If this happens, it indicates a bug/race in the appliance
-   * building code which is what this regression test is designed to
-   * spot.
+  /* The error message should match the one printed by
+   * guestfs___launch_failed_error.  If not, it indicates a bug/race
+   * in the appliance building code which is what this regression test
+   * is designed to spot.
    */
-  if (STRNEQ (error, "child process died unexpectedly")) {
+  if (strstr (error, "guestfs_launch failed") == NULL) {
     fprintf (stderr, "rhbz790721: [thread %d]: error: %s\n", thread_id, error);
     *(int *)vi = -1;
     pthread_exit (vi);
-- 
1.8.1.4




More information about the Libguestfs mailing list