[Cluster-devel] cluster/rgmanager/src clulib/fdops.c clulib/lo ...

lhh at sourceware.org lhh at sourceware.org
Tue Jun 13 19:22:39 UTC 2006


CVSROOT:	/cvs/cluster
Module name:	cluster
Changes by:	lhh at sourceware.org	2006-06-13 19:22:38

Added files:
	rgmanager/src/clulib: fdops.c lock.c lockspace.c members.c 
	                      message.c 
Removed files:
	rgmanager/src/daemons: members.c 

Log message:
	Include missing .c files in src/clulib; remove defunct src/daemons/members.c

Patches:
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/rgmanager/src/clulib/fdops.c.diff?cvsroot=cluster&r1=NONE&r2=1.1
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/rgmanager/src/clulib/lock.c.diff?cvsroot=cluster&r1=NONE&r2=1.1
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/rgmanager/src/clulib/lockspace.c.diff?cvsroot=cluster&r1=NONE&r2=1.1
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/rgmanager/src/clulib/members.c.diff?cvsroot=cluster&r1=NONE&r2=1.1
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/rgmanager/src/clulib/message.c.diff?cvsroot=cluster&r1=NONE&r2=1.1
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/rgmanager/src/daemons/members.c.diff?cvsroot=cluster&r1=1.4&r2=NONE

/cvs/cluster/cluster/rgmanager/src/clulib/fdops.c,v  -->  standard output
revision 1.1
--- cluster/rgmanager/src/clulib/fdops.c
+++ -	2006-06-13 19:22:38.586032000 +0000
@@ -0,0 +1,193 @@
+/*
+  Copyright Red Hat, Inc. 2002-2003
+
+  This program is free software; you can redistribute it and/or modify it
+  under the terms of the GNU General Public License as published by the
+  Free Software Foundation; either version 2, or (at your option) any
+  later version.
+
+  This program is distributed in the hope that it will be useful, but
+  WITHOUT ANY WARRANTY; without even the implied warranty of
+  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+  General Public License for more details.
+
+  You should have received a copy of the GNU General Public License
+  along with this program; see the file COPYING.  If not, write to the
+  Free Software Foundation, Inc.,  675 Mass Ave, Cambridge,
+  MA 02139, USA.
+*/
+/** @file
+ * Wrapper functions around read/write/select to retry in the event
+ * of interrupts.
+ */
+#include <unistd.h>
+#include <sys/types.h>
+#include <fcntl.h>
+#include <errno.h>
+
+/**
+ * This is a wrapper around select which will retry in the case we receive
+ * EINTR.  This is necessary for _read_retry, since it wouldn't make sense
+ * to have _read_retry terminate if and only if two EINTRs were received
+ * in a row - one during the read() call, one during the select call...
+ *
+ * See select(2) for description of parameters.
+ */
+int
+_select_retry(int fdmax, fd_set * rfds, fd_set * wfds, fd_set * xfds,
+	       struct timeval *timeout)
+{
+	int rv;
+
+	while (1) {
+		rv = select(fdmax, rfds, wfds, xfds, timeout);
+		if ((rv == -1) && (errno == EINTR))
+			/* return on EBADF/EINVAL/ENOMEM; continue on EINTR */
+			continue;
+		return rv;
+	}
+}
+
+/**
+ * Retries a write in the event of a non-blocked interrupt signal.
+ *
+ * @param fd		File descriptor to which we are writing.
+ * @param buf		Data buffer to send.
+ * @param count		Number of bytes in buf to send.
+ * @param timeout	(struct timeval) telling us how long we should retry.
+ * @return		The number of bytes written to the file descriptor,
+ * 			or -1 on error (with errno set appropriately).
+ */
+ssize_t
+_write_retry(int fd, void *buf, int count, struct timeval * timeout)
+{
+	int n, total = 0, remain = count, rv = 0;
+	fd_set wfds, xfds;
+
+	while (total < count) {
+
+		/* Create the write FD set of 1... */
+		FD_ZERO(&wfds);
+		FD_SET(fd, &wfds);
+		FD_ZERO(&xfds);
+		FD_SET(fd, &xfds);
+
+		/* wait for the fd to be available for writing */
+		rv = _select_retry(fd + 1, NULL, &wfds, &xfds, timeout);
+		if (rv == -1)
+			return -1;
+		else if (rv == 0) {
+			errno = ETIMEDOUT;
+			return -1;
+		}
+
+		if (FD_ISSET(fd, &xfds)) {
+			errno = EPIPE;
+			return -1;
+		}
+
+		/* 
+		 * Attempt to write to fd
+		 */
+		n = write(fd, buf + (off_t) total, remain);
+
+		/*
+		 * When we know our fd was select()ed and we receive 0 bytes
+		 * when we write, the fd was closed.
+		 */
+		if ((n == 0) && (rv == 1)) {
+			errno = EPIPE;
+			return -1;
+		}
+
+		if (n == -1) {
+			if ((errno == EAGAIN) || (errno == EINTR)) {
+				/* 
+				 * Not ready?
+				 */
+				continue;
+			}
+
+			/* Other errors: EIO, EINVAL, etc */
+			return -1;
+		}
+
+		total += n;
+		remain -= n;
+	}
+
+	return total;
+}
+
+/**
+ * Retry reads until we (a) time out or (b) get our data.  Of course, if
+ * timeout is NULL, it'll wait forever.
+ *
+ * @param sockfd	File descriptor we want to read from.
+ * @param buf		Preallocated buffer into which we will read data.
+ * @param count		Number of bytes to read.
+ * @param timeout	(struct timeval) describing how long we should retry.
+ * @return 		The number of bytes read on success, or -1 on failure.
+ 			Note that we will always return (count) or (-1).
+ */
+ssize_t
+_read_retry(int sockfd, void *buf, int count, struct timeval * timeout)
+{
+	int n, total = 0, remain = count, rv = 0;
+	fd_set rfds, xfds;
+
+	while (total < count) {
+		FD_ZERO(&rfds);
+		FD_SET(sockfd, &rfds);
+		FD_ZERO(&xfds);
+		FD_SET(sockfd, &xfds);
+		
+		/*
+		 * Select on the socket, in case it closes while we're not
+		 * looking...
+		 */
+		rv = _select_retry(sockfd + 1, &rfds, NULL, &xfds, timeout);
+		if (rv == -1)
+			return -1;
+		else if (rv == 0) {
+			errno = ETIMEDOUT;
+			return -1;
+		}
+
+		if (FD_ISSET(sockfd, &xfds)) {
+			errno = EPIPE;
+			return -1;
+		}
+
+		/* 
+		 * Attempt to read off the socket 
+		 */
+		n = read(sockfd, buf + (off_t) total, remain);
+
+		/*
+		 * When we know our socket was select()ed and we receive 0 bytes
+		 * when we read, the socket was closed.
+		 */
+		if ((n == 0) && (rv == 1)) {
+			errno = EPIPE;
+			return -1;
+		}
+
+		if (n == -1) {
+			if ((errno == EAGAIN) || (errno == EINTR)) {
+				/* 
+				 * Not ready? Wait for data to become available
+				 */
+				continue;
+			}
+
+			/* Other errors: EPIPE, EINVAL, etc */
+			return -1;
+		}
+
+		total += n;
+		remain -= n;
+	}
+
+	return total;
+}
/cvs/cluster/cluster/rgmanager/src/clulib/lock.c,v  -->  standard output
revision 1.1
--- cluster/rgmanager/src/clulib/lock.c
+++ -	2006-06-13 19:22:38.669122000 +0000
@@ -0,0 +1,153 @@
+/*
+  Copyright Red Hat, Inc. 2004-2006
+
+  The Magma Cluster API Library is free software; you can redistribute
+  it and/or modify it under the terms of the GNU Lesser General Public
+  License as published by the Free Software Foundation; either version
+  2.1 of the License, or (at your option) any later version.
+
+  The Magma Cluster API Library is distributed in the hope that it will
+  be useful, but WITHOUT ANY WARRANTY; without even the implied warranty
+  of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+  Lesser General Public License for more details.
+
+  You should have received a copy of the GNU Lesser General Public
+  License along with this library; if not, write to the Free Software
+  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
+  USA.
+ */
+/** @file
+ * Locking.
+ */
+#include <errno.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <string.h>
+#include <stdio.h>
+#include <assert.h>
+#include <sys/ioctl.h>
+#include <lock.h>
+#include <sys/types.h>
+#include <sys/select.h>
+#include <pthread.h>
+
+
+static void
+ast_function(void * __attribute__ ((unused)) arg)
+{
+}
+
+
+static int
+wait_for_dlm_event(dlm_lshandle_t *ls)
+{
+	fd_set rfds;
+	int fd = dlm_ls_get_fd(ls);
+
+	FD_ZERO(&rfds);
+	FD_SET(fd, &rfds);
+
+	if (select(fd + 1, &rfds, NULL, NULL, NULL) == 1)
+		return dlm_dispatch(fd);
+
+	return -1;
+}
+
+
+int
+clu_lock(dlm_lshandle_t ls,
+	 int mode,
+	 struct dlm_lksb *lksb,
+	 int options,
+         char *resource)
+{
+        int ret;
+
+	if (!ls || !lksb || !resource || !strlen(resource)) {
+		errno = EINVAL;
+		return -1;
+	}
+
+        ret = dlm_ls_lock(ls, mode, lksb, options, resource,
+                          strlen(resource), 0, ast_function, lksb,
+                          NULL, NULL);
+
+        if (ret < 0) {
+                if (errno == ENOENT)
+                        assert(0);
+
+                return -1;
+        }
+
+        if ((ret = (wait_for_dlm_event(ls) < 0))) {
+                fprintf(stderr, "wait_for_dlm_event: %d / %d\n",
+                        ret, errno);
+                return -1;
+        }
+
+        return 0;
+}
+
+
+
+dlm_lshandle_t
+clu_acquire_lockspace(const char *lsname)
+{
+        dlm_lshandle_t ls = NULL;
+
+        while (!ls) {
+                ls = dlm_open_lockspace(lsname);
+                if (ls)
+                        break;
+
+                ls = dlm_create_lockspace(lsname, 0644);
+                if (ls)
+                        break;
+
+                /* Work around race: Someone was closing lockspace as
+                   we were trying to open it.  Retry. */
+                if (errno == ENOENT)
+                        continue;
+
+                fprintf(stderr, "failed acquiring lockspace: %s\n",
+                        strerror(errno));
+
+                return NULL;
+        }
+
+        return ls;
+}
+
+
+
+int
+clu_unlock(dlm_lshandle_t ls, struct dlm_lksb *lksb)
+{
+        int ret;
+
+	if (!ls || !lksb) {
+		errno = EINVAL;
+		return -1;
+	}
+
+        ret = dlm_ls_unlock(ls, lksb->sb_lkid, 0, lksb, NULL);
+
+        if (ret != 0)
+                return ret;
+
+        /* lksb->sb_status should be EINPROG at this point */
+
+        if (wait_for_dlm_event(ls) < 0) {
+                errno = lksb->sb_status;
+                return -1;
+        }
+
+        return 0;
+}
+
+
+int
+clu_release_lockspace(dlm_lshandle_t ls, char *name)
+{
+        return dlm_release_lockspace(name, ls, 0);
+}
/cvs/cluster/cluster/rgmanager/src/clulib/lockspace.c,v  -->  standard output
revision 1.1
--- cluster/rgmanager/src/clulib/lockspace.c
+++ -	2006-06-13 19:22:38.753795000 +0000
@@ -0,0 +1,75 @@
+#include <stdlib.h>
+#include <unistd.h>
+#include <string.h>
+#include <stdio.h>
+#include <assert.h>
+#include <sys/ioctl.h>
+#include <sys/select.h>
+#include <lock.h>
+#include <pthread.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <clulog.h>
+#include <signal.h>
+#include <gettid.h>
+#include <libdlm.h>
+#include <errno.h>
+
+
+#define RGMGR_LOCKSPACE "rgmanager"
+
+static pthread_mutex_t _default_lock = PTHREAD_MUTEX_INITIALIZER;
+static char _init = 0;
+static pid_t _holder_id = 0;
+static dlm_lshandle_t _default_ls;
+
+
+static int
+_init_lockspace(void)
+{
+	_default_ls = clu_acquire_lockspace(RGMGR_LOCKSPACE);
+	if (!_default_ls) {
+		return -1;
+	}
+	_init = 1;
+	return 0;
+}
+
+
+dlm_lshandle_t 
+ls_hold_default(void)
+{
+	pthread_mutex_lock(&_default_lock);
+	if (!_init && (_init_lockspace() < 0)) {
+		pthread_mutex_unlock(&_default_lock);
+		errno = ENOLCK;
+		return NULL;
+	}
+
+	if (_holder_id != 0) {
+		pthread_mutex_unlock(&_default_lock);
+		errno = EAGAIN;
+		return NULL;
+	}
+
+	_holder_id = gettid();
+	pthread_mutex_unlock(&_default_lock);
+	return _default_ls;
+}
+
+
+void
+ls_release_default(void)
+{
+	pthread_mutex_lock(&_default_lock);
+	if (_holder_id != gettid()) {
+		clulog(LOG_ERR, "Attempt to release lockspace when I am not"
+		       "the holder!\n");
+		raise(SIGSTOP);
+	}
+
+	_holder_id = 0;
+	pthread_mutex_unlock(&_default_lock);
+}
+
+
/cvs/cluster/cluster/rgmanager/src/clulib/members.c,v  -->  standard output
revision 1.1
--- cluster/rgmanager/src/clulib/members.c
+++ -	2006-06-13 19:22:38.840838000 +0000
@@ -0,0 +1,397 @@
+#include <sys/types.h>
+#include <arpa/inet.h>
+#include <stdint.h>
+#include <malloc.h>
+#include <libcman.h>
+#include <stdint.h>
+#include <stdio.h>
+#include <members.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <string.h>
+#include <sys/socket.h>
+#include <rg_types.h>
+#include <pthread.h>
+
+static int my_node_id = -1;
+static pthread_rwlock_t memblock = PTHREAD_RWLOCK_INITIALIZER;
+static cluster_member_list_t *membership = NULL;
+
+
+/**
+  Return the stored node ID.  Since this should never
+  change during the duration of running rgmanager, it is
+  not protected by a lock.
+ */
+int
+my_id(void)
+{
+	return my_node_id;
+}
+
+
+int
+set_my_id(int id)
+{
+	my_node_id = id;
+	return 0;
+}
+
+
+/**
+  Determine and store the local node ID.  This should
+  only ever be called once during initialization.
+ */
+int
+get_my_nodeid(cman_handle_t h)
+{
+	cman_node_t node;
+
+	if (cman_get_node(h, CMAN_NODEID_US, &node) != 0)
+		return -1;
+
+	return node.cn_nodeid;
+}
+
+
+
+/**
+  Generate and return a list of members which are now online in a new
+  membership list, given the old membership list.  User must call
+  @ref free_member_list
+  to free the returned
+  @ref cluster_member_list_t
+  structure.
+
+  @param old		Old membership list
+  @param new		New membership list
+  @return		NULL if no members were gained, or a newly 
+  			allocated cluster_member_list_t structure.
+ */
+cluster_member_list_t *
+memb_gained(cluster_member_list_t *old, cluster_member_list_t *new)
+{
+	int count, x, y;
+	char in_old = 0;
+	cluster_member_list_t *gained;
+
+	/* No nodes in new?  Then nothing could have been gained */
+	if (!new || !new->cml_count)
+		return NULL;
+
+	/* Nothing in old?  Duplicate 'new' and return it. */
+	if (!old || !old->cml_count) {
+		gained = cml_alloc(new->cml_count);
+		if (!gained)
+			return NULL;
+		memcpy(gained, new, cml_size(new->cml_count));
+		return gained;
+	}
+
+	/* Use greatest possible count */
+	count = (old->cml_count > new->cml_count ?
+		 cml_size(old->cml_count) : cml_size(new->cml_count));
+
+	gained = malloc(count);
+	if (!gained)
+		return NULL;
+	memset(gained, 0, count);
+
+	for (x = 0; x < new->cml_count; x++) {
+
+		/* This one isn't active at the moment; it could not have
+		   been gained. */
+		if (!new->cml_members[x].cn_member)
+			continue;
+
+		in_old = 0;
+		for (y = 0; y < old->cml_count; y++) {
+			if ((new->cml_members[x].cn_nodeid !=
+			     old->cml_members[y].cn_nodeid) ||
+			     !old->cml_members[y].cn_member)
+				continue;
+			in_old = 1;
+			break;
+		}
+
+		if (in_old)
+			continue;
+		memcpy(&gained->cml_members[gained->cml_count],
+		       &new->cml_members[x], sizeof(cman_node_t));
+	}
+
+	if (gained->cml_count == 0) {
+		free(gained);
+		gained = NULL;
+	}
+
+	return gained;
+}
+
+
+/**
+  Generate and return a list of members which are lost or no longer online
+  in a new membership list, given the old membership list.  User must call
+  @ref free_member_list
+  to free the returned
+  @ref cluster_member_list_t
+  structure.
+
+  @param old		Old membership list
+  @param new		New membership list
+  @return		NULL if no members were lost, or a newly 
+  			allocated cluster_member_list_t structure.
+ */
+cluster_member_list_t *
+memb_lost(cluster_member_list_t *old, cluster_member_list_t *new)
+{
+	cluster_member_list_t *ret;
+	int x;
+
+	/* Reverse. ;) */
+	ret = memb_gained(new, old);
+	if (!ret)
+		return NULL;
+
+	for (x = 0; x < ret->cml_count; x++) {
+		ret->cml_members[x].cn_member = 0;
+	}
+
+	return ret;
+}
+
+
+
+void
+member_list_update(cluster_member_list_t *new_ml)
+{
+	pthread_rwlock_wrlock(&memblock);
+	if (membership)
+		free_member_list(membership);
+	if (new_ml)
+		membership = member_list_dup(new_ml);
+	else
+		membership = NULL;
+	pthread_rwlock_unlock(&memblock);
+}
+
+
+cluster_member_list_t *
+member_list(void)
+{
+	cluster_member_list_t *ret = NULL;
+	pthread_rwlock_rdlock(&memblock);
+	if (membership) 
+		ret = member_list_dup(membership);
+	pthread_rwlock_unlock(&memblock);
+	return ret;
+}
+
+
+char *
+member_name(uint64_t id, char *buf, int buflen)
+{
+	char *n;
+
+	if (!buf || !buflen)
+		return NULL;
+
+	pthread_rwlock_rdlock(&memblock);
+	n = memb_id_to_name(membership, id);
+	if (n) {
+		strncpy(buf, n, buflen);
+	} else {
+		buf[0] = 0;
+	}
+	pthread_rwlock_unlock(&memblock);
+	return buf;
+}
+
+
+
+cluster_member_list_t *
+get_member_list(cman_handle_t h)
+{
+	int c;
+	cluster_member_list_t *ml = NULL;
+	cman_node_t *nodes = NULL;
+
+	do {
+		if (nodes)
+			free(nodes);
+
+		c = cman_get_node_count(h);
+		if (c <= 0)
+			return NULL;
+
+		if (!ml)
+			ml = malloc(sizeof(*ml));
+		if (!ml)
+			return NULL;
+
+		nodes = malloc(sizeof(*nodes) * c);
+		if (!nodes) {
+			free(ml);
+			return NULL;
+		}
+
+		memset(ml, 0, sizeof(*ml));
+		memset(nodes, 0, sizeof(*nodes)*c);
+
+		cman_get_nodes(h, c, &ml->cml_count, nodes);
+
+	} while (ml->cml_count != c);
+
+	ml->cml_members = nodes;
+	ml->cml_count = c;
+	return ml;
+}
+
+
+void
+free_member_list(cluster_member_list_t *ml)
+{
+	if (ml) {
+		if (ml->cml_members)
+			free(ml->cml_members);
+		free(ml);
+	}
+}
+
+
+int
+memb_online(cluster_member_list_t *ml, int nodeid)
+{
+	int x = 0;
+
+	for (x = 0; x < ml->cml_count; x++) {
+		if (ml->cml_members[x].cn_nodeid == nodeid)
+			return ml->cml_members[x].cn_member;
+	}
+
+	return 0;
+}
+
+
+int
+memb_count(cluster_member_list_t *ml)
+{
+	int x = 0, count = 0;
+
+	for (x = 0; x < ml->cml_count; x++) {
+		if (ml->cml_members[x].cn_member)
+			++count;
+	}
+
+	return count;
+}
+
+
+int
+memb_mark_down(cluster_member_list_t *ml, int nodeid)
+{
+	int x = 0;
+
+	for (x = 0; x < ml->cml_count; x++) {
+		if (ml->cml_members[x].cn_nodeid == nodeid)
+			ml->cml_members[x].cn_member = 0;
+	}
+
+	return 0;
+}
+
+
+
+char *
+memb_id_to_name(cluster_member_list_t *ml, int nodeid)
+{
+	int x = 0;
+
+	for (x = 0; x < ml->cml_count; x++) {
+		if (ml->cml_members[x].cn_nodeid == nodeid)
+			return ml->cml_members[x].cn_name;
+	}
+
+	return 0;
+}
+
+
+cman_node_t *
+memb_id_to_p(cluster_member_list_t *ml, int nodeid)
+{
+	int x = 0;
+
+	for (x = 0; x < ml->cml_count; x++) {
+		if (ml->cml_members[x].cn_nodeid == nodeid)
+			return &ml->cml_members[x];
+	}
+
+	return 0;
+}
+
+
+int
+memb_online_name(cluster_member_list_t *ml, char *name)
+{
+	int x = 0;
+
+	for (x = 0; x < ml->cml_count; x++) {
+		if (!strcasecmp(ml->cml_members[x].cn_name, name))
+			return ml->cml_members[x].cn_member;
+	}
+
+	return 0;
+}
+
+
+int
+memb_name_to_id(cluster_member_list_t *ml, char *name)
+{
+	int x = 0;
+
+	for (x = 0; x < ml->cml_count; x++) {
+		if (!strcasecmp(ml->cml_members[x].cn_name, name))
+			return ml->cml_members[x].cn_nodeid;
+	}
+
+	return 0;
+}
+
+
+cman_node_t *
+memb_name_to_p(cluster_member_list_t *ml, char *name)
+{
+	int x = 0;
+
+	for (x = 0; x < ml->cml_count; x++) {
+		if (!strcasecmp(ml->cml_members[x].cn_name, name))
+			return &ml->cml_members[x];
+	}
+
+	return 0;
+}
+
+/**
+  Duplicate and return a cluster member list structure, sans the DNS resolution
+  information.
+
+  @param orig		List to duplicate.
+  @return		NULL if there is nothing to duplicate or duplication
+  			fails, or a newly allocated cluster_member_list_t
+			structure.
+ */
+cluster_member_list_t *
+member_list_dup(cluster_member_list_t *orig)
+{
+	cluster_member_list_t *ret = NULL;
+
+	if (!orig)
+		return NULL;
+
+	ret = malloc(cml_size(orig->cml_count));
+	memset(ret, 0, cml_size(orig->cml_count));
+	memcpy(ret, orig, cml_size(orig->cml_count));
+
+	return ret;
+}
+
/cvs/cluster/cluster/rgmanager/src/clulib/message.c,v  -->  standard output
revision 1.1
--- cluster/rgmanager/src/clulib/message.c
+++ -	2006-06-13 19:22:38.921541000 +0000
@@ -0,0 +1,1357 @@
+#define _MESSAGE_BUILD
+#include <message.h>
+#include <stdio.h>
+#include <pthread.h>
+#include <libcman.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <string.h>
+#include <sys/types.h>
+#include <stdlib.h>
+#include <rg_types.h>
+#include <sys/socket.h>
+#include <sys/stat.h>
+#include <sys/un.h>
+#include <netinet/in.h>
+#include <arpa/inet.h>
+#include <errno.h>
+#include <sys/time.h>
+#include <fdops.h>
+#include <resgroup.h>
+
+
+
+/* Ripped from ccsd's setup_local_socket */
+#define RGMGR_SOCK "/var/run/cluster/rgmanager.sk"
+#define MAX_CONTEXTS 32  /* Testing; production should be 1024-ish */
+
+/* Context 0 is reserved for control messages */
+
+/* Local-ish contexts */
+static pthread_mutex_t context_lock = PTHREAD_MUTEX_INITIALIZER;
+static msgctx_t *contexts[MAX_CONTEXTS];
+static uint32_t context_index = 1;
+static chandle_t *gch;
+pthread_t comms_thread;
+int thread_running;
+
+
+#define is_established(ctx) \
+	(((ctx->type == MSG_CLUSTER) && \
+ 	  (ctx->u.cluster_info.remote_ctx && ctx->u.cluster_info.local_ctx)) || \
+	 ((ctx->type == MSG_SOCKET) && \
+	  (ctx->u.local_info.sockfd != -1)))
+
+
+static int
+local_connect(void)
+{
+	struct sockaddr_un sun;
+	int sock = -1, error = 0;
+
+	memset(&sun, 0, sizeof(sun));
+	sun.sun_family = PF_LOCAL;
+	snprintf(sun.sun_path, sizeof(sun.sun_path), RGMGR_SOCK);
+
+	sock = socket(PF_LOCAL, SOCK_STREAM, 0);
+	if (sock < 0) {
+		error = errno;
+		goto fail;
+	}
+
+	error = connect(sock, (struct sockaddr *)&sun, sizeof(sun));
+	if (error < 0) {
+		error = errno;
+		goto fail;
+	}
+
+	sock = error;
+fail:
+
+	return sock;
+}
+
+
+static int
+send_cluster_message(msgctx_t *ctx, void *msg, size_t len)
+{
+	char buf[4096];
+	cluster_msg_hdr_t *h = (void *)buf;
+	int ret;
+	char *msgptr = (buf + sizeof(*h));
+
+	if ((len + sizeof(*h)) > sizeof(buf)) {
+		errno = E2BIG;
+		return -1;
+	}
+
+	h->msg_control = M_DATA;
+	h->src_ctx = ctx->u.cluster_info.local_ctx;
+	h->dest_ctx = ctx->u.cluster_info.remote_ctx;
+	h->msg_port = ctx->u.cluster_info.port;
+	memcpy(msgptr, msg, len);
+
+	/*
+	printf("sending cluster message, length = %d to nodeid %d port %d\n",
+	       len + sizeof(*h), ctx->u.cluster_info.nodeid, ctx->u.cluster_info.port);
+	 */
+
+	pthread_mutex_lock(&gch->c_lock);
+	h->src_nodeid = gch->c_nodeid;
+
+	swab_cluster_msg_hdr_t(h);
+
+	ret = cman_send_data(gch->c_cluster, (void *)h, len + sizeof(*h),
+			       ctx->u.cluster_info.nodeid,
+			       ctx->u.cluster_info.port, 0);
+
+	pthread_mutex_unlock(&gch->c_lock);
+
+	return len + sizeof(h);
+}
+
+
+/**
+  Wrapper around write(2)
+ */
+static int
+send_socket_message(msgctx_t *ctx, void *msg, size_t len)
+{
+	char buf[4096];
+	local_msg_hdr_t *h = (local_msg_hdr_t *)buf;
+	char *msgptr = (buf + sizeof(*h));
+
+	/* encapsulate ... ? */
+	if ((len + sizeof(*h)) > sizeof(buf)) {
+		errno = E2BIG;
+		return -1;
+	}
+
+	h->msg_control = M_DATA;
+	h->msg_len = len;
+	memcpy(msgptr, msg, len);
+
+	return _write_retry(ctx->u.local_info.sockfd, msg, len + sizeof(*h), NULL);
+}
+
+
+/**
+  Message sending API.  Sends to the cluster or a socket, depending on
+  the context.
+ */
+int
+msg_send(msgctx_t *ctx, void *msg, size_t len)
+{
+	if (!ctx || !msg || !len) {
+		errno = EINVAL;
+		return -1;
+	}
+
+	switch(ctx->type) {
+	case MSG_CLUSTER:
+		return send_cluster_message(ctx, msg, len);
+	case MSG_SOCKET:
+		return send_socket_message(ctx, msg, len);
+	default:
+		break;
+	}
+
+	errno = EINVAL;
+	return -1;
+}
+
+
+/**
+  Assign a (free) cluster context ID
+ */
+static int
+assign_ctx(msgctx_t *ctx)
+{
+	int start;
+
+	/* Assign context index */
+	ctx->type = MSG_CLUSTER;
+
+	pthread_mutex_lock(&context_lock);
+	start = context_index++;
+	if (context_index >= MAX_CONTEXTS || context_index <= 0)
+		context_index = 1;
+	do {
+		if (contexts[context_index]) {
+			++context_index;
+			if (context_index >= MAX_CONTEXTS)
+				context_index = 1;
+
+			if (context_index == start) {
+				pthread_mutex_unlock(&context_lock);
+				errno = EAGAIN;
+				return -1;
+			}
+
+			continue;
+		}
+
+		contexts[context_index] = ctx;
+		ctx->u.cluster_info.local_ctx = context_index;
+
+	} while (0);
+	pthread_mutex_unlock(&context_lock);
+
+	return 0;
+}
+
+
+/* See if anything's on the cluster socket.  If so, dispatch it
+   on to the requisite queues
+   XXX should be passed a connection arg! */
+static int
+poll_cluster_messages(int timeout)
+{
+	int ret = -1;
+	fd_set rfds;
+	int fd;
+	struct timeval tv;
+	struct timeval *p = NULL;
+
+	if (timeout >= 0) {
+		p = &tv;
+		tv.tv_sec = tv.tv_usec = timeout;
+	}
+	printf("%s\n", __FUNCTION__);
+
+	FD_ZERO(&rfds);
+
+	//pthread_mutex_lock(&gch->c_lock);
+	fd = cman_get_fd(gch->c_cluster);
+	FD_SET(fd, &rfds);
+
+	if (select(fd + 1, &rfds, NULL, NULL, p) == 1) {
+		cman_dispatch(gch->c_cluster, 0);
+		ret = 0;
+	}
+	//pthread_mutex_unlock(&gch->c_lock);
+
+	return ret;
+}
+
+
+/**
+  This is used to establish and tear down pseudo-private
+  contexts which are shared with the cluster context.
+ */
+static int
+cluster_send_control_msg(msgctx_t *ctx, int type)
+{
+	cluster_msg_hdr_t cm;
+
+	cm.msg_control = (uint8_t)type;
+	cm.src_nodeid = gch->c_nodeid;
+	cm.dest_ctx = ctx->u.cluster_info.remote_ctx;
+	cm.src_ctx = ctx->u.cluster_info.local_ctx;
+	cm.msg_port = ctx->u.cluster_info.port;
+
+	swab_cluster_msg_hdr_t(&cm);
+
+	return (cman_send_data(gch->c_cluster, (void *)&cm, sizeof(cm),
+			       ctx->u.cluster_info.nodeid,
+			       ctx->u.cluster_info.port, 0));
+}
+
+
+/**
+  Wait for a message on a context.
+ */
+static int
+cluster_msg_wait(msgctx_t *ctx, int timeout)
+{
+	struct timespec ts = {0, 0};
+	int req = M_NONE;
+	struct timeval start;
+	struct timeval now;
+	
+
+	if (timeout > 0)
+		gettimeofday(&start, NULL);
+
+	ts.tv_sec = !!timeout;
+
+	pthread_mutex_lock(&ctx->u.cluster_info.mutex);
+	while (1) {
+		/* See if we dispatched any messages on to our queue */
+		if (ctx->u.cluster_info.queue) {
+			req = ctx->u.cluster_info.queue->message->msg_control;
+			/*printf("Queue not empty CTX%d : %d\n",
+			  	 ctx->u.cluster_info.local_ctx, req);*/
+			break;
+		}
+
+		if (timeout == 0)
+			break;
+
+		/* Ok, someone else has the mutex on our FD.  Go to
+	   	   sleep on a cond; maybe they'll wake us up */
+		if (pthread_cond_timedwait(&ctx->u.cluster_info.cond,
+		    			   &ctx->u.cluster_info.mutex,
+		   			   &ts) < 0) {
+
+			/* Mutex held */
+			if (errno == ETIMEDOUT) {
+				if (timeout < 0) {
+					ts.tv_sec = 1;
+					ts.tv_nsec = 0;
+					continue;
+				} 
+
+				ts.tv_sec = !!timeout;
+
+				/* Done */
+				break;
+			}
+		}
+
+		if (timeout > 0) {
+			gettimeofday(&now, NULL);
+			/* XXX imprecise */
+			if (now.tv_sec - start.tv_sec > timeout)
+				break;
+		}
+	}
+	pthread_mutex_unlock(&ctx->u.cluster_info.mutex);
+
+	return req;
+}
+
+
+static int
+peekypeeky(int fd)
+{
+	local_msg_hdr_t h;
+	int ret;
+
+	while ((ret = recv(fd, (void *)&h, sizeof(h), MSG_PEEK)) < 0) {
+		if (errno == EINTR)
+			continue;
+		return -1;
+	}
+
+	if (ret == sizeof(h))
+		return h.msg_control;
+
+	if (ret == 0)
+		/* Socket closed? */
+		return M_CLOSE;
+
+	/* XXX */
+	printf("PROTOCOL ERROR: Invalid message\n");
+	return M_CLOSE;
+}
+
+
+static int
+local_msg_wait(msgctx_t *ctx, int timeout)
+{
+	fd_set rfds;
+	struct timeval tv = {0, 0};
+	struct timeval *p = NULL;
+
+	if (timeout >= 0) {
+		tv.tv_sec = timeout;
+		p = &tv;
+	}
+
+	FD_ZERO(&rfds);
+	FD_SET(ctx->u.local_info.sockfd, &rfds);
+
+	if (_select_retry(ctx->u.local_info.sockfd + 1, &rfds,
+			  NULL, NULL, p) == 1) {
+		return peekypeeky(ctx->u.local_info.sockfd);
+	}
+
+	return M_NONE;
+}
+
+
+int
+msg_get_nodeid(msgctx_t *ctx)
+{
+	switch(ctx->type) {
+	case MSG_CLUSTER:
+		return ctx->u.cluster_info.nodeid;
+	case MSG_SOCKET:
+		return 0;
+	default:
+		break;
+	}
+
+	return -1;
+}
+
+
+int
+msg_fd_set(msgctx_t *ctx, fd_set *fds, int *max)
+{
+	int e;
+	switch(ctx->type) {
+	case MSG_CLUSTER:
+		pthread_mutex_lock(&ctx->u.cluster_info.mutex);
+		if (ctx->u.cluster_info.select_pipe[0] < 0) {
+			if (pipe(ctx->u.cluster_info.select_pipe) < 0) {
+				e = errno;
+				pthread_mutex_unlock(&ctx->u.cluster_info.mutex);
+				errno = e;
+				return -1;
+			}
+
+			printf("%s: Created cluster CTX select pipe "
+			       "rd=%d wr=%d\n", __FUNCTION__,
+			       ctx->u.cluster_info.select_pipe[0],
+			       ctx->u.cluster_info.select_pipe[1]);
+
+		}
+
+		e = ctx->u.cluster_info.select_pipe[0];
+		printf("%s: cluster %d\n", __FUNCTION__,  e);
+		FD_SET(e, fds);
+		pthread_mutex_unlock(&ctx->u.cluster_info.mutex);
+
+		if (e > *max)
+			*max = e;
+		return 0;
+
+	case MSG_SOCKET:
+		if (ctx->u.local_info.sockfd >= 0) {
+			printf("%s: local %d\n", __FUNCTION__,
+			       ctx->u.local_info.sockfd);
+			FD_SET(ctx->u.local_info.sockfd, fds);
+
+			if (ctx->u.local_info.sockfd > *max)
+				*max = ctx->u.local_info.sockfd;
+			return 0;
+		}
+		return -1;
+	default:
+		break;
+	}
+
+	return -1;
+}
+
+
+int
+msg_fd_isset(msgctx_t *ctx, fd_set *fds)
+{
+	errno = EINVAL;
+
+	if (!fds || !ctx)
+		return -1;
+
+	switch(ctx->type) {
+	case MSG_CLUSTER:
+		pthread_mutex_lock(&ctx->u.cluster_info.mutex);
+		if (ctx->u.cluster_info.select_pipe[0] >= 0 &&
+		    FD_ISSET(ctx->u.cluster_info.select_pipe[0], fds)) {
+			pthread_mutex_unlock(&ctx->u.cluster_info.mutex);
+			return 1;
+		}
+		pthread_mutex_unlock(&ctx->u.cluster_info.mutex);
+		return 0;
+	case MSG_SOCKET:
+		if (ctx->u.local_info.sockfd >= 0 &&
+		    FD_ISSET(ctx->u.local_info.sockfd, fds)) {
+			return 1;
+		}
+		return 0;
+	default:
+		break;
+	}
+
+	return -1;
+}
+
+
+int
+msg_fd_clr(msgctx_t *ctx, fd_set *fds)
+{
+	errno = EINVAL;
+
+	if (!fds || !ctx)
+		return -1;
+
+	switch(ctx->type) {
+	case MSG_CLUSTER:
+		pthread_mutex_lock(&ctx->u.cluster_info.mutex);
+		if (ctx->u.cluster_info.select_pipe[0] >= 0) {
+		    	FD_CLR(ctx->u.cluster_info.select_pipe[0], fds);
+			pthread_mutex_unlock(&ctx->u.cluster_info.mutex);
+			return 1;
+		}
+		pthread_mutex_unlock(&ctx->u.cluster_info.mutex);
+		return 0;
+	case MSG_SOCKET:
+		if (ctx->u.local_info.sockfd >= 0) {
+		    	FD_CLR(ctx->u.local_info.sockfd, fds);
+			return 1;
+		}
+		return 0;
+	default:
+		break;
+	}
+
+	return -1;
+}
+
+
+/**
+  This polls the context for 'timeout' seconds waiting for data
+  to become available.  Return codes are M_DATA, M_CLOSE, and M_OPEN
+
+  M_DATA - data available
+  M_OPEN - needs msg_accept(
+  M_CLOSE - context / socket closed by remote host
+  M_NONE - nothing available
+
+  For the cluster connection, the return code could also map to one of
+  the CMAN return codes
+
+  M_STATECHANGE - node has changed state
+
+ */
+int
+msg_wait(msgctx_t *ctx, int timeout)
+{
+
+	if (!ctx) {
+		errno = EINVAL;
+		return -1;
+	}
+		
+	switch(ctx->type) {
+	case MSG_CLUSTER:
+		return cluster_msg_wait(ctx, timeout);
+	case MSG_SOCKET:
+		return local_msg_wait(ctx, timeout);
+	default:
+		break;
+	}
+
+	errno = EINVAL;
+	return -1;
+}
+
+
+int
+_cluster_msg_receive(msgctx_t *ctx, void **msg, size_t *len)
+{
+	cluster_msg_hdr_t *m;
+	msg_q_t *n;
+	int ret = 0;
+
+	if (msg)
+		*msg = NULL;
+	if (len)
+		*len = 0;
+
+	if (ctx->u.cluster_info.local_ctx < 0 ||
+	    ctx->u.cluster_info.local_ctx >= MAX_CONTEXTS) {
+		errno = EBADF;
+		return -1;
+	}
+
+	/* trigger receive here */
+	pthread_mutex_lock(&ctx->u.cluster_info.mutex);
+
+	n = ctx->u.cluster_info.queue;
+	if (n == NULL) {
+		pthread_mutex_unlock(&ctx->u.cluster_info.mutex);
+		errno = EAGAIN;
+		return -1;
+	}
+
+	list_remove(&ctx->u.cluster_info.queue, n);
+
+	pthread_mutex_unlock(&ctx->u.cluster_info.mutex);
+
+	m = n->message;
+	switch(m->msg_control) {
+	case M_CLOSE:
+		ctx->u.cluster_info.remote_ctx = 0;
+		break;
+	case M_OPEN_ACK:
+		/* Response to new connection */
+		ctx->u.cluster_info.remote_ctx = m->src_ctx;
+		break;
+	case M_DATA:
+		/* Kill the message control structure */
+		memmove(m, &m[1], n->len - sizeof(*m));
+		if (msg)
+			*msg = (void *)m;
+		else {
+			printf("Warning: dropping data message\n");
+			free(m);
+		}
+		if (len)
+			*len = (n->len - sizeof(*m));
+		ret = (n->len - sizeof(*m));
+		free(n);
+
+		printf("Message received\n");
+		return ret;
+	case M_OPEN:
+		/* Someone is trying to open a connection */
+	default:
+		/* ?!! */
+		ret = -1;
+		break;
+	}
+
+	free(m);
+	free(n);
+
+	return ret;
+}
+
+
+/**
+  Receive a message from a cluster-context.  This copies out the contents
+  into the user-specified buffer, and does random other things.
+ */
+static int
+cluster_msg_receive(msgctx_t *ctx, void *msg, size_t maxlen, int timeout)
+{
+	int req;
+	void *priv_msg;
+	size_t priv_len;
+
+	if (!msg || !maxlen) {
+		errno = EINVAL;
+		return -1;
+	}
+
+	req = cluster_msg_wait(ctx, timeout);
+
+	switch (req) {
+	case M_DATA:
+		/* Copy out. */
+		req = _cluster_msg_receive(ctx, &priv_msg, &priv_len);
+		if (req < 0) {
+			printf("Ruh roh!\n");
+			return -1;
+		}
+
+		priv_len = (priv_len < maxlen ? priv_len : maxlen);
+
+		memcpy(msg, priv_msg, priv_len);
+		free(priv_msg);
+		return req;
+	case M_CLOSE:
+		errno = ECONNRESET;
+		return -1;
+	case 0:
+		/*printf("Nothing on queue\n");*/
+		return 0;
+	default:
+		printf("PROTOCOL ERROR: Received %d\n", req);
+		return -1;
+	}
+
+	printf("%s: CODE PATH ERROR\n", __FUNCTION__);
+	return -1;
+}
+
+
+static int
+_local_msg_receive(msgctx_t *ctx, void *msg, size_t maxlen, int timeout)
+{
+	struct timeval tv = {0, 0};
+	struct timeval *p = NULL;
+	local_msg_hdr_t h;
+
+	if (timeout >= 0) {
+		tv.tv_sec = timeout;
+		p = &tv;
+	}
+
+	if (_read_retry(ctx->u.local_info.sockfd, &h, sizeof(h), p) < 0)
+		return -1;
+
+	if (maxlen < h.msg_len) {
+		printf("WARNING: Buffer too small for message!\n");
+		h.msg_len = maxlen;
+	}
+
+	return _read_retry(ctx->u.local_info.sockfd, msg, h.msg_len, p);
+}
+
+
+/**
+  Receive a message from a cluster-context.  This copies out the contents
+  into the user-specified buffer, and does random other things.
+ */
+static int
+local_msg_receive(msgctx_t *ctx, void *msg, size_t maxlen, int timeout)
+{
+	int req;
+	char priv_msg[4096];
+	size_t priv_len;
+
+	if (!msg || !maxlen) {
+		errno = EINVAL;
+		return -1;
+	}
+
+	switch (req) {
+	case M_DATA:
+		/* Copy out. */
+		req = _local_msg_receive(ctx, priv_msg, priv_len, timeout);
+		if (req <= 0)
+			return -1;
+
+		priv_len = (priv_len < maxlen ? priv_len : maxlen);
+
+		memcpy(msg, priv_msg, priv_len);
+		free(msg);
+		return req;
+	case M_CLOSE:
+		errno = ECONNRESET;
+		return -1;
+	case 0:
+		/*printf("Nothing on queue\n");*/
+		return 0;
+	default:
+		printf("PROTOCOL ERROR: Received %d\n", req);
+		return -1;
+	}
+
+	printf("%s: CODE PATH ERROR\n", __FUNCTION__);
+	return -1;
+}
+
+
+int
+msg_receive(msgctx_t *ctx, void *msg, size_t maxlen, int timeout)
+{
+	if (!ctx || !msg || !maxlen) {
+		errno = EINVAL;
+		return -1;
+	}
+
+	switch(ctx->type) {
+	case MSG_CLUSTER:
+		return cluster_msg_receive(ctx, msg, maxlen, timeout);
+	case MSG_SOCKET:
+		return local_msg_receive(ctx, msg, maxlen, timeout);
+	default:
+		break;
+	}
+
+	errno = EINVAL;
+	return -1;
+}
+
+
+/**
+  Open a connection to the specified node ID.
+  If the speficied node is 0, this connects via the socket in
+  /var/run/cluster...
+ */
+int
+msg_open(int nodeid, int port, msgctx_t *ctx, int timeout)
+{
+	int t = 0;
+
+	errno = EINVAL;
+	if (!ctx)
+		return -1;
+
+
+	/*printf("Opening pseudo channel to node %d\n", nodeid);*/
+
+	memset(ctx, 0, sizeof(*ctx));
+	if (nodeid == CMAN_NODEID_US) {
+		if ((ctx->u.local_info.sockfd = local_connect()) < 0) {
+			return -1;
+		}
+		ctx->type = MSG_SOCKET;
+		return 0;
+	}
+
+	ctx->type = MSG_CLUSTER;
+	ctx->u.cluster_info.nodeid = nodeid;
+	ctx->u.cluster_info.port = port;
+	ctx->u.cluster_info.local_ctx = -1;
+	ctx->u.cluster_info.remote_ctx = 0;
+	ctx->u.cluster_info.queue = NULL;
+	ctx->u.cluster_info.select_pipe[0] = -1;
+	ctx->u.cluster_info.select_pipe[1] = -1;
+	pthread_mutex_init(&ctx->u.cluster_info.mutex, NULL);
+	pthread_cond_init(&ctx->u.cluster_info.cond, NULL);
+
+	/* Assign context index */
+	if (assign_ctx(ctx) < 0)
+		return -1;
+
+	//printf("  Local CTX: %d\n", ctx->u.cluster_info.local_ctx);
+
+	/* Send open */
+	
+	//printf("  Sending control message M_OPEN\n");
+	if (cluster_send_control_msg(ctx, M_OPEN) < 0) {
+		return -1;
+	}
+
+	/* Ok, wait for a response */
+	while (!is_established(ctx)) {
+		++t;
+		if (t > timeout) {
+			msg_close(ctx);
+			errno = ETIMEDOUT;
+			return -1;
+		}
+			
+		switch(msg_wait(ctx, 1)) {
+		case M_OPEN_ACK:
+			_cluster_msg_receive(ctx, NULL, NULL);
+			break;
+		case M_NONE:
+			continue;
+		default: 
+			printf("PROTO ERROR: M_OPEN_ACK not received \n");
+		}
+	}
+
+	/*	
+	printf("  Remote CTX: %d\n",
+	       ctx->u.cluster_info.remote_ctx);
+	printf("  Pseudo channel established!\n");
+	*/
+	
+
+	return 0;
+}
+
+
+/**
+  Close a connection context (cluster or socket; it doesn't matter)
+  In the case of a cluster context, we need to clear out the 
+  receive queue and what-not.  This isn't a big deal.  Also, we
+  need to tell the other end that we're done -- just in case it does
+  not know yet ;)
+
+  With a socket, the O/S cleans up the buffers for us.
+ */
+int
+msg_close(msgctx_t *ctx)
+{
+	msg_q_t *n = NULL;
+
+	if (!ctx) {
+		errno = EINVAL;
+		return -1;
+	}
+
+	switch (ctx->type) {
+	case MSG_CLUSTER:
+		if (ctx->u.cluster_info.local_ctx >= MAX_CONTEXTS) {
+			errno = EINVAL;
+			return -1;
+		}
+		pthread_mutex_lock(&context_lock);
+		/* Other threads should not be able to see this again */
+		if (contexts[ctx->u.cluster_info.local_ctx])
+			contexts[ctx->u.cluster_info.local_ctx] = NULL;
+		pthread_mutex_unlock(&context_lock);
+		/* Clear receive queue */
+		while ((n = ctx->u.cluster_info.queue) != NULL) {
+			list_remove(&ctx->u.cluster_info.queue, n);
+			free(n->message);
+			free(n);
+		}
+		/* Send close message */
+		if (ctx->u.cluster_info.remote_ctx != 0) {
+			cluster_send_control_msg(ctx, M_CLOSE);
+		}
+
+		/* Close pipe if it's open */
+		if (ctx->u.cluster_info.select_pipe[0] >= 0) {
+			close(ctx->u.cluster_info.select_pipe[0]);
+			ctx->u.cluster_info.select_pipe[0] = -1;
+		}
+		if (ctx->u.cluster_info.select_pipe[1] >= 0) {
+			close(ctx->u.cluster_info.select_pipe[1]);
+			ctx->u.cluster_info.select_pipe[1] = -1;
+		}
+		return 0;
+	case MSG_SOCKET:
+		close(ctx->u.local_info.sockfd);
+		ctx->u.local_info.sockfd = -1;
+		return 0;
+	default:
+		break;
+	}
+
+	errno = EINVAL;
+	return -1;
+}
+
+
+/**
+  Called by cman_dispatch to deal with messages coming across the
+  cluster socket.  This function deals with fanning out the requests
+  and putting them on the per-context queues.  We don't have
+  the benefits of pre-configured buffers, so we need this.
+ */
+static void
+process_cman_msg(cman_handle_t h, void *priv, char *buf, int len,
+	    uint8_t port, int nodeid)
+{
+	cluster_msg_hdr_t *m = (cluster_msg_hdr_t *)buf;
+	msg_q_t *node;
+	msgctx_t *ctx;
+
+	if (len < sizeof(*m)) {
+		printf("Message too short.\n");
+		return;
+	}
+
+	swab_cluster_msg_hdr_t(m);
+
+#if 0
+	printf("Processing ");
+	switch(m->msg_control) {
+	case M_NONE: 
+		printf("M_NONE\n");
+		break;
+	case M_OPEN:
+		printf("M_OPEN\n");
+		break;
+	case M_OPEN_ACK: 
+		printf("M_OPEN_ACK\n");
+		break;
+	case M_DATA: 
+		printf("M_DATA\n");
+		break;
+	case M_CLOSE: 
+		printf("M_CLOSE\n");
+		break;
+	}
+
+	printf("  Node ID: %d %d\n", m->src_nodeid, nodeid);
+	printf("  Remote CTX: %d  Local CTX: %d\n", m->src_ctx, m->dest_ctx);
+#endif
+
+	if (m->dest_ctx >= MAX_CONTEXTS) {
+		printf("Context invalid; ignoring\n");
+		return;
+	}
+
+	while ((node = malloc(sizeof(*node))) == NULL) {
+		sleep(1);
+	}
+	memset(node, 0, sizeof(*node));
+	while ((node->message = malloc(len)) == NULL) {
+		sleep(1);
+	}
+	memcpy(node->message, buf, len);
+	node->len = len;
+
+	pthread_mutex_lock(&context_lock);
+	ctx = contexts[m->dest_ctx];
+	if (!ctx) {
+		/* We received a close for something we've already
+		   detached from our list.  No big deal, just
+		   ignore. */
+		free(node->message);
+		free(node);
+		pthread_mutex_unlock(&context_lock);
+		return;
+	}
+
+	pthread_mutex_lock(&ctx->u.cluster_info.mutex);
+	list_insert(&ctx->u.cluster_info.queue, node);
+	pthread_mutex_unlock(&ctx->u.cluster_info.mutex);
+	/* If a select pipe was set up, wake it up */
+	if (ctx->u.cluster_info.select_pipe[1] >= 0)
+		write(ctx->u.cluster_info.select_pipe[1], "", 1);
+	pthread_mutex_unlock(&context_lock);
+
+	pthread_cond_signal(&ctx->u.cluster_info.cond);
+}
+
+
+/**
+  Accept a new pseudo-private connection coming in over the
+  cluster socket.
+ */
+static int
+cluster_msg_accept(msgctx_t *listenctx, msgctx_t *acceptctx)
+{
+	errno = EINVAL;
+	cluster_msg_hdr_t *m;
+	msg_q_t *n;
+	char done = 0;
+	char foo;
+
+	if (!listenctx || !acceptctx)
+		return -1;
+	if (listenctx->u.cluster_info.local_ctx != 0)
+		return -1;
+
+	pthread_mutex_lock(&listenctx->u.cluster_info.mutex);
+
+	n = listenctx->u.cluster_info.queue;
+	if (n == NULL) {
+		pthread_mutex_unlock(&listenctx->u.cluster_info.mutex);
+		errno = EAGAIN;
+		return -1;
+	}
+
+	/* the OPEN should be the first thing on the list; this loop
+	   is probably not necessary */
+	list_do(&listenctx->u.cluster_info.queue, n) {
+
+		m = n->message;
+		switch(m->msg_control) {
+		case M_OPEN:
+			list_remove(&listenctx->u.cluster_info.queue, n);
+			/*printf("Accepting connection from %d %d\n",
+			  	 m->src_nodeid, m->src_ctx);*/
+
+			/* New connection */
+			pthread_mutex_init(&acceptctx->u.cluster_info.mutex,
+					   NULL);
+			pthread_cond_init(&acceptctx->u.cluster_info.cond,
+					  NULL);
+			acceptctx->u.cluster_info.queue = NULL;
+			acceptctx->u.cluster_info.remote_ctx = m->src_ctx;
+			acceptctx->u.cluster_info.nodeid = m->src_nodeid;
+			acceptctx->u.cluster_info.port = m->msg_port;
+
+			assign_ctx(acceptctx);
+			cluster_send_control_msg(acceptctx, M_OPEN_ACK);
+
+			if (listenctx->u.cluster_info.select_pipe[0] >= 0) {
+				read(listenctx->u.cluster_info.select_pipe[0],
+				     &foo, 1);
+			}
+
+			done = 1;
+			free(m);
+			free(n);
+
+			break;
+		case M_DATA:
+			/* Data messages (i.e. from broadcast msgs) are
+			   okay too!...  but we don't handle them here */
+			break;
+		default:
+			/* Other message?! */
+			printf("Odd... %d\n", m->msg_control);
+			break;
+		}
+
+		if (done)
+			break;
+
+	} while (!list_done(&listenctx->u.cluster_info.queue, n));
+
+	pthread_mutex_unlock(&listenctx->u.cluster_info.mutex);
+
+	return 0;
+}
+
+
+/* XXX INCOMPLETE */
+int
+msg_accept(msgctx_t *listenctx, msgctx_t *acceptctx)
+{
+	switch(listenctx->type) {
+	case MSG_CLUSTER:
+		return cluster_msg_accept(listenctx, acceptctx);
+	case MSG_SOCKET:
+		return 0;
+	default:
+		break;
+	}
+
+	return -1;
+}
+
+
+static int
+local_listener_sk(void)
+{
+	int sock;
+	struct sockaddr_un su;
+	mode_t om;
+
+	sock = socket(PF_LOCAL, SOCK_STREAM, 0);
+	if (sock < 0)
+		return -1;
+
+	unlink(RGMGR_SOCK);
+	om = umask(077);
+	su.sun_family = PF_LOCAL;
+	snprintf(su.sun_path, sizeof(su.sun_path), RGMGR_SOCK);
+
+	if (bind(sock, &su, sizeof(su)) < 0) {
+		umask(om);
+		goto fail;
+	}
+	umask(om);
+
+	if (listen(sock, SOMAXCONN) < 0)
+		goto fail;
+
+	return sock;
+fail:
+	if (sock >= 0)
+		close(sock);
+	return -1;
+}
+
+
+/**
+  This waits for events on the cluster comms FD and
+  dispatches them using cman_dispatch.  Initially,
+  the design had no permanent threads, but that model
+  proved difficult to implement correctly.
+ */
+static void *
+cluster_comms_thread(void *arg)
+{
+	while (thread_running) {
+		poll_cluster_messages(2);
+	}
+
+	return NULL;
+}
+
+
+/*
+   Transliterates a CMAN event to a control message
+ */
+static void
+process_cman_event(cman_handle_t handle, void *private, int reason, int arg)
+{
+	cluster_msg_hdr_t *msg;
+	int *argp;
+	msg_q_t *node;
+	msgctx_t *ctx;
+
+	/* Allocate queue node */
+	while ((node = malloc(sizeof(*node))) == NULL) {
+		sleep(1);
+	}
+	memset(node, 0, sizeof(*node));
+
+	/* Allocate message: header + int (for arg) */
+	while ((msg = malloc(sizeof(int) +
+			     sizeof(cluster_msg_hdr_t))) == NULL) {
+		sleep(1);
+	}
+	memset(msg, 0, sizeof(int)+sizeof(cluster_msg_hdr_t));
+
+
+	switch(reason) {
+#if defined(LIBCMAN_VERSION) && LIBCMAN_VERSION >= 2
+	case CMAN_REASON_PORTOPENED:
+		msg->msg_control = M_PORTOPENED;
+		break;
+	case CMAN_REASON_TRY_SHUTDOWN:
+		msg->msg_control = M_TRY_SHUTDOWN;
+		break;
+#endif
+	case CMAN_REASON_PORTCLOSED:
+		msg->msg_control = M_PORTCLOSED;
+		break;
+	case CMAN_REASON_STATECHANGE:
+		msg->msg_control = M_STATECHANGE;
+		break;
+	}
+
+	argp = ((void *)msg + sizeof(cluster_msg_hdr_t));
+	*argp = arg;
+
+	node->len = sizeof(cluster_msg_hdr_t) + sizeof(int);
+	node->message = msg;
+
+	pthread_mutex_lock(&context_lock);
+	ctx = contexts[0]; /* This is the cluster context... */
+	if (!ctx) {
+		/* We received a close for something we've already
+		   detached from our list.  No big deal, just
+		   ignore. */
+		free(node->message);
+		free(node);
+		pthread_mutex_unlock(&context_lock);
+		return;
+	}
+
+	pthread_mutex_lock(&ctx->u.cluster_info.mutex);
+	list_insert(&ctx->u.cluster_info.queue, node);
+	pthread_mutex_unlock(&ctx->u.cluster_info.mutex);
+	/* If a select pipe was set up, wake it up */
+	if (ctx->u.cluster_info.select_pipe[1] >= 0)
+		write(ctx->u.cluster_info.select_pipe[1], "", 1);
+	pthread_mutex_unlock(&context_lock);
+
+	pthread_cond_signal(&ctx->u.cluster_info.cond);
+}
+
+
+/* XXX INCOMPLETE */
+int
+msg_init(chandle_t *ch)
+{
+	int e;
+	pthread_attr_t attrs;
+	msgctx_t *ctx;
+
+	pthread_mutex_lock(&ch->c_lock);
+
+	/* Set up local context */
+
+	ctx = msg_new_ctx();
+	if (!ctx) {
+		pthread_mutex_unlock(&ch->c_lock);
+		return -1;
+	}
+
+	ctx->type = MSG_SOCKET;
+	ctx->u.local_info.sockfd = local_listener_sk();
+	ctx->u.local_info.flags = SKF_LISTEN;
+
+	ch->local_ctx = ctx;
+
+	ctx = msg_new_ctx();
+
+	if (!ctx) {
+		pthread_mutex_unlock(&ch->c_lock);
+		msg_free_ctx((msgctx_t *)ch->local_ctx);
+		return -1;
+	}
+
+	gch = ch;
+
+	if (cman_start_recv_data(ch->c_cluster, process_cman_msg,
+				 RG_PORT) != 0) {
+		e = errno;
+		msg_close(ch->local_ctx);
+		pthread_mutex_unlock(&ch->c_lock);
+		msg_free_ctx((msgctx_t *)ch->local_ctx);
+		msg_free_ctx((msgctx_t *)ch->cluster_ctx);
+		errno = e;
+		return -1;
+	}
+
+	if (cman_start_notification(ch->c_cluster, process_cman_event) != 0) {
+		e = errno;
+		msg_close(ch->local_ctx);
+		pthread_mutex_unlock(&ch->c_lock);
+		msg_free_ctx((msgctx_t *)ch->local_ctx);
+		msg_free_ctx((msgctx_t *)ch->cluster_ctx);
+		errno = e;
+	}
+
+	ch->cluster_ctx = ctx;
+	pthread_mutex_unlock(&ch->c_lock);
+
+	pthread_mutex_lock(&context_lock);
+
+	memset(contexts, 0, sizeof(contexts));
+	contexts[0] = ctx;
+
+	ctx->type = MSG_CLUSTER;
+	ctx->u.cluster_info.port = RG_PORT; /* port! */
+	ctx->u.cluster_info.nodeid = 0; /* Broadcast! */
+	ctx->u.cluster_info.select_pipe[0] = -1;
+	ctx->u.cluster_info.select_pipe[1] = -1;
+	pthread_mutex_init(&ctx->u.cluster_info.mutex, NULL);
+	pthread_cond_init(&ctx->u.cluster_info.cond, NULL);
+	pthread_mutex_unlock(&context_lock);
+
+       	pthread_attr_init(&attrs);
+       	pthread_attr_setinheritsched(&attrs, PTHREAD_INHERIT_SCHED);
+       	pthread_attr_setdetachstate(&attrs, PTHREAD_CREATE_DETACHED);
+
+	thread_running = 1;	
+	pthread_create(&comms_thread, &attrs, cluster_comms_thread, NULL);
+
+
+	pthread_attr_destroy(&attrs);
+
+	return 0;
+}
+
+
+int
+msg_print_ctx(int ctx)
+{
+	if (!contexts[ctx])
+		return -1;
+
+	printf("Cluster Message Context %d\n", ctx);
+	printf("  Node ID %d\n", contexts[ctx]->u.cluster_info.nodeid);
+	printf("  Remote %d\n", contexts[ctx]->u.cluster_info.remote_ctx);
+	return 0;
+}
+
+
+/* XXX INCOMPLETE */
+int
+msg_shutdown(chandle_t *ch)
+{
+	if (!ch) {
+		errno = EINVAL;
+		return -1;
+	}
+
+	while (pthread_kill(comms_thread, 0) == 0)
+		sleep(1);
+
+	pthread_mutex_lock(&ch->c_lock);
+
+	/* xxx purge everything */
+	msg_close(ch->local_ctx);
+	cman_end_recv_data(ch->c_cluster);
+
+	msg_free_ctx(ch->local_ctx);
+	msg_free_ctx(ch->cluster_ctx);
+
+
+	pthread_mutex_unlock(&ch->c_lock);
+
+	return 0;
+}
+
+
+inline int
+msgctx_size(void)
+{
+	return sizeof(msgctx_t);
+}
+
+
+msgctx_t *
+msg_new_ctx(void)
+{
+	msgctx_t *p;
+	
+	printf("Alloc %d\n", sizeof(msgctx_t));
+	p = malloc(sizeof(msgctx_t));
+	if (!p)
+		return NULL;
+
+	memset(p, 0, sizeof(p));
+	p->type = MSG_NONE;
+
+	return p;
+}
+
+
+void
+msg_free_ctx(msgctx_t *dead)
+{
+	free(dead);
+}
+




More information about the Cluster-devel mailing list