[Cluster-devel] cluster/rgmanager/src clulib/fdops.c clulib/lo ...
lhh at sourceware.org
lhh at sourceware.org
Tue Jun 13 19:22:39 UTC 2006
CVSROOT: /cvs/cluster
Module name: cluster
Changes by: lhh at sourceware.org 2006-06-13 19:22:38
Added files:
rgmanager/src/clulib: fdops.c lock.c lockspace.c members.c
message.c
Removed files:
rgmanager/src/daemons: members.c
Log message:
Include missing .c files in src/clulib; remove defunct src/daemons/members.c
Patches:
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/rgmanager/src/clulib/fdops.c.diff?cvsroot=cluster&r1=NONE&r2=1.1
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/rgmanager/src/clulib/lock.c.diff?cvsroot=cluster&r1=NONE&r2=1.1
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/rgmanager/src/clulib/lockspace.c.diff?cvsroot=cluster&r1=NONE&r2=1.1
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/rgmanager/src/clulib/members.c.diff?cvsroot=cluster&r1=NONE&r2=1.1
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/rgmanager/src/clulib/message.c.diff?cvsroot=cluster&r1=NONE&r2=1.1
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/rgmanager/src/daemons/members.c.diff?cvsroot=cluster&r1=1.4&r2=NONE
/cvs/cluster/cluster/rgmanager/src/clulib/fdops.c,v --> standard output
revision 1.1
--- cluster/rgmanager/src/clulib/fdops.c
+++ - 2006-06-13 19:22:38.586032000 +0000
@@ -0,0 +1,193 @@
+/*
+ Copyright Red Hat, Inc. 2002-2003
+
+ This program is free software; you can redistribute it and/or modify it
+ under the terms of the GNU General Public License as published by the
+ Free Software Foundation; either version 2, or (at your option) any
+ later version.
+
+ This program is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; see the file COPYING. If not, write to the
+ Free Software Foundation, Inc., 675 Mass Ave, Cambridge,
+ MA 02139, USA.
+*/
+/** @file
+ * Wrapper functions around read/write/select to retry in the event
+ * of interrupts.
+ */
+#include <unistd.h>
+#include <sys/types.h>
+#include <fcntl.h>
+#include <errno.h>
+
+/**
+ * This is a wrapper around select which will retry in the case we receive
+ * EINTR. This is necessary for _read_retry, since it wouldn't make sense
+ * to have _read_retry terminate if and only if two EINTRs were received
+ * in a row - one during the read() call, one during the select call...
+ *
+ * See select(2) for description of parameters.
+ */
+int
+_select_retry(int fdmax, fd_set * rfds, fd_set * wfds, fd_set * xfds,
+ struct timeval *timeout)
+{
+ int rv;
+
+ while (1) {
+ rv = select(fdmax, rfds, wfds, xfds, timeout);
+ if ((rv == -1) && (errno == EINTR))
+ /* return on EBADF/EINVAL/ENOMEM; continue on EINTR */
+ continue;
+ return rv;
+ }
+}
+
+/**
+ * Retries a write in the event of a non-blocked interrupt signal.
+ *
+ * @param fd File descriptor to which we are writing.
+ * @param buf Data buffer to send.
+ * @param count Number of bytes in buf to send.
+ * @param timeout (struct timeval) telling us how long we should retry.
+ * @return The number of bytes written to the file descriptor,
+ * or -1 on error (with errno set appropriately).
+ */
+ssize_t
+_write_retry(int fd, void *buf, int count, struct timeval * timeout)
+{
+ int n, total = 0, remain = count, rv = 0;
+ fd_set wfds, xfds;
+
+ while (total < count) {
+
+ /* Create the write FD set of 1... */
+ FD_ZERO(&wfds);
+ FD_SET(fd, &wfds);
+ FD_ZERO(&xfds);
+ FD_SET(fd, &xfds);
+
+ /* wait for the fd to be available for writing */
+ rv = _select_retry(fd + 1, NULL, &wfds, &xfds, timeout);
+ if (rv == -1)
+ return -1;
+ else if (rv == 0) {
+ errno = ETIMEDOUT;
+ return -1;
+ }
+
+ if (FD_ISSET(fd, &xfds)) {
+ errno = EPIPE;
+ return -1;
+ }
+
+ /*
+ * Attempt to write to fd
+ */
+ n = write(fd, buf + (off_t) total, remain);
+
+ /*
+ * When we know our fd was select()ed and we receive 0 bytes
+ * when we write, the fd was closed.
+ */
+ if ((n == 0) && (rv == 1)) {
+ errno = EPIPE;
+ return -1;
+ }
+
+ if (n == -1) {
+ if ((errno == EAGAIN) || (errno == EINTR)) {
+ /*
+ * Not ready?
+ */
+ continue;
+ }
+
+ /* Other errors: EIO, EINVAL, etc */
+ return -1;
+ }
+
+ total += n;
+ remain -= n;
+ }
+
+ return total;
+}
+
+/**
+ * Retry reads until we (a) time out or (b) get our data. Of course, if
+ * timeout is NULL, it'll wait forever.
+ *
+ * @param sockfd File descriptor we want to read from.
+ * @param buf Preallocated buffer into which we will read data.
+ * @param count Number of bytes to read.
+ * @param timeout (struct timeval) describing how long we should retry.
+ * @return The number of bytes read on success, or -1 on failure.
+ Note that we will always return (count) or (-1).
+ */
+ssize_t
+_read_retry(int sockfd, void *buf, int count, struct timeval * timeout)
+{
+ int n, total = 0, remain = count, rv = 0;
+ fd_set rfds, xfds;
+
+ while (total < count) {
+ FD_ZERO(&rfds);
+ FD_SET(sockfd, &rfds);
+ FD_ZERO(&xfds);
+ FD_SET(sockfd, &xfds);
+
+ /*
+ * Select on the socket, in case it closes while we're not
+ * looking...
+ */
+ rv = _select_retry(sockfd + 1, &rfds, NULL, &xfds, timeout);
+ if (rv == -1)
+ return -1;
+ else if (rv == 0) {
+ errno = ETIMEDOUT;
+ return -1;
+ }
+
+ if (FD_ISSET(sockfd, &xfds)) {
+ errno = EPIPE;
+ return -1;
+ }
+
+ /*
+ * Attempt to read off the socket
+ */
+ n = read(sockfd, buf + (off_t) total, remain);
+
+ /*
+ * When we know our socket was select()ed and we receive 0 bytes
+ * when we read, the socket was closed.
+ */
+ if ((n == 0) && (rv == 1)) {
+ errno = EPIPE;
+ return -1;
+ }
+
+ if (n == -1) {
+ if ((errno == EAGAIN) || (errno == EINTR)) {
+ /*
+ * Not ready? Wait for data to become available
+ */
+ continue;
+ }
+
+ /* Other errors: EPIPE, EINVAL, etc */
+ return -1;
+ }
+
+ total += n;
+ remain -= n;
+ }
+
+ return total;
+}
/cvs/cluster/cluster/rgmanager/src/clulib/lock.c,v --> standard output
revision 1.1
--- cluster/rgmanager/src/clulib/lock.c
+++ - 2006-06-13 19:22:38.669122000 +0000
@@ -0,0 +1,153 @@
+/*
+ Copyright Red Hat, Inc. 2004-2006
+
+ The Magma Cluster API Library is free software; you can redistribute
+ it and/or modify it under the terms of the GNU Lesser General Public
+ License as published by the Free Software Foundation; either version
+ 2.1 of the License, or (at your option) any later version.
+
+ The Magma Cluster API Library is distributed in the hope that it will
+ be useful, but WITHOUT ANY WARRANTY; without even the implied warranty
+ of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ Lesser General Public License for more details.
+
+ You should have received a copy of the GNU Lesser General Public
+ License along with this library; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
+ USA.
+ */
+/** @file
+ * Locking.
+ */
+#include <errno.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <string.h>
+#include <stdio.h>
+#include <assert.h>
+#include <sys/ioctl.h>
+#include <lock.h>
+#include <sys/types.h>
+#include <sys/select.h>
+#include <pthread.h>
+
+
+static void
+ast_function(void * __attribute__ ((unused)) arg)
+{
+}
+
+
+static int
+wait_for_dlm_event(dlm_lshandle_t *ls)
+{
+ fd_set rfds;
+ int fd = dlm_ls_get_fd(ls);
+
+ FD_ZERO(&rfds);
+ FD_SET(fd, &rfds);
+
+ if (select(fd + 1, &rfds, NULL, NULL, NULL) == 1)
+ return dlm_dispatch(fd);
+
+ return -1;
+}
+
+
+int
+clu_lock(dlm_lshandle_t ls,
+ int mode,
+ struct dlm_lksb *lksb,
+ int options,
+ char *resource)
+{
+ int ret;
+
+ if (!ls || !lksb || !resource || !strlen(resource)) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ ret = dlm_ls_lock(ls, mode, lksb, options, resource,
+ strlen(resource), 0, ast_function, lksb,
+ NULL, NULL);
+
+ if (ret < 0) {
+ if (errno == ENOENT)
+ assert(0);
+
+ return -1;
+ }
+
+ if ((ret = (wait_for_dlm_event(ls) < 0))) {
+ fprintf(stderr, "wait_for_dlm_event: %d / %d\n",
+ ret, errno);
+ return -1;
+ }
+
+ return 0;
+}
+
+
+
+dlm_lshandle_t
+clu_acquire_lockspace(const char *lsname)
+{
+ dlm_lshandle_t ls = NULL;
+
+ while (!ls) {
+ ls = dlm_open_lockspace(lsname);
+ if (ls)
+ break;
+
+ ls = dlm_create_lockspace(lsname, 0644);
+ if (ls)
+ break;
+
+ /* Work around race: Someone was closing lockspace as
+ we were trying to open it. Retry. */
+ if (errno == ENOENT)
+ continue;
+
+ fprintf(stderr, "failed acquiring lockspace: %s\n",
+ strerror(errno));
+
+ return NULL;
+ }
+
+ return ls;
+}
+
+
+
+int
+clu_unlock(dlm_lshandle_t ls, struct dlm_lksb *lksb)
+{
+ int ret;
+
+ if (!ls || !lksb) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ ret = dlm_ls_unlock(ls, lksb->sb_lkid, 0, lksb, NULL);
+
+ if (ret != 0)
+ return ret;
+
+ /* lksb->sb_status should be EINPROG at this point */
+
+ if (wait_for_dlm_event(ls) < 0) {
+ errno = lksb->sb_status;
+ return -1;
+ }
+
+ return 0;
+}
+
+
+int
+clu_release_lockspace(dlm_lshandle_t ls, char *name)
+{
+ return dlm_release_lockspace(name, ls, 0);
+}
/cvs/cluster/cluster/rgmanager/src/clulib/lockspace.c,v --> standard output
revision 1.1
--- cluster/rgmanager/src/clulib/lockspace.c
+++ - 2006-06-13 19:22:38.753795000 +0000
@@ -0,0 +1,75 @@
+#include <stdlib.h>
+#include <unistd.h>
+#include <string.h>
+#include <stdio.h>
+#include <assert.h>
+#include <sys/ioctl.h>
+#include <sys/select.h>
+#include <lock.h>
+#include <pthread.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <clulog.h>
+#include <signal.h>
+#include <gettid.h>
+#include <libdlm.h>
+#include <errno.h>
+
+
+#define RGMGR_LOCKSPACE "rgmanager"
+
+static pthread_mutex_t _default_lock = PTHREAD_MUTEX_INITIALIZER;
+static char _init = 0;
+static pid_t _holder_id = 0;
+static dlm_lshandle_t _default_ls;
+
+
+static int
+_init_lockspace(void)
+{
+ _default_ls = clu_acquire_lockspace(RGMGR_LOCKSPACE);
+ if (!_default_ls) {
+ return -1;
+ }
+ _init = 1;
+ return 0;
+}
+
+
+dlm_lshandle_t
+ls_hold_default(void)
+{
+ pthread_mutex_lock(&_default_lock);
+ if (!_init && (_init_lockspace() < 0)) {
+ pthread_mutex_unlock(&_default_lock);
+ errno = ENOLCK;
+ return NULL;
+ }
+
+ if (_holder_id != 0) {
+ pthread_mutex_unlock(&_default_lock);
+ errno = EAGAIN;
+ return NULL;
+ }
+
+ _holder_id = gettid();
+ pthread_mutex_unlock(&_default_lock);
+ return _default_ls;
+}
+
+
+void
+ls_release_default(void)
+{
+ pthread_mutex_lock(&_default_lock);
+ if (_holder_id != gettid()) {
+ clulog(LOG_ERR, "Attempt to release lockspace when I am not"
+ "the holder!\n");
+ raise(SIGSTOP);
+ }
+
+ _holder_id = 0;
+ pthread_mutex_unlock(&_default_lock);
+}
+
+
/cvs/cluster/cluster/rgmanager/src/clulib/members.c,v --> standard output
revision 1.1
--- cluster/rgmanager/src/clulib/members.c
+++ - 2006-06-13 19:22:38.840838000 +0000
@@ -0,0 +1,397 @@
+#include <sys/types.h>
+#include <arpa/inet.h>
+#include <stdint.h>
+#include <malloc.h>
+#include <libcman.h>
+#include <stdint.h>
+#include <stdio.h>
+#include <members.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <string.h>
+#include <sys/socket.h>
+#include <rg_types.h>
+#include <pthread.h>
+
+static int my_node_id = -1;
+static pthread_rwlock_t memblock = PTHREAD_RWLOCK_INITIALIZER;
+static cluster_member_list_t *membership = NULL;
+
+
+/**
+ Return the stored node ID. Since this should never
+ change during the duration of running rgmanager, it is
+ not protected by a lock.
+ */
+int
+my_id(void)
+{
+ return my_node_id;
+}
+
+
+int
+set_my_id(int id)
+{
+ my_node_id = id;
+ return 0;
+}
+
+
+/**
+ Determine and store the local node ID. This should
+ only ever be called once during initialization.
+ */
+int
+get_my_nodeid(cman_handle_t h)
+{
+ cman_node_t node;
+
+ if (cman_get_node(h, CMAN_NODEID_US, &node) != 0)
+ return -1;
+
+ return node.cn_nodeid;
+}
+
+
+
+/**
+ Generate and return a list of members which are now online in a new
+ membership list, given the old membership list. User must call
+ @ref free_member_list
+ to free the returned
+ @ref cluster_member_list_t
+ structure.
+
+ @param old Old membership list
+ @param new New membership list
+ @return NULL if no members were gained, or a newly
+ allocated cluster_member_list_t structure.
+ */
+cluster_member_list_t *
+memb_gained(cluster_member_list_t *old, cluster_member_list_t *new)
+{
+ int count, x, y;
+ char in_old = 0;
+ cluster_member_list_t *gained;
+
+ /* No nodes in new? Then nothing could have been gained */
+ if (!new || !new->cml_count)
+ return NULL;
+
+ /* Nothing in old? Duplicate 'new' and return it. */
+ if (!old || !old->cml_count) {
+ gained = cml_alloc(new->cml_count);
+ if (!gained)
+ return NULL;
+ memcpy(gained, new, cml_size(new->cml_count));
+ return gained;
+ }
+
+ /* Use greatest possible count */
+ count = (old->cml_count > new->cml_count ?
+ cml_size(old->cml_count) : cml_size(new->cml_count));
+
+ gained = malloc(count);
+ if (!gained)
+ return NULL;
+ memset(gained, 0, count);
+
+ for (x = 0; x < new->cml_count; x++) {
+
+ /* This one isn't active at the moment; it could not have
+ been gained. */
+ if (!new->cml_members[x].cn_member)
+ continue;
+
+ in_old = 0;
+ for (y = 0; y < old->cml_count; y++) {
+ if ((new->cml_members[x].cn_nodeid !=
+ old->cml_members[y].cn_nodeid) ||
+ !old->cml_members[y].cn_member)
+ continue;
+ in_old = 1;
+ break;
+ }
+
+ if (in_old)
+ continue;
+ memcpy(&gained->cml_members[gained->cml_count],
+ &new->cml_members[x], sizeof(cman_node_t));
+ }
+
+ if (gained->cml_count == 0) {
+ free(gained);
+ gained = NULL;
+ }
+
+ return gained;
+}
+
+
+/**
+ Generate and return a list of members which are lost or no longer online
+ in a new membership list, given the old membership list. User must call
+ @ref free_member_list
+ to free the returned
+ @ref cluster_member_list_t
+ structure.
+
+ @param old Old membership list
+ @param new New membership list
+ @return NULL if no members were lost, or a newly
+ allocated cluster_member_list_t structure.
+ */
+cluster_member_list_t *
+memb_lost(cluster_member_list_t *old, cluster_member_list_t *new)
+{
+ cluster_member_list_t *ret;
+ int x;
+
+ /* Reverse. ;) */
+ ret = memb_gained(new, old);
+ if (!ret)
+ return NULL;
+
+ for (x = 0; x < ret->cml_count; x++) {
+ ret->cml_members[x].cn_member = 0;
+ }
+
+ return ret;
+}
+
+
+
+void
+member_list_update(cluster_member_list_t *new_ml)
+{
+ pthread_rwlock_wrlock(&memblock);
+ if (membership)
+ free_member_list(membership);
+ if (new_ml)
+ membership = member_list_dup(new_ml);
+ else
+ membership = NULL;
+ pthread_rwlock_unlock(&memblock);
+}
+
+
+cluster_member_list_t *
+member_list(void)
+{
+ cluster_member_list_t *ret = NULL;
+ pthread_rwlock_rdlock(&memblock);
+ if (membership)
+ ret = member_list_dup(membership);
+ pthread_rwlock_unlock(&memblock);
+ return ret;
+}
+
+
+char *
+member_name(uint64_t id, char *buf, int buflen)
+{
+ char *n;
+
+ if (!buf || !buflen)
+ return NULL;
+
+ pthread_rwlock_rdlock(&memblock);
+ n = memb_id_to_name(membership, id);
+ if (n) {
+ strncpy(buf, n, buflen);
+ } else {
+ buf[0] = 0;
+ }
+ pthread_rwlock_unlock(&memblock);
+ return buf;
+}
+
+
+
+cluster_member_list_t *
+get_member_list(cman_handle_t h)
+{
+ int c;
+ cluster_member_list_t *ml = NULL;
+ cman_node_t *nodes = NULL;
+
+ do {
+ if (nodes)
+ free(nodes);
+
+ c = cman_get_node_count(h);
+ if (c <= 0)
+ return NULL;
+
+ if (!ml)
+ ml = malloc(sizeof(*ml));
+ if (!ml)
+ return NULL;
+
+ nodes = malloc(sizeof(*nodes) * c);
+ if (!nodes) {
+ free(ml);
+ return NULL;
+ }
+
+ memset(ml, 0, sizeof(*ml));
+ memset(nodes, 0, sizeof(*nodes)*c);
+
+ cman_get_nodes(h, c, &ml->cml_count, nodes);
+
+ } while (ml->cml_count != c);
+
+ ml->cml_members = nodes;
+ ml->cml_count = c;
+ return ml;
+}
+
+
+void
+free_member_list(cluster_member_list_t *ml)
+{
+ if (ml) {
+ if (ml->cml_members)
+ free(ml->cml_members);
+ free(ml);
+ }
+}
+
+
+int
+memb_online(cluster_member_list_t *ml, int nodeid)
+{
+ int x = 0;
+
+ for (x = 0; x < ml->cml_count; x++) {
+ if (ml->cml_members[x].cn_nodeid == nodeid)
+ return ml->cml_members[x].cn_member;
+ }
+
+ return 0;
+}
+
+
+int
+memb_count(cluster_member_list_t *ml)
+{
+ int x = 0, count = 0;
+
+ for (x = 0; x < ml->cml_count; x++) {
+ if (ml->cml_members[x].cn_member)
+ ++count;
+ }
+
+ return count;
+}
+
+
+int
+memb_mark_down(cluster_member_list_t *ml, int nodeid)
+{
+ int x = 0;
+
+ for (x = 0; x < ml->cml_count; x++) {
+ if (ml->cml_members[x].cn_nodeid == nodeid)
+ ml->cml_members[x].cn_member = 0;
+ }
+
+ return 0;
+}
+
+
+
+char *
+memb_id_to_name(cluster_member_list_t *ml, int nodeid)
+{
+ int x = 0;
+
+ for (x = 0; x < ml->cml_count; x++) {
+ if (ml->cml_members[x].cn_nodeid == nodeid)
+ return ml->cml_members[x].cn_name;
+ }
+
+ return 0;
+}
+
+
+cman_node_t *
+memb_id_to_p(cluster_member_list_t *ml, int nodeid)
+{
+ int x = 0;
+
+ for (x = 0; x < ml->cml_count; x++) {
+ if (ml->cml_members[x].cn_nodeid == nodeid)
+ return &ml->cml_members[x];
+ }
+
+ return 0;
+}
+
+
+int
+memb_online_name(cluster_member_list_t *ml, char *name)
+{
+ int x = 0;
+
+ for (x = 0; x < ml->cml_count; x++) {
+ if (!strcasecmp(ml->cml_members[x].cn_name, name))
+ return ml->cml_members[x].cn_member;
+ }
+
+ return 0;
+}
+
+
+int
+memb_name_to_id(cluster_member_list_t *ml, char *name)
+{
+ int x = 0;
+
+ for (x = 0; x < ml->cml_count; x++) {
+ if (!strcasecmp(ml->cml_members[x].cn_name, name))
+ return ml->cml_members[x].cn_nodeid;
+ }
+
+ return 0;
+}
+
+
+cman_node_t *
+memb_name_to_p(cluster_member_list_t *ml, char *name)
+{
+ int x = 0;
+
+ for (x = 0; x < ml->cml_count; x++) {
+ if (!strcasecmp(ml->cml_members[x].cn_name, name))
+ return &ml->cml_members[x];
+ }
+
+ return 0;
+}
+
+/**
+ Duplicate and return a cluster member list structure, sans the DNS resolution
+ information.
+
+ @param orig List to duplicate.
+ @return NULL if there is nothing to duplicate or duplication
+ fails, or a newly allocated cluster_member_list_t
+ structure.
+ */
+cluster_member_list_t *
+member_list_dup(cluster_member_list_t *orig)
+{
+ cluster_member_list_t *ret = NULL;
+
+ if (!orig)
+ return NULL;
+
+ ret = malloc(cml_size(orig->cml_count));
+ memset(ret, 0, cml_size(orig->cml_count));
+ memcpy(ret, orig, cml_size(orig->cml_count));
+
+ return ret;
+}
+
/cvs/cluster/cluster/rgmanager/src/clulib/message.c,v --> standard output
revision 1.1
--- cluster/rgmanager/src/clulib/message.c
+++ - 2006-06-13 19:22:38.921541000 +0000
@@ -0,0 +1,1357 @@
+#define _MESSAGE_BUILD
+#include <message.h>
+#include <stdio.h>
+#include <pthread.h>
+#include <libcman.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <string.h>
+#include <sys/types.h>
+#include <stdlib.h>
+#include <rg_types.h>
+#include <sys/socket.h>
+#include <sys/stat.h>
+#include <sys/un.h>
+#include <netinet/in.h>
+#include <arpa/inet.h>
+#include <errno.h>
+#include <sys/time.h>
+#include <fdops.h>
+#include <resgroup.h>
+
+
+
+/* Ripped from ccsd's setup_local_socket */
+#define RGMGR_SOCK "/var/run/cluster/rgmanager.sk"
+#define MAX_CONTEXTS 32 /* Testing; production should be 1024-ish */
+
+/* Context 0 is reserved for control messages */
+
+/* Local-ish contexts */
+static pthread_mutex_t context_lock = PTHREAD_MUTEX_INITIALIZER;
+static msgctx_t *contexts[MAX_CONTEXTS];
+static uint32_t context_index = 1;
+static chandle_t *gch;
+pthread_t comms_thread;
+int thread_running;
+
+
+#define is_established(ctx) \
+ (((ctx->type == MSG_CLUSTER) && \
+ (ctx->u.cluster_info.remote_ctx && ctx->u.cluster_info.local_ctx)) || \
+ ((ctx->type == MSG_SOCKET) && \
+ (ctx->u.local_info.sockfd != -1)))
+
+
+static int
+local_connect(void)
+{
+ struct sockaddr_un sun;
+ int sock = -1, error = 0;
+
+ memset(&sun, 0, sizeof(sun));
+ sun.sun_family = PF_LOCAL;
+ snprintf(sun.sun_path, sizeof(sun.sun_path), RGMGR_SOCK);
+
+ sock = socket(PF_LOCAL, SOCK_STREAM, 0);
+ if (sock < 0) {
+ error = errno;
+ goto fail;
+ }
+
+ error = connect(sock, (struct sockaddr *)&sun, sizeof(sun));
+ if (error < 0) {
+ error = errno;
+ goto fail;
+ }
+
+ sock = error;
+fail:
+
+ return sock;
+}
+
+
+static int
+send_cluster_message(msgctx_t *ctx, void *msg, size_t len)
+{
+ char buf[4096];
+ cluster_msg_hdr_t *h = (void *)buf;
+ int ret;
+ char *msgptr = (buf + sizeof(*h));
+
+ if ((len + sizeof(*h)) > sizeof(buf)) {
+ errno = E2BIG;
+ return -1;
+ }
+
+ h->msg_control = M_DATA;
+ h->src_ctx = ctx->u.cluster_info.local_ctx;
+ h->dest_ctx = ctx->u.cluster_info.remote_ctx;
+ h->msg_port = ctx->u.cluster_info.port;
+ memcpy(msgptr, msg, len);
+
+ /*
+ printf("sending cluster message, length = %d to nodeid %d port %d\n",
+ len + sizeof(*h), ctx->u.cluster_info.nodeid, ctx->u.cluster_info.port);
+ */
+
+ pthread_mutex_lock(&gch->c_lock);
+ h->src_nodeid = gch->c_nodeid;
+
+ swab_cluster_msg_hdr_t(h);
+
+ ret = cman_send_data(gch->c_cluster, (void *)h, len + sizeof(*h),
+ ctx->u.cluster_info.nodeid,
+ ctx->u.cluster_info.port, 0);
+
+ pthread_mutex_unlock(&gch->c_lock);
+
+ return len + sizeof(h);
+}
+
+
+/**
+ Wrapper around write(2)
+ */
+static int
+send_socket_message(msgctx_t *ctx, void *msg, size_t len)
+{
+ char buf[4096];
+ local_msg_hdr_t *h = (local_msg_hdr_t *)buf;
+ char *msgptr = (buf + sizeof(*h));
+
+ /* encapsulate ... ? */
+ if ((len + sizeof(*h)) > sizeof(buf)) {
+ errno = E2BIG;
+ return -1;
+ }
+
+ h->msg_control = M_DATA;
+ h->msg_len = len;
+ memcpy(msgptr, msg, len);
+
+ return _write_retry(ctx->u.local_info.sockfd, msg, len + sizeof(*h), NULL);
+}
+
+
+/**
+ Message sending API. Sends to the cluster or a socket, depending on
+ the context.
+ */
+int
+msg_send(msgctx_t *ctx, void *msg, size_t len)
+{
+ if (!ctx || !msg || !len) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ switch(ctx->type) {
+ case MSG_CLUSTER:
+ return send_cluster_message(ctx, msg, len);
+ case MSG_SOCKET:
+ return send_socket_message(ctx, msg, len);
+ default:
+ break;
+ }
+
+ errno = EINVAL;
+ return -1;
+}
+
+
+/**
+ Assign a (free) cluster context ID
+ */
+static int
+assign_ctx(msgctx_t *ctx)
+{
+ int start;
+
+ /* Assign context index */
+ ctx->type = MSG_CLUSTER;
+
+ pthread_mutex_lock(&context_lock);
+ start = context_index++;
+ if (context_index >= MAX_CONTEXTS || context_index <= 0)
+ context_index = 1;
+ do {
+ if (contexts[context_index]) {
+ ++context_index;
+ if (context_index >= MAX_CONTEXTS)
+ context_index = 1;
+
+ if (context_index == start) {
+ pthread_mutex_unlock(&context_lock);
+ errno = EAGAIN;
+ return -1;
+ }
+
+ continue;
+ }
+
+ contexts[context_index] = ctx;
+ ctx->u.cluster_info.local_ctx = context_index;
+
+ } while (0);
+ pthread_mutex_unlock(&context_lock);
+
+ return 0;
+}
+
+
+/* See if anything's on the cluster socket. If so, dispatch it
+ on to the requisite queues
+ XXX should be passed a connection arg! */
+static int
+poll_cluster_messages(int timeout)
+{
+ int ret = -1;
+ fd_set rfds;
+ int fd;
+ struct timeval tv;
+ struct timeval *p = NULL;
+
+ if (timeout >= 0) {
+ p = &tv;
+ tv.tv_sec = tv.tv_usec = timeout;
+ }
+ printf("%s\n", __FUNCTION__);
+
+ FD_ZERO(&rfds);
+
+ //pthread_mutex_lock(&gch->c_lock);
+ fd = cman_get_fd(gch->c_cluster);
+ FD_SET(fd, &rfds);
+
+ if (select(fd + 1, &rfds, NULL, NULL, p) == 1) {
+ cman_dispatch(gch->c_cluster, 0);
+ ret = 0;
+ }
+ //pthread_mutex_unlock(&gch->c_lock);
+
+ return ret;
+}
+
+
+/**
+ This is used to establish and tear down pseudo-private
+ contexts which are shared with the cluster context.
+ */
+static int
+cluster_send_control_msg(msgctx_t *ctx, int type)
+{
+ cluster_msg_hdr_t cm;
+
+ cm.msg_control = (uint8_t)type;
+ cm.src_nodeid = gch->c_nodeid;
+ cm.dest_ctx = ctx->u.cluster_info.remote_ctx;
+ cm.src_ctx = ctx->u.cluster_info.local_ctx;
+ cm.msg_port = ctx->u.cluster_info.port;
+
+ swab_cluster_msg_hdr_t(&cm);
+
+ return (cman_send_data(gch->c_cluster, (void *)&cm, sizeof(cm),
+ ctx->u.cluster_info.nodeid,
+ ctx->u.cluster_info.port, 0));
+}
+
+
+/**
+ Wait for a message on a context.
+ */
+static int
+cluster_msg_wait(msgctx_t *ctx, int timeout)
+{
+ struct timespec ts = {0, 0};
+ int req = M_NONE;
+ struct timeval start;
+ struct timeval now;
+
+
+ if (timeout > 0)
+ gettimeofday(&start, NULL);
+
+ ts.tv_sec = !!timeout;
+
+ pthread_mutex_lock(&ctx->u.cluster_info.mutex);
+ while (1) {
+ /* See if we dispatched any messages on to our queue */
+ if (ctx->u.cluster_info.queue) {
+ req = ctx->u.cluster_info.queue->message->msg_control;
+ /*printf("Queue not empty CTX%d : %d\n",
+ ctx->u.cluster_info.local_ctx, req);*/
+ break;
+ }
+
+ if (timeout == 0)
+ break;
+
+ /* Ok, someone else has the mutex on our FD. Go to
+ sleep on a cond; maybe they'll wake us up */
+ if (pthread_cond_timedwait(&ctx->u.cluster_info.cond,
+ &ctx->u.cluster_info.mutex,
+ &ts) < 0) {
+
+ /* Mutex held */
+ if (errno == ETIMEDOUT) {
+ if (timeout < 0) {
+ ts.tv_sec = 1;
+ ts.tv_nsec = 0;
+ continue;
+ }
+
+ ts.tv_sec = !!timeout;
+
+ /* Done */
+ break;
+ }
+ }
+
+ if (timeout > 0) {
+ gettimeofday(&now, NULL);
+ /* XXX imprecise */
+ if (now.tv_sec - start.tv_sec > timeout)
+ break;
+ }
+ }
+ pthread_mutex_unlock(&ctx->u.cluster_info.mutex);
+
+ return req;
+}
+
+
+static int
+peekypeeky(int fd)
+{
+ local_msg_hdr_t h;
+ int ret;
+
+ while ((ret = recv(fd, (void *)&h, sizeof(h), MSG_PEEK)) < 0) {
+ if (errno == EINTR)
+ continue;
+ return -1;
+ }
+
+ if (ret == sizeof(h))
+ return h.msg_control;
+
+ if (ret == 0)
+ /* Socket closed? */
+ return M_CLOSE;
+
+ /* XXX */
+ printf("PROTOCOL ERROR: Invalid message\n");
+ return M_CLOSE;
+}
+
+
+static int
+local_msg_wait(msgctx_t *ctx, int timeout)
+{
+ fd_set rfds;
+ struct timeval tv = {0, 0};
+ struct timeval *p = NULL;
+
+ if (timeout >= 0) {
+ tv.tv_sec = timeout;
+ p = &tv;
+ }
+
+ FD_ZERO(&rfds);
+ FD_SET(ctx->u.local_info.sockfd, &rfds);
+
+ if (_select_retry(ctx->u.local_info.sockfd + 1, &rfds,
+ NULL, NULL, p) == 1) {
+ return peekypeeky(ctx->u.local_info.sockfd);
+ }
+
+ return M_NONE;
+}
+
+
+int
+msg_get_nodeid(msgctx_t *ctx)
+{
+ switch(ctx->type) {
+ case MSG_CLUSTER:
+ return ctx->u.cluster_info.nodeid;
+ case MSG_SOCKET:
+ return 0;
+ default:
+ break;
+ }
+
+ return -1;
+}
+
+
+int
+msg_fd_set(msgctx_t *ctx, fd_set *fds, int *max)
+{
+ int e;
+ switch(ctx->type) {
+ case MSG_CLUSTER:
+ pthread_mutex_lock(&ctx->u.cluster_info.mutex);
+ if (ctx->u.cluster_info.select_pipe[0] < 0) {
+ if (pipe(ctx->u.cluster_info.select_pipe) < 0) {
+ e = errno;
+ pthread_mutex_unlock(&ctx->u.cluster_info.mutex);
+ errno = e;
+ return -1;
+ }
+
+ printf("%s: Created cluster CTX select pipe "
+ "rd=%d wr=%d\n", __FUNCTION__,
+ ctx->u.cluster_info.select_pipe[0],
+ ctx->u.cluster_info.select_pipe[1]);
+
+ }
+
+ e = ctx->u.cluster_info.select_pipe[0];
+ printf("%s: cluster %d\n", __FUNCTION__, e);
+ FD_SET(e, fds);
+ pthread_mutex_unlock(&ctx->u.cluster_info.mutex);
+
+ if (e > *max)
+ *max = e;
+ return 0;
+
+ case MSG_SOCKET:
+ if (ctx->u.local_info.sockfd >= 0) {
+ printf("%s: local %d\n", __FUNCTION__,
+ ctx->u.local_info.sockfd);
+ FD_SET(ctx->u.local_info.sockfd, fds);
+
+ if (ctx->u.local_info.sockfd > *max)
+ *max = ctx->u.local_info.sockfd;
+ return 0;
+ }
+ return -1;
+ default:
+ break;
+ }
+
+ return -1;
+}
+
+
+int
+msg_fd_isset(msgctx_t *ctx, fd_set *fds)
+{
+ errno = EINVAL;
+
+ if (!fds || !ctx)
+ return -1;
+
+ switch(ctx->type) {
+ case MSG_CLUSTER:
+ pthread_mutex_lock(&ctx->u.cluster_info.mutex);
+ if (ctx->u.cluster_info.select_pipe[0] >= 0 &&
+ FD_ISSET(ctx->u.cluster_info.select_pipe[0], fds)) {
+ pthread_mutex_unlock(&ctx->u.cluster_info.mutex);
+ return 1;
+ }
+ pthread_mutex_unlock(&ctx->u.cluster_info.mutex);
+ return 0;
+ case MSG_SOCKET:
+ if (ctx->u.local_info.sockfd >= 0 &&
+ FD_ISSET(ctx->u.local_info.sockfd, fds)) {
+ return 1;
+ }
+ return 0;
+ default:
+ break;
+ }
+
+ return -1;
+}
+
+
+int
+msg_fd_clr(msgctx_t *ctx, fd_set *fds)
+{
+ errno = EINVAL;
+
+ if (!fds || !ctx)
+ return -1;
+
+ switch(ctx->type) {
+ case MSG_CLUSTER:
+ pthread_mutex_lock(&ctx->u.cluster_info.mutex);
+ if (ctx->u.cluster_info.select_pipe[0] >= 0) {
+ FD_CLR(ctx->u.cluster_info.select_pipe[0], fds);
+ pthread_mutex_unlock(&ctx->u.cluster_info.mutex);
+ return 1;
+ }
+ pthread_mutex_unlock(&ctx->u.cluster_info.mutex);
+ return 0;
+ case MSG_SOCKET:
+ if (ctx->u.local_info.sockfd >= 0) {
+ FD_CLR(ctx->u.local_info.sockfd, fds);
+ return 1;
+ }
+ return 0;
+ default:
+ break;
+ }
+
+ return -1;
+}
+
+
+/**
+ This polls the context for 'timeout' seconds waiting for data
+ to become available. Return codes are M_DATA, M_CLOSE, and M_OPEN
+
+ M_DATA - data available
+ M_OPEN - needs msg_accept(
+ M_CLOSE - context / socket closed by remote host
+ M_NONE - nothing available
+
+ For the cluster connection, the return code could also map to one of
+ the CMAN return codes
+
+ M_STATECHANGE - node has changed state
+
+ */
+int
+msg_wait(msgctx_t *ctx, int timeout)
+{
+
+ if (!ctx) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ switch(ctx->type) {
+ case MSG_CLUSTER:
+ return cluster_msg_wait(ctx, timeout);
+ case MSG_SOCKET:
+ return local_msg_wait(ctx, timeout);
+ default:
+ break;
+ }
+
+ errno = EINVAL;
+ return -1;
+}
+
+
+int
+_cluster_msg_receive(msgctx_t *ctx, void **msg, size_t *len)
+{
+ cluster_msg_hdr_t *m;
+ msg_q_t *n;
+ int ret = 0;
+
+ if (msg)
+ *msg = NULL;
+ if (len)
+ *len = 0;
+
+ if (ctx->u.cluster_info.local_ctx < 0 ||
+ ctx->u.cluster_info.local_ctx >= MAX_CONTEXTS) {
+ errno = EBADF;
+ return -1;
+ }
+
+ /* trigger receive here */
+ pthread_mutex_lock(&ctx->u.cluster_info.mutex);
+
+ n = ctx->u.cluster_info.queue;
+ if (n == NULL) {
+ pthread_mutex_unlock(&ctx->u.cluster_info.mutex);
+ errno = EAGAIN;
+ return -1;
+ }
+
+ list_remove(&ctx->u.cluster_info.queue, n);
+
+ pthread_mutex_unlock(&ctx->u.cluster_info.mutex);
+
+ m = n->message;
+ switch(m->msg_control) {
+ case M_CLOSE:
+ ctx->u.cluster_info.remote_ctx = 0;
+ break;
+ case M_OPEN_ACK:
+ /* Response to new connection */
+ ctx->u.cluster_info.remote_ctx = m->src_ctx;
+ break;
+ case M_DATA:
+ /* Kill the message control structure */
+ memmove(m, &m[1], n->len - sizeof(*m));
+ if (msg)
+ *msg = (void *)m;
+ else {
+ printf("Warning: dropping data message\n");
+ free(m);
+ }
+ if (len)
+ *len = (n->len - sizeof(*m));
+ ret = (n->len - sizeof(*m));
+ free(n);
+
+ printf("Message received\n");
+ return ret;
+ case M_OPEN:
+ /* Someone is trying to open a connection */
+ default:
+ /* ?!! */
+ ret = -1;
+ break;
+ }
+
+ free(m);
+ free(n);
+
+ return ret;
+}
+
+
+/**
+ Receive a message from a cluster-context. This copies out the contents
+ into the user-specified buffer, and does random other things.
+ */
+static int
+cluster_msg_receive(msgctx_t *ctx, void *msg, size_t maxlen, int timeout)
+{
+ int req;
+ void *priv_msg;
+ size_t priv_len;
+
+ if (!msg || !maxlen) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ req = cluster_msg_wait(ctx, timeout);
+
+ switch (req) {
+ case M_DATA:
+ /* Copy out. */
+ req = _cluster_msg_receive(ctx, &priv_msg, &priv_len);
+ if (req < 0) {
+ printf("Ruh roh!\n");
+ return -1;
+ }
+
+ priv_len = (priv_len < maxlen ? priv_len : maxlen);
+
+ memcpy(msg, priv_msg, priv_len);
+ free(priv_msg);
+ return req;
+ case M_CLOSE:
+ errno = ECONNRESET;
+ return -1;
+ case 0:
+ /*printf("Nothing on queue\n");*/
+ return 0;
+ default:
+ printf("PROTOCOL ERROR: Received %d\n", req);
+ return -1;
+ }
+
+ printf("%s: CODE PATH ERROR\n", __FUNCTION__);
+ return -1;
+}
+
+
+static int
+_local_msg_receive(msgctx_t *ctx, void *msg, size_t maxlen, int timeout)
+{
+ struct timeval tv = {0, 0};
+ struct timeval *p = NULL;
+ local_msg_hdr_t h;
+
+ if (timeout >= 0) {
+ tv.tv_sec = timeout;
+ p = &tv;
+ }
+
+ if (_read_retry(ctx->u.local_info.sockfd, &h, sizeof(h), p) < 0)
+ return -1;
+
+ if (maxlen < h.msg_len) {
+ printf("WARNING: Buffer too small for message!\n");
+ h.msg_len = maxlen;
+ }
+
+ return _read_retry(ctx->u.local_info.sockfd, msg, h.msg_len, p);
+}
+
+
+/**
+ Receive a message from a cluster-context. This copies out the contents
+ into the user-specified buffer, and does random other things.
+ */
+static int
+local_msg_receive(msgctx_t *ctx, void *msg, size_t maxlen, int timeout)
+{
+ int req;
+ char priv_msg[4096];
+ size_t priv_len;
+
+ if (!msg || !maxlen) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ switch (req) {
+ case M_DATA:
+ /* Copy out. */
+ req = _local_msg_receive(ctx, priv_msg, priv_len, timeout);
+ if (req <= 0)
+ return -1;
+
+ priv_len = (priv_len < maxlen ? priv_len : maxlen);
+
+ memcpy(msg, priv_msg, priv_len);
+ free(msg);
+ return req;
+ case M_CLOSE:
+ errno = ECONNRESET;
+ return -1;
+ case 0:
+ /*printf("Nothing on queue\n");*/
+ return 0;
+ default:
+ printf("PROTOCOL ERROR: Received %d\n", req);
+ return -1;
+ }
+
+ printf("%s: CODE PATH ERROR\n", __FUNCTION__);
+ return -1;
+}
+
+
+int
+msg_receive(msgctx_t *ctx, void *msg, size_t maxlen, int timeout)
+{
+ if (!ctx || !msg || !maxlen) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ switch(ctx->type) {
+ case MSG_CLUSTER:
+ return cluster_msg_receive(ctx, msg, maxlen, timeout);
+ case MSG_SOCKET:
+ return local_msg_receive(ctx, msg, maxlen, timeout);
+ default:
+ break;
+ }
+
+ errno = EINVAL;
+ return -1;
+}
+
+
+/**
+ Open a connection to the specified node ID.
+ If the speficied node is 0, this connects via the socket in
+ /var/run/cluster...
+ */
+int
+msg_open(int nodeid, int port, msgctx_t *ctx, int timeout)
+{
+ int t = 0;
+
+ errno = EINVAL;
+ if (!ctx)
+ return -1;
+
+
+ /*printf("Opening pseudo channel to node %d\n", nodeid);*/
+
+ memset(ctx, 0, sizeof(*ctx));
+ if (nodeid == CMAN_NODEID_US) {
+ if ((ctx->u.local_info.sockfd = local_connect()) < 0) {
+ return -1;
+ }
+ ctx->type = MSG_SOCKET;
+ return 0;
+ }
+
+ ctx->type = MSG_CLUSTER;
+ ctx->u.cluster_info.nodeid = nodeid;
+ ctx->u.cluster_info.port = port;
+ ctx->u.cluster_info.local_ctx = -1;
+ ctx->u.cluster_info.remote_ctx = 0;
+ ctx->u.cluster_info.queue = NULL;
+ ctx->u.cluster_info.select_pipe[0] = -1;
+ ctx->u.cluster_info.select_pipe[1] = -1;
+ pthread_mutex_init(&ctx->u.cluster_info.mutex, NULL);
+ pthread_cond_init(&ctx->u.cluster_info.cond, NULL);
+
+ /* Assign context index */
+ if (assign_ctx(ctx) < 0)
+ return -1;
+
+ //printf(" Local CTX: %d\n", ctx->u.cluster_info.local_ctx);
+
+ /* Send open */
+
+ //printf(" Sending control message M_OPEN\n");
+ if (cluster_send_control_msg(ctx, M_OPEN) < 0) {
+ return -1;
+ }
+
+ /* Ok, wait for a response */
+ while (!is_established(ctx)) {
+ ++t;
+ if (t > timeout) {
+ msg_close(ctx);
+ errno = ETIMEDOUT;
+ return -1;
+ }
+
+ switch(msg_wait(ctx, 1)) {
+ case M_OPEN_ACK:
+ _cluster_msg_receive(ctx, NULL, NULL);
+ break;
+ case M_NONE:
+ continue;
+ default:
+ printf("PROTO ERROR: M_OPEN_ACK not received \n");
+ }
+ }
+
+ /*
+ printf(" Remote CTX: %d\n",
+ ctx->u.cluster_info.remote_ctx);
+ printf(" Pseudo channel established!\n");
+ */
+
+
+ return 0;
+}
+
+
+/**
+ Close a connection context (cluster or socket; it doesn't matter)
+ In the case of a cluster context, we need to clear out the
+ receive queue and what-not. This isn't a big deal. Also, we
+ need to tell the other end that we're done -- just in case it does
+ not know yet ;)
+
+ With a socket, the O/S cleans up the buffers for us.
+ */
+int
+msg_close(msgctx_t *ctx)
+{
+ msg_q_t *n = NULL;
+
+ if (!ctx) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ switch (ctx->type) {
+ case MSG_CLUSTER:
+ if (ctx->u.cluster_info.local_ctx >= MAX_CONTEXTS) {
+ errno = EINVAL;
+ return -1;
+ }
+ pthread_mutex_lock(&context_lock);
+ /* Other threads should not be able to see this again */
+ if (contexts[ctx->u.cluster_info.local_ctx])
+ contexts[ctx->u.cluster_info.local_ctx] = NULL;
+ pthread_mutex_unlock(&context_lock);
+ /* Clear receive queue */
+ while ((n = ctx->u.cluster_info.queue) != NULL) {
+ list_remove(&ctx->u.cluster_info.queue, n);
+ free(n->message);
+ free(n);
+ }
+ /* Send close message */
+ if (ctx->u.cluster_info.remote_ctx != 0) {
+ cluster_send_control_msg(ctx, M_CLOSE);
+ }
+
+ /* Close pipe if it's open */
+ if (ctx->u.cluster_info.select_pipe[0] >= 0) {
+ close(ctx->u.cluster_info.select_pipe[0]);
+ ctx->u.cluster_info.select_pipe[0] = -1;
+ }
+ if (ctx->u.cluster_info.select_pipe[1] >= 0) {
+ close(ctx->u.cluster_info.select_pipe[1]);
+ ctx->u.cluster_info.select_pipe[1] = -1;
+ }
+ return 0;
+ case MSG_SOCKET:
+ close(ctx->u.local_info.sockfd);
+ ctx->u.local_info.sockfd = -1;
+ return 0;
+ default:
+ break;
+ }
+
+ errno = EINVAL;
+ return -1;
+}
+
+
+/**
+ Called by cman_dispatch to deal with messages coming across the
+ cluster socket. This function deals with fanning out the requests
+ and putting them on the per-context queues. We don't have
+ the benefits of pre-configured buffers, so we need this.
+ */
+static void
+process_cman_msg(cman_handle_t h, void *priv, char *buf, int len,
+ uint8_t port, int nodeid)
+{
+ cluster_msg_hdr_t *m = (cluster_msg_hdr_t *)buf;
+ msg_q_t *node;
+ msgctx_t *ctx;
+
+ if (len < sizeof(*m)) {
+ printf("Message too short.\n");
+ return;
+ }
+
+ swab_cluster_msg_hdr_t(m);
+
+#if 0
+ printf("Processing ");
+ switch(m->msg_control) {
+ case M_NONE:
+ printf("M_NONE\n");
+ break;
+ case M_OPEN:
+ printf("M_OPEN\n");
+ break;
+ case M_OPEN_ACK:
+ printf("M_OPEN_ACK\n");
+ break;
+ case M_DATA:
+ printf("M_DATA\n");
+ break;
+ case M_CLOSE:
+ printf("M_CLOSE\n");
+ break;
+ }
+
+ printf(" Node ID: %d %d\n", m->src_nodeid, nodeid);
+ printf(" Remote CTX: %d Local CTX: %d\n", m->src_ctx, m->dest_ctx);
+#endif
+
+ if (m->dest_ctx >= MAX_CONTEXTS) {
+ printf("Context invalid; ignoring\n");
+ return;
+ }
+
+ while ((node = malloc(sizeof(*node))) == NULL) {
+ sleep(1);
+ }
+ memset(node, 0, sizeof(*node));
+ while ((node->message = malloc(len)) == NULL) {
+ sleep(1);
+ }
+ memcpy(node->message, buf, len);
+ node->len = len;
+
+ pthread_mutex_lock(&context_lock);
+ ctx = contexts[m->dest_ctx];
+ if (!ctx) {
+ /* We received a close for something we've already
+ detached from our list. No big deal, just
+ ignore. */
+ free(node->message);
+ free(node);
+ pthread_mutex_unlock(&context_lock);
+ return;
+ }
+
+ pthread_mutex_lock(&ctx->u.cluster_info.mutex);
+ list_insert(&ctx->u.cluster_info.queue, node);
+ pthread_mutex_unlock(&ctx->u.cluster_info.mutex);
+ /* If a select pipe was set up, wake it up */
+ if (ctx->u.cluster_info.select_pipe[1] >= 0)
+ write(ctx->u.cluster_info.select_pipe[1], "", 1);
+ pthread_mutex_unlock(&context_lock);
+
+ pthread_cond_signal(&ctx->u.cluster_info.cond);
+}
+
+
+/**
+ Accept a new pseudo-private connection coming in over the
+ cluster socket.
+ */
+static int
+cluster_msg_accept(msgctx_t *listenctx, msgctx_t *acceptctx)
+{
+ errno = EINVAL;
+ cluster_msg_hdr_t *m;
+ msg_q_t *n;
+ char done = 0;
+ char foo;
+
+ if (!listenctx || !acceptctx)
+ return -1;
+ if (listenctx->u.cluster_info.local_ctx != 0)
+ return -1;
+
+ pthread_mutex_lock(&listenctx->u.cluster_info.mutex);
+
+ n = listenctx->u.cluster_info.queue;
+ if (n == NULL) {
+ pthread_mutex_unlock(&listenctx->u.cluster_info.mutex);
+ errno = EAGAIN;
+ return -1;
+ }
+
+ /* the OPEN should be the first thing on the list; this loop
+ is probably not necessary */
+ list_do(&listenctx->u.cluster_info.queue, n) {
+
+ m = n->message;
+ switch(m->msg_control) {
+ case M_OPEN:
+ list_remove(&listenctx->u.cluster_info.queue, n);
+ /*printf("Accepting connection from %d %d\n",
+ m->src_nodeid, m->src_ctx);*/
+
+ /* New connection */
+ pthread_mutex_init(&acceptctx->u.cluster_info.mutex,
+ NULL);
+ pthread_cond_init(&acceptctx->u.cluster_info.cond,
+ NULL);
+ acceptctx->u.cluster_info.queue = NULL;
+ acceptctx->u.cluster_info.remote_ctx = m->src_ctx;
+ acceptctx->u.cluster_info.nodeid = m->src_nodeid;
+ acceptctx->u.cluster_info.port = m->msg_port;
+
+ assign_ctx(acceptctx);
+ cluster_send_control_msg(acceptctx, M_OPEN_ACK);
+
+ if (listenctx->u.cluster_info.select_pipe[0] >= 0) {
+ read(listenctx->u.cluster_info.select_pipe[0],
+ &foo, 1);
+ }
+
+ done = 1;
+ free(m);
+ free(n);
+
+ break;
+ case M_DATA:
+ /* Data messages (i.e. from broadcast msgs) are
+ okay too!... but we don't handle them here */
+ break;
+ default:
+ /* Other message?! */
+ printf("Odd... %d\n", m->msg_control);
+ break;
+ }
+
+ if (done)
+ break;
+
+ } while (!list_done(&listenctx->u.cluster_info.queue, n));
+
+ pthread_mutex_unlock(&listenctx->u.cluster_info.mutex);
+
+ return 0;
+}
+
+
+/* XXX INCOMPLETE */
+int
+msg_accept(msgctx_t *listenctx, msgctx_t *acceptctx)
+{
+ switch(listenctx->type) {
+ case MSG_CLUSTER:
+ return cluster_msg_accept(listenctx, acceptctx);
+ case MSG_SOCKET:
+ return 0;
+ default:
+ break;
+ }
+
+ return -1;
+}
+
+
+static int
+local_listener_sk(void)
+{
+ int sock;
+ struct sockaddr_un su;
+ mode_t om;
+
+ sock = socket(PF_LOCAL, SOCK_STREAM, 0);
+ if (sock < 0)
+ return -1;
+
+ unlink(RGMGR_SOCK);
+ om = umask(077);
+ su.sun_family = PF_LOCAL;
+ snprintf(su.sun_path, sizeof(su.sun_path), RGMGR_SOCK);
+
+ if (bind(sock, &su, sizeof(su)) < 0) {
+ umask(om);
+ goto fail;
+ }
+ umask(om);
+
+ if (listen(sock, SOMAXCONN) < 0)
+ goto fail;
+
+ return sock;
+fail:
+ if (sock >= 0)
+ close(sock);
+ return -1;
+}
+
+
+/**
+ This waits for events on the cluster comms FD and
+ dispatches them using cman_dispatch. Initially,
+ the design had no permanent threads, but that model
+ proved difficult to implement correctly.
+ */
+static void *
+cluster_comms_thread(void *arg)
+{
+ while (thread_running) {
+ poll_cluster_messages(2);
+ }
+
+ return NULL;
+}
+
+
+/*
+ Transliterates a CMAN event to a control message
+ */
+static void
+process_cman_event(cman_handle_t handle, void *private, int reason, int arg)
+{
+ cluster_msg_hdr_t *msg;
+ int *argp;
+ msg_q_t *node;
+ msgctx_t *ctx;
+
+ /* Allocate queue node */
+ while ((node = malloc(sizeof(*node))) == NULL) {
+ sleep(1);
+ }
+ memset(node, 0, sizeof(*node));
+
+ /* Allocate message: header + int (for arg) */
+ while ((msg = malloc(sizeof(int) +
+ sizeof(cluster_msg_hdr_t))) == NULL) {
+ sleep(1);
+ }
+ memset(msg, 0, sizeof(int)+sizeof(cluster_msg_hdr_t));
+
+
+ switch(reason) {
+#if defined(LIBCMAN_VERSION) && LIBCMAN_VERSION >= 2
+ case CMAN_REASON_PORTOPENED:
+ msg->msg_control = M_PORTOPENED;
+ break;
+ case CMAN_REASON_TRY_SHUTDOWN:
+ msg->msg_control = M_TRY_SHUTDOWN;
+ break;
+#endif
+ case CMAN_REASON_PORTCLOSED:
+ msg->msg_control = M_PORTCLOSED;
+ break;
+ case CMAN_REASON_STATECHANGE:
+ msg->msg_control = M_STATECHANGE;
+ break;
+ }
+
+ argp = ((void *)msg + sizeof(cluster_msg_hdr_t));
+ *argp = arg;
+
+ node->len = sizeof(cluster_msg_hdr_t) + sizeof(int);
+ node->message = msg;
+
+ pthread_mutex_lock(&context_lock);
+ ctx = contexts[0]; /* This is the cluster context... */
+ if (!ctx) {
+ /* We received a close for something we've already
+ detached from our list. No big deal, just
+ ignore. */
+ free(node->message);
+ free(node);
+ pthread_mutex_unlock(&context_lock);
+ return;
+ }
+
+ pthread_mutex_lock(&ctx->u.cluster_info.mutex);
+ list_insert(&ctx->u.cluster_info.queue, node);
+ pthread_mutex_unlock(&ctx->u.cluster_info.mutex);
+ /* If a select pipe was set up, wake it up */
+ if (ctx->u.cluster_info.select_pipe[1] >= 0)
+ write(ctx->u.cluster_info.select_pipe[1], "", 1);
+ pthread_mutex_unlock(&context_lock);
+
+ pthread_cond_signal(&ctx->u.cluster_info.cond);
+}
+
+
+/* XXX INCOMPLETE */
+int
+msg_init(chandle_t *ch)
+{
+ int e;
+ pthread_attr_t attrs;
+ msgctx_t *ctx;
+
+ pthread_mutex_lock(&ch->c_lock);
+
+ /* Set up local context */
+
+ ctx = msg_new_ctx();
+ if (!ctx) {
+ pthread_mutex_unlock(&ch->c_lock);
+ return -1;
+ }
+
+ ctx->type = MSG_SOCKET;
+ ctx->u.local_info.sockfd = local_listener_sk();
+ ctx->u.local_info.flags = SKF_LISTEN;
+
+ ch->local_ctx = ctx;
+
+ ctx = msg_new_ctx();
+
+ if (!ctx) {
+ pthread_mutex_unlock(&ch->c_lock);
+ msg_free_ctx((msgctx_t *)ch->local_ctx);
+ return -1;
+ }
+
+ gch = ch;
+
+ if (cman_start_recv_data(ch->c_cluster, process_cman_msg,
+ RG_PORT) != 0) {
+ e = errno;
+ msg_close(ch->local_ctx);
+ pthread_mutex_unlock(&ch->c_lock);
+ msg_free_ctx((msgctx_t *)ch->local_ctx);
+ msg_free_ctx((msgctx_t *)ch->cluster_ctx);
+ errno = e;
+ return -1;
+ }
+
+ if (cman_start_notification(ch->c_cluster, process_cman_event) != 0) {
+ e = errno;
+ msg_close(ch->local_ctx);
+ pthread_mutex_unlock(&ch->c_lock);
+ msg_free_ctx((msgctx_t *)ch->local_ctx);
+ msg_free_ctx((msgctx_t *)ch->cluster_ctx);
+ errno = e;
+ }
+
+ ch->cluster_ctx = ctx;
+ pthread_mutex_unlock(&ch->c_lock);
+
+ pthread_mutex_lock(&context_lock);
+
+ memset(contexts, 0, sizeof(contexts));
+ contexts[0] = ctx;
+
+ ctx->type = MSG_CLUSTER;
+ ctx->u.cluster_info.port = RG_PORT; /* port! */
+ ctx->u.cluster_info.nodeid = 0; /* Broadcast! */
+ ctx->u.cluster_info.select_pipe[0] = -1;
+ ctx->u.cluster_info.select_pipe[1] = -1;
+ pthread_mutex_init(&ctx->u.cluster_info.mutex, NULL);
+ pthread_cond_init(&ctx->u.cluster_info.cond, NULL);
+ pthread_mutex_unlock(&context_lock);
+
+ pthread_attr_init(&attrs);
+ pthread_attr_setinheritsched(&attrs, PTHREAD_INHERIT_SCHED);
+ pthread_attr_setdetachstate(&attrs, PTHREAD_CREATE_DETACHED);
+
+ thread_running = 1;
+ pthread_create(&comms_thread, &attrs, cluster_comms_thread, NULL);
+
+
+ pthread_attr_destroy(&attrs);
+
+ return 0;
+}
+
+
+int
+msg_print_ctx(int ctx)
+{
+ if (!contexts[ctx])
+ return -1;
+
+ printf("Cluster Message Context %d\n", ctx);
+ printf(" Node ID %d\n", contexts[ctx]->u.cluster_info.nodeid);
+ printf(" Remote %d\n", contexts[ctx]->u.cluster_info.remote_ctx);
+ return 0;
+}
+
+
+/* XXX INCOMPLETE */
+int
+msg_shutdown(chandle_t *ch)
+{
+ if (!ch) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ while (pthread_kill(comms_thread, 0) == 0)
+ sleep(1);
+
+ pthread_mutex_lock(&ch->c_lock);
+
+ /* xxx purge everything */
+ msg_close(ch->local_ctx);
+ cman_end_recv_data(ch->c_cluster);
+
+ msg_free_ctx(ch->local_ctx);
+ msg_free_ctx(ch->cluster_ctx);
+
+
+ pthread_mutex_unlock(&ch->c_lock);
+
+ return 0;
+}
+
+
+inline int
+msgctx_size(void)
+{
+ return sizeof(msgctx_t);
+}
+
+
+msgctx_t *
+msg_new_ctx(void)
+{
+ msgctx_t *p;
+
+ printf("Alloc %d\n", sizeof(msgctx_t));
+ p = malloc(sizeof(msgctx_t));
+ if (!p)
+ return NULL;
+
+ memset(p, 0, sizeof(p));
+ p->type = MSG_NONE;
+
+ return p;
+}
+
+
+void
+msg_free_ctx(msgctx_t *dead)
+{
+ free(dead);
+}
+
More information about the Cluster-devel
mailing list