[Cluster-devel] cluster/cmirror/src cluster.c functions.c func ...
jbrassow at sourceware.org
jbrassow at sourceware.org
Mon Nov 5 22:44:06 UTC 2007
CVSROOT: /cvs/cluster
Module name: cluster
Branch: RHEL5
Changes by: jbrassow at sourceware.org 2007-11-05 22:44:04
Modified files:
cmirror/src : cluster.c functions.c functions.h link_mon.c
local.c
Log message:
- Fix problem with recovery work assignment (still need to add
priority recovery... otherwise, I/O will stall for long periods
during mirror resync)
- Clean-up checkpointing code and fix a couple bugs there that
prevented proper start-up.
Patches:
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/cmirror/src/cluster.c.diff?cvsroot=cluster&only_with_tag=RHEL5&r1=1.1.2.3&r2=1.1.2.4
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/cmirror/src/functions.c.diff?cvsroot=cluster&only_with_tag=RHEL5&r1=1.1.2.2&r2=1.1.2.3
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/cmirror/src/functions.h.diff?cvsroot=cluster&only_with_tag=RHEL5&r1=1.1.2.1&r2=1.1.2.2
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/cmirror/src/link_mon.c.diff?cvsroot=cluster&only_with_tag=RHEL5&r1=1.1.2.1&r2=1.1.2.2
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/cmirror/src/local.c.diff?cvsroot=cluster&only_with_tag=RHEL5&r1=1.1.2.2&r2=1.1.2.3
--- cluster/cmirror/src/Attic/cluster.c 2007/11/03 18:53:03 1.1.2.3
+++ cluster/cmirror/src/Attic/cluster.c 2007/11/05 22:44:03 1.1.2.4
@@ -49,6 +49,8 @@
int valid;
struct queue *startup_queue;
+ int checkpoints_needed;
+ uint32_t checkpoint_requesters[10];
struct checkpoint_data *checkpoint_list;
};
@@ -384,7 +386,7 @@
return 0;
}
-static int import_checkpoint(struct clog_cpg *entry)
+static int import_checkpoint(struct clog_cpg *entry, int no_read)
{
int rtn = 0;
SaCkptCheckpointHandleT h;
@@ -422,6 +424,11 @@
saCkptCheckpointUnlink(ckpt_handle, &name);
+ if (no_read) {
+ LOG_DBG("Checkpoint for this log already recieved");
+ goto no_read;
+ }
+
init_retry:
rv = saCkptSectionIterationInitialize(h, SA_CKPT_SECTIONS_ANY, 0, &itr);
if (rv == SA_AIS_ERR_TRY_AGAIN) {
@@ -516,7 +523,7 @@
fail:
saCkptSectionIterationFinalize(itr);
-
+no_read:
saCkptCheckpointClose(h);
free(bitmap);
@@ -542,11 +549,18 @@
* notice in tfr in export_checkpoint function
* by setting tfr->error
*/
- export_checkpoint(cp);
-
- entry->checkpoint_list = cp->next;
- free_checkpoint(cp);
- cp = entry->checkpoint_list;
+ switch (export_checkpoint(cp)) {
+ case -EEXIST:
+ LOG_DBG("Checkpoint already handled by someone else");
+ case 0:
+ entry->checkpoint_list = cp->next;
+ free_checkpoint(cp);
+ cp = entry->checkpoint_list;
+ break;
+ default:
+ /* FIXME: Skipping will cause list corruption */
+ LOG_ERROR("Failed to export checkpoint");
+ }
}
}
EXIT();
@@ -556,7 +570,9 @@
static void cpg_message_callback(cpg_handle_t handle, struct cpg_name *gname,
uint32_t nodeid, uint32_t pid, void *msg, int msg_len)
{
+ int i;
int r = 0;
+ int i_am_server;
struct clog_tfr *tfr = msg;
struct clog_tfr *startup_tfr = NULL;
struct clog_tfr *cp_tfr = NULL;
@@ -567,9 +583,6 @@
if (msg_len != (sizeof(*tfr) + tfr->data_size))
LOG_ERROR("Badly sized message recieved from cluster.");
- LOG_DBG("Message (len = %d) from node/pid %u/%d", msg_len,
- nodeid, pid);
-
if (tfr->request_type & DM_CLOG_RESPONSE)
LOG_DBG("Response from cluster recieved %s",
RQ_TYPE(tfr->request_type & ~DM_CLOG_RESPONSE));
@@ -587,13 +600,31 @@
LOG_ERROR("Unable to find clog_cpg for cluster message");
goto out;
}
+ i_am_server = (my_cluster_id == match->lowest_id) ? 1 : 0;
if (tfr->request_type == DM_CLOG_CHECKPOINT_READY) {
- if ((!match->valid) && (my_cluster_id == tfr->originator)) {
- switch (import_checkpoint(match)) {
+ /* Redundant checkpoints ignored due to match->valid */
+ if (my_cluster_id == tfr->originator) {
+ switch (import_checkpoint(match, match->valid)) {
case 0:
- LOG_DBG("Checkpoint data recieved. Log is now valid");
- match->valid = 1;
+ if (!match->valid) {
+ LOG_DBG("Checkpoint data recieved. Log is now valid");
+ match->valid = 1;
+ while ((startup_tfr = queue_remove(match->startup_queue))) {
+ LOG_DBG("Processing delayed request %d: %s",
+ match->startup_queue->count,
+ RQ_TYPE(startup_tfr->request_type));
+ r = handle_cluster_request(startup_tfr, i_am_server);
+
+ if (r) {
+ LOG_ERROR("Error while processing delayed CPG message");
+ goto out;
+ } else {
+ queue_add(startup_tfr, free_queue);
+ }
+ }
+ }
+
break;
case -EAGAIN:
LOG_PRINT("Checkpoint data empty. Requesting new checkpoint.");
@@ -628,28 +659,28 @@
goto out;
}
- if (tfr->request_type == DM_CLOG_CHECKPOINT_REQUEST) {
- if (tfr->originator == my_cluster_id) {
- /*
- * The checkpoint includes any request up to the
- * request for checkpoint. So, we must clear any
- * previous requests we were storing.
- */
- while ((startup_tfr = queue_remove(match->startup_queue)))
- queue_add(startup_tfr, free_queue);
- } else if (my_cluster_id == match->lowest_id) {
- struct checkpoint_data *new;
-
- new = prepare_checkpoint(match, tfr->originator);
- if (!new) {
- /* FIXME: Need better error handling */
- LOG_ERROR("Failed to prepare checkpoint!!!");
- goto out;
- }
- new->next = match->checkpoint_list;
- match->checkpoint_list = new;
+ /*
+ * If the log is now valid, we can queue the checkpoints
+ */
+ for (i = match->checkpoints_needed; i;) {
+ struct checkpoint_data *new;
+
+ i--;
+ if (log_get_state(tfr) != LOG_RESUMED) {
+ LOG_DBG("Skipping checkpoint for %u, my log is not ready",
+ match->checkpoint_requesters[i]);
+ continue;
}
- goto out;
+ new = prepare_checkpoint(match, match->checkpoint_requesters[i]);
+ if (!new) {
+ /* FIXME: Need better error handling */
+ LOG_ERROR("Failed to prepare checkpoint for %u!!!",
+ match->checkpoint_requesters[i]);
+ break;
+ }
+ match->checkpoints_needed = i;
+ new->next = match->checkpoint_list;
+ match->checkpoint_list = new;
}
if (tfr->request_type & DM_CLOG_RESPONSE)
@@ -665,21 +696,7 @@
goto out;
}
- while ((startup_tfr = queue_remove(match->startup_queue))) {
- LOG_DBG("Processing delayed request %d: %s",
- match->startup_queue->count,
- RQ_TYPE(startup_tfr->request_type));
- r = handle_cluster_request(startup_tfr,
- (my_cluster_id == match->lowest_id) ? 1 : 0);
- if (r) {
- LOG_ERROR("Error while processing delayed CPG message");
- goto out;
- } else {
- queue_add(startup_tfr, free_queue);
- }
- }
-
- r = handle_cluster_request(tfr, (my_cluster_id == match->lowest_id) ? 1 : 0);
+ r = handle_cluster_request(tfr, i_am_server);
}
out:
@@ -694,16 +711,14 @@
struct cpg_address *left_list, int left_list_entries,
struct cpg_address *joined_list, int joined_list_entries)
{
- int i;
+ int i, j;
int my_pid = getpid();
int found = 0;
- int i_was_server = 0;
- int do_checkpoint = 0;
struct clog_cpg *match, *tmp;
ENTER();
- LOG_PRINT("* CPG config callback *********************");
+ LOG_PRINT("****** CPG config callback ****************");
LOG_PRINT("* JOINING (%d):", joined_list_entries);
for (i = 0; i < joined_list_entries; i++)
@@ -751,6 +766,7 @@
goto out;
}
+ /* Assign my_cluster_id */
if (my_cluster_id == 0xDEAD) {
for (i = 0; i < joined_list_entries; i++) {
LOG_DBG("My pid = %d\t\t[%u/%d]", my_pid,
@@ -764,42 +780,30 @@
LOG_PRINT("Setting my cluster id: %u", my_cluster_id);
}
}
- } else if (match->lowest_id == my_cluster_id) {
- LOG_DBG("I was the server (lowest_id = %u, my_cluster_id = %u)",
- match->lowest_id, my_cluster_id);
- i_was_server = 1;
+ goto out;
}
+ /* Find the lowest_id, i.e. the server */
for (i = 0, match->lowest_id = member_list[0].nodeid;
i < member_list_entries; i++)
if (match->lowest_id > member_list[i].nodeid)
match->lowest_id = member_list[i].nodeid;
- if (match->lowest_id == my_cluster_id) {
- /* I am the server now */
- if (!i_was_server)
- LOG_PRINT("I am the new log server for %s", match->name.value);
- else if (joined_list_entries) {
- LOG_PRINT("I must send checkpoint data.");
- do_checkpoint = 1;
- }
- } else if (i_was_server && joined_list_entries) {
- LOG_PRINT("Giving server ownership to %u", match->lowest_id);
- LOG_PRINT("I must send checkpoint data.");
- do_checkpoint = 1;
- }
+ LOG_PRINT("Server is now %u", match->lowest_id);
- if (do_checkpoint) {
- struct checkpoint_data *new;
+ /*
+ * If I am part of the joining list, I do not send checkpoints
+ * FIXME: What are the cases where multiple nodes can join?
+ */
+ for (i = 0; i < joined_list_entries; i++)
+ if (joined_list[i].nodeid == my_cluster_id)
+ goto out;
- for (i = 0; i < joined_list_entries; i++) {
- new = prepare_checkpoint(match, joined_list[i].nodeid);
- if (!new)
- goto out;
- new->next = match->checkpoint_list;
- match->checkpoint_list = new;
- }
+ for (i = 0, j = match->checkpoints_needed; i < joined_list_entries; i++) {
+ LOG_DBG("Joining node, %u needs checkpoint", joined_list[i].nodeid);
+ match->checkpoint_requesters[i + j] = joined_list[i].nodeid;
}
+ match->checkpoints_needed += i;
out:
EXIT();
--- cluster/cmirror/src/Attic/functions.c 2007/11/03 18:37:48 1.1.2.2
+++ cluster/cmirror/src/Attic/functions.c 2007/11/05 22:44:03 1.1.2.3
@@ -56,6 +56,8 @@
FORCESYNC, /* Force a sync to happen */
} sync;
+ uint32_t state; /* current operational state of the log */
+
int disk_fd; /* -1 means no disk log */
int log_dev_failed;
uint64_t disk_nr_regions;
@@ -563,6 +565,8 @@
if (lc->touched)
LOG_DBG("WARNING: log still marked as 'touched' during suspend");
+ lc->state = LOG_SUSPENDED;
+
return 0;
}
@@ -583,12 +587,12 @@
}
/*
- * _clog_resume
+ * clog_resume
* @tfr
*
* Does the main work of resuming.
*/
-static int _clog_resume(struct clog_tfr *tfr)
+static int clog_resume(struct clog_tfr *tfr)
{
uint32_t i;
struct log_c *lc = get_log(tfr->uuid);
@@ -617,6 +621,9 @@
case 3:
LOG_DBG("Non-master resume: bits pre-loaded");
lc->resume_override = 1000;
+ lc->sync_count = count_bits32(lc->clean_bits, lc->bitset_uint32_count);
+ LOG_DBG("sync_count = %llu", lc->sync_count);
+ goto out;
return 0;
default:
LOG_ERROR("Error:: multiple loading of bits (%d)", lc->resume_override);
@@ -662,36 +669,29 @@
/* copy clean across to sync */
memcpy(lc->sync_bits, lc->clean_bits, size);
lc->sync_count = count_bits32(lc->clean_bits, lc->bitset_uint32_count);
+ LOG_DBG("sync_count = %llu", lc->sync_count);
lc->sync_search = 0;
/*
- tfr->error = write_log(lc);
- if (tfr->error) {
- lc->log_dev_failed = 1;
- LOG_ERROR("Failed to write initial disk log");
- } else
- lc->log_dev_failed = 0;
- */
- /*
* We mark 'touched' so that only the master commits
* the log via 'commit_log'
*/
lc->touched = 1;
-
+out:
+ lc->state = LOG_RESUMED;
+
return tfr->error;
}
/*
- * clog_resume
+ * local_resume
* @tfr
*
* If the log is pending, we must first join the cpg and
* put the log in the official list.
*
- * If the log is in the official list, then we call
- * _clog_resume.
*/
-static int clog_resume(struct clog_tfr *tfr)
+int local_resume(struct clog_tfr *tfr)
{
int r;
struct log_c *lc = get_log(tfr->uuid);
@@ -714,12 +714,9 @@
/* move log to official list */
list_del_init(&lc->list);
list_add(&lc->list, &log_list);
-
- return 0;
}
- /* log is in the official list, try to resume */
- return _clog_resume(tfr);
+ return 0;
}
/*
@@ -896,7 +893,6 @@
*/
static int clog_get_resync_work(struct clog_tfr *tfr)
{
- int sync_search, conflict=0;
struct {int i; uint64_t r; } *pkg = (void *)tfr->data;
struct log_c *lc = get_log(tfr->uuid);
@@ -905,42 +901,35 @@
tfr->data_size = sizeof(*pkg);
- /*
- * Check if we are done recovering, or if there
- * is already someone recovering.
- */
- if ((lc->sync_search >= lc->region_count) ||
- (lc->recovering_region != (uint64_t)-1)) {
+ if (lc->sync_search >= lc->region_count) {
+ /*
+ * FIXME: handle intermittent errors during recovery
+ * by resetting sync_search... but not to many times.
+ */
+ LOG_DBG(" Recovery has finished");
pkg->i = 0;
return 0;
}
- for (sync_search = lc->sync_search;
- sync_search < lc->region_count;
- sync_search += (pkg->r + 1)) {
- pkg->r = find_next_zero_bit(lc->sync_bits,
- lc->region_count,
- sync_search);
-
- /*
- * If the region is currently marked, we cannot
- * recover it yet.
- */
- if (!log_test_bit(lc->clean_bits, pkg->r))
- conflict = 1;
- else
- break;
+ if (lc->recovering_region != (uint64_t)-1) {
+ LOG_DBG("Someone is already recovering region %Lu",
+ lc->recovering_region);
+ pkg->i = 0;
+ return 0;
}
- if (!conflict)
- lc->sync_search = pkg->r + 1;
+ pkg->r = find_next_zero_bit(lc->sync_bits,
+ lc->region_count,
+ lc->sync_search);
if (pkg->r >= lc->region_count) {
pkg->i = 0;
return 0;
}
- LOG_DBG("Assigning resync work: region = %llu\n", pkg->r);
+ lc->sync_search = pkg->r + 1;
+
+ LOG_DBG(" Assigning resync work: region = %llu\n", pkg->r);
pkg->i = 1;
return 0;
}
@@ -984,6 +973,8 @@
return -EINVAL;
*sync_count = lc->sync_count;
+ LOG_DBG("sync_count = %llu", *sync_count);
+
tfr->data_size = sizeof(*sync_count);
return 0;
@@ -1347,6 +1338,17 @@
return 0;
}
+int log_get_state(struct clog_tfr *tfr)
+{
+ struct log_c *lc;
+
+ lc = get_log(tfr->uuid);
+ if (!lc)
+ return -EINVAL;
+
+ return lc->state;
+}
+
int log_status(void)
{
int found = 0;
@@ -1380,3 +1382,4 @@
}
return found;
}
+
--- cluster/cmirror/src/Attic/functions.h 2007/08/23 19:57:31 1.1.2.1
+++ cluster/cmirror/src/Attic/functions.h 2007/11/05 22:44:03 1.1.2.2
@@ -3,9 +3,14 @@
#include <linux/dm-clog-tfr.h>
+#define LOG_RESUMED 1
+#define LOG_SUSPENDED 2
+
+int local_resume(struct clog_tfr *tfr);
int do_request(struct clog_tfr *tfr);
int commit_log(struct clog_tfr *tfr);
int store_bits(const char *uuid, const char *which, char **buf);
int load_bits(const char *uuid, const char *which, char *buf, int size);
+int log_get_state(struct clog_tfr *tfr);
int log_status(void);
#endif /* __CLOG_FUNCTIONS_DOT_H__ */
--- cluster/cmirror/src/Attic/link_mon.c 2007/08/23 19:57:31 1.1.2.1
+++ cluster/cmirror/src/Attic/link_mon.c 2007/11/05 22:44:03 1.1.2.2
@@ -102,8 +102,6 @@
/* FIXME: handle POLLHUP */
for (i = 0; i < used_pfds; i++)
if (pfds[i].revents & POLLIN) {
- LOG_DBG("Data waiting on file descriptor, %d.",
- pfds[i].fd);
/* FIXME: Add this back return 1;*/
r++;
}
--- cluster/cmirror/src/Attic/local.c 2007/11/03 18:37:48 1.1.2.2
+++ cluster/cmirror/src/Attic/local.c 2007/11/05 22:44:03 1.1.2.3
@@ -170,7 +170,7 @@
* component to join the CPG, and a cluster component
* to handle the request.
*/
- r = do_request(tfr);
+ r = local_resume(tfr);
if (r) {
LOG_DBG("Returning failed request to kernel [%s]",
RQ_TYPE(tfr->request_type));
More information about the Cluster-devel
mailing list