[Cluster-devel] [PATCHv3 dlm/next 3/8] fs: dlm: make buffer handling per msg
Alexander Aring
aahringo at redhat.com
Fri Mar 26 17:33:32 UTC 2021
This patch makes the void pointer handle for lowcomms functionality per
message and not per page allocation entry. A refcount handling for the
handle was added to keep the message alive until the user doesn't need
it anymore.
There exists now a per message callback which will be called when
allocating a new buffer. This callback will be guaranteed to be called
according the order of the sending buffer, which can be used that the
caller increments a sequence number.
Signed-off-by: Alexander Aring <aahringo at redhat.com>
---
fs/dlm/lowcomms.c | 100 +++++++++++++++++++++++++++++++++++++++++-----
fs/dlm/lowcomms.h | 5 ++-
fs/dlm/midcomms.c | 8 +++-
3 files changed, 101 insertions(+), 12 deletions(-)
diff --git a/fs/dlm/lowcomms.c b/fs/dlm/lowcomms.c
index 73cc1809050a..ba782ea84281 100644
--- a/fs/dlm/lowcomms.c
+++ b/fs/dlm/lowcomms.c
@@ -114,6 +114,17 @@ struct writequeue_entry {
int end;
int users;
struct connection *con;
+ struct list_head msgs;
+ struct kref ref;
+};
+
+struct dlm_msg {
+ struct writequeue_entry *entry;
+ void *ppc;
+ int len;
+
+ struct list_head list;
+ struct kref ref;
};
struct dlm_node_addr {
@@ -976,12 +987,36 @@ static int accept_from_sock(struct listen_connection *con)
return result;
}
-static void free_entry(struct writequeue_entry *e)
+static void dlm_page_release(struct kref *kref)
{
+ struct writequeue_entry *e = container_of(kref, struct writequeue_entry,
+ ref);
+
__free_page(e->page);
kfree(e);
}
+static void dlm_msg_release(struct kref *kref)
+{
+ struct dlm_msg *msg = container_of(kref, struct dlm_msg, ref);
+
+ kref_put(&msg->entry->ref, dlm_page_release);
+ kfree(msg);
+}
+
+static void free_entry(struct writequeue_entry *e)
+{
+ struct dlm_msg *msg, *tmp;
+
+ list_for_each_entry_safe(msg, tmp, &e->msgs, list) {
+ list_del(&msg->list);
+ kref_put(&msg->ref, dlm_msg_release);
+ }
+
+ list_del(&e->list);
+ kref_put(&e->ref, dlm_page_release);
+}
+
/*
* writequeue_entry_complete - try to delete and free write queue entry
* @e: write queue entry to try to delete
@@ -994,10 +1029,8 @@ static void writequeue_entry_complete(struct writequeue_entry *e, int completed)
e->offset += completed;
e->len -= completed;
- if (e->len == 0 && e->users == 0) {
- list_del(&e->list);
+ if (e->len == 0 && e->users == 0)
free_entry(e);
- }
}
/*
@@ -1363,12 +1396,16 @@ static struct writequeue_entry *new_writequeue_entry(struct connection *con,
entry->con = con;
entry->users = 1;
+ kref_init(&entry->ref);
+ INIT_LIST_HEAD(&entry->msgs);
return entry;
}
static struct writequeue_entry *new_wq_entry(struct connection *con, int len,
- gfp_t allocation, char **ppc)
+ gfp_t allocation, char **ppc,
+ void (*cb)(void *buf, void *priv),
+ void *priv)
{
struct writequeue_entry *e;
@@ -1376,7 +1413,12 @@ static struct writequeue_entry *new_wq_entry(struct connection *con, int len,
if (!list_empty(&con->writequeue)) {
e = list_last_entry(&con->writequeue, struct writequeue_entry, list);
if (DLM_WQ_REMAIN_BYTES(e) >= len) {
+ kref_get(&e->ref);
+
*ppc = page_address(e->page) + e->end;
+ if (cb)
+ cb(*ppc, priv);
+
e->end += len;
e->users++;
spin_unlock(&con->writequeue_lock);
@@ -1390,19 +1432,26 @@ static struct writequeue_entry *new_wq_entry(struct connection *con, int len,
if (!e)
return NULL;
+ kref_get(&e->ref);
*ppc = page_address(e->page);
e->end += len;
spin_lock(&con->writequeue_lock);
+ if (cb)
+ cb(*ppc, priv);
+
list_add_tail(&e->list, &con->writequeue);
spin_unlock(&con->writequeue_lock);
return e;
};
-void *dlm_lowcomms_get_buffer(int nodeid, int len, gfp_t allocation, char **ppc)
+void *dlm_lowcomms_new_buffer(int nodeid, int len, gfp_t allocation, char **ppc,
+ void (*cb)(void *buf, void *priv), void *priv)
{
+ struct writequeue_entry *e;
struct connection *con;
+ struct dlm_msg *msg;
if (len > DEFAULT_BUFFER_SIZE ||
len < sizeof(struct dlm_header)) {
@@ -1416,16 +1465,36 @@ void *dlm_lowcomms_get_buffer(int nodeid, int len, gfp_t allocation, char **ppc)
if (!con)
return NULL;
- return new_wq_entry(con, len, allocation, ppc);
+ msg = kzalloc(sizeof(*msg), allocation);
+ if (!msg)
+ return NULL;
+
+ kref_init(&msg->ref);
+
+ e = new_wq_entry(con, len, allocation, ppc, cb, priv);
+ if (!e) {
+ kfree(msg);
+ return NULL;
+ }
+
+ msg->ppc = *ppc;
+ msg->len = len;
+ msg->entry = e;
+
+ return msg;
}
void dlm_lowcomms_commit_buffer(void *mh)
{
- struct writequeue_entry *e = (struct writequeue_entry *)mh;
+ struct dlm_msg *msg = mh;
+ struct writequeue_entry *e = msg->entry;
struct connection *con = e->con;
int users;
spin_lock(&con->writequeue_lock);
+ list_add(&msg->list, &e->msgs);
+ kref_get(&msg->ref);
+
users = --e->users;
if (users)
goto out;
@@ -1441,6 +1510,20 @@ void dlm_lowcomms_commit_buffer(void *mh)
return;
}
+void dlm_lowcomms_put_buffer(void *mh)
+{
+ struct dlm_msg *msg = mh;
+
+ kref_put(&msg->ref, dlm_msg_release);
+}
+
+void dlm_lowcomms_get_buffer(void *mh)
+{
+ struct dlm_msg *msg = mh;
+
+ kref_get(&msg->ref);
+}
+
/* Send a message */
static void send_to_sock(struct connection *con)
{
@@ -1519,7 +1602,6 @@ static void clean_one_writequeue(struct connection *con)
spin_lock(&con->writequeue_lock);
list_for_each_entry_safe(e, safe, &con->writequeue, list) {
- list_del(&e->list);
free_entry(e);
}
spin_unlock(&con->writequeue_lock);
diff --git a/fs/dlm/lowcomms.h b/fs/dlm/lowcomms.h
index 48bbc4e18761..fa735497dad8 100644
--- a/fs/dlm/lowcomms.h
+++ b/fs/dlm/lowcomms.h
@@ -22,11 +22,14 @@ void dlm_lowcomms_shutdown(void);
void dlm_lowcomms_stop(void);
void dlm_lowcomms_exit(void);
int dlm_lowcomms_close(int nodeid);
-void *dlm_lowcomms_get_buffer(int nodeid, int len, gfp_t allocation, char **ppc);
+void *dlm_lowcomms_new_buffer(int nodeid, int len, gfp_t allocation, char **ppc,
+ void (*cb)(void *buf, void *priv), void *priv);
void dlm_lowcomms_commit_buffer(void *mh);
int dlm_lowcomms_connect_node(int nodeid);
int dlm_lowcomms_nodes_set_mark(int nodeid, unsigned int mark);
int dlm_lowcomms_addr(int nodeid, struct sockaddr_storage *addr, int len);
+void dlm_lowcomms_put_buffer(void *mh);
+void dlm_lowcomms_get_buffer(void *mh);
#endif /* __LOWCOMMS_DOT_H__ */
diff --git a/fs/dlm/midcomms.c b/fs/dlm/midcomms.c
index bbcb242e6101..2ea0449a82ab 100644
--- a/fs/dlm/midcomms.c
+++ b/fs/dlm/midcomms.c
@@ -30,23 +30,27 @@
void *dlm_midcomms_get_buffer(int nodeid, int len, gfp_t allocation, char **ppc)
{
- return dlm_lowcomms_get_buffer(nodeid, len, allocation, ppc);
+ return dlm_lowcomms_new_buffer(nodeid, len, allocation, ppc, NULL,
+ NULL);
}
void dlm_midcomms_commit_buffer(void *mh)
{
dlm_lowcomms_commit_buffer(mh);
+ dlm_lowcomms_put_buffer(mh);
}
void *dlm_midcomms_stateless_get_buffer(int nodeid, int len, gfp_t allocation,
char **ppc)
{
- return dlm_lowcomms_get_buffer(nodeid, len, allocation, ppc);
+ return dlm_lowcomms_new_buffer(nodeid, len, allocation, ppc, NULL,
+ NULL);
}
void dlm_midcomms_stateless_commit_buffer(void *mh)
{
dlm_lowcomms_commit_buffer(mh);
+ dlm_lowcomms_put_buffer(mh);
}
void midcomms_add_member(int nodeid)
--
2.26.3
More information about the Cluster-devel
mailing list