[Cluster-devel] [RFC dlm/next 10/15] fs: dlm: introduce reconnect work

Alexander Aring aahringo at redhat.com
Wed Jun 23 15:14:49 UTC 2021


This patch will add another work to close the sockets which we cannot do
inside the lowcomms_error_report() handler. This patch will also close
the "othercon" sock if present.

Signed-off-by: Alexander Aring <aahringo at redhat.com>
---
 fs/dlm/lowcomms.c | 163 +++++++++++++++++++++++++---------------------
 1 file changed, 88 insertions(+), 75 deletions(-)

diff --git a/fs/dlm/lowcomms.c b/fs/dlm/lowcomms.c
index d2febefe1d0d..a54ed3cf0b45 100644
--- a/fs/dlm/lowcomms.c
+++ b/fs/dlm/lowcomms.c
@@ -73,10 +73,8 @@ struct connection {
 #define CF_APP_LIMITED 3
 #define CF_SHUTDOWN 4
 #define CF_CONNECTED 5
-#define CF_RECONNECT 6
-#define CF_DELAY_CONNECT 7
-#define CF_EOF 8
-#define CF_STOP 9
+#define CF_EOF 6
+#define CF_STOP 7
 	struct list_head writequeue;  /* List of outgoing writequeue_entries */
 	spinlock_t writequeue_lock;
 	atomic_t writequeue_cnt;
@@ -89,7 +87,9 @@ struct connection {
 	struct mutex rwork_lock;
 	struct work_struct rwork;
 	struct mutex swork_lock;
-	struct work_struct swork;
+	struct delayed_work swork;
+	struct work_struct cwork;
+	int sk_err;
 	wait_queue_head_t shutdown_wait; /* wait for graceful shutdown */
 	unsigned char *rx_buf;
 	int rx_buflen;
@@ -184,6 +184,7 @@ DEFINE_STATIC_SRCU(connections_srcu);
 
 static const struct dlm_proto_ops *dlm_proto_ops;
 
+static void process_close_sockets(struct work_struct *work);
 static void process_recv_sockets(struct work_struct *work);
 static void process_send_sockets(struct work_struct *work);
 
@@ -208,6 +209,16 @@ static inline void dlm_io_queue(struct connection *con,
 	queue_work(io_workqueue, work);
 }
 
+static inline void dlm_io_delayed_queue(struct connection *con,
+					struct delayed_work *dwork,
+					unsigned long delay)
+{
+	if (test_bit(CF_STOP, &con->flags))
+		return;
+
+	queue_delayed_work(io_workqueue, dwork, delay);
+}
+
 static inline struct connection *dlm_sendcon(struct connection *con)
 {
 	if (test_bit(CF_IS_OTHERCON, &con->flags))
@@ -260,8 +271,9 @@ static int dlm_con_init(struct connection *con, int nodeid)
 	INIT_LIST_HEAD(&con->writequeue);
 	spin_lock_init(&con->writequeue_lock);
 	atomic_set(&con->writequeue_cnt, 0);
-	INIT_WORK(&con->swork, process_send_sockets);
+	INIT_DELAYED_WORK(&con->swork, process_send_sockets);
 	INIT_WORK(&con->rwork, process_recv_sockets);
+	INIT_WORK(&con->cwork, process_close_sockets);
 	init_waitqueue_head(&con->shutdown_wait);
 
 	return 0;
@@ -551,19 +563,10 @@ static void lowcomms_write_space(struct sock *sk)
 	}
 
 out:
-	dlm_io_queue(con, &con->swork);
+	dlm_io_delayed_queue(con, &con->swork, 0);
 	read_unlock_bh(&sk->sk_callback_lock);
 }
 
-static inline void lowcomms_connect_sock(struct connection *con)
-{
-	if (test_bit(CF_CLOSE, &con->flags))
-		return;
-
-	dlm_io_queue(con, &con->swork);
-	cond_resched();
-}
-
 static void lowcomms_state_change(struct sock *sk)
 {
 	/* SCTP layer is not calling sk_data_ready when the connection
@@ -579,27 +582,6 @@ static void lowcomms_state_change(struct sock *sk)
 	}
 }
 
-int dlm_lowcomms_connect_node(int nodeid)
-{
-	struct connection *con;
-	int idx;
-
-	if (nodeid == dlm_our_nodeid())
-		return 0;
-
-	idx = srcu_read_lock(&connections_srcu);
-	con = nodeid2con(nodeid, GFP_NOFS);
-	if (!con) {
-		srcu_read_unlock(&connections_srcu, idx);
-		return -ENOMEM;
-	}
-
-	lowcomms_connect_sock(con);
-	srcu_read_unlock(&connections_srcu, idx);
-
-	return 0;
-}
-
 int dlm_lowcomms_nodes_set_mark(int nodeid, unsigned int mark)
 {
 	struct dlm_node_addr *na;
@@ -659,20 +641,8 @@ static void lowcomms_error_report(struct sock *sk)
 				   sk->sk_err_soft);
 	}
 
-	/* below sendcon only handling */
-	if (test_bit(CF_IS_OTHERCON, &con->flags))
-		con = con->sendcon;
-
-	switch (sk->sk_err) {
-	case ECONNREFUSED:
-		set_bit(CF_DELAY_CONNECT, &con->flags);
-		break;
-	default:
-		break;
-	}
-
-	if (!test_and_set_bit(CF_RECONNECT, &con->flags))
-		dlm_io_queue(con, &con->swork);
+	con->sk_err = sk->sk_err;
+	dlm_io_queue(dlm_sendcon(con), &con->cwork);
 
 out:
 	read_unlock_bh(&sk->sk_callback_lock);
@@ -837,8 +807,6 @@ static void close_connection(struct connection *con, bool and_other)
 	con->retries = 0;
 	clear_bit(CF_APP_LIMITED, &con->flags);
 	clear_bit(CF_CONNECTED, &con->flags);
-	clear_bit(CF_DELAY_CONNECT, &con->flags);
-	clear_bit(CF_RECONNECT, &con->flags);
 	clear_bit(CF_EOF, &con->flags);
 
 	/* handling for tcp shutdown */
@@ -851,10 +819,13 @@ static void cancel_io_work(struct connection *con, bool and_other)
 	struct connection *sendcon = dlm_sendcon(con);
 
 	set_bit(CF_STOP, &sendcon->flags);
-	cancel_work_sync(&sendcon->swork);
 	cancel_work_sync(&sendcon->rwork);
-	if (sendcon->othercon && and_other)
+	cancel_work_sync(&sendcon->cwork);
+	cancel_delayed_work_sync(&sendcon->swork);
+	if (sendcon->othercon && and_other) {
 		cancel_work_sync(&sendcon->othercon->rwork);
+		cancel_work_sync(&sendcon->othercon->cwork);
+	}
 
 	dlm_con_lock(sendcon);
 	close_connection(con, and_other);
@@ -911,7 +882,7 @@ static void dlm_tcp_shutdown(struct connection *con)
 	}
 
 	/* flush all send */
-	flush_work(&con->swork);
+	flush_delayed_work(&con->swork);
 	shutdown_connection(con, con);
 }
 
@@ -1002,9 +973,10 @@ static void receive_from_sock(struct connection *con,
 	return;
 
 out_close:
-	if (!test_and_set_bit(CF_RECONNECT, &sendcon->flags))
-		dlm_io_queue(sendcon, &sendcon->swork);
-
+	dlm_con_lock(sendcon);
+	close_connection(con, false);
+	dlm_con_unlock(sendcon);
+	dlm_io_delayed_queue(sendcon, &sendcon->swork, 0);
 	return;
 
 out_eof:
@@ -1376,7 +1348,7 @@ static void _dlm_lowcomms_commit_msg(struct dlm_msg *msg)
 	e->len = DLM_WQ_LENGTH_BYTES(e);
 	spin_unlock(&con->writequeue_lock);
 
-	dlm_io_queue(con, &con->swork);
+	dlm_io_delayed_queue(con, &con->swork, 0);
 	return;
 
 out:
@@ -1430,7 +1402,7 @@ static void send_to_sock(struct connection *con)
 
 	mutex_lock(&con->swork_lock);
 	if (con->sock == NULL) {
-		dlm_io_queue(con, &con->swork);
+		dlm_io_delayed_queue(con, &con->swork, 0);
 		goto out;
 	}
 
@@ -1611,33 +1583,74 @@ static int dlm_connect(struct connection *con)
 	    result != -EPROTONOSUPPORT) {
 		log_print("connect %d try %d error %d", con->nodeid,
 			  con->retries, result);
-		msleep(1000);
-		lowcomms_connect_sock(con);
+		dlm_io_delayed_queue(con, &con->swork,
+				     msecs_to_jiffies(1000));
 	}
 
 	return result;
 }
 
-/* Send workqueue function */
-static void process_send_sockets(struct work_struct *work)
+int dlm_lowcomms_connect_node(int nodeid)
 {
-	struct connection *con = container_of(work, struct connection, swork);
+	struct connection *con;
+	int idx;
 
-	WARN_ON(test_bit(CF_IS_OTHERCON, &con->flags));
+	if (nodeid == dlm_our_nodeid())
+		return 0;
 
-	if (test_and_clear_bit(CF_RECONNECT, &con->flags)) {
-		dlm_con_lock(con);
-		close_connection(con, false);
-		dlm_midcomms_unack_msg_resend(con->nodeid);
-		dlm_con_unlock(con);
+	idx = srcu_read_lock(&connections_srcu);
+	con = nodeid2con(nodeid, GFP_NOFS);
+	if (!con) {
+		srcu_read_unlock(&connections_srcu, idx);
+		return -ENOMEM;
 	}
 
-	if (con->sock == NULL) { /* not mutex protected so check it inside too */
-		if (test_and_clear_bit(CF_DELAY_CONNECT, &con->flags))
-			msleep(1000);
+	if (test_bit(CF_CLOSE, &con->flags)) {
+		srcu_read_unlock(&connections_srcu, idx);
+		return 0;
+	}
 
+	dlm_con_lock(con);
+	dlm_connect(con);
+	dlm_con_unlock(con);
+	srcu_read_unlock(&connections_srcu, idx);
+
+	cond_resched();
+	return 0;
+}
+
+static void process_close_sockets(struct work_struct *work)
+{
+	struct connection *con = container_of(work, struct connection, cwork);
+	struct connection *sendcon = dlm_sendcon(con);
+	unsigned int delay = 0;
+
+	dlm_con_lock(sendcon);
+	close_connection(con, false);
+	dlm_con_unlock(sendcon);
+
+	switch (con->sk_err) {
+	case ECONNREFUSED:
+		delay = msecs_to_jiffies(1000);
+		break;
+	default:
+		break;
+	}
+
+	dlm_io_delayed_queue(sendcon, &sendcon->swork, delay);
+}
+
+/* Send workqueue function */
+static void process_send_sockets(struct work_struct *work)
+{
+	struct connection *con = container_of(work, struct connection,
+					      swork.work);
+
+	/* be used to connect socket */
+	if (con->sock == NULL) {
 		dlm_con_lock(con);
 		dlm_connect(con);
+		dlm_midcomms_unack_msg_resend(con->nodeid);
 		dlm_con_unlock(con);
 	}
 
-- 
2.26.3




More information about the Cluster-devel mailing list