[Cluster-devel] [PATCHv5 v5.13-rc1 dlm/next 09/15] fs: dlm: add functionality to re-transmit a message

Alexander Aring aahringo at redhat.com
Sat May 15 00:35:43 UTC 2021


This patch introduces a retransmit functionality for a lowcomms message
handle. It's just allocates a new buffer and transmit it again, no
special handling about prioritize it because keeping bytestream in order.

To avoid another connection look some refactor was done to make a new
buffer allocation with a preexisting connection pointer.

Signed-off-by: Alexander Aring <aahringo at redhat.com>
---
 fs/dlm/lowcomms.c | 85 ++++++++++++++++++++++++++++++++++++-----------
 fs/dlm/lowcomms.h |  1 +
 2 files changed, 67 insertions(+), 19 deletions(-)

diff --git a/fs/dlm/lowcomms.c b/fs/dlm/lowcomms.c
index 70fd54ebdbe1..fd5d4f5c13cf 100644
--- a/fs/dlm/lowcomms.c
+++ b/fs/dlm/lowcomms.c
@@ -126,6 +126,8 @@ struct writequeue_entry {
 
 struct dlm_msg {
 	struct writequeue_entry *entry;
+	struct dlm_msg *orig_msg;
+	bool retransmit;
 	void *ppc;
 	int len;
 	int idx; /* new()/commit() idx exchange */
@@ -1055,6 +1057,10 @@ static void free_entry(struct writequeue_entry *e)
 	struct dlm_msg *msg, *tmp;
 
 	list_for_each_entry_safe(msg, tmp, &e->msgs, list) {
+		if (msg->orig_msg) {
+			msg->orig_msg->retransmit = false;
+			kref_put(&msg->orig_msg->ref, dlm_msg_release);
+		}
 		list_del(&msg->list);
 		kref_put(&msg->ref, dlm_msg_release);
 	}
@@ -1494,11 +1500,37 @@ static struct writequeue_entry *new_wq_entry(struct connection *con, int len,
 	return e;
 };
 
+static struct dlm_msg *dlm_lowcomms_new_msg_con(struct connection *con, int len,
+						gfp_t allocation, char **ppc,
+						void (*cb)(struct dlm_mhandle *mh),
+						struct dlm_mhandle *mh)
+{
+	struct writequeue_entry *e;
+	struct dlm_msg *msg;
+
+	msg = kzalloc(sizeof(*msg), allocation);
+	if (!msg)
+		return NULL;
+
+	kref_init(&msg->ref);
+
+	e = new_wq_entry(con, len, allocation, ppc, cb, mh);
+	if (!e) {
+		kfree(msg);
+		return NULL;
+	}
+
+	msg->ppc = *ppc;
+	msg->len = len;
+	msg->entry = e;
+
+	return msg;
+}
+
 struct dlm_msg *dlm_lowcomms_new_msg(int nodeid, int len, gfp_t allocation,
 				     char **ppc, void (*cb)(struct dlm_mhandle *mh),
 				     struct dlm_mhandle *mh)
 {
-	struct writequeue_entry *e;
 	struct connection *con;
 	struct dlm_msg *msg;
 	int idx;
@@ -1518,32 +1550,18 @@ struct dlm_msg *dlm_lowcomms_new_msg(int nodeid, int len, gfp_t allocation,
 		return NULL;
 	}
 
-	msg = kzalloc(sizeof(*msg), allocation);
+	msg = dlm_lowcomms_new_msg_con(con, len, allocation, ppc, cb, mh);
 	if (!msg) {
 		srcu_read_unlock(&connections_srcu, idx);
 		return NULL;
 	}
 
-	kref_init(&msg->ref);
-
-	e = new_wq_entry(con, len, allocation, ppc, cb, mh);
-	if (!e) {
-		srcu_read_unlock(&connections_srcu, idx);
-		kfree(msg);
-		return NULL;
-	}
-
-	msg->ppc = *ppc;
-	msg->len = len;
-	msg->entry = e;
-
 	/* we assume if successful commit must called */
 	msg->idx = idx;
-
 	return msg;
 }
 
-void dlm_lowcomms_commit_msg(struct dlm_msg *msg)
+static void _dlm_lowcomms_commit_msg(struct dlm_msg *msg)
 {
 	struct writequeue_entry *e = msg->entry;
 	struct connection *con = e->con;
@@ -1561,20 +1579,49 @@ void dlm_lowcomms_commit_msg(struct dlm_msg *msg)
 	spin_unlock(&con->writequeue_lock);
 
 	queue_work(send_workqueue, &con->swork);
-	srcu_read_unlock(&connections_srcu, msg->idx);
 	return;
 
 out:
 	spin_unlock(&con->writequeue_lock);
-	srcu_read_unlock(&connections_srcu, msg->idx);
 	return;
 }
 
+void dlm_lowcomms_commit_msg(struct dlm_msg *msg)
+{
+	_dlm_lowcomms_commit_msg(msg);
+	srcu_read_unlock(&connections_srcu, msg->idx);
+}
+
 void dlm_lowcomms_put_msg(struct dlm_msg *msg)
 {
 	kref_put(&msg->ref, dlm_msg_release);
 }
 
+/* does not held connections_srcu, usage workqueue only */
+int dlm_lowcomms_resend_msg(struct dlm_msg *msg)
+{
+	struct dlm_msg *msg_resend;
+	char *ppc;
+
+	if (msg->retransmit)
+		return 1;
+
+	msg_resend = dlm_lowcomms_new_msg_con(msg->entry->con, msg->len,
+					      GFP_ATOMIC, &ppc, NULL, NULL);
+	if (!msg_resend)
+		return -ENOMEM;
+
+	msg->retransmit = true;
+	kref_get(&msg->ref);
+	msg_resend->orig_msg = msg;
+
+	memcpy(ppc, msg->ppc, msg->len);
+	_dlm_lowcomms_commit_msg(msg_resend);
+	dlm_lowcomms_put_msg(msg_resend);
+
+	return 0;
+}
+
 /* Send a message */
 static void send_to_sock(struct connection *con)
 {
diff --git a/fs/dlm/lowcomms.h b/fs/dlm/lowcomms.h
index cdb8f066f0d8..a4384826442c 100644
--- a/fs/dlm/lowcomms.h
+++ b/fs/dlm/lowcomms.h
@@ -27,6 +27,7 @@ struct dlm_msg *dlm_lowcomms_new_msg(int nodeid, int len, gfp_t allocation,
 				     struct dlm_mhandle *mh);
 void dlm_lowcomms_commit_msg(struct dlm_msg *msg);
 void dlm_lowcomms_put_msg(struct dlm_msg *msg);
+int dlm_lowcomms_resend_msg(struct dlm_msg *msg);
 int dlm_lowcomms_connect_node(int nodeid);
 int dlm_lowcomms_nodes_set_mark(int nodeid, unsigned int mark);
 int dlm_lowcomms_addr(int nodeid, struct sockaddr_storage *addr, int len);
-- 
2.26.3




More information about the Cluster-devel mailing list