[Cluster-devel] [RFC dlm/next 11/15] fs: dlm: introduce process workqueue
Alexander Aring
aahringo at redhat.com
Wed Jun 23 15:14:50 UTC 2021
To not block future receive handling calls this patch introduces a
process workqueue which will call dlm_process_incoming_buffer().
While processing dlm messages the current send functionality should come
to an end and no new queues for swork is allowed. This is done by the
introduced connection bit CF_STOP_SEND.
Signed-off-by: Alexander Aring <aahringo at redhat.com>
---
fs/dlm/lowcomms.c | 219 +++++++++++++++++++++++++++++++---------------
fs/dlm/midcomms.c | 38 ++++++--
fs/dlm/midcomms.h | 3 +-
3 files changed, 178 insertions(+), 82 deletions(-)
diff --git a/fs/dlm/lowcomms.c b/fs/dlm/lowcomms.c
index a54ed3cf0b45..28d97f8187a5 100644
--- a/fs/dlm/lowcomms.c
+++ b/fs/dlm/lowcomms.c
@@ -24,15 +24,15 @@
* responsibility to resolve these into IP address or
* whatever it needs for inter-node communication.
*
- * The comms level is two kernel threads that deal mainly with
- * the receiving of messages from other nodes and passing them
- * up to the mid-level comms layer (which understands the
- * message format) for execution by the locking core, and
- * a send thread which does all the setting up of connections
- * to remote nodes and the sending of data. Threads are not allowed
- * to send their own data because it may cause them to wait in times
- * of high load. Also, this way, the sending thread can collect together
- * messages bound for one node and send them in one block.
+ * Each connection can send and receive at the same time which is considered
+ * as hotpath. Closing or accepting new connection is considered as not hotpath
+ * and will block all send and receive per connection. To disallow sending
+ * while processing dlm message the connection flag CF_STOP_SEND was introduced
+ * which disallow any further dequeuing of the connection writequeue and will
+ * not trigger any new queuing of connection swork. If all received dlm
+ * messages are processed the flag will be dropped and a swork will be
+ * triggered. This combines all new messages which appeared while processing
+ * dlm messages.
*
* lowcomms will choose to use either TCP or SCTP as its transport layer
* depending on the configuration variable 'protocol'. This should be set
@@ -75,6 +75,7 @@ struct connection {
#define CF_CONNECTED 5
#define CF_EOF 6
#define CF_STOP 7
+#define CF_STOP_SEND 8
struct list_head writequeue; /* List of outgoing writequeue_entries */
spinlock_t writequeue_lock;
atomic_t writequeue_cnt;
@@ -91,9 +92,12 @@ struct connection {
struct work_struct cwork;
int sk_err;
wait_queue_head_t shutdown_wait; /* wait for graceful shutdown */
- unsigned char *rx_buf;
- int rx_buflen;
+ unsigned char rx_leftover_buf[DLM_MAX_SOCKET_BUFSIZE];
int rx_leftover;
+ struct work_struct pwork;
+ struct list_head processqueue;
+ spinlock_t processqueue_lock;
+ struct mutex process_lock;
struct rcu_head rcu;
};
#define sock2con(x) ((struct connection *)(x)->sk_user_data)
@@ -121,6 +125,13 @@ struct writequeue_entry {
struct kref ref;
};
+struct processqueue_entry {
+ unsigned char *buf;
+ int buflen;
+
+ struct list_head list;
+};
+
struct dlm_msg {
struct writequeue_entry *entry;
struct dlm_msg *orig_msg;
@@ -177,6 +188,7 @@ int dlm_allow_conn;
/* Work queues */
static struct workqueue_struct *io_workqueue;
+static struct workqueue_struct *process_workqueue;
static struct hlist_head connection_hash[CONN_HASH_SIZE];
static DEFINE_SPINLOCK(connections_lock);
@@ -185,6 +197,7 @@ DEFINE_STATIC_SRCU(connections_srcu);
static const struct dlm_proto_ops *dlm_proto_ops;
static void process_close_sockets(struct work_struct *work);
+static void process_dlm_messages(struct work_struct *work);
static void process_recv_sockets(struct work_struct *work);
static void process_send_sockets(struct work_struct *work);
@@ -227,11 +240,25 @@ static inline struct connection *dlm_sendcon(struct connection *con)
return con;
}
+static inline void con_stop_send(struct connection *con)
+{
+ set_bit(CF_STOP_SEND, &con->flags);
+}
+
+static inline void con_resume_send(struct connection *con)
+{
+ clear_bit(CF_STOP_SEND, &con->flags);
+ dlm_io_delayed_queue(con, &con->swork, 0);
+}
+
/* need to held writequeue_lock */
static struct writequeue_entry *con_next_wq(struct connection *con)
{
struct writequeue_entry *e;
+ if (test_bit(CF_STOP_SEND, &con->flags))
+ return NULL;
+
if (list_empty(&con->writequeue))
return NULL;
@@ -260,13 +287,8 @@ static bool tcp_eof_condition(struct connection *con)
return atomic_read(&con->writequeue_cnt);
}
-static int dlm_con_init(struct connection *con, int nodeid)
+static void dlm_con_init(struct connection *con, int nodeid)
{
- con->rx_buflen = dlm_config.ci_buffer_size;
- con->rx_buf = kmalloc(con->rx_buflen, GFP_NOFS);
- if (!con->rx_buf)
- return -ENOMEM;
-
con->nodeid = nodeid;
INIT_LIST_HEAD(&con->writequeue);
spin_lock_init(&con->writequeue_lock);
@@ -275,8 +297,6 @@ static int dlm_con_init(struct connection *con, int nodeid)
INIT_WORK(&con->rwork, process_recv_sockets);
INIT_WORK(&con->cwork, process_close_sockets);
init_waitqueue_head(&con->shutdown_wait);
-
- return 0;
}
/*
@@ -286,7 +306,7 @@ static int dlm_con_init(struct connection *con, int nodeid)
static struct connection *nodeid2con(int nodeid, gfp_t alloc)
{
struct connection *con, *tmp;
- int r, ret;
+ int r;
r = nodeid_hash(nodeid);
con = __find_con(nodeid, r);
@@ -297,16 +317,17 @@ static struct connection *nodeid2con(int nodeid, gfp_t alloc)
if (!con)
return NULL;
- ret = dlm_con_init(con, nodeid);
- if (ret) {
- kfree(con);
- return NULL;
- }
+ dlm_con_init(con, nodeid);
mutex_init(&con->rwork_lock);
mutex_init(&con->swork_lock);
mutex_init(&con->wq_alloc);
+ mutex_init(&con->process_lock);
+ INIT_LIST_HEAD(&con->processqueue);
+ spin_lock_init(&con->processqueue_lock);
+ INIT_WORK(&con->pwork, process_dlm_messages);
+
spin_lock(&connections_lock);
/* Because multiple workqueues/threads calls this function it can
* race on multiple cpu's. Instead of locking hot path __find_con()
@@ -317,7 +338,6 @@ static struct connection *nodeid2con(int nodeid, gfp_t alloc)
tmp = __find_con(nodeid, r);
if (tmp) {
spin_unlock(&connections_lock);
- kfree(con->rx_buf);
kfree(con);
return tmp;
}
@@ -819,13 +839,17 @@ static void cancel_io_work(struct connection *con, bool and_other)
struct connection *sendcon = dlm_sendcon(con);
set_bit(CF_STOP, &sendcon->flags);
+ /* stop receiving */
cancel_work_sync(&sendcon->rwork);
cancel_work_sync(&sendcon->cwork);
- cancel_delayed_work_sync(&sendcon->swork);
if (sendcon->othercon && and_other) {
cancel_work_sync(&sendcon->othercon->rwork);
cancel_work_sync(&sendcon->othercon->cwork);
}
+ /* flush pending processes which might trigger swork */
+ flush_work(&sendcon->pwork);
+ /* stop sending */
+ cancel_delayed_work_sync(&sendcon->swork);
dlm_con_lock(sendcon);
close_connection(con, and_other);
@@ -874,45 +898,77 @@ static void shutdown_connection(struct connection *con,
static void dlm_tcp_shutdown(struct connection *con)
{
/* flush pending processes which might trigger send */
- flush_work(&con->rwork);
+ flush_work(&con->pwork);
+ /* flush all send */
+ flush_delayed_work(&con->swork);
- if (con->othercon) {
- flush_work(&con->othercon->rwork);
+ if (con->othercon)
shutdown_connection(con->othercon, con);
- }
- /* flush all send */
- flush_delayed_work(&con->swork);
shutdown_connection(con, con);
}
-static int con_realloc_receive_buf(struct connection *con, int newlen)
+static struct processqueue_entry *new_processqueue_entry(int nodeid,
+ int buflen)
{
- unsigned char *newbuf;
+ struct processqueue_entry *pentry;
- newbuf = kmalloc(newlen, GFP_NOFS);
- if (!newbuf)
- return -ENOMEM;
+ pentry = kmalloc(sizeof(*pentry), GFP_NOFS);
+ if (!pentry)
+ return NULL;
- /* copy any leftover from last receive */
- if (con->rx_leftover)
- memmove(newbuf, con->rx_buf, con->rx_leftover);
+ pentry->buf = kmalloc(buflen, GFP_NOFS);
+ if (!pentry->buf) {
+ kfree(pentry);
+ return NULL;
+ }
- /* swap to new buffer space */
- kfree(con->rx_buf);
- con->rx_buflen = newlen;
- con->rx_buf = newbuf;
+ return pentry;
+}
- return 0;
+static void free_processqueue_entry(struct processqueue_entry *pentry)
+{
+ kfree(pentry->buf);
+ kfree(pentry);
+}
+
+static void process_dlm_messages(struct work_struct *work)
+{
+ struct connection *con = container_of(work, struct connection, pwork);
+ struct processqueue_entry *pentry;
+
+ mutex_lock(&con->process_lock);
+ con_stop_send(con);
+
+ for (;;) {
+ spin_lock(&con->processqueue_lock);
+ if (list_empty(&con->processqueue)) {
+ spin_unlock(&con->processqueue_lock);
+ break;
+ }
+
+ pentry = list_first_entry(&con->processqueue,
+ struct processqueue_entry, list);
+ list_del(&pentry->list);
+ spin_unlock(&con->processqueue_lock);
+
+ dlm_process_incoming_buffer(con->nodeid, pentry->buf,
+ pentry->buflen);
+ free_processqueue_entry(pentry);
+ }
+
+ con_resume_send(con);
+ mutex_unlock(&con->process_lock);
}
/* Data received from remote end */
static void receive_from_sock(struct connection *con,
struct connection *sendcon)
{
+ struct processqueue_entry *pentry;
+ int ret, buflen, buflen_real;
struct msghdr msg;
struct kvec iov;
- int ret, buflen;
mutex_lock(&sendcon->rwork_lock);
if (con->sock == NULL) {
@@ -920,20 +976,21 @@ static void receive_from_sock(struct connection *con,
return;
}
- /* realloc if we get new buffer size to read out */
- buflen = dlm_config.ci_buffer_size;
- if (con->rx_buflen != buflen && con->rx_leftover <= buflen) {
- ret = con_realloc_receive_buf(con, buflen);
- if (ret < 0)
+ buflen = READ_ONCE(dlm_config.ci_buffer_size);
+ for (;;) {
+ pentry = new_processqueue_entry(con->nodeid, buflen);
+ if (!pentry) {
+ mutex_unlock(&sendcon->rwork_lock);
goto out_resched;
- }
+ }
+
+ memcpy(pentry->buf, con->rx_leftover_buf, con->rx_leftover);
- for (;;) {
/* calculate new buffer parameter regarding last receive and
* possible leftover bytes
*/
- iov.iov_base = con->rx_buf + con->rx_leftover;
- iov.iov_len = con->rx_buflen - con->rx_leftover;
+ iov.iov_base = pentry->buf + con->rx_leftover;
+ iov.iov_len = buflen - con->rx_leftover;
memset(&msg, 0, sizeof(msg));
msg.msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL;
@@ -941,28 +998,39 @@ static void receive_from_sock(struct connection *con,
msg.msg_flags);
if (ret == 0) {
mutex_unlock(&sendcon->rwork_lock);
+ free_processqueue_entry(pentry);
goto out_eof;
} else if (ret < 0) {
+ free_processqueue_entry(pentry);
break;
}
/* new buflen according readed bytes and leftover from last receive */
- buflen = ret + con->rx_leftover;
- ret = dlm_process_incoming_buffer(con->nodeid, con->rx_buf, buflen);
+ buflen_real = ret + con->rx_leftover;
+ ret = dlm_validate_incoming_buffer(con->nodeid, pentry->buf,
+ buflen_real);
if (ret < 0) {
mutex_unlock(&sendcon->rwork_lock);
+ free_processqueue_entry(pentry);
goto out_close;
}
+ pentry->buflen = ret;
+
/* calculate leftover bytes from process and put it into begin of
* the receive buffer, so next receive we have the full message
* at the start address of the receive buffer.
*/
- con->rx_leftover = buflen - ret;
- if (con->rx_leftover) {
- memmove(con->rx_buf, con->rx_buf + ret,
+ con->rx_leftover = buflen_real - ret;
+ if (con->rx_leftover)
+ memmove(con->rx_leftover_buf, pentry->buf + ret,
con->rx_leftover);
- }
+
+ spin_lock(&sendcon->processqueue_lock);
+ list_add_tail(&pentry->list, &sendcon->processqueue);
+ spin_unlock(&sendcon->processqueue_lock);
+
+ queue_work(process_workqueue, &sendcon->pwork);
}
mutex_unlock(&sendcon->rwork_lock);
@@ -983,6 +1051,9 @@ static void receive_from_sock(struct connection *con,
log_print("connection %p got EOF from %d",
con, con->nodeid);
+ /* flush pending processing which might trigger send */
+ flush_work(&sendcon->pwork);
+
if (dlm_proto_ops->eof_condition &&
dlm_proto_ops->eof_condition(con)) {
set_bit(CF_EOF, &con->flags);
@@ -1063,13 +1134,7 @@ static int accept_from_sock(struct listen_connection *con)
goto accept_err;
}
- result = dlm_con_init(othercon, nodeid);
- if (result < 0) {
- kfree(othercon);
- dlm_con_unlock(newcon);
- srcu_read_unlock(&connections_srcu, idx);
- goto accept_err;
- }
+ dlm_con_init(othercon, nodeid);
set_bit(CF_IS_OTHERCON, &othercon->flags);
newcon->othercon = othercon;
@@ -1099,7 +1164,6 @@ static int accept_from_sock(struct listen_connection *con)
* to the read_sockets list
*/
dlm_io_queue(newcon, &addcon->rwork);
-
srcu_read_unlock(&connections_srcu, idx);
return 0;
@@ -1348,7 +1412,9 @@ static void _dlm_lowcomms_commit_msg(struct dlm_msg *msg)
e->len = DLM_WQ_LENGTH_BYTES(e);
spin_unlock(&con->writequeue_lock);
- dlm_io_delayed_queue(con, &con->swork, 0);
+ if (!test_bit(CF_STOP_SEND, &con->flags))
+ dlm_io_delayed_queue(con, &con->swork, 0);
+
return;
out:
@@ -1660,13 +1726,23 @@ static void process_send_sockets(struct work_struct *work)
static void work_stop(void)
{
destroy_workqueue(io_workqueue);
+ destroy_workqueue(process_workqueue);
}
static int work_start(void)
{
+ process_workqueue = alloc_workqueue("dlm_process",
+ WQ_HIGHPRI | WQ_UNBOUND |
+ WQ_MEM_RECLAIM, 0);
+ if (!process_workqueue) {
+ log_print("can't start dlm_process");
+ return -ENOMEM;
+ }
+
io_workqueue = alloc_workqueue("dlm_io",
WQ_UNBOUND | WQ_MEM_RECLAIM, 0);
if (!io_workqueue) {
+ destroy_workqueue(process_workqueue);
log_print("can't start dlm_io");
return -ENOMEM;
}
@@ -1702,7 +1778,6 @@ static void connection_release(struct rcu_head *rcu)
{
struct connection *con = container_of(rcu, struct connection, rcu);
- kfree(con->rx_buf);
kfree(con);
}
diff --git a/fs/dlm/midcomms.c b/fs/dlm/midcomms.c
index e3de268898ed..483f7c54c217 100644
--- a/fs/dlm/midcomms.c
+++ b/fs/dlm/midcomms.c
@@ -872,12 +872,7 @@ static void dlm_midcomms_receive_buffer_3_1(union dlm_packet *p, int nodeid)
dlm_receive_buffer(p, nodeid);
}
-/*
- * Called from the low-level comms layer to process a buffer of
- * commands.
- */
-
-int dlm_process_incoming_buffer(int nodeid, unsigned char *buf, int len)
+int dlm_validate_incoming_buffer(int nodeid, unsigned char *buf, int len)
{
const unsigned char *ptr = buf;
const struct dlm_header *hd;
@@ -885,7 +880,7 @@ int dlm_process_incoming_buffer(int nodeid, unsigned char *buf, int len)
int ret = 0;
while (len >= sizeof(struct dlm_header)) {
- hd = (struct dlm_header *)ptr;
+ hd = (const struct dlm_header *)ptr;
/* no message should be more than DLM_MAX_SOCKET_BUFSIZE or
* less than dlm_header size.
@@ -912,6 +907,33 @@ int dlm_process_incoming_buffer(int nodeid, unsigned char *buf, int len)
if (msglen > len)
break;
+ ret += msglen;
+ len -= msglen;
+ ptr += msglen;
+ }
+
+ return ret;
+}
+
+/*
+ * Called from the low-level comms layer to process a buffer of
+ * commands.
+ */
+
+void dlm_process_incoming_buffer(int nodeid, unsigned char *buf, int len)
+{
+ const unsigned char *ptr = buf;
+ const struct dlm_header *hd;
+ uint16_t msglen;
+ int ret = 0;
+
+ while (len >= sizeof(struct dlm_header)) {
+ hd = (struct dlm_header *)ptr;
+
+ msglen = le16_to_cpu(hd->h_length);
+ if (msglen > len)
+ break;
+
switch (le32_to_cpu(hd->h_version)) {
case DLM_VERSION_3_1:
dlm_midcomms_receive_buffer_3_1((union dlm_packet *)ptr, nodeid);
@@ -929,8 +951,6 @@ int dlm_process_incoming_buffer(int nodeid, unsigned char *buf, int len)
len -= msglen;
ptr += msglen;
}
-
- return ret;
}
void dlm_midcomms_unack_msg_resend(int nodeid)
diff --git a/fs/dlm/midcomms.h b/fs/dlm/midcomms.h
index 579abc6929be..a62c1ad786ef 100644
--- a/fs/dlm/midcomms.h
+++ b/fs/dlm/midcomms.h
@@ -14,7 +14,8 @@
struct midcomms_node;
-int dlm_process_incoming_buffer(int nodeid, unsigned char *buf, int buflen);
+void dlm_process_incoming_buffer(int nodeid, unsigned char *buf, int buflen);
+int dlm_validate_incoming_buffer(int nodeid, unsigned char *buf, int len);
struct dlm_mhandle *dlm_midcomms_get_mhandle(int nodeid, int len,
gfp_t allocation, char **ppc);
void dlm_midcomms_commit_mhandle(struct dlm_mhandle *mh);
--
2.26.3
More information about the Cluster-devel
mailing list