[Linux-cluster] [gfs_controld] send messages through separate cpg
Steven Dake
sdake at redhat.com
Wed Jun 7 19:24:25 UTC 2006
Dave,
I'd say the cpg bits look really good except for the mcast operation
(where you have a FIXME).
I'd recommend not backing off here, but instead spinning on the transmit
if ERR_TRY_AGAIN is returned. Even on a heavily loaded system the delay
should not be very significant on a spin operation, unless this code has
certain timeouts (not sure about that) that would expire. It would
appear not since the code suggests backing off using a timer.
Regards
-steve
On Wed, 2006-06-07 at 12:27 -0500, David Teigland wrote:
> [new process requires all work to be sent to ml prior to cvs check-in]
>
> Set up a separate cpg for sending messages (e.g. for processing
> mount/unmount) instead of sending them through the cpg used to represent
> the mount group. Since we apply cpg changes to the mount group async,
> that cpg won't always contain all the nodes we need to process the
> mount/unmount. A mount from one node in parallel with unmount from
> another often won't work without this.
>
>
> diff -urN -X dontdiff cluster-HEAD/gfs/lock_dlm/daemon/Makefile cluster/gfs/lock_dlm/daemon/Makefile
> --- cluster-HEAD/gfs/lock_dlm/daemon/Makefile 2006-03-27 01:31:46.000000000 -0600
> +++ cluster/gfs/lock_dlm/daemon/Makefile 2006-06-06 17:19:40.740421037 -0500
> @@ -21,6 +21,7 @@
> -I../../include/ \
> -I../../../group/lib/ \
> -I../../../cman/lib/ \
> + -I../../../cman/daemon/openais/trunk/include/ \
> -I../../../dlm/lib/ \
> -I../../../gfs-kernel/src/dlm/
>
> @@ -33,12 +34,14 @@
>
> gfs_controld: main.o \
> member_cman.o \
> + cpg.o \
> group.o \
> plock.o \
> recover.o \
> withdraw.o \
> ../../../dlm/lib/libdlm_lt.a \
> ../../../cman/lib/libcman.a \
> + ../../../cman/daemon/openais/trunk/lib/libcpg.a \
> ../../../group/lib/libgroup.a
> $(CC) $(LDFLAGS) -o $@ $^
>
> @@ -49,6 +52,9 @@
> member_cman.o: member_cman.c
> $(CC) $(CFLAGS) -c -o $@ $<
>
> +cpg.o: cpg.c
> + $(CC) $(CFLAGS) -c -o $@ $<
> +
> recover.o: recover.c
> $(CC) $(CFLAGS) -c -o $@ $<
>
> diff -urN -X dontdiff cluster-HEAD/gfs/lock_dlm/daemon/cpg.c cluster/gfs/lock_dlm/daemon/cpg.c
> --- cluster-HEAD/gfs/lock_dlm/daemon/cpg.c 1969-12-31 18:00:00.000000000 -0600
> +++ cluster/gfs/lock_dlm/daemon/cpg.c 2006-06-07 11:54:28.478585576 -0500
> @@ -0,0 +1,212 @@
> +/******************************************************************************
> +*******************************************************************************
> +**
> +** Copyright (C) 2006 Red Hat, Inc. All rights reserved.
> +**
> +** This copyrighted material is made available to anyone wishing to use,
> +** modify, copy, or redistribute it subject to the terms and conditions
> +** of the GNU General Public License v.2.
> +**
> +*******************************************************************************
> +******************************************************************************/
> +
> +#include "lock_dlm.h"
> +#include "cpg.h"
> +
> +static cpg_handle_t daemon_handle;
> +static struct cpg_name daemon_name;
> +static int got_msg;
> +static int saved_nodeid;
> +static int saved_len;
> +static char saved_data[MAX_MSGLEN];
> +
> +void receive_journals(struct mountgroup *mg, char *buf, int len, int from);
> +void receive_options(struct mountgroup *mg, char *buf, int len, int from);
> +void receive_remount(struct mountgroup *mg, char *buf, int len, int from);
> +void receive_plock(struct mountgroup *mg, char *buf, int len, int from);
> +void receive_recovery_status(struct mountgroup *mg, char *buf, int len,
> + int from);
> +void receive_recovery_done(struct mountgroup *mg, char *buf, int len, int from);
> +
> +
> +static void do_deliver(int nodeid, char *data, int len)
> +{
> + struct mountgroup *mg;
> + struct gdlm_header *hd;
> +
> + hd = (struct gdlm_header *) data;
> +
> + mg = find_mg(hd->name);
> + if (!mg)
> + return;
> +
> + hd->version[0] = le16_to_cpu(hd->version[0]);
> + hd->version[1] = le16_to_cpu(hd->version[1]);
> + hd->version[2] = le16_to_cpu(hd->version[2]);
> + hd->type = le16_to_cpu(hd->type);
> + hd->nodeid = le32_to_cpu(hd->nodeid);
> + hd->to_nodeid = le32_to_cpu(hd->to_nodeid);
> +
> + if (hd->version[0] != GDLM_VER_MAJOR) {
> + log_error("reject message version %u.%u.%u",
> + hd->version[0], hd->version[1], hd->version[2]);
> + return;
> + }
> +
> + /* If there are some group messages between a new node being added to
> + the cpg group and being added to the app group, the new node should
> + discard them since they're only relevant to the app group. */
> +
> + if (!mg->last_callback) {
> + log_group(mg, "discard message type %d len %d from %d",
> + hd->type, len, nodeid);
> + return;
> + }
> +
> + switch (hd->type) {
> + case MSG_JOURNAL:
> + receive_journals(mg, data, len, nodeid);
> + break;
> +
> + case MSG_OPTIONS:
> + receive_options(mg, data, len, nodeid);
> + break;
> +
> + case MSG_REMOUNT:
> + receive_remount(mg, data, len, nodeid);
> + break;
> +
> + case MSG_PLOCK:
> + receive_plock(mg, data, len, nodeid);
> + break;
> +
> + case MSG_RECOVERY_STATUS:
> + receive_recovery_status(mg, data, len, nodeid);
> + break;
> +
> + case MSG_RECOVERY_DONE:
> + receive_recovery_done(mg, data, len, nodeid);
> + break;
> +
> + default:
> + log_error("unknown message type %d from %d",
> + hd->type, hd->nodeid);
> + }
> +}
> +
> +void deliver_cb(cpg_handle_t handle, struct cpg_name *group_name,
> + uint32_t nodeid, uint32_t pid, void *data, int data_len)
> +{
> + saved_nodeid = nodeid;
> + saved_len = data_len;
> + memcpy(saved_data, data, data_len);
> + got_msg = 1;
> +}
> +
> +void confchg_cb(cpg_handle_t handle, struct cpg_name *group_name,
> + struct cpg_address *member_list, int member_list_entries,
> + struct cpg_address *left_list, int left_list_entries,
> + struct cpg_address *joined_list, int joined_list_entries)
> +{
> +}
> +
> +static cpg_callbacks_t callbacks = {
> + .cpg_deliver_fn = deliver_cb,
> + .cpg_confchg_fn = confchg_cb,
> +};
> +
> +int process_cpg(void)
> +{
> + cpg_error_t error;
> +
> + got_msg = 0;
> + saved_len = 0;
> + saved_nodeid = 0;
> + memset(saved_data, 0, sizeof(saved_data));
> +
> + error = cpg_dispatch(daemon_handle, CPG_DISPATCH_ONE);
> + if (error != CPG_OK) {
> + log_error("cpg_dispatch error %d", error);
> + return -1;
> + }
> +
> + if (got_msg)
> + do_deliver(saved_nodeid, saved_data, saved_len);
> + return 0;
> +}
> +
> +int setup_cpg(void)
> +{
> + cpg_error_t error;
> + int fd = 0;
> +
> + error = cpg_initialize(&daemon_handle, &callbacks);
> + if (error != CPG_OK) {
> + log_error("cpg_initialize error %d", error);
> + return -1;
> + }
> +
> + cpg_fd_get(daemon_handle, &fd);
> + if (fd < 0)
> + return -1;
> +
> + memset(&daemon_name, 0, sizeof(daemon_name));
> + strcpy(daemon_name.value, "gfs_controld");
> + daemon_name.length = 12;
> +
> + retry:
> + error = cpg_join(daemon_handle, &daemon_name);
> + if (error == CPG_ERR_TRY_AGAIN) {
> + log_debug("setup_cpg cpg_join retry");
> + sleep(1);
> + goto retry;
> + }
> + if (error != CPG_OK) {
> + log_error("cpg_join error %d", error);
> + cpg_finalize(daemon_handle);
> + return -1;
> + }
> +
> + log_debug("cpg %d", fd);
> + return fd;
> +}
> +
> +static int _send_message(cpg_handle_t h, void *buf, int len)
> +{
> + struct iovec iov;
> + cpg_error_t error;
> + int retries = 0;
> +
> + iov.iov_base = buf;
> + iov.iov_len = len;
> +
> + retry:
> + error = cpg_mcast_joined(h, CPG_TYPE_AGREED, &iov, 1);
> + if (error != CPG_OK)
> + log_error("cpg_mcast_joined error %d handle %llx", error, h);
> + if (error == CPG_ERR_TRY_AGAIN) {
> + /* FIXME: backoff say .25 sec, .5 sec, .75 sec, 1 sec */
> + retries++;
> + if (retries > 3)
> + sleep(1);
> + goto retry;
> + }
> +
> + return 0;
> +}
> +
> +int send_group_message(struct mountgroup *mg, int len, char *buf)
> +{
> + struct gdlm_header *hd = (struct gdlm_header *) buf;
> +
> + hd->version[0] = cpu_to_le16(GDLM_VER_MAJOR);
> + hd->version[1] = cpu_to_le16(GDLM_VER_MINOR);
> + hd->version[2] = cpu_to_le16(GDLM_VER_PATCH);
> + hd->type = cpu_to_le16(hd->type);
> + hd->nodeid = cpu_to_le32(hd->nodeid);
> + hd->to_nodeid = cpu_to_le32(hd->to_nodeid);
> + memcpy(hd->name, mg->name, strlen(mg->name));
> +
> + return _send_message(daemon_handle, buf, len);
> +}
> +
> diff -urN -X dontdiff cluster-HEAD/gfs/lock_dlm/daemon/group.c cluster/gfs/lock_dlm/daemon/group.c
> --- cluster-HEAD/gfs/lock_dlm/daemon/group.c 2006-06-07 12:10:32.102338261 -0500
> +++ cluster/gfs/lock_dlm/daemon/group.c 2006-06-06 17:23:06.523976113 -0500
> @@ -21,25 +21,14 @@
> static int cb_event_nr;
> static unsigned int cb_id;
> static int cb_type;
> -static int cb_nodeid;
> -static int cb_len;
> static int cb_member_count;
> static int cb_members[MAX_GROUP_MEMBERS];
> -static char cb_message[MAX_MSGLEN+1];
>
> int do_stop(struct mountgroup *mg);
> int do_finish(struct mountgroup *mg);
> int do_terminate(struct mountgroup *mg);
> int do_start(struct mountgroup *mg, int type, int count, int *nodeids);
>
> -void receive_journals(struct mountgroup *mg, char *buf, int len, int from);
> -void receive_options(struct mountgroup *mg, char *buf, int len, int from);
> -void receive_remount(struct mountgroup *mg, char *buf, int len, int from);
> -void receive_plock(struct mountgroup *mg, char *buf, int len, int from);
> -void receive_recovery_status(struct mountgroup *mg, char *buf, int len,
> - int from);
> -void receive_recovery_done(struct mountgroup *mg, char *buf, int len, int from);
> -
>
> static void stop_cbfn(group_handle_t h, void *private, char *name)
> {
> @@ -87,17 +76,9 @@
> static void deliver_cbfn(group_handle_t h, void *private, char *name,
> int nodeid, int len, char *buf)
> {
> - int n;
> - cb_action = DO_DELIVER;
> - strncpy(cb_name, name, MAX_GROUP_NAME_LEN);
> - cb_nodeid = nodeid;
> - cb_len = n = len;
> - if (len > MAX_MSGLEN)
> - n = MAX_MSGLEN;
> - memcpy(&cb_message, buf, n);
> }
>
> -group_callbacks_t callbacks = {
> +static group_callbacks_t callbacks = {
> stop_cbfn,
> start_cbfn,
> finish_cbfn,
> @@ -106,53 +87,6 @@
> deliver_cbfn
> };
>
> -static void do_deliver(struct mountgroup *mg)
> -{
> - struct gdlm_header *hd;
> -
> - hd = (struct gdlm_header *) cb_message;
> -
> - /* If there are some group messages between a new node being added to
> - the cpg group and being added to the app group, the new node should
> - discard them since they're only relevant to the app group. */
> -
> - if (!mg->last_callback) {
> - log_group(mg, "discard message type %d len %d from %d",
> - hd->type, cb_len, cb_nodeid);
> - return;
> - }
> -
> - switch (hd->type) {
> - case MSG_JOURNAL:
> - receive_journals(mg, cb_message, cb_len, cb_nodeid);
> - break;
> -
> - case MSG_OPTIONS:
> - receive_options(mg, cb_message, cb_len, cb_nodeid);
> - break;
> -
> - case MSG_REMOUNT:
> - receive_remount(mg, cb_message, cb_len, cb_nodeid);
> - break;
> -
> - case MSG_PLOCK:
> - receive_plock(mg, cb_message, cb_len, cb_nodeid);
> - break;
> -
> - case MSG_RECOVERY_STATUS:
> - receive_recovery_status(mg, cb_message, cb_len, cb_nodeid);
> - break;
> -
> - case MSG_RECOVERY_DONE:
> - receive_recovery_done(mg, cb_message, cb_len, cb_nodeid);
> - break;
> -
> - default:
> - log_error("unknown message type %d from %d",
> - hd->type, hd->nodeid);
> - }
> -}
> -
> char *str_members(void)
> {
> static char buf[MAXLINE];
> @@ -222,12 +156,6 @@
> mg->id = cb_id;
> break;
>
> - case DO_DELIVER:
> - log_debug("groupd callback: deliver %s len %d nodeid %d",
> - cb_name, cb_len, cb_nodeid);
> - do_deliver(mg);
> - break;
> -
> default:
> error = -EINVAL;
> }
> @@ -257,15 +185,3 @@
> return rv;
> }
>
> -int send_group_message(struct mountgroup *mg, int len, char *buf)
> -{
> - int error;
> -
> - error = group_send(gh, mg->name, len, buf);
> - if (error < 0)
> - log_error("group_send error %d errno %d", error, errno);
> - else
> - error = 0;
> - return error;
> -}
> -
> diff -urN -X dontdiff cluster-HEAD/gfs/lock_dlm/daemon/lock_dlm.h cluster/gfs/lock_dlm/daemon/lock_dlm.h
> --- cluster-HEAD/gfs/lock_dlm/daemon/lock_dlm.h 2006-05-25 14:30:40.000000000 -0500
> +++ cluster/gfs/lock_dlm/daemon/lock_dlm.h 2006-06-06 17:18:25.510916543 -0500
> @@ -201,11 +201,16 @@
> MSG_RECOVERY_DONE,
> };
>
> +#define GDLM_VER_MAJOR 1
> +#define GDLM_VER_MINOR 0
> +#define GDLM_VER_PATCH 0
> +
> struct gdlm_header {
> uint16_t version[3];
> uint16_t type; /* MSG_ */
> uint32_t nodeid; /* sender */
> uint32_t to_nodeid; /* 0 if to all */
> + char name[MAXNAME];
> };
>
>
> @@ -214,6 +219,8 @@
>
> int setup_cman(void);
> int process_cman(void);
> +int setup_cpg(void);
> +int process_cpg(void);
> int setup_groupd(void);
> int process_groupd(void);
> int setup_libdlm(void);
> diff -urN -X dontdiff cluster-HEAD/gfs/lock_dlm/daemon/main.c cluster/gfs/lock_dlm/daemon/main.c
> --- cluster-HEAD/gfs/lock_dlm/daemon/main.c 2006-04-21 14:54:10.000000000 -0500
> +++ cluster/gfs/lock_dlm/daemon/main.c 2006-06-07 11:59:12.248223925 -0500
> @@ -25,6 +25,7 @@
> static struct pollfd pollfd[MAX_CLIENTS];
>
> static int cman_fd;
> +static int cpg_fd;
> static int listen_fd;
> static int groupd_fd;
> static int uevent_fd;
> @@ -249,6 +250,11 @@
> goto out;
> client_add(cman_fd, &maxi);
>
> + rv = cpg_fd = setup_cpg();
> + if (rv < 0)
> + goto out;
> + client_add(cpg_fd, &maxi);
> +
> rv = groupd_fd = setup_groupd();
> if (rv < 0)
> goto out;
> @@ -272,6 +278,8 @@
> goto out;
> client_add(plocks_fd, &maxi);
>
> + log_debug("setup done");
> +
> for (;;) {
> rv = poll(pollfd, maxi + 1, -1);
> if (rv < 0)
> @@ -296,6 +304,8 @@
> process_groupd();
> else if (pollfd[i].fd == cman_fd)
> process_cman();
> + else if (pollfd[i].fd == cpg_fd)
> + process_cpg();
> else if (pollfd[i].fd == uevent_fd)
> process_uevent();
> else if (!no_withdraw &&
> @@ -310,7 +320,6 @@
> if (pollfd[i].revents & POLLHUP) {
> if (pollfd[i].fd == cman_fd)
> exit_cman();
> - log_debug("closing fd %d", pollfd[i].fd);
> close(pollfd[i].fd);
> }
> }
>
> --
> Linux-cluster mailing list
> Linux-cluster at redhat.com
> https://www.redhat.com/mailman/listinfo/linux-cluster
More information about the Linux-cluster
mailing list