[Cluster-devel] [PATCH 15/29] gfs2: gfs2_glock_get: Wait on freeing glocks

Bob Peterson rpeterso at redhat.com
Mon Sep 4 02:51:03 UTC 2017


From: Andreas Gruenbacher <agruenba at redhat.com>

Keep glocks in their hash table until they are freed instead of removing
them when their last reference is dropped.  This allows to wait for any
previous instances of a glock to go away in gfs2_glock_get before
creating a new glocks.

Special thanks to Andy Price for finding and fixing a problem which also
required us to delete the rcu_read_unlock from the error case in function
gfs2_glock_get.

Signed-off-by: Andreas Gruenbacher <agruenba at redhat.com>
Signed-off-by: Bob Peterson <rpeterso at redhat.com>
---
 fs/gfs2/glock.c | 126 ++++++++++++++++++++++++++++++++++++++++++++++----------
 1 file changed, 104 insertions(+), 22 deletions(-)

diff --git a/fs/gfs2/glock.c b/fs/gfs2/glock.c
index 1029340fc8ba..11d48b964047 100644
--- a/fs/gfs2/glock.c
+++ b/fs/gfs2/glock.c
@@ -15,6 +15,7 @@
 #include <linux/buffer_head.h>
 #include <linux/delay.h>
 #include <linux/sort.h>
+#include <linux/hash.h>
 #include <linux/jhash.h>
 #include <linux/kallsyms.h>
 #include <linux/gfs2_ondisk.h>
@@ -80,6 +81,66 @@ static struct rhashtable_params ht_parms = {
 
 static struct rhashtable gl_hash_table;
 
+#define GLOCK_WAIT_TABLE_BITS 12
+#define GLOCK_WAIT_TABLE_SIZE (1 << GLOCK_WAIT_TABLE_BITS)
+static wait_queue_head_t glock_wait_table[GLOCK_WAIT_TABLE_SIZE] __cacheline_aligned;
+
+struct wait_glock_queue {
+	struct lm_lockname *name;
+	wait_queue_entry_t wait;
+};
+
+static int glock_wake_function(wait_queue_entry_t *wait, unsigned int mode,
+			       int sync, void *key)
+{
+	struct wait_glock_queue *wait_glock =
+		container_of(wait, struct wait_glock_queue, wait);
+	struct lm_lockname *wait_name = wait_glock->name;
+	struct lm_lockname *wake_name = key;
+
+	if (wake_name->ln_sbd != wait_name->ln_sbd ||
+	    wake_name->ln_number != wait_name->ln_number ||
+	    wake_name->ln_type != wait_name->ln_type)
+		return 0;
+	return autoremove_wake_function(wait, mode, sync, key);
+}
+
+static wait_queue_head_t *glock_waitqueue(struct lm_lockname *name)
+{
+	u32 hash = jhash2((u32 *)name, sizeof(*name) / 4, 0);
+
+	return glock_wait_table + hash_32(hash, GLOCK_WAIT_TABLE_BITS);
+}
+
+static void prepare_to_wait_on_glock(wait_queue_head_t **wq,
+				     struct wait_glock_queue *wait,
+				     struct lm_lockname *name)
+{
+	wait->name = name;
+	init_wait(&wait->wait);
+	wait->wait.func = glock_wake_function;
+	*wq = glock_waitqueue(name);
+	prepare_to_wait(*wq, &wait->wait, TASK_UNINTERRUPTIBLE);
+}
+
+static void finish_wait_on_glock(wait_queue_head_t *wq,
+				 struct wait_glock_queue *wait)
+{
+	finish_wait(wq, &wait->wait);
+}
+
+/**
+ * wake_up_glock  -  Wake up waiters on a glock
+ * @gl: the glock
+ */
+static void wake_up_glock(struct gfs2_glock *gl)
+{
+	wait_queue_head_t *wq = glock_waitqueue(&gl->gl_name);
+
+	if (waitqueue_active(wq))
+		__wake_up(wq, TASK_NORMAL, 1, &gl->gl_name);
+}
+
 static void gfs2_glock_dealloc(struct rcu_head *rcu)
 {
 	struct gfs2_glock *gl = container_of(rcu, struct gfs2_glock, gl_rcu);
@@ -96,6 +157,9 @@ void gfs2_glock_free(struct gfs2_glock *gl)
 {
 	struct gfs2_sbd *sdp = gl->gl_name.ln_sbd;
 
+	rhashtable_remove_fast(&gl_hash_table, &gl->gl_node, ht_parms);
+	smp_mb();
+	wake_up_glock(gl);
 	call_rcu(&gl->gl_rcu, gfs2_glock_dealloc);
 	if (atomic_dec_and_test(&sdp->sd_glock_disposal))
 		wake_up(&sdp->sd_glock_wait);
@@ -194,7 +258,6 @@ static void __gfs2_glock_put(struct gfs2_glock *gl)
 
 	gfs2_glock_remove_from_lru(gl);
 	spin_unlock(&gl->gl_lockref.lock);
-	rhashtable_remove_fast(&gl_hash_table, &gl->gl_node, ht_parms);
 	GLOCK_BUG_ON(gl, !list_empty(&gl->gl_holders));
 	GLOCK_BUG_ON(gl, mapping && mapping->nrpages);
 	trace_gfs2_glock_put(gl);
@@ -679,6 +742,36 @@ static void glock_work_func(struct work_struct *work)
 	spin_unlock(&gl->gl_lockref.lock);
 }
 
+static struct gfs2_glock *find_insert_glock(struct lm_lockname *name,
+					    struct gfs2_glock *new)
+{
+	struct wait_glock_queue wait;
+	wait_queue_head_t *wq;
+	struct gfs2_glock *gl;
+
+again:
+	prepare_to_wait_on_glock(&wq, &wait, name);
+	rcu_read_lock();
+	if (new) {
+		gl = rhashtable_lookup_get_insert_fast(&gl_hash_table,
+			&new->gl_node, ht_parms);
+		if (IS_ERR(gl))
+			goto out;
+	} else {
+		gl = rhashtable_lookup_fast(&gl_hash_table,
+			name, ht_parms);
+	}
+	if (gl && !lockref_get_not_dead(&gl->gl_lockref)) {
+		rcu_read_unlock();
+		schedule();
+		goto again;
+	}
+out:
+	rcu_read_unlock();
+	finish_wait_on_glock(wq, &wait);
+	return gl;
+}
+
 /**
  * gfs2_glock_get() - Get a glock, or create one if one doesn't exist
  * @sdp: The GFS2 superblock
@@ -705,15 +798,11 @@ int gfs2_glock_get(struct gfs2_sbd *sdp, u64 number,
 	struct kmem_cache *cachep;
 	int ret = 0;
 
-	rcu_read_lock();
-	gl = rhashtable_lookup_fast(&gl_hash_table, &name, ht_parms);
-	if (gl && !lockref_get_not_dead(&gl->gl_lockref))
-		gl = NULL;
-	rcu_read_unlock();
-
-	*glp = gl;
-	if (gl)
+	gl = find_insert_glock(&name, NULL);
+	if (gl) {
+		*glp = gl;
 		return 0;
+	}
 	if (!create)
 		return -ENOENT;
 
@@ -767,10 +856,7 @@ int gfs2_glock_get(struct gfs2_sbd *sdp, u64 number,
 		mapping->writeback_index = 0;
 	}
 
-again:
-	rcu_read_lock();
-	tmp = rhashtable_lookup_get_insert_fast(&gl_hash_table, &gl->gl_node,
-						ht_parms);
+	tmp = find_insert_glock(&name, gl);
 	if (!tmp) {
 		*glp = gl;
 		goto out;
@@ -779,13 +865,7 @@ int gfs2_glock_get(struct gfs2_sbd *sdp, u64 number,
 		ret = PTR_ERR(tmp);
 		goto out_free;
 	}
-	if (lockref_get_not_dead(&tmp->gl_lockref)) {
-		*glp = tmp;
-		goto out_free;
-	}
-	rcu_read_unlock();
-	cond_resched();
-	goto again;
+	*glp = tmp;
 
 out_free:
 	kfree(gl->gl_lksb.sb_lvbptr);
@@ -793,7 +873,6 @@ int gfs2_glock_get(struct gfs2_sbd *sdp, u64 number,
 	atomic_dec(&sdp->sd_glock_disposal);
 
 out:
-	rcu_read_unlock();
 	return ret;
 }
 
@@ -1806,7 +1885,7 @@ static int gfs2_sbstats_seq_show(struct seq_file *seq, void *iter_ptr)
 
 int __init gfs2_glock_init(void)
 {
-	int ret;
+	int i, ret;
 
 	ret = rhashtable_init(&gl_hash_table, &ht_parms);
 	if (ret < 0)
@@ -1835,6 +1914,9 @@ int __init gfs2_glock_init(void)
 		return ret;
 	}
 
+	for (i = 0; i < GLOCK_WAIT_TABLE_SIZE; i++)
+		init_waitqueue_head(glock_wait_table + i);
+
 	return 0;
 }
 
-- 
2.13.5




More information about the Cluster-devel mailing list