[Cluster-devel] cluster/cmirror/src cluster.c functions.c func ...
jbrassow at sourceware.org
jbrassow at sourceware.org
Mon Jan 14 22:52:20 UTC 2008
CVSROOT: /cvs/cluster
Module name: cluster
Branch: RHEL5
Changes by: jbrassow at sourceware.org 2008-01-14 22:52:17
Modified files:
cmirror/src : cluster.c functions.c functions.h link_mon.c
link_mon.h local.c logging.h queues.c queues.h
rbtree.c
Log message:
- Several small bug fixes
-- More correct method of leaving CPG (on suspend)
-- close log file desc after finished using
-- fix problem with overlapping recoveries
-- clean-up postsuspend so remote requests do not get lost
-- missing 'break' statement causing seg fault
-- better error checking
Patches:
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/cmirror/src/cluster.c.diff?cvsroot=cluster&only_with_tag=RHEL5&r1=1.1.2.7&r2=1.1.2.8
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/cmirror/src/functions.c.diff?cvsroot=cluster&only_with_tag=RHEL5&r1=1.1.2.6&r2=1.1.2.7
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/cmirror/src/functions.h.diff?cvsroot=cluster&only_with_tag=RHEL5&r1=1.1.2.2&r2=1.1.2.3
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/cmirror/src/link_mon.c.diff?cvsroot=cluster&only_with_tag=RHEL5&r1=1.1.2.2&r2=1.1.2.3
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/cmirror/src/link_mon.h.diff?cvsroot=cluster&only_with_tag=RHEL5&r1=1.1.2.1&r2=1.1.2.2
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/cmirror/src/local.c.diff?cvsroot=cluster&only_with_tag=RHEL5&r1=1.1.2.5&r2=1.1.2.6
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/cmirror/src/logging.h.diff?cvsroot=cluster&only_with_tag=RHEL5&r1=1.1.2.2&r2=1.1.2.3
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/cmirror/src/queues.c.diff?cvsroot=cluster&only_with_tag=RHEL5&r1=1.1.2.1&r2=1.1.2.2
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/cmirror/src/queues.h.diff?cvsroot=cluster&only_with_tag=RHEL5&r1=1.1.2.1&r2=1.1.2.2
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/cmirror/src/rbtree.c.diff?cvsroot=cluster&only_with_tag=RHEL5&r1=1.1.2.2&r2=1.1.2.3
--- cluster/cmirror/src/Attic/cluster.c 2007/11/19 18:00:20 1.1.2.7
+++ cluster/cmirror/src/Attic/cluster.c 2008/01/14 22:52:17 1.1.2.8
@@ -162,7 +162,7 @@
* Errors from previous functions are in the tfr struct.
*/
- LOG_DBG("Sending respose to %u on cluster: [%s/%llu]",
+ LOG_DBG("Sending response to %u on cluster: [%s/%llu]",
tfr->originator,
RQ_TYPE(tfr->request_type & ~DM_CLOG_RESPONSE),
tfr->seq);
@@ -704,6 +704,12 @@
if (!match->valid) {
LOG_DBG("Log not valid yet, storing request");
startup_tfr = queue_remove(free_queue);
+ if (!startup_tfr) {
+ LOG_ERROR("Supply of transfer structs exhausted");
+ r = -ENOMEM; /* FIXME: Better error #? */
+ goto out;
+ }
+
memcpy(startup_tfr, tfr, sizeof(*tfr) + tfr->data_size);
queue_add_tail(startup_tfr, match->startup_queue);
goto out;
@@ -724,7 +730,7 @@
struct cpg_address *left_list, int left_list_entries,
struct cpg_address *joined_list, int joined_list_entries)
{
- int i, j;
+ int i, j, fd;
int my_pid = getpid();
int found = 0;
struct clog_cpg *match, *tmp;
@@ -740,8 +746,8 @@
LOG_PRINT("* MEMBERS (%d):", member_list_entries);
for (i = 0; i < member_list_entries; i++)
- LOG_PRINT("* [%d] nodeid: %d, pid: %d",
- i, member_list[i].nodeid, member_list[i].pid);
+ LOG_PRINT("* nodeid: %d, pid: %d",
+ member_list[i].nodeid, member_list[i].pid);
LOG_PRINT("* LEAVING (%d):", left_list_entries);
for (i = 0; i < left_list_entries; i++)
@@ -768,6 +774,46 @@
goto out;
}
+ /* Am I leaving? */
+ for (i = 0; i < left_list_entries; i++)
+ if (my_cluster_id == left_list[i].nodeid) {
+ struct clog_tfr *tfr;
+
+ LOG_DBG("Finalizing leave...");
+ list_del_init(&match->list);
+
+ cpg_fd_get(match->handle, &fd);
+ links_unregister(fd);
+
+ cluster_postsuspend(match->name.value);
+
+ while (!queue_empty(cluster_queue)) {
+ tfr = queue_remove(cluster_queue);
+
+ /*
+ * A postsuspend is place directly into
+ * the cluster_queue, without going out
+ * to the cluster. This means that only
+ * our postsuspend will ever exist in the
+ * cluster_queue.
+ */
+ if (tfr->request_type == DM_CLOG_POSTSUSPEND)
+ kernel_send(tfr);
+ else
+ queue_add(tfr, free_queue);
+ }
+
+ cpg_finalize(match->handle);
+
+ if (match->startup_queue->count)
+ LOG_ERROR("Startup items remain in cluster log");
+
+ free(match->startup_queue);
+ free(match);
+
+ goto out;
+ }
+
/* Am I the very first to join? */
if (!left_list_entries &&
(member_list_entries == 1) && (joined_list_entries == 1) &&
@@ -887,7 +933,7 @@
/* FIXME: better variable */
cpg_fd_get(new->handle, &r);
- links_register(r, do_cluster_work, NULL);
+ links_register(r, "cluster", do_cluster_work, NULL);
EXIT();
return 0;
@@ -895,26 +941,15 @@
int destroy_cluster_cpg(char *str)
{
- int fd;
+ int r;
struct clog_cpg *del, *tmp;
ENTER();
list_for_each_entry_safe(del, tmp, &clog_cpg_list, list)
if (!strncmp(del->name.value, str, CPG_MAX_NAME_LENGTH)) {
- list_del_init(&del->list);
-
- cpg_fd_get(del->handle, &fd);
- links_unregister(fd);
-
- cpg_leave(del->handle, &del->name);
- cpg_finalize(del->handle);
-
- if (del->startup_queue->count)
- LOG_ERROR("Startup items remain in cluster log");
-
- free(del->startup_queue);
- free(del);
-
+ r = cpg_leave(del->handle, &del->name);
+ if (r != CPG_OK)
+ LOG_ERROR("Error leaving CPG!");
break;
}
--- cluster/cmirror/src/Attic/functions.c 2007/12/06 22:12:21 1.1.2.6
+++ cluster/cmirror/src/Attic/functions.c 2008/01/14 22:52:17 1.1.2.7
@@ -28,6 +28,12 @@
#define MIRROR_DISK_VERSION 2
#define LOG_OFFSET 2
+struct log_header {
+ uint32_t magic;
+ uint32_t version;
+ uint64_t nr_regions;
+};
+
/*
* Used by the 'touched' variable, these macros mean:
* LOG_CHANGED - bits in the in-memory log have changed
@@ -36,12 +42,6 @@
#define LOG_CHANGED 1
#define LOG_FLUSH 2
-struct log_header {
- uint32_t magic;
- uint32_t version;
- uint64_t nr_regions;
-};
-
struct log_c {
struct list_head list;
char uuid[DM_UUID_LEN];
@@ -68,6 +68,8 @@
uint32_t state; /* current operational state of the log */
struct rb_tree mark_tree; /* Tree that tracks all mark requests */
+
+ uint32_t recovery_halted;
struct recovery_request *recovery_request_list;
int disk_fd; /* -1 means no disk log */
@@ -326,7 +328,7 @@
}
-static int find_disk_path(char *major_minor_str, char *path_rtn)
+static int find_disk_path(char *major_minor_str, char *path_rtn, int *unlink_path)
{
int r;
DIR *dp;
@@ -372,6 +374,11 @@
sprintf(path_rtn, "/dev/mapper/%d-%d", major, minor);
r = mknod(path_rtn, S_IFBLK | S_IRUSR | S_IWUSR, MKDEV(major, minor));
+ /*
+ * If we have to make the path, we unlink it after we open it
+ */
+ *unlink_path = 1;
+
return r ? -errno : 0;
}
@@ -387,6 +394,7 @@
int disk_log = 0;
char disk_path[128];
+ int unlink_path = 0;
size_t page_size;
int pages;
@@ -403,7 +411,7 @@
goto fail;
}
- r = find_disk_path(argv[0], disk_path);
+ r = find_disk_path(argv[0], disk_path, &unlink_path);
if (r) {
LOG_ERROR("Unable to find path to device %s", argv[0]);
goto fail;
@@ -497,11 +505,14 @@
r = errno;
goto fail;
}
+ if (unlink_path)
+ unlink(disk_path);
+
lc->disk_fd = r;
lc->disk_size = pages * page_size;
r = posix_memalign(&(lc->disk_buffer), page_size,
- lc->disk_size);
+ lc->disk_size);
if (r) {
LOG_ERROR("Unable to allocate memory for disk_buffer");
goto fail;
@@ -511,6 +522,8 @@
list_add(&lc->list, &log_pending_list);
+ LOG_DBG("Log UUID = %s, mark_tree = 0x%p",
+ SHORT_UUID(lc->uuid), &lc->mark_tree);
EXIT();
return 0;
fail:
@@ -557,7 +570,7 @@
LOG_ERROR("Received constructor request with bad data");
LOG_DBG("strlen(tfr->data)[%d] != tfr->data_size[%d]",
strlen(tfr->data), tfr->data_size);
- LOG_DBG("tfr->data = %s", tfr->data);
+ LOG_DBG("tfr->data = '%s' [%d]", tfr->data, strlen(tfr->data));
return -EINVAL;
}
@@ -591,6 +604,7 @@
LOG_ERROR("Failed to create cluster log (%s)", tfr->uuid);
else
LOG_PRINT("Cluster log created (%s)", tfr->uuid);
+
return r;
}
@@ -603,14 +617,29 @@
{
struct log_c *lc = get_log(tfr->uuid);
- if (!lc)
- return -EINVAL;
-
- destroy_cluster_cpg(tfr->uuid);
+ if (!lc) {
+ /* Is the log in the pending list? */
+ lc = get_pending_log(tfr->uuid);
+ if (!lc) {
+ LOG_ERROR("clog_dtr called on log that is not official or pending");
+ return -EINVAL;
+ }
+ } else {
+ LOG_DBG("[%s] clog_dtr: leaving CPG", SHORT_UUID(lc->uuid));
+ /*
+ * If postsuspend had done the destroy_cluster_cpg,
+ * the log context would be in the pending list
+ */
+ destroy_cluster_cpg(tfr->uuid);
+ }
LOG_PRINT("Cluster log removed (%s)", lc->uuid);
list_del_init(&lc->list);
+ if (lc->disk_fd != -1)
+ close(lc->disk_fd);
+ if (lc->disk_buffer)
+ free(lc->disk_buffer);
free(lc->clean_bits);
free(lc->sync_bits);
free(lc);
@@ -634,6 +663,7 @@
LOG_DBG("WARNING: log still marked as 'touched' during suspend");
lc->state = LOG_SUSPENDED;
+ lc->recovery_halted = 1;
return 0;
}
@@ -650,7 +680,31 @@
if (!lc)
return -EINVAL;
+ LOG_DBG("[%s] clog_postsuspend: leaving CPG", SHORT_UUID(lc->uuid));
+ destroy_cluster_cpg(tfr->uuid);
+
+ return 0;
+}
+
+/*
+ * cluster_postsuspend
+ * @tfr
+ *
+ */
+int cluster_postsuspend(char *uuid)
+{
+ struct log_c *lc = get_log(uuid);
+
+ if (!lc)
+ return -EINVAL;
+
+ LOG_DBG("[%s] clog_postsuspend: finalizing", SHORT_UUID(lc->uuid));
lc->resume_override = 0;
+
+ /* move log to pending list */
+ list_del_init(&lc->list);
+ list_add(&lc->list, &log_pending_list);
+
return 0;
}
@@ -687,12 +741,13 @@
LOG_ERROR("Error:: partial bit loading (just clean_bits)");
return -EINVAL;
case 3:
- LOG_DBG("Non-master resume: bits pre-loaded");
+ LOG_DBG("[%s] Non-master resume: bits pre-loaded",
+ SHORT_UUID(lc->uuid));
lc->resume_override = 1000;
lc->sync_count = count_bits32(lc->clean_bits, lc->bitset_uint32_count);
- LOG_DBG("Initial sync_count = %llu", lc->sync_count);
+ LOG_DBG("[%s] Initial sync_count = %llu",
+ SHORT_UUID(lc->uuid), lc->sync_count);
goto out;
- return 0;
default:
LOG_ERROR("Error:: multiple loading of bits (%d)", lc->resume_override);
return -EINVAL;
@@ -708,12 +763,15 @@
switch (tfr->error) {
case 0:
if (lc->disk_nr_regions < lc->region_count)
- LOG_DBG("Mirror has grown, updating log bits");
+ LOG_DBG("[%s] Mirror has grown, updating log bits",
+ SHORT_UUID(lc->uuid));
else if (lc->disk_nr_regions > lc->region_count)
- LOG_DBG("Mirror has shrunk, updating log bits");
+ LOG_DBG("[%s] Mirror has shrunk, updating log bits",
+ SHORT_UUID(lc->uuid));
break;
case -EINVAL:
- LOG_DBG("Read log failed: not yet initialized");
+ LOG_DBG("[%s] Read log failed: not yet initialized",
+ SHORT_UUID(lc->uuid));
lc->disk_nr_regions = 0;
break;
default:
@@ -737,7 +795,8 @@
/* copy clean across to sync */
memcpy(lc->sync_bits, lc->clean_bits, size);
lc->sync_count = count_bits32(lc->clean_bits, lc->bitset_uint32_count);
- LOG_DBG("Initial sync_count = %llu", lc->sync_count);
+ LOG_DBG("[%s] Initial sync_count = %llu",
+ SHORT_UUID(lc->uuid), lc->sync_count);
lc->sync_search = 0;
/*
@@ -747,6 +806,7 @@
lc->touched = LOG_FLUSH;
out:
lc->state = LOG_RESUMED;
+ lc->recovery_halted = 0;
return tfr->error;
}
@@ -858,9 +918,11 @@
*rtn = log_test_bit(lc->sync_bits, region);
if (*rtn)
- LOG_DBG(" Region is in-sync: %llu", region);
+ LOG_DBG("[%s] Region is in-sync: %llu",
+ SHORT_UUID(lc->uuid), region);
else
- LOG_DBG(" Region is not in-sync: %llu", region);
+ LOG_DBG("[%s] Region is not in-sync: %llu",
+ SHORT_UUID(lc->uuid), region);
tfr->data_size = sizeof(*rtn);
@@ -879,14 +941,6 @@
if (!lc)
return -EINVAL;
- /*
- * Are we trying to flush when a mark request conflicts
- * with a recovering region?
- */
- if ((lc->recovering_region != -1) &&
- !log_test_bit(lc->clean_bits, lc->recovering_region))
- return -EAGAIN;
-
/*
* Actual disk flush happens in 'commit_log()'
* Clear LOG_CHANGED and set LOG_FLUSH
@@ -992,8 +1046,8 @@
srsm_count_var = 0;
mark_list = rbt_search_plus(&lc->mark_tree, ®ion, srsm_count, &who);
if (!mark_list || !srsm_count_var) {
- LOG_DBG("Clear issued on region that is not marked: %llu/%u",
- region, who);
+ LOG_DBG("[%s] Clear issued by %u on region not marked: %llu",
+ SHORT_UUID(lc->uuid), who, region);
goto set_bit;
}
@@ -1082,14 +1136,14 @@
* FIXME: handle intermittent errors during recovery
* by resetting sync_search... but not to many times.
*/
- LOG_DBG(" Recovery has finished");
+ LOG_DBG("[%s] Recovery has finished", SHORT_UUID(lc->uuid));
pkg->i = 0;
return 0;
}
if (lc->recovering_region != (uint64_t)-1) {
- LOG_DBG("Someone is already recovering region %Lu",
- lc->recovering_region);
+ LOG_DBG("[%s] Someone is already recovering region %Lu",
+ SHORT_UUID(lc->uuid), lc->recovering_region);
pkg->i = 0;
return 0;
}
@@ -1104,14 +1158,17 @@
free(del);
if (!log_test_bit(lc->sync_bits, pkg->r)) {
- LOG_DBG("Assigning priority resync work to %u: %llu",
- tfr->originator, pkg->r);
+ LOG_DBG("[%s] Assigning priority resync work to %u: %llu",
+ SHORT_UUID(lc->uuid), tfr->originator, pkg->r);
#ifdef DEBUG
- LOG_DBG("Priority work remaining:");
+ LOG_DBG("[%s] Priority work remaining:",
+ SHORT_UUID(lc->uuid));
for (del = lc->recovery_request_list; del; del = del->next)
- LOG_DBG(" %llu", del->region);
+ LOG_DBG("[%s] %llu", SHORT_UUID(lc->uuid),
+ del->region);
#endif
pkg->i = 1;
+ lc->recovering_region = pkg->r;
return 0;
}
}
@@ -1127,8 +1184,10 @@
lc->sync_search = pkg->r + 1;
- LOG_DBG(" Assigning resync work: region = %llu\n", pkg->r);
+ LOG_DBG("[%s] Assigning resync work to %u: region = %llu\n",
+ SHORT_UUID(lc->uuid), tfr->originator, pkg->r);
pkg->i = 1;
+ lc->recovering_region = pkg->r;
return 0;
}
@@ -1153,10 +1212,16 @@
} else {
log_set_bit(lc, lc->sync_bits, pkg->region);
lc->sync_count++;
+ LOG_DBG("[%s] sync_count = %llu, Region %llu marked in-sync by %u",
+ SHORT_UUID(lc->uuid), lc->sync_count,
+ pkg->region, tfr->originator);
}
} else if (log_test_bit(lc->sync_bits, pkg->region)) {
lc->sync_count--;
log_clear_bit(lc, lc->sync_bits, pkg->region);
+ LOG_DBG("[%s] sync_count = %llu, Region %llu marked not in-sync by %u",
+ SHORT_UUID(lc->uuid), lc->sync_count,
+ pkg->region, tfr->originator);
}
tfr->data_size = 0;
@@ -1301,21 +1366,41 @@
if (region > lc->region_count)
return -EINVAL;
- *rtn = !log_test_bit(lc->sync_bits, region);
- if (*rtn) {
+ if (lc->recovery_halted) {
+ LOG_DBG("[%s] Recovery halted... [not remote recovering]: %llu",
+ SHORT_UUID(lc->uuid), region);
+ *rtn = 0;
+ } else {
+ *rtn = !log_test_bit(lc->sync_bits, region);
+ LOG_DBG("[%s] Region is %s: %llu",
+ SHORT_UUID(lc->uuid),
+ (region == lc->recovering_region) ?
+ "currently remote recovering" :
+ (*rtn) ? "pending remote recovery" :
+ "not remote recovering", region);
+ }
+
+ if (*rtn && (region != lc->recovering_region)) {
struct recovery_request *rr;
- LOG_DBG(" Region is busy recovering: %llu", region);
+
+ /* Already in the list? */
+ for (rr = lc->recovery_request_list; rr; rr = rr->next)
+ if (rr->region == region)
+ goto out;
/* Failure to allocated simply means we can't prioritize it */
rr = malloc(sizeof(*rr));
- if (rr) {
- rr->region = region;
- rr->next = lc->recovery_request_list;
- lc->recovery_request_list = rr;
- }
- } else
- LOG_DBG(" Region is not recovering: %llu", region);
+ if (!rr)
+ goto out;
+ LOG_DBG("[%s] Adding region to priority list: %llu",
+ SHORT_UUID(lc->uuid), region);
+ rr->region = region;
+ rr->next = lc->recovery_request_list;
+ lc->recovery_request_list = rr;
+ }
+
+out:
tfr->data_size = sizeof(*rtn);
return 0;
@@ -1336,7 +1421,6 @@
{
int r;
- /* FIXME: lock */
/* FIXME: ENTER discards null check */
ENTER("%s", RQ_TYPE(tfr->request_type));
@@ -1415,7 +1499,6 @@
tfr->data_size = 0;
}
- /* FIXME: unlock */
EXIT();
return 0;
}
@@ -1434,7 +1517,7 @@
struct log_c *lc;
ENTER();
- /* FIXME: lock */
+
lc = get_log(tfr->uuid);
if (!lc) {
@@ -1453,7 +1536,7 @@
LOG_ERROR("Error writing to disk log");
return -EIO;
}
- LOG_DBG("Disk log written");
+ LOG_DBG("[%s] Disk log written", SHORT_UUID(lc->uuid));
}
if (lc->touched & LOG_CHANGED)
@@ -1461,7 +1544,6 @@
lc->touched &= ~LOG_FLUSH;
- /* FIXME: unlock */
out:
EXIT();
return 0;
@@ -1511,11 +1593,11 @@
if (!strncmp(which, "sync_bits", 9)) {
memcpy(*buf, lc->sync_bits, bitset_size);
- LOG_DBG("storing sync_bits:");
+ LOG_DBG("[%s] storing sync_bits:", SHORT_UUID(lc->uuid));
print_bits(*buf, bitset_size);
} else if (!strncmp(which, "clean_bits", 9)) {
memcpy(*buf, lc->clean_bits, bitset_size);
- LOG_DBG("storing clean_bits:");
+ LOG_DBG("[%s] storing clean_bits:", SHORT_UUID(lc->uuid));
print_bits(*buf, bitset_size);
}
@@ -1545,12 +1627,12 @@
if (!strncmp(which, "sync_bits", 9)) {
lc->resume_override += 1;
memcpy(lc->sync_bits, buf, bitset_size);
- LOG_DBG("loading sync_bits:");
+ LOG_DBG("[%s] loading sync_bits:", SHORT_UUID(lc->uuid));
print_bits((char *)lc->sync_bits, bitset_size);
} else if (!strncmp(which, "clean_bits", 9)) {
lc->resume_override += 2;
memcpy(lc->clean_bits, buf, bitset_size);
- LOG_DBG("loading clean_bits:");
+ LOG_DBG("[%s] loading clean_bits:", SHORT_UUID(lc->uuid));
print_bits((char *)lc->clean_bits, bitset_size);
}
--- cluster/cmirror/src/Attic/functions.h 2007/11/05 22:44:03 1.1.2.2
+++ cluster/cmirror/src/Attic/functions.h 2008/01/14 22:52:17 1.1.2.3
@@ -7,6 +7,7 @@
#define LOG_SUSPENDED 2
int local_resume(struct clog_tfr *tfr);
+int cluster_postsuspend(char *);
int do_request(struct clog_tfr *tfr);
int commit_log(struct clog_tfr *tfr);
int store_bits(const char *uuid, const char *which, char **buf);
--- cluster/cmirror/src/Attic/link_mon.c 2007/11/05 22:44:03 1.1.2.2
+++ cluster/cmirror/src/Attic/link_mon.c 2008/01/14 22:52:17 1.1.2.3
@@ -6,7 +6,7 @@
struct link_callback {
int fd;
-
+ char *name;
void *data;
int (*callback)(void *data);
@@ -18,7 +18,7 @@
static struct pollfd *pfds = NULL;
static struct link_callback *callbacks = NULL;
-int links_register(int fd, int (*callback)(void *data), void *data)
+int links_register(int fd, char *name, int (*callback)(void *data), void *data)
{
int i;
struct link_callback *lc;
@@ -40,6 +40,7 @@
}
lc->fd = fd;
+ lc->name = name;
lc->data = data;
lc->callback = callback;
@@ -64,6 +65,9 @@
lc->next = callbacks;
callbacks = lc;
+ LOG_DBG("Adding %s/%d", lc->name, lc->fd);
+ LOG_DBG(" used_pfds = %d, free_pfds = %d",
+ used_pfds, free_pfds);
EXIT();
return 0;
@@ -72,14 +76,27 @@
int links_unregister(int fd)
{
int i;
+ struct link_callback *p, *c;
- for (i = 0; i < used_pfds; i++) {
+ for (i = 0; i < used_pfds; i++)
if (fd == pfds[i].fd) {
- pfds[i].fd = pfds[used_pfds - 1].fd;
+ /* entire struct is copied (overwritten) */
+ pfds[i] = pfds[used_pfds - 1];
used_pfds--;
free_pfds++;
}
- }
+
+ for (p = NULL, c = callbacks; c; p = c, c = c->next)
+ if (fd == c->fd) {
+ LOG_DBG("Freeing up %s/%d", c->name, c->fd);
+ LOG_DBG(" used_pfds = %d, free_pfds = %d",
+ used_pfds, free_pfds);
+ if (p)
+ p->next = c->next;
+ else
+ callbacks = c->next;
+ free(c);
+ }
return 0;
}
@@ -102,6 +119,8 @@
/* FIXME: handle POLLHUP */
for (i = 0; i < used_pfds; i++)
if (pfds[i].revents & POLLIN) {
+ LOG_DBG("Data ready on %d", pfds[i].fd);
+
/* FIXME: Add this back return 1;*/
r++;
}
@@ -118,10 +137,13 @@
if (pfds[i].revents & POLLIN)
for (lc = callbacks; lc; lc = lc->next)
if (pfds[i].fd == lc->fd) {
+ LOG_DBG("Issuing callback on %s/%d",
+ lc->name, lc->fd);
r = lc->callback(lc->data);
if (r)
- LOG_ERROR("Bad callback on file desc, %d",
- lc->fd);
+ LOG_ERROR("Bad callback on %s/%d",
+ lc->name, lc->fd);
+ break;
}
return 0;
}
--- cluster/cmirror/src/Attic/link_mon.h 2007/08/23 19:57:31 1.1.2.1
+++ cluster/cmirror/src/Attic/link_mon.h 2008/01/14 22:52:17 1.1.2.2
@@ -1,7 +1,7 @@
#ifndef __LINK_MON_DOT_H__
#define __LINK_MON_DOT_H__
-int links_register(int fd, int (*callback)(void *data), void *data);
+int links_register(int fd, char *name, int (*callback)(void *data), void *data);
int links_unregister(int fd);
int links_monitor(void);
int links_issue_callbacks(void);
--- cluster/cmirror/src/Attic/local.c 2007/11/09 05:47:05 1.1.2.5
+++ cluster/cmirror/src/Attic/local.c 2008/01/14 22:52:17 1.1.2.6
@@ -84,7 +84,7 @@
return -ENOMEM;
}
- memset(*tfr, 0, sizeof(struct clog_tfr));
+ memset(*tfr, 0, DM_CLOG_TFR_SIZE);
r = kernel_recv_helper(*tfr, DM_CLOG_TFR_SIZE);
if (r) {
@@ -158,6 +158,7 @@
case DM_CLOG_DTR:
case DM_CLOG_STATUS_INFO:
case DM_CLOG_STATUS_TABLE:
+ case DM_CLOG_PRESUSPEND:
r = do_request(tfr);
if (r)
LOG_DBG("Returning failed request to kernel [%s]",
@@ -168,6 +169,19 @@
RQ_TYPE(tfr->request_type));
break;
+ case DM_CLOG_POSTSUSPEND:
+ r = do_request(tfr);
+ if (r) {
+ LOG_DBG("Returning failed request to kernel [%s]",
+ RQ_TYPE(tfr->request_type));
+ r = kernel_send(tfr);
+ if (r)
+ LOG_ERROR("Failed to respond to kernel [%s]",
+ RQ_TYPE(tfr->request_type));
+ }
+ queue_add_tail(tfr, cluster_queue);
+
+ break;
case DM_CLOG_RESUME:
/*
* Resume is a special case that requires a local
@@ -189,6 +203,9 @@
/* Add before send_to_cluster, so cluster code can find it */
queue_add_tail(tfr, cluster_queue);
r = cluster_send(tfr);
+ if (r)
+ LOG_ERROR("Unable to send request to cluster: %s",
+ strerror(-r));
break;
}
@@ -298,7 +315,7 @@
r = fcntl(cn_fd, F_SETFL, FNDELAY);
*/
- links_register(cn_fd, do_local_work, NULL);
+ links_register(cn_fd, "local", do_local_work, NULL);
EXIT();
return 0;
--- cluster/cmirror/src/Attic/logging.h 2007/11/03 18:53:03 1.1.2.2
+++ cluster/cmirror/src/Attic/logging.h 2008/01/14 22:52:17 1.1.2.3
@@ -4,6 +4,9 @@
#include <stdio.h>
#include <syslog.h>
+/* SHORT_UUID - print last 8 chars of a string */
+#define SHORT_UUID(x) ((x) + (strlen(x) - 8))
+
extern int log_tabbing;
extern int log_is_open;
--- cluster/cmirror/src/Attic/queues.c 2007/08/23 19:57:31 1.1.2.1
+++ cluster/cmirror/src/Attic/queues.c 2008/01/14 22:52:17 1.1.2.2
@@ -81,7 +81,18 @@
int queue_status(void)
{
+ int i=1;
+ struct clog_tfr *tfr;
+ struct list_head *p, *n;
+
LOG_PRINT("cluster_queue: %d", cluster_queue->count);
+ list_for_each_safe(p, n, &cluster_queue->list) {
+ tfr = (struct clog_tfr *)p;
+ LOG_PRINT(" %d) %s, originator = %u",
+ i++, RQ_TYPE(tfr->request_type),
+ tfr->originator);
+ }
+
LOG_PRINT("free_queue : %d", free_queue->count);
return cluster_queue->count;
--- cluster/cmirror/src/Attic/queues.h 2007/08/23 19:57:31 1.1.2.1
+++ cluster/cmirror/src/Attic/queues.h 2008/01/14 22:52:17 1.1.2.2
@@ -24,6 +24,6 @@
int (*f)(struct clog_tfr *, struct clog_tfr *),
struct clog_tfr *tfr_cmp);
void queue_remove_all(struct list_head *l, struct queue *q);
-int queue_emtpy(struct queue *q);
+int queue_empty(struct queue *q);
#endif /* __CLUSTER_LOG_QUEUES_DOT_H__ */
--- cluster/cmirror/src/Attic/rbtree.c 2007/11/09 05:47:05 1.1.2.2
+++ cluster/cmirror/src/Attic/rbtree.c 2008/01/14 22:52:17 1.1.2.3
@@ -51,6 +51,13 @@
{
struct rb_node *tmp;
+ if (RIGHT(axis) == NIL(t)) {
+ LOG_ERROR("Tree error: unable to left rotate");
+ return axis;
+ }
+
+ LOG_DBG("TREE[0x%p]: left_rotate [axis = %llu]",
+ t, *((uint64_t *)KEY(axis)));
tmp = RIGHT(axis);
RIGHT(axis) = LEFT(tmp);
if (LEFT(tmp) != NIL(t))
@@ -74,6 +81,13 @@
{
struct rb_node *tmp;
+ if (LEFT(axis) == NIL(t)) {
+ LOG_ERROR("Tree error: unable to right rotate");
+ return axis;
+ }
+
+ LOG_DBG("TREE[0x%p]: right_rotate [axis = %llu]",
+ t, *((uint64_t *)KEY(axis)));
tmp = LEFT(axis);
LEFT(axis) = RIGHT(tmp);
if (RIGHT(tmp) != NIL(t))
@@ -128,9 +142,13 @@
t->in_use_nodes++;
if (t->in_use_nodes > t->max_nodes) {
t->max_nodes = t->in_use_nodes;
- LOG_DBG("Maximum tree nodes now at %d", t->max_nodes);
+ LOG_PRINT("TREE[0x%p]: Maximum tree nodes now at %d",
+ t, t->max_nodes);
}
+ LOG_DBG("TREE[0x%p]: allocating node (in_use_nodes = %d, max_nodes = %d)",
+ t, t->in_use_nodes, t->max_nodes);
+
if (t->free_list) {
new = t->free_list;
t->free_list = new->rb_next;
@@ -150,6 +168,9 @@
d->rb_next = t->free_list;
t->free_list = d;
+ LOG_DBG("TREE[0x%p]: freeing node (in_use_nodes = %d, max_nodes = %d)",
+ t, t->in_use_nodes, t->max_nodes);
+
/* FIXME: We never free any nodes */
}
@@ -157,7 +178,8 @@
{
struct rb_node *tmp;
- LOG_DBG("TREE: insert %llu", *((uint64_t *)KEY(new)));
+ LOG_DBG("TREE[0x%p]: inserting %llu",
+ t, *((uint64_t *)KEY(new)));
if (!t || !new) {
errno = EINVAL;
return -1;
@@ -221,6 +243,7 @@
left_rotate(t, PARENT(x));
w = RIGHT(PARENT(x));
}
+
if ((COLOR(LEFT(w)) == BLACK) && (COLOR(RIGHT(w)) == BLACK)) {
COLOR(w) = RED;
x = PARENT(x);
@@ -296,7 +319,8 @@
{
struct rb_node *x, *y;
- LOG_DBG("TREE: remove %llu", *((uint64_t *)KEY(del)));
+ LOG_DBG("TREE[0x%p]: removing %llu",
+ t, *((uint64_t *)KEY(del)));
if ((LEFT(del) == NIL(t)) || (RIGHT(del) == NIL(t)))
y = del;
else
More information about the Cluster-devel
mailing list