[Cluster-devel] GFS2: Use list_lru for quota lru list

Steven Whitehouse swhiteho at redhat.com
Fri Sep 13 11:44:03 UTC 2013


This patch is a first go (only compile tested so far) at what list_lru
support should look like for GFS2's quota subsystem. I've done this one
rather than the glock lru list first, since this is easier. There are
still some unresolved issues on the glock side with how to use the new
list_lru code.

The conversion is fairly straight forward in this case though, just
requiring an extra function to conditionally add items to the lru
list upon an atomic counter hitting zero. The other change is that
the locking as split into two, one the internal lock in the lru code,
and the other being a new quota spinlock, which is used for all the
non-lru locking.

Let me know if you can spot any problems, and we'll move on to doing
a bit of testing before we merge this. Once thats done, I'll start
looking at the glock issues again to see if I can make some headway
there too,

Steve.


diff --git a/fs/gfs2/main.c b/fs/gfs2/main.c
index 351586e..7296592 100644
--- a/fs/gfs2/main.c
+++ b/fs/gfs2/main.c
@@ -18,6 +18,7 @@
 #include <linux/rculist_bl.h>
 #include <linux/atomic.h>
 #include <linux/mempool.h>
+#include <linux/list_lru.h>
 
 #include "gfs2.h"
 #include "incore.h"
@@ -35,6 +36,7 @@ static struct shrinker qd_shrinker = {
 	.count_objects = gfs2_qd_shrink_count,
 	.scan_objects = gfs2_qd_shrink_scan,
 	.seeks = DEFAULT_SEEKS,
+	.flags = SHRINKER_NUMA_AWARE,
 };
 
 static void gfs2_init_inode_once(void *foo)
@@ -87,6 +89,10 @@ static int __init init_gfs2_fs(void)
 	if (error)
 		return error;
 
+	error = list_lru_init(&gfs2_qd_lru);
+	if (error)
+		goto fail_lru;
+
 	error = gfs2_glock_init();
 	if (error)
 		goto fail;
@@ -179,6 +185,8 @@ fail_wq:
 fail_unregister:
 	unregister_filesystem(&gfs2_fs_type);
 fail:
+	list_lru_destroy(&gfs2_qd_lru);
+fail_lru:
 	unregister_shrinker(&qd_shrinker);
 	gfs2_glock_exit();
 
@@ -221,6 +229,7 @@ static void __exit exit_gfs2_fs(void)
 	unregister_filesystem(&gfs2meta_fs_type);
 	destroy_workqueue(gfs_recovery_wq);
 	destroy_workqueue(gfs2_control_wq);
+	list_lru_destroy(&gfs2_qd_lru);
 
 	rcu_barrier();
 
diff --git a/fs/gfs2/quota.c b/fs/gfs2/quota.c
index db44135..726fab3 100644
--- a/fs/gfs2/quota.c
+++ b/fs/gfs2/quota.c
@@ -50,6 +50,7 @@
 #include <linux/freezer.h>
 #include <linux/quota.h>
 #include <linux/dqblk_xfs.h>
+#include <linux/list_lru.h>
 
 #include "gfs2.h"
 #include "incore.h"
@@ -71,54 +72,56 @@ struct gfs2_quota_change_host {
 	struct kqid qc_id;
 };
 
-static LIST_HEAD(qd_lru_list);
-static atomic_t qd_lru_count = ATOMIC_INIT(0);
-static DEFINE_SPINLOCK(qd_lru_lock);
+struct list_lru gfs2_qd_lru;
+static DEFINE_SPINLOCK(qd_lock);
 
-unsigned long gfs2_qd_shrink_scan(struct shrinker *shrink,
-				  struct shrink_control *sc)
+static enum lru_status gfs2_qd_walk(struct list_head *item, spinlock_t *lock, void *arg)
 {
 	struct gfs2_quota_data *qd;
 	struct gfs2_sbd *sdp;
-	int nr_to_scan = sc->nr_to_scan;
-	long freed = 0;
 
-	if (!(sc->gfp_mask & __GFP_FS))
-		return SHRINK_STOP;
+	qd = list_entry(item, struct gfs2_quota_data, qd_reclaim);
+	sdp = qd->qd_gl->gl_sbd;
 
-	spin_lock(&qd_lru_lock);
-	while (nr_to_scan && !list_empty(&qd_lru_list)) {
-		qd = list_entry(qd_lru_list.next,
-				struct gfs2_quota_data, qd_reclaim);
-		sdp = qd->qd_gl->gl_sbd;
+	spin_lock(&qd_lock);
+	if (atomic_read(&qd->qd_count)) {
+		spin_unlock(&qd_lock);
+		return LRU_SKIP;
+	}
 
-		/* Free from the filesystem-specific list */
-		list_del(&qd->qd_list);
+	/* Free from the filesystem-specific list */
+	list_del(&qd->qd_list);
+	spin_unlock(&qd_lock);
+	spin_unlock(lock);
 
-		gfs2_assert_warn(sdp, !qd->qd_change);
-		gfs2_assert_warn(sdp, !qd->qd_slot_count);
-		gfs2_assert_warn(sdp, !qd->qd_bh_count);
+	gfs2_assert_warn(sdp, !qd->qd_change);
+	gfs2_assert_warn(sdp, !qd->qd_slot_count);
+	gfs2_assert_warn(sdp, !qd->qd_bh_count);
 
-		gfs2_glock_put(qd->qd_gl);
-		atomic_dec(&sdp->sd_quota_count);
+	gfs2_glock_put(qd->qd_gl);
+	atomic_dec(&sdp->sd_quota_count);
 
-		/* Delete it from the common reclaim list */
-		list_del_init(&qd->qd_reclaim);
-		atomic_dec(&qd_lru_count);
-		spin_unlock(&qd_lru_lock);
-		kmem_cache_free(gfs2_quotad_cachep, qd);
-		spin_lock(&qd_lru_lock);
-		nr_to_scan--;
-		freed++;
-	}
-	spin_unlock(&qd_lru_lock);
-	return freed;
+	/* Delete it from the common reclaim list */
+	kmem_cache_free(gfs2_quotad_cachep, qd);
+	spin_lock(lock);
+
+	return LRU_REMOVED;
+}
+
+unsigned long gfs2_qd_shrink_scan(struct shrinker *shrink,
+				  struct shrink_control *sc)
+{
+	if (!(sc->gfp_mask & __GFP_FS))
+		return SHRINK_STOP;
+
+	return list_lru_walk_node(&gfs2_qd_lru, sc->nid, gfs2_qd_walk, NULL,
+				  &sc->nr_to_scan);
 }
 
 unsigned long gfs2_qd_shrink_count(struct shrinker *shrink,
 				   struct shrink_control *sc)
 {
-	return vfs_pressure_ratio(atomic_read(&qd_lru_count));
+	return vfs_pressure_ratio(list_lru_count_node(&gfs2_qd_lru, sc->nid));
 }
 
 static u64 qd2index(struct gfs2_quota_data *qd)
@@ -177,15 +180,9 @@ static int qd_get(struct gfs2_sbd *sdp, struct kqid qid,
 
 	for (;;) {
 		found = 0;
-		spin_lock(&qd_lru_lock);
+		spin_lock(&qd_lock);
 		list_for_each_entry(qd, &sdp->sd_quota_list, qd_list) {
 			if (qid_eq(qd->qd_id, qid)) {
-				if (!atomic_read(&qd->qd_count) &&
-				    !list_empty(&qd->qd_reclaim)) {
-					/* Remove it from reclaim list */
-					list_del_init(&qd->qd_reclaim);
-					atomic_dec(&qd_lru_count);
-				}
 				atomic_inc(&qd->qd_count);
 				found = 1;
 				break;
@@ -202,9 +199,10 @@ static int qd_get(struct gfs2_sbd *sdp, struct kqid qid,
 			new_qd = NULL;
 		}
 
-		spin_unlock(&qd_lru_lock);
+		spin_unlock(&qd_lock);
 
 		if (qd) {
+			list_lru_del(&gfs2_qd_lru, &qd->qd_reclaim);
 			if (new_qd) {
 				gfs2_glock_put(new_qd->qd_gl);
 				kmem_cache_free(gfs2_quotad_cachep, new_qd);
@@ -228,12 +226,7 @@ static void qd_hold(struct gfs2_quota_data *qd)
 
 static void qd_put(struct gfs2_quota_data *qd)
 {
-	if (atomic_dec_and_lock(&qd->qd_count, &qd_lru_lock)) {
-		/* Add to the reclaim list */
-		list_add_tail(&qd->qd_reclaim, &qd_lru_list);
-		atomic_inc(&qd_lru_count);
-		spin_unlock(&qd_lru_lock);
-	}
+	list_lru_add_on_zero(&gfs2_qd_lru, &qd->qd_reclaim, &qd->qd_count);
 }
 
 static int slot_get(struct gfs2_quota_data *qd)
@@ -242,10 +235,10 @@ static int slot_get(struct gfs2_quota_data *qd)
 	unsigned int c, o = 0, b;
 	unsigned char byte = 0;
 
-	spin_lock(&qd_lru_lock);
+	spin_lock(&qd_lock);
 
 	if (qd->qd_slot_count++) {
-		spin_unlock(&qd_lru_lock);
+		spin_unlock(&qd_lock);
 		return 0;
 	}
 
@@ -269,13 +262,13 @@ found:
 
 	sdp->sd_quota_bitmap[c][o] |= 1 << b;
 
-	spin_unlock(&qd_lru_lock);
+	spin_unlock(&qd_lock);
 
 	return 0;
 
 fail:
 	qd->qd_slot_count--;
-	spin_unlock(&qd_lru_lock);
+	spin_unlock(&qd_lock);
 	return -ENOSPC;
 }
 
@@ -283,23 +276,23 @@ static void slot_hold(struct gfs2_quota_data *qd)
 {
 	struct gfs2_sbd *sdp = qd->qd_gl->gl_sbd;
 
-	spin_lock(&qd_lru_lock);
+	spin_lock(&qd_lock);
 	gfs2_assert(sdp, qd->qd_slot_count);
 	qd->qd_slot_count++;
-	spin_unlock(&qd_lru_lock);
+	spin_unlock(&qd_lock);
 }
 
 static void slot_put(struct gfs2_quota_data *qd)
 {
 	struct gfs2_sbd *sdp = qd->qd_gl->gl_sbd;
 
-	spin_lock(&qd_lru_lock);
+	spin_lock(&qd_lock);
 	gfs2_assert(sdp, qd->qd_slot_count);
 	if (!--qd->qd_slot_count) {
 		gfs2_icbit_munge(sdp, sdp->sd_quota_bitmap, qd->qd_slot, 0);
 		qd->qd_slot = -1;
 	}
-	spin_unlock(&qd_lru_lock);
+	spin_unlock(&qd_lock);
 }
 
 static int bh_get(struct gfs2_quota_data *qd)
@@ -374,7 +367,7 @@ static int qd_fish(struct gfs2_sbd *sdp, struct gfs2_quota_data **qdp)
 	if (sdp->sd_vfs->s_flags & MS_RDONLY)
 		return 0;
 
-	spin_lock(&qd_lru_lock);
+	spin_lock(&qd_lock);
 
 	list_for_each_entry(qd, &sdp->sd_quota_list, qd_list) {
 		if (test_bit(QDF_LOCKED, &qd->qd_flags) ||
@@ -398,7 +391,7 @@ static int qd_fish(struct gfs2_sbd *sdp, struct gfs2_quota_data **qdp)
 	if (!found)
 		qd = NULL;
 
-	spin_unlock(&qd_lru_lock);
+	spin_unlock(&qd_lock);
 
 	if (qd) {
 		gfs2_assert_warn(sdp, qd->qd_change_sync);
@@ -423,11 +416,11 @@ static int qd_trylock(struct gfs2_quota_data *qd)
 	if (sdp->sd_vfs->s_flags & MS_RDONLY)
 		return 0;
 
-	spin_lock(&qd_lru_lock);
+	spin_lock(&qd_lock);
 
 	if (test_bit(QDF_LOCKED, &qd->qd_flags) ||
 	    !test_bit(QDF_CHANGE, &qd->qd_flags)) {
-		spin_unlock(&qd_lru_lock);
+		spin_unlock(&qd_lock);
 		return 0;
 	}
 
@@ -440,7 +433,7 @@ static int qd_trylock(struct gfs2_quota_data *qd)
 	gfs2_assert_warn(sdp, qd->qd_slot_count);
 	qd->qd_slot_count++;
 
-	spin_unlock(&qd_lru_lock);
+	spin_unlock(&qd_lock);
 
 	gfs2_assert_warn(sdp, qd->qd_change_sync);
 	if (bh_get(qd)) {
@@ -602,9 +595,9 @@ static void do_qc(struct gfs2_quota_data *qd, s64 change)
 	x = be64_to_cpu(qc->qc_change) + change;
 	qc->qc_change = cpu_to_be64(x);
 
-	spin_lock(&qd_lru_lock);
+	spin_lock(&qd_lock);
 	qd->qd_change = x;
-	spin_unlock(&qd_lru_lock);
+	spin_unlock(&qd_lock);
 
 	if (!x) {
 		gfs2_assert_warn(sdp, test_bit(QDF_CHANGE, &qd->qd_flags));
@@ -974,9 +967,9 @@ static int need_sync(struct gfs2_quota_data *qd)
 	if (!qd->qd_qb.qb_limit)
 		return 0;
 
-	spin_lock(&qd_lru_lock);
+	spin_lock(&qd_lock);
 	value = qd->qd_change;
-	spin_unlock(&qd_lru_lock);
+	spin_unlock(&qd_lock);
 
 	spin_lock(&gt->gt_spin);
 	num = gt->gt_quota_scale_num;
@@ -1067,9 +1060,9 @@ int gfs2_quota_check(struct gfs2_inode *ip, kuid_t uid, kgid_t gid)
 			continue;
 
 		value = (s64)be64_to_cpu(qd->qd_qb.qb_value);
-		spin_lock(&qd_lru_lock);
+		spin_lock(&qd_lock);
 		value += qd->qd_change;
-		spin_unlock(&qd_lru_lock);
+		spin_unlock(&qd_lock);
 
 		if (be64_to_cpu(qd->qd_qb.qb_limit) && (s64)be64_to_cpu(qd->qd_qb.qb_limit) < value) {
 			print_message(qd, "exceeded");
@@ -1258,11 +1251,11 @@ int gfs2_quota_init(struct gfs2_sbd *sdp)
 			qd->qd_slot = slot;
 			qd->qd_slot_count = 1;
 
-			spin_lock(&qd_lru_lock);
+			spin_lock(&qd_lock);
 			gfs2_icbit_munge(sdp, sdp->sd_quota_bitmap, slot, 1);
 			list_add(&qd->qd_list, &sdp->sd_quota_list);
 			atomic_inc(&sdp->sd_quota_count);
-			spin_unlock(&qd_lru_lock);
+			spin_unlock(&qd_lock);
 
 			found++;
 		}
@@ -1288,7 +1281,7 @@ void gfs2_quota_cleanup(struct gfs2_sbd *sdp)
 	struct gfs2_quota_data *qd;
 	unsigned int x;
 
-	spin_lock(&qd_lru_lock);
+	spin_lock(&qd_lock);
 	while (!list_empty(head)) {
 		qd = list_entry(head->prev, struct gfs2_quota_data, qd_list);
 
@@ -1296,20 +1289,15 @@ void gfs2_quota_cleanup(struct gfs2_sbd *sdp)
 		    (atomic_read(&qd->qd_count) &&
 		     !test_bit(QDF_CHANGE, &qd->qd_flags))) {
 			list_move(&qd->qd_list, head);
-			spin_unlock(&qd_lru_lock);
-			schedule();
-			spin_lock(&qd_lru_lock);
+			cond_resched_lock(&qd_lock);
 			continue;
 		}
 
 		list_del(&qd->qd_list);
+		spin_unlock(&qd_lock);
 		/* Also remove if this qd exists in the reclaim list */
-		if (!list_empty(&qd->qd_reclaim)) {
-			list_del_init(&qd->qd_reclaim);
-			atomic_dec(&qd_lru_count);
-		}
+		list_lru_del(&gfs2_qd_lru, &qd->qd_reclaim);
 		atomic_dec(&sdp->sd_quota_count);
-		spin_unlock(&qd_lru_lock);
 
 		if (!atomic_read(&qd->qd_count)) {
 			gfs2_assert_warn(sdp, !qd->qd_change);
@@ -1320,10 +1308,9 @@ void gfs2_quota_cleanup(struct gfs2_sbd *sdp)
 
 		gfs2_glock_put(qd->qd_gl);
 		kmem_cache_free(gfs2_quotad_cachep, qd);
-
-		spin_lock(&qd_lru_lock);
+		spin_lock(&qd_lock);
 	}
-	spin_unlock(&qd_lru_lock);
+	spin_unlock(&qd_lock);
 
 	gfs2_assert_warn(sdp, !atomic_read(&sdp->sd_quota_count));
 
@@ -1462,7 +1449,7 @@ static int gfs2_quota_get_xstate(struct super_block *sb,
 	}
 	fqs->qs_uquota.qfs_nextents = 1; /* unsupported */
 	fqs->qs_gquota = fqs->qs_uquota; /* its the same inode in both cases */
-	fqs->qs_incoredqs = atomic_read(&qd_lru_count);
+	fqs->qs_incoredqs = list_lru_count(&gfs2_qd_lru);
 	return 0;
 }
 
diff --git a/fs/gfs2/quota.h b/fs/gfs2/quota.h
index 0f64d9d..19ac982 100644
--- a/fs/gfs2/quota.h
+++ b/fs/gfs2/quota.h
@@ -58,5 +58,6 @@ extern unsigned long gfs2_qd_shrink_count(struct shrinker *shrink,
 extern unsigned long gfs2_qd_shrink_scan(struct shrinker *shrink,
 					 struct shrink_control *sc);
 extern const struct quotactl_ops gfs2_quotactl_ops;
+extern struct list_lru gfs2_qd_lru;
 
 #endif /* __QUOTA_DOT_H__ */
diff --git a/include/linux/list_lru.h b/include/linux/list_lru.h
index 3ce5417..69895c0 100644
--- a/include/linux/list_lru.h
+++ b/include/linux/list_lru.h
@@ -53,6 +53,22 @@ int list_lru_init(struct list_lru *lru);
 bool list_lru_add(struct list_lru *lru, struct list_head *item);
 
 /**
+ * list_lru_add_on_zero: decrement counter and atomically add to lru on zero
+ * @list_lru: the lru pointer
+ * @item: the item to potentially add to the lru
+ * @counter: the counter to decrement
+ *
+ * If the @counter is zero after decrementing, then the item will be
+ * atomically added to the lru, assuming that it is not already there.
+ * This is identical to list_lru_add, aside from the additional testing
+ * of an atomic counter.
+ *
+ * Return value: true if the list was updated, false otherwise
+ */
+bool list_lru_add_on_zero(struct list_lru *lru, struct list_head *item, atomic_t
+ *count);
+
+/**
  * list_lru_del: delete an element to the lru list
  * @list_lru: the lru pointer
  * @item: the item to be deleted.
diff --git a/mm/list_lru.c b/mm/list_lru.c
index 7246791..22a57b1 100644
--- a/mm/list_lru.c
+++ b/mm/list_lru.c
@@ -29,6 +29,26 @@ bool list_lru_add(struct list_lru *lru, struct list_head *item)
 }
 EXPORT_SYMBOL_GPL(list_lru_add);
 
+bool list_lru_add_on_zero(struct list_lru *lru, struct list_head *item, atomic_t *count)
+{
+	int nid = page_to_nid(virt_to_page(item));
+	struct list_lru_node *nlru = &lru->node[nid];
+
+	if (atomic_dec_and_lock(count, &nlru->lock)) {
+		WARN_ON_ONCE(nlru->nr_items < 0);
+		if (list_empty(item)) {
+			list_add_tail(item, &nlru->list);
+			if (nlru->nr_items++ == 0)
+				node_set(nid, lru->active_nodes);
+			spin_unlock(&nlru->lock);
+			return true;
+		}
+		spin_unlock(&nlru->lock);
+	}
+	return false;		
+}
+EXPORT_SYMBOL_GPL(list_lru_add_on_zero);
+
 bool list_lru_del(struct list_lru *lru, struct list_head *item)
 {
 	int nid = page_to_nid(virt_to_page(item));





More information about the Cluster-devel mailing list