[lvm-devel] [PATCH 1/2] Change cmirrord to multi-thread
dongmao zhang
dmzhang at suse.com
Sun Jan 6 08:46:35 UTC 2013
From: dongmao zhang <deanraccoon at gmail.com>
As https://bugzilla.redhat.com/show_bug.cgi?id=612221#c11 said,
when using mirrored log in cluster environment, a multi-threaded
cmirrord is needed.
The main thread(do_local_work) is used to receive netlink packet from kernel.
Once a new mirrored device created, new thread(cluster_thread_fn) is
created to handle request from cluster.
---
daemons/cmirrord/cluster.c | 94 +++++++++++++++++++++++++++--------------
daemons/cmirrord/functions.c | 6 +-
daemons/cmirrord/local.c | 3 +-
3 files changed, 67 insertions(+), 36 deletions(-)
diff --git a/daemons/cmirrord/cluster.c b/daemons/cmirrord/cluster.c
index 70c76c3..58a1f3f 100644
--- a/daemons/cmirrord/cluster.c
+++ b/daemons/cmirrord/cluster.c
@@ -22,6 +22,7 @@
#include <errno.h>
#include <signal.h>
#include <unistd.h>
+#include <pthread.h>
#if CMIRROR_HAS_CHECKPOINT
#include <openais/saAis.h>
#include <openais/saCkpt.h>
@@ -118,9 +119,11 @@ struct clog_cpg {
struct checkpoint_data *checkpoint_list;
int idx;
char debugging[DEBUGGING_HISTORY][128];
+ pthread_t thread_pid;
};
static struct dm_list clog_cpg_list;
+static pthread_rwlock_t clog_cpg_lock =PTHREAD_RWLOCK_INITIALIZER;
/*
* cluster_send
@@ -135,12 +138,14 @@ int cluster_send(struct clog_request *rq)
struct iovec iov;
struct clog_cpg *entry;
+ pthread_rwlock_rdlock(&clog_cpg_lock);
dm_list_iterate_items(entry, &clog_cpg_list)
if (!strncmp(entry->name.value, rq->u_rq.uuid,
CPG_MAX_NAME_LENGTH)) {
found = 1;
break;
}
+ pthread_rwlock_unlock(&clog_cpg_lock);
if (!found) {
rq->u_rq.error = -ENOENT;
@@ -221,11 +226,11 @@ static struct clog_request *get_matching_rq(struct clog_request *rq,
return NULL;
}
-static char rq_buffer[DM_ULOG_REQUEST_SIZE];
static int handle_cluster_request(struct clog_cpg *entry __attribute__((unused)),
struct clog_request *rq, int server)
{
int r = 0;
+ char rq_buffer[DM_ULOG_REQUEST_SIZE];
struct clog_request *tmp = (struct clog_request *)rq_buffer;
/*
@@ -332,9 +337,13 @@ static struct clog_cpg *find_clog_cpg(cpg_handle_t handle)
{
struct clog_cpg *match;
+ pthread_rwlock_rdlock(&clog_cpg_lock);
dm_list_iterate_items(match, &clog_cpg_list)
- if (match->handle == handle)
+ if (match->handle == handle) {
+ pthread_rwlock_unlock(&clog_cpg_lock);
return match;
+ }
+ pthread_rwlock_unlock(&clog_cpg_lock);
return NULL;
}
@@ -939,28 +948,17 @@ static int resend_requests(struct clog_cpg *entry)
return r;
}
-static int do_cluster_work(void *data __attribute__((unused)))
+static void cluster_thread_fn(void *data)
{
int r = CS_OK;
- struct clog_cpg *entry, *tmp;
-
- dm_list_iterate_items_safe(entry, tmp, &clog_cpg_list) {
- r = cpg_dispatch(entry->handle, CS_DISPATCH_ALL);
- if (r != CS_OK)
- LOG_ERROR("cpg_dispatch failed: %d", r);
-
- if (entry->free_me) {
- free(entry);
- continue;
- }
- do_checkpoints(entry, 0);
-
- resend_requests(entry);
- }
+ struct clog_cpg * match = data;
+ r = cpg_dispatch (match->handle, CS_DISPATCH_BLOCKING);
+ if (r != CS_OK)
+ LOG_DBG("cpg_dispatch failed");
- return (r == CS_OK) ? 0 : -1; /* FIXME: good error number? */
}
+
static int flush_startup_list(struct clog_cpg *entry)
{
int r = 0;
@@ -1011,23 +1009,35 @@ static int flush_startup_list(struct clog_cpg *entry)
return 0;
}
+static void do_cpg_message_callback(struct clog_cpg * match , uint32_t nodeid, void *msg, size_t msg_len);
+
static void cpg_message_callback(cpg_handle_t handle, const struct cpg_name *gname __attribute__((unused)),
uint32_t nodeid, uint32_t pid __attribute__((unused)),
void *msg, size_t msg_len)
{
+ struct clog_cpg * entry;
+
+ entry = find_clog_cpg(handle);
+ if (!entry) {
+ LOG_ERROR("Unable to find clog_cpg for cluster message");
+ return;
+ }
+ do_cpg_message_callback(entry, nodeid, msg, msg_len);
+
+ do_checkpoints(entry, 0);
+ resend_requests(entry);
+
+}
+
+static void do_cpg_message_callback(struct clog_cpg * match , uint32_t nodeid, void *msg, size_t msg_len)
+{
int i;
int r = 0;
int i_am_server;
int response = 0;
struct clog_request *rq = msg;
struct clog_request *tmp_rq;
- struct clog_cpg *match;
- match = find_clog_cpg(handle);
- if (!match) {
- LOG_ERROR("Unable to find clog_cpg for cluster message");
- return;
- }
/*
* Perform necessary endian and version compatibility conversions
@@ -1324,7 +1334,7 @@ static void cpg_leave_callback(struct clog_cpg *match,
size_t member_list_entries)
{
unsigned i;
- int j, fd;
+ int j;
uint32_t lowest = match->lowest_id;
struct clog_request *rq, *n;
struct checkpoint_data *p_cp, *c_cp;
@@ -1335,10 +1345,9 @@ static void cpg_leave_callback(struct clog_cpg *match,
/* Am I leaving? */
if (my_cluster_id == left->nodeid) {
LOG_DBG("Finalizing leave...");
+ pthread_rwlock_wrlock(&clog_cpg_lock);
dm_list_del(&match->list);
-
- cpg_fd_get(match->handle, &fd);
- links_unregister(fd);
+ pthread_rwlock_unlock(&clog_cpg_lock);
cluster_postsuspend(match->name.value, match->luid);
@@ -1466,11 +1475,13 @@ static void cpg_config_callback(cpg_handle_t handle, const struct cpg_name *gnam
struct clog_cpg *match;
int found = 0;
+ pthread_rwlock_rdlock(&clog_cpg_lock);
dm_list_iterate_items(match, &clog_cpg_list)
if (match->handle == handle) {
found = 1;
break;
}
+ pthread_rwlock_unlock(&clog_cpg_lock);
if (!found) {
LOG_ERROR("Unable to find match for CPG config callback");
@@ -1487,6 +1498,16 @@ static void cpg_config_callback(cpg_handle_t handle, const struct cpg_name *gnam
else
cpg_leave_callback(match, left_list,
member_list, member_list_entries);
+
+
+ if (match->free_me) {
+ LOG_DBG("closing thread %x", (unsigned int)match->thread_pid);
+ free(match);
+ return;
+ }
+
+ do_checkpoints(match, 0);
+ resend_requests(match);
}
cpg_callbacks_t cpg_callbacks = {
@@ -1554,12 +1575,16 @@ int create_cluster_cpg(char *uuid, uint64_t luid)
size_t size;
struct clog_cpg *new = NULL;
struct clog_cpg *tmp;
+ pthread_t new_pid;
+ pthread_rwlock_rdlock(&clog_cpg_lock);
dm_list_iterate_items(tmp, &clog_cpg_list)
if (!strncmp(tmp->name.value, uuid, CPG_MAX_NAME_LENGTH)) {
LOG_ERROR("Log entry already exists: %s", uuid);
+ pthread_rwlock_unlock(&clog_cpg_lock);
return -EEXIST;
}
+ pthread_rwlock_unlock(&clog_cpg_lock);
new = malloc(sizeof(*new));
if (!new) {
@@ -1601,13 +1626,16 @@ int create_cluster_cpg(char *uuid, uint64_t luid)
}
new->cpg_state = VALID;
+ pthread_rwlock_wrlock(&clog_cpg_lock);
dm_list_add(&clog_cpg_list, &new->list);
+ pthread_rwlock_unlock(&clog_cpg_lock);
+
LOG_DBG("New handle: %llu", (unsigned long long)new->handle);
LOG_DBG("New name: %s", new->name.value);
- /* FIXME: better variable */
- cpg_fd_get(new->handle, &r);
- links_register(r, "cluster", do_cluster_work, NULL);
+ pthread_create(&new_pid, NULL, (void *)cluster_thread_fn, (void*)new);
+ new->thread_pid = new_pid;
+ pthread_detach(new_pid);
return 0;
}
@@ -1676,9 +1704,11 @@ int destroy_cluster_cpg(char *uuid)
{
struct clog_cpg *del, *tmp;
+ pthread_rwlock_rdlock(&clog_cpg_lock);
dm_list_iterate_items_safe(del, tmp, &clog_cpg_list)
if (!strncmp(del->name.value, uuid, CPG_MAX_NAME_LENGTH))
_destroy_cluster_cpg(del);
+ pthread_rwlock_unlock(&clog_cpg_lock);
return 0;
}
diff --git a/daemons/cmirrord/functions.c b/daemons/cmirrord/functions.c
index f6e0918..bb6acfd 100644
--- a/daemons/cmirrord/functions.c
+++ b/daemons/cmirrord/functions.c
@@ -818,9 +818,9 @@ no_disk:
if (commit_log && (lc->disk_fd >= 0)) {
rq->error = write_log(lc);
if (rq->error)
- LOG_ERROR("Failed initial disk log write");
+ LOG_ERROR("[%s] Failed initial disk log write", SHORT_UUID(lc->uuid));
else
- LOG_DBG("Disk log initialized");
+ LOG_DBG("[%s] Disk log initialized", SHORT_UUID(lc->uuid));
lc->touched = 0;
}
out:
@@ -1910,7 +1910,6 @@ void log_debug(void)
LOG_ERROR("");
LOG_ERROR("LOG COMPONENT DEBUGGING::");
- LOG_ERROR("Official log list:");
LOG_ERROR("Pending log list:");
dm_list_iterate_items(lc, &log_pending_list) {
LOG_ERROR("%s", lc->uuid);
@@ -1920,6 +1919,7 @@ void log_debug(void)
print_bits(lc->clean_bits, 1);
}
+ LOG_ERROR("Official log list:");
dm_list_iterate_items(lc, &log_list) {
LOG_ERROR("%s", lc->uuid);
LOG_ERROR(" recoverer : %" PRIu32, lc->recoverer);
diff --git a/daemons/cmirrord/local.c b/daemons/cmirrord/local.c
index 500f6dc..b5ccc3f 100644
--- a/daemons/cmirrord/local.c
+++ b/daemons/cmirrord/local.c
@@ -29,13 +29,13 @@
static int cn_fd = -1; /* Connector (netlink) socket fd */
static char recv_buf[2048];
-static char send_buf[2048];
/* FIXME: merge this function with kernel_send_helper */
static int kernel_ack(uint32_t seq, int error)
{
int r;
+ char send_buf[2048];
struct nlmsghdr *nlh = (struct nlmsghdr *)send_buf;
struct cn_msg *msg = NLMSG_DATA(nlh);
@@ -179,6 +179,7 @@ static int kernel_send_helper(void *data, uint16_t out_size)
int r;
struct nlmsghdr *nlh;
struct cn_msg *msg;
+ char send_buf[2048];
memset(send_buf, 0, sizeof(send_buf));
--
1.7.3.4
More information about the lvm-devel
mailing list