<div dir="ltr">Mike,<div><br></div><div>I'm Nacking this patch here.  You've taken the comment from my latest patch, and attached it to something else.  I don't know if you've taken an older patch and updated it, or taken the latest patch and downgraded it.</div><div><br></div><div>For instance the latest patch uses the name 'struct dm_buffer_cache', but it looks like you've renamed to 'struct buffer_cache' and then posted another patch in the series that renames back to dm_buffer_cache.</div><div><br></div><div>I also see the comment change:</div><div><br></div><div>original "A refcount_t for hold_count does not work." which has changed to "NOTE: attempting to use refcount_t instead of atomic_t caused crashes!".  Talk about over dramatisation (If the hold_count is incorrect we detect and BUG).</div><div><br></div><div>I can't stand by my code if I don't know what's being sent up in my name.</div><div><br></div><div>Please use the patchset from this branch: <a href="https://github.com/jthornber/linux/tree/2023-03-21-thin-concurrency-8">https://github.com/jthornber/linux/tree/2023-03-21-thin-concurrency-8</a></div><div><br></div><div>- Joe</div><div><br></div><div><br></div></div><br><div class="gmail_quote"><div dir="ltr" class="gmail_attr">On Wed, Mar 22, 2023 at 6:19 PM Mike Snitzer <<a href="mailto:snitzer@kernel.org">snitzer@kernel.org</a>> wrote:<br></div><blockquote class="gmail_quote" style="margin:0px 0px 0px 0.8ex;border-left:1px solid rgb(204,204,204);padding-left:1ex">From: Joe Thornber <<a href="mailto:ejt@redhat.com" target="_blank">ejt@redhat.com</a>><br>
<br>
When multiple threads perform IO to a thin device, the underlying<br>
dm_bufio object can become a bottleneck; slowing down access to btree<br>
nodes that store the thin metadata. Prior to this commit, each bufio<br>
instance had a single mutex that was taken for every bufio operation.<br>
<br>
This commit concentrates on improving the common case: in which a user<br>
of dm_bufio wishes to access, but not modify, a buffer which is already<br>
within the dm_bufio cache.<br>
<br>
Implementation::<br>
<br>
  The code has been refactored; pulling out an 'lru' abstraction and a<br>
  'buffer cache' abstraction.<br>
<br>
 LRU:<br>
  A clock algorithm is used in the LRU abstraction.  This avoids<br>
  relinking list nodes, which would require a write lock protecting<br>
  it.<br>
<br>
  None of the lru methods are threadsafe; locking must be done at a<br>
  higher level.<br>
<br>
 Buffer cache:<br>
  The buffer cache is responsible for managing the holder count,<br>
  tracking clean/dirty state, and choosing buffers via predicates.<br>
  Higher level code is responsible for allocation of buffers, IO and<br>
  eviction/cache sizing.<br>
<br>
  The buffer cache has thread safe methods for acquiring a reference<br>
  to an existing buffer. All other methods in buffer cache are _not_<br>
  threadsafe, and only contain enough locking to guarantee the safe<br>
  methods.<br>
<br>
  Rather than a single mutex, sharded rw_semaphores are used to allow<br>
  concurrent threads to 'get' buffers. Each rw_semaphore protects its<br>
  own rbtree of buffer entries.<br>
<br>
Testing::<br>
<br>
  Some of the low level functions have been unit tested using dm-unit:<br>
    <a href="https://github.com/jthornber/dm-unit/blob/main/src/tests/bufio.rs" rel="noreferrer" target="_blank">https://github.com/jthornber/dm-unit/blob/main/src/tests/bufio.rs</a><br>
<br>
  Higher level concurrency and IO is tested via a test only target<br>
  found here:<br>
    <a href="https://github.com/jthornber/linux/blob/2023-02-28-thin-concurrency-7/drivers/md/dm-bufio-test.c" rel="noreferrer" target="_blank">https://github.com/jthornber/linux/blob/2023-02-28-thin-concurrency-7/drivers/md/dm-bufio-test.c</a><br>
<br>
  The associated userland side of these tests is here:<br>
    <a href="https://github.com/jthornber/dmtest-python/blob/main/src/dmtest/bufio/bufio_tests.py" rel="noreferrer" target="_blank">https://github.com/jthornber/dmtest-python/blob/main/src/dmtest/bufio/bufio_tests.py</a><br>
<br>
  In addition the full dmtest suite of tests (dm-thin, dm-cache, etc)<br>
  has been run (~450 tests).<br>
<br>
Performance::<br>
<br>
  Most bufio operations have unchanged performance. But if multiple<br>
  threads are attempting to get buffers concurrently, and these<br>
  buffers are already in the cache then there's a big speed up. Eg,<br>
  one test has 16 'hotspot' threads simulating btree lookups while<br>
  another thread dirties the whole device. In this case the hotspot<br>
  threads acquire the buffers about 25 times faster.<br>
<br>
Signed-off-by: Joe Thornber <<a href="mailto:ejt@redhat.com" target="_blank">ejt@redhat.com</a>><br>
Signed-off-by: Mike Snitzer <<a href="mailto:snitzer@kernel.org" target="_blank">snitzer@kernel.org</a>><br>
---<br>
 drivers/md/dm-bufio.c | 1769 ++++++++++++++++++++++++++++++-----------<br>
 1 file changed, 1292 insertions(+), 477 deletions(-)<br>
<br>
diff --git a/drivers/md/dm-bufio.c b/drivers/md/dm-bufio.c<br>
index 1de1bdcda1ce..56ef160331ce 100644<br>
--- a/drivers/md/dm-bufio.c<br>
+++ b/drivers/md/dm-bufio.c<br>
@@ -66,6 +66,247 @@<br>
 #define LIST_DIRTY     1<br>
 #define LIST_SIZE      2<br>
<br>
+/*--------------------------------------------------------------*/<br>
+<br>
+/*<br>
+ * Rather than use an LRU list, we use a clock algorithm where entries<br>
+ * are held in a circular list.  When an entry is 'hit' a reference bit<br>
+ * is set.  The least recently used entry is approximated by running a<br>
+ * cursor around the list selecting unreferenced entries. Referenced<br>
+ * entries have their reference bit cleared as the cursor passes them.<br>
+ */<br>
+struct lru_entry {<br>
+       struct list_head list;<br>
+       atomic_t referenced;<br>
+};<br>
+<br>
+struct lru_iter {<br>
+       struct lru *lru;<br>
+       struct list_head list;<br>
+       struct lru_entry *stop;<br>
+       struct lru_entry *e;<br>
+};<br>
+<br>
+struct lru {<br>
+       struct list_head *cursor;<br>
+       unsigned long count;<br>
+<br>
+       struct list_head iterators;<br>
+};<br>
+<br>
+/*--------------*/<br>
+<br>
+static void lru_init(struct lru *lru)<br>
+{<br>
+       lru->cursor = NULL;<br>
+       lru->count = 0;<br>
+       INIT_LIST_HEAD(&lru->iterators);<br>
+}<br>
+<br>
+static void lru_destroy(struct lru *lru)<br>
+{<br>
+       BUG_ON(lru->cursor);<br>
+       BUG_ON(!list_empty(&lru->iterators));<br>
+}<br>
+<br>
+static inline unsigned int lru_count(struct lru *lru)<br>
+{<br>
+       return lru->count;<br>
+}<br>
+<br>
+/*<br>
+ * Insert a new entry into the lru.<br>
+ */<br>
+static void lru_insert(struct lru *lru, struct lru_entry *le)<br>
+{<br>
+       /*<br>
+        * Don't be tempted to set to 1, makes the lru aspect<br>
+        * perform poorly.<br>
+        */<br>
+       atomic_set(&le->referenced, 0);<br>
+<br>
+       if (lru->cursor)<br>
+               list_add_tail(&le->list, lru->cursor);<br>
+<br>
+       else {<br>
+               INIT_LIST_HEAD(&le->list);<br>
+               lru->cursor = &le->list;<br>
+       }<br>
+       lru->count++;<br>
+}<br>
+<br>
+/*--------------*/<br>
+<br>
+/*<br>
+ * Convert a list_head pointer to an lru_entry pointer.<br>
+ */<br>
+static inline struct lru_entry *to_le(struct list_head *l)<br>
+{<br>
+       return container_of(l, struct lru_entry, list);<br>
+}<br>
+<br>
+/*<br>
+ * Initialize an lru_iter and add it to the list of cursors in the lru.<br>
+ */<br>
+static void lru_iter_begin(struct lru *lru, struct lru_iter *it)<br>
+{<br>
+       it->lru = lru;<br>
+       it->stop = lru->cursor ? to_le(lru->cursor->prev) : NULL;<br>
+       it->e = lru->cursor ? to_le(lru->cursor) : NULL;<br>
+       list_add(&it->list, &lru->iterators);<br>
+}<br>
+<br>
+/*<br>
+ * Remove an lru_iter from the list of cursors in the lru.<br>
+ */<br>
+static void lru_iter_end(struct lru_iter *it)<br>
+{<br>
+       list_del(&it->list);<br>
+}<br>
+<br>
+/* Predicate function type to be used with lru_iter_next */<br>
+typedef bool (*iter_predicate)(struct lru_entry *le, void *context);<br>
+<br>
+/*<br>
+ * Advance the cursor to the next entry that passes the<br>
+ * predicate, and return that entry.  Returns NULL if the<br>
+ * iteration is complete.<br>
+ */<br>
+static struct lru_entry *lru_iter_next(struct lru_iter *it,<br>
+                                      iter_predicate pred, void *context)<br>
+{<br>
+       struct lru_entry *e;<br>
+<br>
+       while (it->e) {<br>
+               e = it->e;<br>
+<br>
+               /* advance the cursor */<br>
+               if (it->e == it->stop)<br>
+                       it->e = NULL;<br>
+               else<br>
+                       it->e = to_le(it->e->list.next);<br>
+<br>
+               if (pred(e, context))<br>
+                       return e;<br>
+       }<br>
+<br>
+       return NULL;<br>
+}<br>
+<br>
+/*<br>
+ * Invalidate a specific lru_entry and update all cursors in<br>
+ * the lru accordingly.<br>
+ */<br>
+static void lru_iter_invalidate(struct lru *lru, struct lru_entry *e)<br>
+{<br>
+       struct lru_iter *it;<br>
+<br>
+       list_for_each_entry(it, &lru->iterators, list) {<br>
+               /* Move c->e forwards if necc. */<br>
+               if (it->e == e) {<br>
+                       it->e = to_le(it->e->list.next);<br>
+                       if (it->e == e)<br>
+                               it->e = NULL;<br>
+               }<br>
+<br>
+               /* Move it->stop backwards if necc. */<br>
+               if (it->stop == e) {<br>
+                       it->stop = to_le(it->stop->list.prev);<br>
+                       if (it->stop == e)<br>
+                               it->stop = NULL;<br>
+               }<br>
+       }<br>
+}<br>
+<br>
+/*--------------*/<br>
+<br>
+/*<br>
+ * Remove a specific entry from the lru.<br>
+ */<br>
+static void lru_remove(struct lru *lru, struct lru_entry *le)<br>
+{<br>
+       lru_iter_invalidate(lru, le);<br>
+       if (lru->count == 1)<br>
+               lru->cursor = NULL;<br>
+       else {<br>
+               if (lru->cursor == &le->list)<br>
+                       lru->cursor = lru->cursor->next;<br>
+               list_del(&le->list);<br>
+       }<br>
+       lru->count--;<br>
+}<br>
+<br>
+/*<br>
+ * Mark as referenced.<br>
+ */<br>
+static inline void lru_reference(struct lru_entry *le)<br>
+{<br>
+       atomic_set(&le->referenced, 1);<br>
+}<br>
+<br>
+/*--------------*/<br>
+<br>
+/*<br>
+ * Remove the least recently used entry (approx), that passes the predicate.<br>
+ * Returns NULL on failure.<br>
+ */<br>
+enum evict_result {<br>
+       ER_EVICT,<br>
+       ER_DONT_EVICT,<br>
+       ER_STOP, /* stop looking for something to evict */<br>
+};<br>
+<br>
+typedef enum evict_result (*le_predicate)(struct lru_entry *le, void *context);<br>
+<br>
+static struct lru_entry *lru_evict(struct lru *lru, le_predicate pred, void *context)<br>
+{<br>
+       unsigned int tested = 0;<br>
+       struct list_head *h = lru->cursor;<br>
+       struct lru_entry *le;<br>
+<br>
+       if (!h)<br>
+               return NULL;<br>
+       /*<br>
+        * In the worst case we have to loop around twice. Once to clear<br>
+        * the reference flags, and then again to discover the predicate<br>
+        * fails for all entries.<br>
+        */<br>
+       while (tested < lru->count) {<br>
+               le = container_of(h, struct lru_entry, list);<br>
+<br>
+               if (atomic_read(&le->referenced))<br>
+                       atomic_set(&le->referenced, 0);<br>
+               else {<br>
+                       tested++;<br>
+                       switch (pred(le, context)) {<br>
+                       case ER_EVICT:<br>
+                               /*<br>
+                                * Adjust the cursor, so we start the next<br>
+                                * search from here.<br>
+                                */<br>
+                               lru->cursor = le->list.next;<br>
+                               lru_remove(lru, le);<br>
+                               return le;<br>
+<br>
+                       case ER_DONT_EVICT:<br>
+                               break;<br>
+<br>
+                       case ER_STOP:<br>
+                               lru->cursor = le->list.next;<br>
+                               return NULL;<br>
+                       }<br>
+               }<br>
+<br>
+               h = h->next;<br>
+<br>
+               cond_resched();<br>
+       }<br>
+<br>
+       return NULL;<br>
+}<br>
+<br>
+/*--------------------------------------------------------------*/<br>
+<br>
 /*<br>
  * Buffer state bits.<br>
  */<br>
@@ -86,28 +327,37 @@ enum data_mode {<br>
 };<br>
<br>
 struct dm_buffer {<br>
+       /* protected by the locks in buffer_cache */<br>
        struct rb_node node;<br>
-       struct list_head lru_list;<br>
-       struct list_head global_list;<br>
<br>
+       /* immutable, so don't need protecting */<br>
        sector_t block;<br>
        void *data;<br>
        unsigned char data_mode;                /* DATA_MODE_* */<br>
<br>
-       unsigned int accessed;<br>
-       unsigned int hold_count;<br>
+       /*<br>
+        * These two fields are used in isolation, so do not need<br>
+        * a surrounding lock. NOTE: attempting to use refcount_t<br>
+        * instead of atomic_t caused crashes!<br>
+        */<br>
+       atomic_t hold_count;<br>
        unsigned long last_accessed;<br>
<br>
+       /*<br>
+        * Everything else is protected by the mutex in<br>
+        * dm_bufio_client<br>
+        */<br>
+       unsigned long state;<br>
+       struct lru_entry lru;<br>
        unsigned char list_mode;                /* LIST_* */<br>
        blk_status_t read_error;<br>
        blk_status_t write_error;<br>
-       unsigned long state;<br>
        unsigned int dirty_start;<br>
        unsigned int dirty_end;<br>
        unsigned int write_start;<br>
        unsigned int write_end;<br>
-       struct dm_bufio_client *c;<br>
        struct list_head write_list;<br>
+       struct dm_bufio_client *c;<br>
        void (*end_io)(struct dm_buffer *b, blk_status_t bs);<br>
 #ifdef CONFIG_DM_DEBUG_BLOCK_STACK_TRACING<br>
 #define MAX_STACK 10<br>
@@ -116,9 +366,579 @@ struct dm_buffer {<br>
 #endif<br>
 };<br>
<br>
+/*--------------------------------------------------------------*/<br>
+<br>
+/*<br>
+ * The buffer cache manages buffers, particularly:<br>
+ *  - inc/dec of holder count<br>
+ *  - setting the last_accessed field<br>
+ *  - maintains clean/dirty state along with lru<br>
+ *  - selecting buffers that match predicates<br>
+ *<br>
+ * It does *not* handle:<br>
+ *  - allocation/freeing of buffers.<br>
+ *  - IO<br>
+ *  - Eviction or cache sizing.<br>
+ *<br>
+ * cache_get() and cache_put() are threadsafe, you do not need to<br>
+ * protect these calls with a surrounding mutex.  All the other<br>
+ * methods are not threadsafe; they do use locking primitives, but<br>
+ * only enough to ensure get/put are threadsafe.<br>
+ */<br>
+<br>
+#define NR_LOCKS 64<br>
+#define LOCKS_MASK (NR_LOCKS - 1)<br>
+<br>
+struct tree_lock {<br>
+       struct rw_semaphore lock;<br>
+} ____cacheline_aligned_in_smp;<br>
+<br>
+struct buffer_cache {<br>
+       /*<br>
+        * We spread entries across multiple trees to reduce contention<br>
+        * on the locks.<br>
+        */<br>
+       struct tree_lock locks[NR_LOCKS];<br>
+       struct rb_root roots[NR_LOCKS];<br>
+       struct lru lru[LIST_SIZE];<br>
+};<br>
+<br>
+/*<br>
+ * We want to scatter the buffers across the different rbtrees<br>
+ * to improve concurrency, but we also want to encourage the<br>
+ * lock_history optimisation (below). Scattering runs of 16<br>
+ * buffers seems to be a good compromise.<br>
+ */<br>
+static inline unsigned int cache_index(sector_t block)<br>
+{<br>
+       return block & LOCKS_MASK;<br>
+}<br>
+<br>
+static inline void cache_read_lock(struct buffer_cache *bc, sector_t block)<br>
+{<br>
+       down_read(&bc->locks[cache_index(block)].lock);<br>
+}<br>
+<br>
+static inline void cache_read_unlock(struct buffer_cache *bc, sector_t block)<br>
+{<br>
+       up_read(&bc->locks[cache_index(block)].lock);<br>
+}<br>
+<br>
+static inline void cache_write_lock(struct buffer_cache *bc, sector_t block)<br>
+{<br>
+       down_write(&bc->locks[cache_index(block)].lock);<br>
+}<br>
+<br>
+static inline void cache_write_unlock(struct buffer_cache *bc, sector_t block)<br>
+{<br>
+       up_write(&bc->locks[cache_index(block)].lock);<br>
+}<br>
+<br>
+/*<br>
+ * Sometimes we want to repeatedly get and drop locks as part of an iteration.<br>
+ * This struct helps avoid redundant drop and gets of the same lock.<br>
+ */<br>
+struct lock_history {<br>
+       struct buffer_cache *cache;<br>
+       bool write;<br>
+       unsigned int previous;<br>
+};<br>
+<br>
+static void lh_init(struct lock_history *lh, struct buffer_cache *cache, bool write)<br>
+{<br>
+       lh->cache = cache;<br>
+       lh->write = write;<br>
+       lh->previous = NR_LOCKS;  /* indicates no previous */<br>
+}<br>
+<br>
+static void __lh_lock(struct lock_history *lh, unsigned int index)<br>
+{<br>
+       if (lh->write)<br>
+               down_write(&lh->cache->locks[index].lock);<br>
+       else<br>
+               down_read(&lh->cache->locks[index].lock);<br>
+}<br>
+<br>
+static void __lh_unlock(struct lock_history *lh, unsigned int index)<br>
+{<br>
+       if (lh->write)<br>
+               up_write(&lh->cache->locks[index].lock);<br>
+       else<br>
+               up_read(&lh->cache->locks[index].lock);<br>
+}<br>
+<br>
+/*<br>
+ * Make sure you call this since it will unlock the final lock.<br>
+ */<br>
+static void lh_exit(struct lock_history *lh)<br>
+{<br>
+       if (lh->previous != NR_LOCKS) {<br>
+               __lh_unlock(lh, lh->previous);<br>
+               lh->previous = NR_LOCKS;<br>
+       }<br>
+}<br>
+<br>
+/*<br>
+ * Named 'next' because there is no corresponding<br>
+ * 'up/unlock' call since it's done automatically.<br>
+ */<br>
+static void lh_next(struct lock_history *lh, sector_t b)<br>
+{<br>
+       unsigned int index = cache_index(b);<br>
+<br>
+       if (lh->previous != NR_LOCKS) {<br>
+               if (lh->previous != index) {<br>
+                       __lh_unlock(lh, lh->previous);<br>
+                       __lh_lock(lh, index);<br>
+                       lh->previous = index;<br>
+               }<br>
+       } else {<br>
+               __lh_lock(lh, index);<br>
+               lh->previous = index;<br>
+       }<br>
+}<br>
+<br>
+static struct dm_buffer *le_to_buffer(struct lru_entry *le)<br>
+{<br>
+       return container_of(le, struct dm_buffer, lru);<br>
+}<br>
+<br>
+static struct dm_buffer *list_to_buffer(struct list_head *l)<br>
+{<br>
+       struct lru_entry *le = list_entry(l, struct lru_entry, list);<br>
+<br>
+       if (!le)<br>
+               return NULL;<br>
+<br>
+       return le_to_buffer(le);<br>
+}<br>
+<br>
+static void cache_init(struct buffer_cache *bc)<br>
+{<br>
+       unsigned int i;<br>
+<br>
+       for (i = 0; i < NR_LOCKS; i++) {<br>
+               init_rwsem(&bc->locks[i].lock);<br>
+               bc->roots[i] = RB_ROOT;<br>
+       }<br>
+<br>
+       lru_init(&bc->lru[LIST_CLEAN]);<br>
+       lru_init(&bc->lru[LIST_DIRTY]);<br>
+}<br>
+<br>
+static void cache_destroy(struct buffer_cache *bc)<br>
+{<br>
+       unsigned int i;<br>
+<br>
+       for (i = 0; i < NR_LOCKS; i++)<br>
+               BUG_ON(!RB_EMPTY_ROOT(&bc->roots[i]));<br>
+<br>
+       lru_destroy(&bc->lru[LIST_CLEAN]);<br>
+       lru_destroy(&bc->lru[LIST_DIRTY]);<br>
+}<br>
+<br>
+/*<br>
+ * not threadsafe, or racey depending how you look at it<br>
+ */<br>
+static unsigned long cache_count(struct buffer_cache *bc, int list_mode)<br>
+{<br>
+       return lru_count(&bc->lru[list_mode]);<br>
+}<br>
+<br>
+/*--------------*/<br>
+<br>
+/*<br>
+ * not threadsafe<br>
+ */<br>
+static unsigned long cache_total(struct buffer_cache *bc)<br>
+{<br>
+       return lru_count(&bc->lru[LIST_CLEAN]) +<br>
+               lru_count(&bc->lru[LIST_DIRTY]);<br>
+}<br>
+<br>
+/*--------------*/<br>
+<br>
+/*<br>
+ * Gets a specific buffer, indexed by block.<br>
+ * If the buffer is found then its holder count will be incremented and<br>
+ * lru_reference will be called.<br>
+ *<br>
+ * threadsafe<br>
+ */<br>
+static struct dm_buffer *__cache_get(const struct rb_root *root, sector_t block)<br>
+{<br>
+       struct rb_node *n = root->rb_node;<br>
+       struct dm_buffer *b;<br>
+<br>
+       while (n) {<br>
+               b = container_of(n, struct dm_buffer, node);<br>
+<br>
+               if (b->block == block)<br>
+                       return b;<br>
+<br>
+               n = block < b->block ? n->rb_left : n->rb_right;<br>
+       }<br>
+<br>
+       return NULL;<br>
+}<br>
+<br>
+static void __cache_inc_buffer(struct dm_buffer *b)<br>
+{<br>
+       atomic_inc(&b->hold_count);<br>
+       WRITE_ONCE(b->last_accessed, jiffies);<br>
+}<br>
+<br>
+static struct dm_buffer *cache_get(struct buffer_cache *bc, sector_t block)<br>
+{<br>
+       struct dm_buffer *b;<br>
+<br>
+       cache_read_lock(bc, block);<br>
+       b = __cache_get(&bc->roots[cache_index(block)], block);<br>
+       if (b) {<br>
+               lru_reference(&b->lru);<br>
+               __cache_inc_buffer(b);<br>
+       }<br>
+       cache_read_unlock(bc, block);<br>
+<br>
+       return b;<br>
+}<br>
+<br>
+/*--------------*/<br>
+<br>
+/*<br>
+ * Returns true if the hold count hits zero.<br>
+ * threadsafe<br>
+ */<br>
+static bool cache_put(struct buffer_cache *bc, struct dm_buffer *b)<br>
+{<br>
+       bool r;<br>
+<br>
+       cache_read_lock(bc, b->block);<br>
+       BUG_ON(!atomic_read(&b->hold_count));<br>
+       r = atomic_dec_and_test(&b->hold_count);<br>
+       cache_read_unlock(bc, b->block);<br>
+<br>
+       return r;<br>
+}<br>
+<br>
+/*--------------*/<br>
+<br>
+typedef enum evict_result (*b_predicate)(struct dm_buffer *, void *);<br>
+<br>
+/*<br>
+ * Evicts a buffer based on a predicate.  The oldest buffer that<br>
+ * matches the predicate will be selected.  In addition to the<br>
+ * predicate the hold_count of the selected buffer will be zero.<br>
+ */<br>
+struct evict_wrapper {<br>
+       struct lock_history *lh;<br>
+       b_predicate pred;<br>
+       void *context;<br>
+};<br>
+<br>
+/*<br>
+ * Wraps the buffer predicate turning it into an lru predicate.  Adds<br>
+ * extra test for hold_count.<br>
+ */<br>
+static enum evict_result __evict_pred(struct lru_entry *le, void *context)<br>
+{<br>
+       struct evict_wrapper *w = context;<br>
+       struct dm_buffer *b = le_to_buffer(le);<br>
+<br>
+       lh_next(w->lh, b->block);<br>
+<br>
+       if (atomic_read(&b->hold_count))<br>
+               return ER_DONT_EVICT;<br>
+<br>
+       return w->pred(b, w->context);<br>
+}<br>
+<br>
+static struct dm_buffer *__cache_evict(struct buffer_cache *bc, int list_mode,<br>
+                                      b_predicate pred, void *context,<br>
+                                      struct lock_history *lh)<br>
+{<br>
+       struct evict_wrapper w = {.lh = lh, .pred = pred, .context = context};<br>
+       struct lru_entry *le;<br>
+       struct dm_buffer *b;<br>
+<br>
+       le = lru_evict(&bc->lru[list_mode], __evict_pred, &w);<br>
+       if (!le)<br>
+               return NULL;<br>
+<br>
+       b = le_to_buffer(le);<br>
+       /* __evict_pred will have locked the appropriate tree. */<br>
+       rb_erase(&b->node, &bc->roots[cache_index(b->block)]);<br>
+<br>
+       return b;<br>
+}<br>
+<br>
+static struct dm_buffer *cache_evict(struct buffer_cache *bc, int list_mode,<br>
+                                    b_predicate pred, void *context)<br>
+{<br>
+       struct dm_buffer *b;<br>
+       struct lock_history lh;<br>
+<br>
+       lh_init(&lh, bc, true);<br>
+       b = __cache_evict(bc, list_mode, pred, context, &lh);<br>
+       lh_exit(&lh);<br>
+<br>
+       return b;<br>
+}<br>
+<br>
+/*--------------*/<br>
+<br>
+/*<br>
+ * Mark a buffer as clean or dirty. Not threadsafe.<br>
+ */<br>
+static void cache_mark(struct buffer_cache *bc, struct dm_buffer *b, int list_mode)<br>
+{<br>
+       cache_write_lock(bc, b->block);<br>
+       if (list_mode != b->list_mode) {<br>
+               lru_remove(&bc->lru[b->list_mode], &b->lru);<br>
+               b->list_mode = list_mode;<br>
+               lru_insert(&bc->lru[b->list_mode], &b->lru);<br>
+       }<br>
+       cache_write_unlock(bc, b->block);<br>
+}<br>
+<br>
+/*--------------*/<br>
+<br>
+/*<br>
+ * Runs through the lru associated with 'old_mode', if the predicate matches then<br>
+ * it moves them to 'new_mode'.  Not threadsafe.<br>
+ */<br>
+static void __cache_mark_many(struct buffer_cache *bc, int old_mode, int new_mode,<br>
+                             b_predicate pred, void *context, struct lock_history *lh)<br>
+{<br>
+       struct lru_entry *le;<br>
+       struct dm_buffer *b;<br>
+       struct evict_wrapper w = {.lh = lh, .pred = pred, .context = context};<br>
+<br>
+       while (true) {<br>
+               le = lru_evict(&bc->lru[old_mode], __evict_pred, &w);<br>
+               if (!le)<br>
+                       break;<br>
+<br>
+               b = le_to_buffer(le);<br>
+               b->list_mode = new_mode;<br>
+               lru_insert(&bc->lru[b->list_mode], &b->lru);<br>
+       }<br>
+}<br>
+<br>
+static void cache_mark_many(struct buffer_cache *bc, int old_mode, int new_mode,<br>
+                           b_predicate pred, void *context)<br>
+{<br>
+       struct lock_history lh;<br>
+<br>
+       lh_init(&lh, bc, true);<br>
+       __cache_mark_many(bc, old_mode, new_mode, pred, context, &lh);<br>
+       lh_exit(&lh);<br>
+}<br>
+<br>
+/*--------------*/<br>
+<br>
+/*<br>
+ * Iterates through all clean or dirty entries calling a function for each<br>
+ * entry.  The callback may terminate the iteration early.  Not threadsafe.<br>
+ */<br>
+<br>
+/*<br>
+ * Iterator functions should return one of these actions to indicate<br>
+ * how the iteration should proceed.<br>
+ */<br>
+enum it_action {<br>
+       IT_NEXT,<br>
+       IT_COMPLETE,<br>
+};<br>
+<br>
+typedef enum it_action (*iter_fn)(struct dm_buffer *b, void *context);<br>
+<br>
+static void __cache_iterate(struct buffer_cache *bc, int list_mode,<br>
+                           iter_fn fn, void *context, struct lock_history *lh)<br>
+{<br>
+       struct lru *lru = &bc->lru[list_mode];<br>
+       struct lru_entry *le, *first;<br>
+<br>
+       if (!lru->cursor)<br>
+               return;<br>
+<br>
+       first = le = to_le(lru->cursor);<br>
+       do {<br>
+               struct dm_buffer *b = le_to_buffer(le);<br>
+<br>
+               lh_next(lh, b->block);<br>
+<br>
+               switch (fn(b, context)) {<br>
+               case IT_NEXT:<br>
+                       break;<br>
+<br>
+               case IT_COMPLETE:<br>
+                       return;<br>
+               }<br>
+               cond_resched();<br>
+<br>
+               le = to_le(le->list.next);<br>
+       } while (le != first);<br>
+}<br>
+<br>
+static void cache_iterate(struct buffer_cache *bc, int list_mode,<br>
+                         iter_fn fn, void *context)<br>
+{<br>
+       struct lock_history lh;<br>
+<br>
+       lh_init(&lh, bc, false);<br>
+       __cache_iterate(bc, list_mode, fn, context, &lh);<br>
+       lh_exit(&lh);<br>
+}<br>
+<br>
+/*--------------*/<br>
+<br>
+/*<br>
+ * Passes ownership of the buffer to the cache. Returns false if the<br>
+ * buffer was already present (in which case ownership does not pass).<br>
+ * eg, a race with another thread.<br>
+ *<br>
+ * Holder count should be 1 on insertion.<br>
+ *<br>
+ * Not threadsafe.<br>
+ */<br>
+static bool __cache_insert(struct rb_root *root, struct dm_buffer *b)<br>
+{<br>
+       struct rb_node **new = &root->rb_node, *parent = NULL;<br>
+       struct dm_buffer *found;<br>
+<br>
+       while (*new) {<br>
+               found = container_of(*new, struct dm_buffer, node);<br>
+<br>
+               if (found->block == b->block)<br>
+                       return false;<br>
+<br>
+               parent = *new;<br>
+               new = b->block < found->block ?<br>
+                       &found->node.rb_left : &found->node.rb_right;<br>
+       }<br>
+<br>
+       rb_link_node(&b->node, parent, new);<br>
+       rb_insert_color(&b->node, root);<br>
+<br>
+       return true;<br>
+}<br>
+<br>
+static bool cache_insert(struct buffer_cache *bc, struct dm_buffer *b)<br>
+{<br>
+       bool r;<br>
+<br>
+       BUG_ON(b->list_mode >= LIST_SIZE);<br>
+<br>
+       cache_write_lock(bc, b->block);<br>
+       BUG_ON(atomic_read(&b->hold_count) != 1);<br>
+       r = __cache_insert(&bc->roots[cache_index(b->block)], b);<br>
+       if (r)<br>
+               lru_insert(&bc->lru[b->list_mode], &b->lru);<br>
+       cache_write_unlock(bc, b->block);<br>
+<br>
+       return r;<br>
+}<br>
+<br>
+/*--------------*/<br>
+<br>
+/*<br>
+ * Removes buffer from cache, ownership of the buffer passes back to the caller.<br>
+ * Fails if the hold_count is not one (ie. the caller holds the only reference).<br>
+ *<br>
+ * Not threadsafe.<br>
+ */<br>
+static bool cache_remove(struct buffer_cache *bc, struct dm_buffer *b)<br>
+{<br>
+       bool r;<br>
+<br>
+       cache_write_lock(bc, b->block);<br>
+<br>
+       if (atomic_read(&b->hold_count) != 1)<br>
+               r = false;<br>
+<br>
+       else {<br>
+               r = true;<br>
+               rb_erase(&b->node, &bc->roots[cache_index(b->block)]);<br>
+               lru_remove(&bc->lru[b->list_mode], &b->lru);<br>
+       }<br>
+<br>
+       cache_write_unlock(bc, b->block);<br>
+<br>
+       return r;<br>
+}<br>
+<br>
+/*--------------*/<br>
+<br>
+typedef void (*b_release)(struct dm_buffer *);<br>
+<br>
+static struct dm_buffer *__find_next(struct rb_root *root, sector_t block)<br>
+{<br>
+       struct rb_node *n = root->rb_node;<br>
+       struct dm_buffer *b;<br>
+       struct dm_buffer *best = NULL;<br>
+<br>
+       while (n) {<br>
+               b = container_of(n, struct dm_buffer, node);<br>
+<br>
+               if (b->block == block)<br>
+                       return b;<br>
+<br>
+               if (block <= b->block) {<br>
+                       n = n->rb_left;<br>
+                       best = b;<br>
+               } else<br>
+                       n = n->rb_right;<br>
+       }<br>
+<br>
+       return best;<br>
+}<br>
+<br>
+static void __remove_range(struct buffer_cache *bc,<br>
+                          struct rb_root *root,<br>
+                          sector_t begin, sector_t end,<br>
+                          b_predicate pred, b_release release)<br>
+{<br>
+       struct dm_buffer *b;<br>
+<br>
+       while (true) {<br>
+               cond_resched();<br>
+<br>
+               b = __find_next(root, begin);<br>
+               if (!b || (b->block >= end))<br>
+                       break;<br>
+<br>
+               begin = b->block + 1;<br>
+<br>
+               if (atomic_read(&b->hold_count))<br>
+                       continue;<br>
+<br>
+               if (pred(b, NULL) == ER_EVICT) {<br>
+                       rb_erase(&b->node, root);<br>
+                       lru_remove(&bc->lru[b->list_mode], &b->lru);<br>
+                       release(b);<br>
+               }<br>
+       }<br>
+}<br>
+<br>
+static void cache_remove_range(struct buffer_cache *bc,<br>
+                              sector_t begin, sector_t end,<br>
+                              b_predicate pred, b_release release)<br>
+{<br>
+       unsigned int i;<br>
+<br>
+       for (i = 0; i < NR_LOCKS; i++) {<br>
+               down_write(&bc->locks[i].lock);<br>
+               __remove_range(bc, &bc->roots[i], begin, end, pred, release);<br>
+               up_write(&bc->locks[i].lock);<br>
+       }<br>
+}<br>
+<br>
+/*----------------------------------------------------------------*/<br>
+<br>
 /*<br>
  * Linking of buffers:<br>
- *     All buffers are linked to buffer_tree with their node field.<br>
+ *     All buffers are linked to buffer_cache with their node field.<br>
  *<br>
  *     Clean buffers that are not being written (B_WRITING not set)<br>
  *     are linked to lru[LIST_CLEAN] with their lru_list field.<br>
@@ -136,9 +956,6 @@ struct dm_bufio_client {<br>
        spinlock_t spinlock;<br>
        bool no_sleep;<br>
<br>
-       struct list_head lru[LIST_SIZE];<br>
-       unsigned long n_buffers[LIST_SIZE];<br>
-<br>
        struct block_device *bdev;<br>
        unsigned int block_size;<br>
        s8 sectors_per_block_bits;<br>
@@ -153,7 +970,7 @@ struct dm_bufio_client {<br>
<br>
        unsigned int minimum_buffers;<br>
<br>
-       struct rb_root buffer_tree;<br>
+       struct buffer_cache cache;<br>
        wait_queue_head_t free_buffer_wait;<br>
<br>
        sector_t start;<br>
@@ -165,6 +982,11 @@ struct dm_bufio_client {<br>
        struct shrinker shrinker;<br>
        struct work_struct shrink_work;<br>
        atomic_long_t need_shrink;<br>
+<br>
+       /*<br>
+        * Used by global_cleanup to sort the clients list.<br>
+        */<br>
+       unsigned long oldest_buffer;<br>
 };<br>
<br>
 static DEFINE_STATIC_KEY_FALSE(no_sleep_enabled);<br>
@@ -181,14 +1003,6 @@ static void dm_bufio_lock(struct dm_bufio_client *c)<br>
                mutex_lock_nested(&c->lock, dm_bufio_in_request());<br>
 }<br>
<br>
-static int dm_bufio_trylock(struct dm_bufio_client *c)<br>
-{<br>
-       if (static_branch_unlikely(&no_sleep_enabled) && c->no_sleep)<br>
-               return spin_trylock_bh(&c->spinlock);<br>
-       else<br>
-               return mutex_trylock(&c->lock);<br>
-}<br>
-<br>
 static void dm_bufio_unlock(struct dm_bufio_client *c)<br>
 {<br>
        if (static_branch_unlikely(&no_sleep_enabled) && c->no_sleep)<br>
@@ -217,10 +1031,6 @@ static unsigned long dm_bufio_cache_size_latch;<br>
<br>
 static DEFINE_SPINLOCK(global_spinlock);<br>
<br>
-static LIST_HEAD(global_queue);<br>
-<br>
-static unsigned long global_num;<br>
-<br>
 /*<br>
  * Buffers are freed after this timeout<br>
  */<br>
@@ -254,7 +1064,6 @@ static struct workqueue_struct *dm_bufio_wq;<br>
 static struct delayed_work dm_bufio_cleanup_old_work;<br>
 static struct work_struct dm_bufio_replacement_work;<br>
<br>
-<br>
 #ifdef CONFIG_DM_DEBUG_BLOCK_STACK_TRACING<br>
 static void buffer_record_stack(struct dm_buffer *b)<br>
 {<br>
@@ -262,78 +1071,6 @@ static void buffer_record_stack(struct dm_buffer *b)<br>
 }<br>
 #endif<br>
<br>
-/*<br>
- *----------------------------------------------------------------<br>
- * A red/black tree acts as an index for all the buffers.<br>
- *----------------------------------------------------------------<br>
- */<br>
-static struct dm_buffer *__find(struct dm_bufio_client *c, sector_t block)<br>
-{<br>
-       struct rb_node *n = c->buffer_tree.rb_node;<br>
-       struct dm_buffer *b;<br>
-<br>
-       while (n) {<br>
-               b = container_of(n, struct dm_buffer, node);<br>
-<br>
-               if (b->block == block)<br>
-                       return b;<br>
-<br>
-               n = block < b->block ? n->rb_left : n->rb_right;<br>
-       }<br>
-<br>
-       return NULL;<br>
-}<br>
-<br>
-static struct dm_buffer *__find_next(struct dm_bufio_client *c, sector_t block)<br>
-{<br>
-       struct rb_node *n = c->buffer_tree.rb_node;<br>
-       struct dm_buffer *b;<br>
-       struct dm_buffer *best = NULL;<br>
-<br>
-       while (n) {<br>
-               b = container_of(n, struct dm_buffer, node);<br>
-<br>
-               if (b->block == block)<br>
-                       return b;<br>
-<br>
-               if (block <= b->block) {<br>
-                       n = n->rb_left;<br>
-                       best = b;<br>
-               } else {<br>
-                       n = n->rb_right;<br>
-               }<br>
-       }<br>
-<br>
-       return best;<br>
-}<br>
-<br>
-static void __insert(struct dm_bufio_client *c, struct dm_buffer *b)<br>
-{<br>
-       struct rb_node **new = &c->buffer_tree.rb_node, *parent = NULL;<br>
-       struct dm_buffer *found;<br>
-<br>
-       while (*new) {<br>
-               found = container_of(*new, struct dm_buffer, node);<br>
-<br>
-               if (found->block == b->block) {<br>
-                       BUG_ON(found != b);<br>
-                       return;<br>
-               }<br>
-<br>
-               parent = *new;<br>
-               new = b->block < found->block ?<br>
-                       &found->node.rb_left : &found->node.rb_right;<br>
-       }<br>
-<br>
-       rb_link_node(&b->node, parent, new);<br>
-       rb_insert_color(&b->node, &c->buffer_tree);<br>
-}<br>
-<br>
-static void __remove(struct dm_bufio_client *c, struct dm_buffer *b)<br>
-{<br>
-       rb_erase(&b->node, &c->buffer_tree);<br>
-}<br>
-<br>
 /*----------------------------------------------------------------*/<br>
<br>
 static void adjust_total_allocated(struct dm_buffer *b, bool unlink)<br>
@@ -361,16 +1098,9 @@ static void adjust_total_allocated(struct dm_buffer *b, bool unlink)<br>
        if (dm_bufio_current_allocated > dm_bufio_peak_allocated)<br>
                dm_bufio_peak_allocated = dm_bufio_current_allocated;<br>
<br>
-       b->accessed = 1;<br>
-<br>
        if (!unlink) {<br>
-               list_add(&b->global_list, &global_queue);<br>
-               global_num++;<br>
                if (dm_bufio_current_allocated > dm_bufio_cache_size)<br>
                        queue_work(dm_bufio_wq, &dm_bufio_replacement_work);<br>
-       } else {<br>
-               list_del(&b->global_list);<br>
-               global_num--;<br>
        }<br>
<br>
        spin_unlock(&global_spinlock);<br>
@@ -486,8 +1216,9 @@ static void free_buffer_data(struct dm_bufio_client *c,<br>
  */<br>
 static struct dm_buffer *alloc_buffer(struct dm_bufio_client *c, gfp_t gfp_mask)<br>
 {<br>
-       struct dm_buffer *b = kmem_cache_alloc(c->slab_buffer, gfp_mask);<br>
+       struct dm_buffer *b;<br>
<br>
+       b = kmem_cache_alloc(c->slab_buffer, gfp_mask);<br>
        if (!b)<br>
                return NULL;<br>
<br>
@@ -498,6 +1229,7 @@ static struct dm_buffer *alloc_buffer(struct dm_bufio_client *c, gfp_t gfp_mask)<br>
                kmem_cache_free(c->slab_buffer, b);<br>
                return NULL;<br>
        }<br>
+       adjust_total_allocated(b, false);<br>
<br>
 #ifdef CONFIG_DM_DEBUG_BLOCK_STACK_TRACING<br>
        b->stack_len = 0;<br>
@@ -512,61 +1244,11 @@ static void free_buffer(struct dm_buffer *b)<br>
 {<br>
        struct dm_bufio_client *c = b->c;<br>
<br>
+       adjust_total_allocated(b, true);<br>
        free_buffer_data(c, b->data, b->data_mode);<br>
        kmem_cache_free(c->slab_buffer, b);<br>
 }<br>
<br>
-/*<br>
- * Link buffer to the buffer tree and clean or dirty queue.<br>
- */<br>
-static void __link_buffer(struct dm_buffer *b, sector_t block, int dirty)<br>
-{<br>
-       struct dm_bufio_client *c = b->c;<br>
-<br>
-       c->n_buffers[dirty]++;<br>
-       b->block = block;<br>
-       b->list_mode = dirty;<br>
-       list_add(&b->lru_list, &c->lru[dirty]);<br>
-       __insert(b->c, b);<br>
-       b->last_accessed = jiffies;<br>
-<br>
-       adjust_total_allocated(b, false);<br>
-}<br>
-<br>
-/*<br>
- * Unlink buffer from the buffer tree and dirty or clean queue.<br>
- */<br>
-static void __unlink_buffer(struct dm_buffer *b)<br>
-{<br>
-       struct dm_bufio_client *c = b->c;<br>
-<br>
-       BUG_ON(!c->n_buffers[b->list_mode]);<br>
-<br>
-       c->n_buffers[b->list_mode]--;<br>
-       __remove(b->c, b);<br>
-       list_del(&b->lru_list);<br>
-<br>
-       adjust_total_allocated(b, true);<br>
-}<br>
-<br>
-/*<br>
- * Place the buffer to the head of dirty or clean LRU queue.<br>
- */<br>
-static void __relink_lru(struct dm_buffer *b, int dirty)<br>
-{<br>
-       struct dm_bufio_client *c = b->c;<br>
-<br>
-       b->accessed = 1;<br>
-<br>
-       BUG_ON(!c->n_buffers[b->list_mode]);<br>
-<br>
-       c->n_buffers[b->list_mode]--;<br>
-       c->n_buffers[dirty]++;<br>
-       b->list_mode = dirty;<br>
-       list_move(&b->lru_list, &c->lru[dirty]);<br>
-       b->last_accessed = jiffies;<br>
-}<br>
-<br>
 /*<br>
  *--------------------------------------------------------------------------<br>
  * Submit I/O on the buffer.<br>
@@ -806,7 +1488,7 @@ static void __flush_write_list(struct list_head *write_list)<br>
  */<br>
 static void __make_buffer_clean(struct dm_buffer *b)<br>
 {<br>
-       BUG_ON(b->hold_count);<br>
+       BUG_ON(atomic_read(&b->hold_count));<br>
<br>
        /* smp_load_acquire() pairs with read_endio()'s smp_mb__before_atomic() */<br>
        if (!smp_load_acquire(&b->state))       /* fast case */<br>
@@ -817,6 +1499,26 @@ static void __make_buffer_clean(struct dm_buffer *b)<br>
        wait_on_bit_io(&b->state, B_WRITING, TASK_UNINTERRUPTIBLE);<br>
 }<br>
<br>
+static enum evict_result is_clean(struct dm_buffer *b, void *context)<br>
+{<br>
+       struct dm_bufio_client *c = context;<br>
+<br>
+       BUG_ON(test_bit(B_WRITING, &b->state));<br>
+       BUG_ON(test_bit(B_DIRTY, &b->state));<br>
+<br>
+       if (static_branch_unlikely(&no_sleep_enabled) && c->no_sleep &&<br>
+           unlikely(test_bit(B_READING, &b->state)))<br>
+               return ER_DONT_EVICT;<br>
+<br>
+       return ER_EVICT;<br>
+}<br>
+<br>
+static enum evict_result is_dirty(struct dm_buffer *b, void *context)<br>
+{<br>
+       BUG_ON(test_bit(B_READING, &b->state));<br>
+       return ER_EVICT;<br>
+}<br>
+<br>
 /*<br>
  * Find some buffer that is not held by anybody, clean it, unlink it and<br>
  * return it.<br>
@@ -825,34 +1527,20 @@ static struct dm_buffer *__get_unclaimed_buffer(struct dm_bufio_client *c)<br>
 {<br>
        struct dm_buffer *b;<br>
<br>
-       list_for_each_entry_reverse(b, &c->lru[LIST_CLEAN], lru_list) {<br>
-               BUG_ON(test_bit(B_WRITING, &b->state));<br>
-               BUG_ON(test_bit(B_DIRTY, &b->state));<br>
-<br>
-               if (static_branch_unlikely(&no_sleep_enabled) && c->no_sleep &&<br>
-                   unlikely(test_bit_acquire(B_READING, &b->state)))<br>
-                       continue;<br>
-<br>
-               if (!b->hold_count) {<br>
-                       __make_buffer_clean(b);<br>
-                       __unlink_buffer(b);<br>
-                       return b;<br>
-               }<br>
-               cond_resched();<br>
+       b = cache_evict(&c->cache, LIST_CLEAN, is_clean, c);<br>
+       if (b) {<br>
+               /* this also waits for pending reads */<br>
+               __make_buffer_clean(b);<br>
+               return b;<br>
        }<br>
<br>
        if (static_branch_unlikely(&no_sleep_enabled) && c->no_sleep)<br>
                return NULL;<br>
<br>
-       list_for_each_entry_reverse(b, &c->lru[LIST_DIRTY], lru_list) {<br>
-               BUG_ON(test_bit(B_READING, &b->state));<br>
-<br>
-               if (!b->hold_count) {<br>
-                       __make_buffer_clean(b);<br>
-                       __unlink_buffer(b);<br>
-                       return b;<br>
-               }<br>
-               cond_resched();<br>
+       b = cache_evict(&c->cache, LIST_DIRTY, is_dirty, NULL);<br>
+       if (b) {<br>
+               __make_buffer_clean(b);<br>
+               return b;<br>
        }<br>
<br>
        return NULL;<br>
@@ -873,7 +1561,12 @@ static void __wait_for_free_buffer(struct dm_bufio_client *c)<br>
        set_current_state(TASK_UNINTERRUPTIBLE);<br>
        dm_bufio_unlock(c);<br>
<br>
-       io_schedule();<br>
+       /*<br>
+        * It's possible to miss a wake up event since we don't always<br>
+        * hold c->lock when wake_up is called.  So we have a timeout here,<br>
+        * just in case.<br>
+        */<br>
+       io_schedule_timeout(5 * HZ);<br>
<br>
        remove_wait_queue(&c->free_buffer_wait, &wait);<br>
<br>
@@ -931,9 +1624,8 @@ static struct dm_buffer *__alloc_buffer_wait_no_callback(struct dm_bufio_client<br>
                }<br>
<br>
                if (!list_empty(&c->reserved_buffers)) {<br>
-                       b = list_entry(c->reserved_buffers.next,<br>
-                                      struct dm_buffer, lru_list);<br>
-                       list_del(&b->lru_list);<br>
+                       b = list_to_buffer(c->reserved_buffers.next);<br>
+                       list_del(&b->lru.list);<br>
                        c->need_reserved_buffers++;<br>
<br>
                        return b;<br>
@@ -967,36 +1659,55 @@ static void __free_buffer_wake(struct dm_buffer *b)<br>
 {<br>
        struct dm_bufio_client *c = b->c;<br>
<br>
+       b->block = -1;<br>
        if (!c->need_reserved_buffers)<br>
                free_buffer(b);<br>
        else {<br>
-               list_add(&b->lru_list, &c->reserved_buffers);<br>
+               list_add(&b->lru.list, &c->reserved_buffers);<br>
                c->need_reserved_buffers--;<br>
        }<br>
<br>
        wake_up(&c->free_buffer_wait);<br>
 }<br>
<br>
+static enum evict_result cleaned(struct dm_buffer *b, void *context)<br>
+{<br>
+       BUG_ON(test_bit(B_READING, &b->state));<br>
+<br>
+       if (test_bit(B_DIRTY, &b->state) || test_bit(B_WRITING, &b->state))<br>
+               return ER_DONT_EVICT;<br>
+       else<br>
+               return ER_EVICT;<br>
+}<br>
+<br>
+static void __move_clean_buffers(struct dm_bufio_client *c)<br>
+{<br>
+       cache_mark_many(&c->cache, LIST_DIRTY, LIST_CLEAN, cleaned, NULL);<br>
+}<br>
+<br>
+struct write_context {<br>
+       int no_wait;<br>
+       struct list_head *write_list;<br>
+};<br>
+<br>
+static enum it_action write_one(struct dm_buffer *b, void *context)<br>
+{<br>
+       struct write_context *wc = context;<br>
+<br>
+       if (wc->no_wait && test_bit(B_WRITING, &b->state))<br>
+               return IT_COMPLETE;<br>
+<br>
+       __write_dirty_buffer(b, wc->write_list);<br>
+       return IT_NEXT;<br>
+}<br>
+<br>
 static void __write_dirty_buffers_async(struct dm_bufio_client *c, int no_wait,<br>
                                        struct list_head *write_list)<br>
 {<br>
-       struct dm_buffer *b, *tmp;<br>
+       struct write_context wc = {.no_wait = no_wait, .write_list = write_list};<br>
<br>
-       list_for_each_entry_safe_reverse(b, tmp, &c->lru[LIST_DIRTY], lru_list) {<br>
-               BUG_ON(test_bit(B_READING, &b->state));<br>
-<br>
-               if (!test_bit(B_DIRTY, &b->state) &&<br>
-                   !test_bit(B_WRITING, &b->state)) {<br>
-                       __relink_lru(b, LIST_CLEAN);<br>
-                       continue;<br>
-               }<br>
-<br>
-               if (no_wait && test_bit(B_WRITING, &b->state))<br>
-                       return;<br>
-<br>
-               __write_dirty_buffer(b, write_list);<br>
-               cond_resched();<br>
-       }<br>
+       __move_clean_buffers(c);<br>
+       cache_iterate(&c->cache, LIST_DIRTY, write_one, &wc);<br>
 }<br>
<br>
 /*<br>
@@ -1007,7 +1718,8 @@ static void __write_dirty_buffers_async(struct dm_bufio_client *c, int no_wait,<br>
 static void __check_watermark(struct dm_bufio_client *c,<br>
                              struct list_head *write_list)<br>
 {<br>
-       if (c->n_buffers[LIST_DIRTY] > c->n_buffers[LIST_CLEAN] * DM_BUFIO_WRITEBACK_RATIO)<br>
+       if (cache_count(&c->cache, LIST_DIRTY) ><br>
+               cache_count(&c->cache, LIST_CLEAN) * DM_BUFIO_WRITEBACK_RATIO)<br>
                __write_dirty_buffers_async(c, 1, write_list);<br>
 }<br>
<br>
@@ -1017,20 +1729,30 @@ static void __check_watermark(struct dm_bufio_client *c,<br>
  *--------------------------------------------------------------<br>
  */<br>
<br>
+static void cache_put_and_wake(struct dm_bufio_client *c, struct dm_buffer *b)<br>
+{<br>
+       /*<br>
+        * Relying on waitqueue_active() is racey, but we sleep<br>
+        * with schedule_timeout anyway.<br>
+        */<br>
+       if (cache_put(&c->cache, b) &&<br>
+           unlikely(waitqueue_active(&c->free_buffer_wait)))<br>
+               wake_up(&c->free_buffer_wait);<br>
+}<br>
+<br>
+/*<br>
+ * This assumes you have already checked the cache to see if the buffer<br>
+ * is already present (it will recheck after dropping the lock for allocation).<br>
+ */<br>
 static struct dm_buffer *__bufio_new(struct dm_bufio_client *c, sector_t block,<br>
                                     enum new_flag nf, int *need_submit,<br>
                                     struct list_head *write_list)<br>
 {<br>
        struct dm_buffer *b, *new_b = NULL;<br>
-<br>
        *need_submit = 0;<br>
<br>
-       b = __find(c, block);<br>
-       if (b)<br>
-               goto found_buffer;<br>
-<br>
-       if (nf == NF_GET)<br>
-               return NULL;<br>
+       /* This can't be called with NF_GET */<br>
+       BUG_ON(nf == NF_GET);<br>
<br>
        new_b = __alloc_buffer_wait(c, nf);<br>
        if (!new_b)<br>
@@ -1040,7 +1762,7 @@ static struct dm_buffer *__bufio_new(struct dm_bufio_client *c, sector_t block,<br>
         * We've had a period where the mutex was unlocked, so need to<br>
         * recheck the buffer tree.<br>
         */<br>
-       b = __find(c, block);<br>
+       b = cache_get(&c->cache, block);<br>
        if (b) {<br>
                __free_buffer_wake(new_b);<br>
                goto found_buffer;<br>
@@ -1049,24 +1771,35 @@ static struct dm_buffer *__bufio_new(struct dm_bufio_client *c, sector_t block,<br>
        __check_watermark(c, write_list);<br>
<br>
        b = new_b;<br>
-       b->hold_count = 1;<br>
+       atomic_set(&b->hold_count, 1);<br>
+       WRITE_ONCE(b->last_accessed, jiffies);<br>
+       b->block = block;<br>
        b->read_error = 0;<br>
        b->write_error = 0;<br>
-       __link_buffer(b, block, LIST_CLEAN);<br>
+       b->list_mode = LIST_CLEAN;<br>
<br>
-       if (nf == NF_FRESH) {<br>
+       if (nf == NF_FRESH)<br>
                b->state = 0;<br>
-               return b;<br>
+       else {<br>
+               b->state = 1 << B_READING;<br>
+               *need_submit = 1;<br>
        }<br>
<br>
-       b->state = 1 << B_READING;<br>
-       *need_submit = 1;<br>
+       /*<br>
+        * We mustn't insert into the cache until the B_READING state<br>
+        * is set.  Otherwise another thread could get it and use<br>
+        * it before it had been read.<br>
+        */<br>
+       cache_insert(&c->cache, b);<br>
<br>
        return b;<br>
<br>
 found_buffer:<br>
-       if (nf == NF_PREFETCH)<br>
+       if (nf == NF_PREFETCH) {<br>
+               cache_put_and_wake(c, b);<br>
                return NULL;<br>
+       }<br>
+<br>
        /*<br>
         * Note: it is essential that we don't wait for the buffer to be<br>
         * read if dm_bufio_get function is used. Both dm_bufio_get and<br>
@@ -1074,12 +1807,11 @@ static struct dm_buffer *__bufio_new(struct dm_bufio_client *c, sector_t block,<br>
         * If the user called both dm_bufio_prefetch and dm_bufio_get on<br>
         * the same buffer, it would deadlock if we waited.<br>
         */<br>
-       if (nf == NF_GET && unlikely(test_bit_acquire(B_READING, &b->state)))<br>
+       if (nf == NF_GET && unlikely(test_bit_acquire(B_READING, &b->state))) {<br>
+               cache_put_and_wake(c, b);<br>
                return NULL;<br>
+       }<br>
<br>
-       b->hold_count++;<br>
-       __relink_lru(b, test_bit(B_DIRTY, &b->state) ||<br>
-                    test_bit(B_WRITING, &b->state));<br>
        return b;<br>
 }<br>
<br>
@@ -1109,18 +1841,49 @@ static void read_endio(struct dm_buffer *b, blk_status_t status)<br>
 static void *new_read(struct dm_bufio_client *c, sector_t block,<br>
                      enum new_flag nf, struct dm_buffer **bp)<br>
 {<br>
-       int need_submit;<br>
+       int need_submit = 0;<br>
        struct dm_buffer *b;<br>
<br>
        LIST_HEAD(write_list);<br>
+       *bp = NULL;<br>
+<br>
+       /*<br>
+        * Fast path, hopefully the block is already in the cache.  No need<br>
+        * to get the client lock for this.<br>
+        */<br>
+       b = cache_get(&c->cache, block);<br>
+       if (b) {<br>
+               if (nf == NF_PREFETCH) {<br>
+                       cache_put_and_wake(c, b);<br>
+                       return NULL;<br>
+               }<br>
+<br>
+               /*<br>
+                * Note: it is essential that we don't wait for the buffer to be<br>
+                * read if dm_bufio_get function is used. Both dm_bufio_get and<br>
+                * dm_bufio_prefetch can be used in the driver request routine.<br>
+                * If the user called both dm_bufio_prefetch and dm_bufio_get on<br>
+                * the same buffer, it would deadlock if we waited.<br>
+                */<br>
+               if (nf == NF_GET && unlikely(test_bit_acquire(B_READING, &b->state))) {<br>
+                       cache_put_and_wake(c, b);<br>
+                       return NULL;<br>
+               }<br>
+       }<br>
+<br>
+       if (!b) {<br>
+               if (nf == NF_GET)<br>
+                       return NULL;<br>
+<br>
+               dm_bufio_lock(c);<br>
+               b = __bufio_new(c, block, nf, &need_submit, &write_list);<br>
+               dm_bufio_unlock(c);<br>
+       }<br>
<br>
-       dm_bufio_lock(c);<br>
-       b = __bufio_new(c, block, nf, &need_submit, &write_list);<br>
 #ifdef CONFIG_DM_DEBUG_BLOCK_STACK_TRACING<br>
-       if (b && b->hold_count == 1)<br>
+       if (b && (atomic_read(&b->hold_count) == 1))<br>
                buffer_record_stack(b);<br>
 #endif<br>
-       dm_bufio_unlock(c);<br>
<br>
        __flush_write_list(&write_list);<br>
<br>
@@ -1141,7 +1904,6 @@ static void *new_read(struct dm_bufio_client *c, sector_t block,<br>
        }<br>
<br>
        *bp = b;<br>
-<br>
        return b->data;<br>
 }<br>
<br>
@@ -1156,7 +1918,6 @@ void *dm_bufio_read(struct dm_bufio_client *c, sector_t block,<br>
                    struct dm_buffer **bp)<br>
 {<br>
        BUG_ON(dm_bufio_in_request());<br>
-<br>
        return new_read(c, block, NF_READ, bp);<br>
 }<br>
 EXPORT_SYMBOL_GPL(dm_bufio_read);<br>
@@ -1165,7 +1926,6 @@ void *dm_bufio_new(struct dm_bufio_client *c, sector_t block,<br>
                   struct dm_buffer **bp)<br>
 {<br>
        BUG_ON(dm_bufio_in_request());<br>
-<br>
        return new_read(c, block, NF_FRESH, bp);<br>
 }<br>
 EXPORT_SYMBOL_GPL(dm_bufio_new);<br>
@@ -1180,12 +1940,19 @@ void dm_bufio_prefetch(struct dm_bufio_client *c,<br>
        BUG_ON(dm_bufio_in_request());<br>
<br>
        blk_start_plug(&plug);<br>
-       dm_bufio_lock(c);<br>
<br>
        for (; n_blocks--; block++) {<br>
                int need_submit;<br>
                struct dm_buffer *b;<br>
<br>
+               b = cache_get(&c->cache, block);<br>
+               if (b) {<br>
+                       /* already in cache */<br>
+                       cache_put_and_wake(c, b);<br>
+                       continue;<br>
+               }<br>
+<br>
+               dm_bufio_lock(c);<br>
                b = __bufio_new(c, block, NF_PREFETCH, &need_submit,<br>
                                &write_list);<br>
                if (unlikely(!list_empty(&write_list))) {<br>
@@ -1195,6 +1962,7 @@ void dm_bufio_prefetch(struct dm_bufio_client *c,<br>
                        blk_start_plug(&plug);<br>
                        dm_bufio_lock(c);<br>
                }<br>
+<br>
                if (unlikely(b != NULL)) {<br>
                        dm_bufio_unlock(c);<br>
<br>
@@ -1208,10 +1976,9 @@ void dm_bufio_prefetch(struct dm_bufio_client *c,<br>
                                goto flush_plug;<br>
                        dm_bufio_lock(c);<br>
                }<br>
+               dm_bufio_unlock(c);<br>
        }<br>
<br>
-       dm_bufio_unlock(c);<br>
-<br>
 flush_plug:<br>
        blk_finish_plug(&plug);<br>
 }<br>
@@ -1221,29 +1988,28 @@ void dm_bufio_release(struct dm_buffer *b)<br>
 {<br>
        struct dm_bufio_client *c = b->c;<br>
<br>
-       dm_bufio_lock(c);<br>
+       /*<br>
+        * If there were errors on the buffer, and the buffer is not<br>
+        * to be written, free the buffer. There is no point in caching<br>
+        * invalid buffer.<br>
+        */<br>
+       if ((b->read_error || b->write_error) &&<br>
+           !test_bit_acquire(B_READING, &b->state) &&<br>
+           !test_bit(B_WRITING, &b->state) &&<br>
+           !test_bit(B_DIRTY, &b->state)) {<br>
+               dm_bufio_lock(c);<br>
<br>
-       BUG_ON(!b->hold_count);<br>
-<br>
-       b->hold_count--;<br>
-       if (!b->hold_count) {<br>
-               wake_up(&c->free_buffer_wait);<br>
-<br>
-               /*<br>
-                * If there were errors on the buffer, and the buffer is not<br>
-                * to be written, free the buffer. There is no point in caching<br>
-                * invalid buffer.<br>
-                */<br>
-               if ((b->read_error || b->write_error) &&<br>
-                   !test_bit_acquire(B_READING, &b->state) &&<br>
-                   !test_bit(B_WRITING, &b->state) &&<br>
-                   !test_bit(B_DIRTY, &b->state)) {<br>
-                       __unlink_buffer(b);<br>
+               /* cache remove can fail if there are other holders */<br>
+               if (cache_remove(&c->cache, b)) {<br>
                        __free_buffer_wake(b);<br>
+                       dm_bufio_unlock(c);<br>
+                       return;<br>
                }<br>
+<br>
+               dm_bufio_unlock(c);<br>
        }<br>
<br>
-       dm_bufio_unlock(c);<br>
+       cache_put_and_wake(c, b);<br>
 }<br>
 EXPORT_SYMBOL_GPL(dm_bufio_release);<br>
<br>
@@ -1262,7 +2028,7 @@ void dm_bufio_mark_partial_buffer_dirty(struct dm_buffer *b,<br>
        if (!test_and_set_bit(B_DIRTY, &b->state)) {<br>
                b->dirty_start = start;<br>
                b->dirty_end = end;<br>
-               __relink_lru(b, LIST_DIRTY);<br>
+               cache_mark(&c->cache, b, LIST_DIRTY);<br>
        } else {<br>
                if (start < b->dirty_start)<br>
                        b->dirty_start = start;<br>
@@ -1300,11 +2066,19 @@ EXPORT_SYMBOL_GPL(dm_bufio_write_dirty_buffers_async);<br>
  *<br>
  * Finally, we flush hardware disk cache.<br>
  */<br>
+static bool is_writing(struct lru_entry *e, void *context)<br>
+{<br>
+       struct dm_buffer *b = le_to_buffer(e);<br>
+<br>
+       return test_bit(B_WRITING, &b->state);<br>
+}<br>
+<br>
 int dm_bufio_write_dirty_buffers(struct dm_bufio_client *c)<br>
 {<br>
        int a, f;<br>
-       unsigned long buffers_processed = 0;<br>
-       struct dm_buffer *b, *tmp;<br>
+       unsigned long nr_buffers;<br>
+       struct lru_entry *e;<br>
+       struct lru_iter it;<br>
<br>
        LIST_HEAD(write_list);<br>
<br>
@@ -1314,52 +2088,32 @@ int dm_bufio_write_dirty_buffers(struct dm_bufio_client *c)<br>
        __flush_write_list(&write_list);<br>
        dm_bufio_lock(c);<br>
<br>
-again:<br>
-       list_for_each_entry_safe_reverse(b, tmp, &c->lru[LIST_DIRTY], lru_list) {<br>
-               int dropped_lock = 0;<br>
-<br>
-               if (buffers_processed < c->n_buffers[LIST_DIRTY])<br>
-                       buffers_processed++;<br>
+       nr_buffers = cache_count(&c->cache, LIST_DIRTY);<br>
+       lru_iter_begin(&c->cache.lru[LIST_DIRTY], &it);<br>
+       while ((e = lru_iter_next(&it, is_writing, c))) {<br>
+               struct dm_buffer *b = le_to_buffer(e);<br>
+               __cache_inc_buffer(b);<br>
<br>
                BUG_ON(test_bit(B_READING, &b->state));<br>
<br>
-               if (test_bit(B_WRITING, &b->state)) {<br>
-                       if (buffers_processed < c->n_buffers[LIST_DIRTY]) {<br>
-                               dropped_lock = 1;<br>
-                               b->hold_count++;<br>
-                               dm_bufio_unlock(c);<br>
-                               wait_on_bit_io(&b->state, B_WRITING,<br>
-                                              TASK_UNINTERRUPTIBLE);<br>
-                               dm_bufio_lock(c);<br>
-                               b->hold_count--;<br>
-                       } else<br>
-                               wait_on_bit_io(&b->state, B_WRITING,<br>
-                                              TASK_UNINTERRUPTIBLE);<br>
-               }<br>
+               if (nr_buffers) {<br>
+                       nr_buffers--;<br>
+                       dm_bufio_unlock(c);<br>
+                       wait_on_bit_io(&b->state, B_WRITING, TASK_UNINTERRUPTIBLE);<br>
+                       dm_bufio_lock(c);<br>
+               } else<br>
+                       wait_on_bit_io(&b->state, B_WRITING, TASK_UNINTERRUPTIBLE);<br>
<br>
-               if (!test_bit(B_DIRTY, &b->state) &&<br>
-                   !test_bit(B_WRITING, &b->state))<br>
-                       __relink_lru(b, LIST_CLEAN);<br>
+               if (!test_bit(B_DIRTY, &b->state) && !test_bit(B_WRITING, &b->state))<br>
+                       cache_mark(&c->cache, b, LIST_CLEAN);<br>
+<br>
+               cache_put_and_wake(c, b);<br>
<br>
                cond_resched();<br>
-<br>
-               /*<br>
-                * If we dropped the lock, the list is no longer consistent,<br>
-                * so we must restart the search.<br>
-                *<br>
-                * In the most common case, the buffer just processed is<br>
-                * relinked to the clean list, so we won't loop scanning the<br>
-                * same buffer again and again.<br>
-                *<br>
-                * This may livelock if there is another thread simultaneously<br>
-                * dirtying buffers, so we count the number of buffers walked<br>
-                * and if it exceeds the total number of buffers, it means that<br>
-                * someone is doing some writes simultaneously with us.  In<br>
-                * this case, stop, dropping the lock.<br>
-                */<br>
-               if (dropped_lock)<br>
-                       goto again;<br>
        }<br>
+<br>
+       lru_iter_end(&it);<br>
+<br>
        wake_up(&c->free_buffer_wait);<br>
        dm_bufio_unlock(c);<br>
<br>
@@ -1418,12 +2172,22 @@ int dm_bufio_issue_discard(struct dm_bufio_client *c, sector_t block, sector_t c<br>
 }<br>
 EXPORT_SYMBOL_GPL(dm_bufio_issue_discard);<br>
<br>
-static void forget_buffer_locked(struct dm_buffer *b)<br>
+static bool forget_buffer(struct dm_bufio_client *c, sector_t block)<br>
 {<br>
-       if (likely(!b->hold_count) && likely(!smp_load_acquire(&b->state))) {<br>
-               __unlink_buffer(b);<br>
-               __free_buffer_wake(b);<br>
+       struct dm_buffer *b;<br>
+<br>
+       b = cache_get(&c->cache, block);<br>
+       if (b) {<br>
+               if (likely(!smp_load_acquire(&b->state))) {<br>
+                       if (cache_remove(&c->cache, b))<br>
+                               __free_buffer_wake(b);<br>
+                       else<br>
+                               cache_put_and_wake(c, b);<br>
+               } else<br>
+                       cache_put_and_wake(c, b);<br>
        }<br>
+<br>
+       return b ? true : false;<br>
 }<br>
<br>
 /*<br>
@@ -1434,38 +2198,22 @@ static void forget_buffer_locked(struct dm_buffer *b)<br>
  */<br>
 void dm_bufio_forget(struct dm_bufio_client *c, sector_t block)<br>
 {<br>
-       struct dm_buffer *b;<br>
-<br>
        dm_bufio_lock(c);<br>
-<br>
-       b = __find(c, block);<br>
-       if (b)<br>
-               forget_buffer_locked(b);<br>
-<br>
+       forget_buffer(c, block);<br>
        dm_bufio_unlock(c);<br>
 }<br>
 EXPORT_SYMBOL_GPL(dm_bufio_forget);<br>
<br>
+static enum evict_result idle(struct dm_buffer *b, void *context)<br>
+{<br>
+       return b->state ? ER_DONT_EVICT : ER_EVICT;<br>
+}<br>
+<br>
 void dm_bufio_forget_buffers(struct dm_bufio_client *c, sector_t block, sector_t n_blocks)<br>
 {<br>
-       struct dm_buffer *b;<br>
-       sector_t end_block = block + n_blocks;<br>
-<br>
-       while (block < end_block) {<br>
-               dm_bufio_lock(c);<br>
-<br>
-               b = __find_next(c, block);<br>
-               if (b) {<br>
-                       block = b->block + 1;<br>
-                       forget_buffer_locked(b);<br>
-               }<br>
-<br>
-               dm_bufio_unlock(c);<br>
-<br>
-               if (!b)<br>
-                       break;<br>
-       }<br>
-<br>
+       dm_bufio_lock(c);<br>
+       cache_remove_range(&c->cache, block, block + n_blocks, idle, __free_buffer_wake);<br>
+       dm_bufio_unlock(c);<br>
 }<br>
 EXPORT_SYMBOL_GPL(dm_bufio_forget_buffers);<br>
<br>
@@ -1527,11 +2275,26 @@ struct dm_bufio_client *dm_bufio_get_client(struct dm_buffer *b)<br>
 }<br>
 EXPORT_SYMBOL_GPL(dm_bufio_get_client);<br>
<br>
+static enum it_action warn_leak(struct dm_buffer *b, void *context)<br>
+{<br>
+       bool *warned = context;<br>
+<br>
+       WARN_ON(!(*warned));<br>
+       *warned = true;<br>
+       DMERR("leaked buffer %llx, hold count %u, list %d",<br>
+             (unsigned long long)b->block, atomic_read(&b->hold_count), b->list_mode);<br>
+#ifdef CONFIG_DM_DEBUG_BLOCK_STACK_TRACING<br>
+       stack_trace_print(b->stack_entries, b->stack_len, 1);<br>
+       /* mark unclaimed to avoid BUG_ON below */<br>
+       atomic_set(&b->hold_count, 0);<br>
+#endif<br>
+       return IT_NEXT;<br>
+}<br>
+<br>
 static void drop_buffers(struct dm_bufio_client *c)<br>
 {<br>
-       struct dm_buffer *b;<br>
        int i;<br>
-       bool warned = false;<br>
+       struct dm_buffer *b;<br>
<br>
        BUG_ON(dm_bufio_in_request());<br>
<br>
@@ -1545,18 +2308,11 @@ static void drop_buffers(struct dm_bufio_client *c)<br>
        while ((b = __get_unclaimed_buffer(c)))<br>
                __free_buffer_wake(b);<br>
<br>
-       for (i = 0; i < LIST_SIZE; i++)<br>
-               list_for_each_entry(b, &c->lru[i], lru_list) {<br>
-                       WARN_ON(!warned);<br>
-                       warned = true;<br>
-                       DMERR("leaked buffer %llx, hold count %u, list %d",<br>
-                             (unsigned long long)b->block, b->hold_count, i);<br>
-#ifdef CONFIG_DM_DEBUG_BLOCK_STACK_TRACING<br>
-                       stack_trace_print(b->stack_entries, b->stack_len, 1);<br>
-                       /* mark unclaimed to avoid BUG_ON below */<br>
-                       b->hold_count = 0;<br>
-#endif<br>
-               }<br>
+       for (i = 0; i < LIST_SIZE; i++) {<br>
+               bool warned = false;<br>
+<br>
+               cache_iterate(&c->cache, i, warn_leak, &warned);<br>
+       }<br>
<br>
 #ifdef CONFIG_DM_DEBUG_BLOCK_STACK_TRACING<br>
        while ((b = __get_unclaimed_buffer(c)))<br>
@@ -1564,39 +2320,11 @@ static void drop_buffers(struct dm_bufio_client *c)<br>
 #endif<br>
<br>
        for (i = 0; i < LIST_SIZE; i++)<br>
-               BUG_ON(!list_empty(&c->lru[i]));<br>
+               BUG_ON(cache_count(&c->cache, i));<br>
<br>
        dm_bufio_unlock(c);<br>
 }<br>
<br>
-/*<br>
- * We may not be able to evict this buffer if IO pending or the client<br>
- * is still using it.  Caller is expected to know buffer is too old.<br>
- *<br>
- * And if GFP_NOFS is used, we must not do any I/O because we hold<br>
- * dm_bufio_clients_lock and we would risk deadlock if the I/O gets<br>
- * rerouted to different bufio client.<br>
- */<br>
-static bool __try_evict_buffer(struct dm_buffer *b, gfp_t gfp)<br>
-{<br>
-       if (!(gfp & __GFP_FS) ||<br>
-           (static_branch_unlikely(&no_sleep_enabled) && b->c->no_sleep)) {<br>
-               if (test_bit_acquire(B_READING, &b->state) ||<br>
-                   test_bit(B_WRITING, &b->state) ||<br>
-                   test_bit(B_DIRTY, &b->state))<br>
-                       return false;<br>
-       }<br>
-<br>
-       if (b->hold_count)<br>
-               return false;<br>
-<br>
-       __make_buffer_clean(b);<br>
-       __unlink_buffer(b);<br>
-       __free_buffer_wake(b);<br>
-<br>
-       return true;<br>
-}<br>
-<br>
 static unsigned long get_retain_buffers(struct dm_bufio_client *c)<br>
 {<br>
        unsigned long retain_bytes = READ_ONCE(dm_bufio_retain_bytes);<br>
@@ -1612,22 +2340,30 @@ static unsigned long get_retain_buffers(struct dm_bufio_client *c)<br>
 static void __scan(struct dm_bufio_client *c)<br>
 {<br>
        int l;<br>
-       struct dm_buffer *b, *tmp;<br>
+       struct dm_buffer *b;<br>
        unsigned long freed = 0;<br>
-       unsigned long count = c->n_buffers[LIST_CLEAN] +<br>
-                             c->n_buffers[LIST_DIRTY];<br>
        unsigned long retain_target = get_retain_buffers(c);<br>
+       unsigned long count = cache_total(&c->cache);<br>
<br>
        for (l = 0; l < LIST_SIZE; l++) {<br>
-               list_for_each_entry_safe_reverse(b, tmp, &c->lru[l], lru_list) {<br>
+               while (true) {<br>
                        if (count - freed <= retain_target)<br>
                                atomic_long_set(&c->need_shrink, 0);<br>
+<br>
                        if (!atomic_long_read(&c->need_shrink))<br>
-                               return;<br>
-                       if (__try_evict_buffer(b, GFP_KERNEL)) {<br>
-                               atomic_long_dec(&c->need_shrink);<br>
-                               freed++;<br>
-                       }<br>
+                               break;<br>
+<br>
+                       b = cache_evict(&c->cache, l,<br>
+                                       l == LIST_CLEAN ? is_clean : is_dirty, c);<br>
+                       if (!b)<br>
+                               break;<br>
+<br>
+                       BUG_ON(b->list_mode != l);<br>
+                       __make_buffer_clean(b);<br>
+                       __free_buffer_wake(b);<br>
+<br>
+                       atomic_long_dec(&c->need_shrink);<br>
+                       freed++;<br>
                        cond_resched();<br>
                }<br>
        }<br>
@@ -1656,8 +2392,7 @@ static unsigned long dm_bufio_shrink_scan(struct shrinker *shrink, struct shrink<br>
 static unsigned long dm_bufio_shrink_count(struct shrinker *shrink, struct shrink_control *sc)<br>
 {<br>
        struct dm_bufio_client *c = container_of(shrink, struct dm_bufio_client, shrinker);<br>
-       unsigned long count = READ_ONCE(c->n_buffers[LIST_CLEAN]) +<br>
-                             READ_ONCE(c->n_buffers[LIST_DIRTY]);<br>
+       unsigned long count = cache_total(&c->cache);<br>
        unsigned long retain_target = get_retain_buffers(c);<br>
        unsigned long queued_for_cleanup = atomic_long_read(&c->need_shrink);<br>
<br>
@@ -1685,7 +2420,6 @@ struct dm_bufio_client *dm_bufio_client_create(struct block_device *bdev, unsign<br>
 {<br>
        int r;<br>
        struct dm_bufio_client *c;<br>
-       unsigned int i;<br>
        char slab_name[27];<br>
<br>
        if (!block_size || block_size & ((1 << SECTOR_SHIFT) - 1)) {<br>
@@ -1699,7 +2433,7 @@ struct dm_bufio_client *dm_bufio_client_create(struct block_device *bdev, unsign<br>
                r = -ENOMEM;<br>
                goto bad_client;<br>
        }<br>
-       c->buffer_tree = RB_ROOT;<br>
+       cache_init(&c->cache);<br>
<br>
        c->bdev = bdev;<br>
        c->block_size = block_size;<br>
@@ -1716,11 +2450,6 @@ struct dm_bufio_client *dm_bufio_client_create(struct block_device *bdev, unsign<br>
                static_branch_inc(&no_sleep_enabled);<br>
        }<br>
<br>
-       for (i = 0; i < LIST_SIZE; i++) {<br>
-               INIT_LIST_HEAD(&c->lru[i]);<br>
-               c->n_buffers[i] = 0;<br>
-       }<br>
-<br>
        mutex_init(&c->lock);<br>
        spin_lock_init(&c->spinlock);<br>
        INIT_LIST_HEAD(&c->reserved_buffers);<br>
@@ -1792,9 +2521,9 @@ struct dm_bufio_client *dm_bufio_client_create(struct block_device *bdev, unsign<br>
<br>
 bad:<br>
        while (!list_empty(&c->reserved_buffers)) {<br>
-               struct dm_buffer *b = list_entry(c->reserved_buffers.next,<br>
-                                                struct dm_buffer, lru_list);<br>
-               list_del(&b->lru_list);<br>
+               struct dm_buffer *b = list_to_buffer(c->reserved_buffers.next);<br>
+<br>
+               list_del(&b->lru.list);<br>
                free_buffer(b);<br>
        }<br>
        kmem_cache_destroy(c->slab_cache);<br>
@@ -1831,23 +2560,23 @@ void dm_bufio_client_destroy(struct dm_bufio_client *c)<br>
<br>
        mutex_unlock(&dm_bufio_clients_lock);<br>
<br>
-       BUG_ON(!RB_EMPTY_ROOT(&c->buffer_tree));<br>
        BUG_ON(c->need_reserved_buffers);<br>
<br>
        while (!list_empty(&c->reserved_buffers)) {<br>
-               struct dm_buffer *b = list_entry(c->reserved_buffers.next,<br>
-                                                struct dm_buffer, lru_list);<br>
-               list_del(&b->lru_list);<br>
+               struct dm_buffer *b = list_to_buffer(c->reserved_buffers.next);<br>
+<br>
+               list_del(&b->lru.list);<br>
                free_buffer(b);<br>
        }<br>
<br>
        for (i = 0; i < LIST_SIZE; i++)<br>
-               if (c->n_buffers[i])<br>
-                       DMERR("leaked buffer count %d: %ld", i, c->n_buffers[i]);<br>
+               if (cache_count(&c->cache, i))<br>
+                       DMERR("leaked buffer count %d: %lu", i, cache_count(&c->cache, i));<br>
<br>
        for (i = 0; i < LIST_SIZE; i++)<br>
-               BUG_ON(c->n_buffers[i]);<br>
+               BUG_ON(cache_count(&c->cache, i));<br>
<br>
+       cache_destroy(&c->cache);<br>
        kmem_cache_destroy(c->slab_cache);<br>
        kmem_cache_destroy(c->slab_buffer);<br>
        dm_io_client_destroy(c->dm_io);<br>
@@ -1864,6 +2593,8 @@ void dm_bufio_set_sector_offset(struct dm_bufio_client *c, sector_t start)<br>
 }<br>
 EXPORT_SYMBOL_GPL(dm_bufio_set_sector_offset);<br>
<br>
+/*--------------------------------------------------------------*/<br>
+<br>
 static unsigned int get_max_age_hz(void)<br>
 {<br>
        unsigned int max_age = READ_ONCE(dm_bufio_max_age);<br>
@@ -1876,13 +2607,74 @@ static unsigned int get_max_age_hz(void)<br>
<br>
 static bool older_than(struct dm_buffer *b, unsigned long age_hz)<br>
 {<br>
-       return time_after_eq(jiffies, b->last_accessed + age_hz);<br>
+       return time_after_eq(jiffies, READ_ONCE(b->last_accessed) + age_hz);<br>
 }<br>
<br>
-static void __evict_old_buffers(struct dm_bufio_client *c, unsigned long age_hz)<br>
+struct evict_params {<br>
+       gfp_t gfp;<br>
+       unsigned long age_hz;<br>
+<br>
+       /*<br>
+        * This gets updated with the largest last_accessed (ie. most<br>
+        * recently used) of the evicted buffers.  It will not be reinitialised<br>
+        * by __evict_many(), so you can use it across multiple invocations.<br>
+        */<br>
+       unsigned long last_accessed;<br>
+};<br>
+<br>
+/*<br>
+ * We may not be able to evict this buffer if IO pending or the client<br>
+ * is still using it.<br>
+ *<br>
+ * And if GFP_NOFS is used, we must not do any I/O because we hold<br>
+ * dm_bufio_clients_lock and we would risk deadlock if the I/O gets<br>
+ * rerouted to different bufio client.<br>
+ */<br>
+static enum evict_result select_for_evict(struct dm_buffer *b, void *context)<br>
+{<br>
+       struct evict_params *params = context;<br>
+<br>
+       if (!(params->gfp & __GFP_FS) ||<br>
+           (static_branch_unlikely(&no_sleep_enabled) && b->c->no_sleep)) {<br>
+               if (test_bit_acquire(B_READING, &b->state) ||<br>
+                   test_bit(B_WRITING, &b->state) ||<br>
+                   test_bit(B_DIRTY, &b->state))<br>
+                       return ER_DONT_EVICT;<br>
+       }<br>
+<br>
+       return older_than(b, params->age_hz) ? ER_EVICT : ER_STOP;<br>
+}<br>
+<br>
+static unsigned int __evict_many(struct dm_bufio_client *c,<br>
+                                struct evict_params *params,<br>
+                                int list_mode, unsigned int max_count)<br>
+{<br>
+       unsigned int count;<br>
+       unsigned long last_accessed;<br>
+       struct dm_buffer *b;<br>
+<br>
+       for (count = 0; count < max_count; count++) {<br>
+               b = cache_evict(&c->cache, list_mode, select_for_evict, params);<br>
+               if (!b)<br>
+                       break;<br>
+<br>
+               last_accessed = READ_ONCE(b->last_accessed);<br>
+               if (time_after_eq(params->last_accessed, last_accessed))<br>
+                       params->last_accessed = last_accessed;<br>
+<br>
+               __make_buffer_clean(b);<br>
+               __free_buffer_wake(b);<br>
+<br>
+               cond_resched();<br>
+       }<br>
+<br>
+       return count;<br>
+}<br>
+<br>
+static void evict_old_buffers(struct dm_bufio_client *c, unsigned long age_hz)<br>
 {<br>
-       struct dm_buffer *b, *tmp;<br>
-       unsigned long retain_target = get_retain_buffers(c);<br>
+       struct evict_params params = {.gfp = 0, .age_hz = age_hz, .last_accessed = 0};<br>
+       unsigned long retain = get_retain_buffers(c);<br>
        unsigned long count;<br>
        LIST_HEAD(write_list);<br>
<br>
@@ -1895,91 +2687,13 @@ static void __evict_old_buffers(struct dm_bufio_client *c, unsigned long age_hz)<br>
                dm_bufio_lock(c);<br>
        }<br>
<br>
-       count = c->n_buffers[LIST_CLEAN] + c->n_buffers[LIST_DIRTY];<br>
-       list_for_each_entry_safe_reverse(b, tmp, &c->lru[LIST_CLEAN], lru_list) {<br>
-               if (count <= retain_target)<br>
-                       break;<br>
-<br>
-               if (!older_than(b, age_hz))<br>
-                       break;<br>
-<br>
-               if (__try_evict_buffer(b, 0))<br>
-                       count--;<br>
-<br>
-               cond_resched();<br>
-       }<br>
+       count = cache_total(&c->cache);<br>
+       if (count > retain)<br>
+               __evict_many(c, &params, LIST_CLEAN, count - retain);<br>
<br>
        dm_bufio_unlock(c);<br>
 }<br>
<br>
-static void do_global_cleanup(struct work_struct *w)<br>
-{<br>
-       struct dm_bufio_client *locked_client = NULL;<br>
-       struct dm_bufio_client *current_client;<br>
-       struct dm_buffer *b;<br>
-       unsigned int spinlock_hold_count;<br>
-       unsigned long threshold = dm_bufio_cache_size -<br>
-               dm_bufio_cache_size / DM_BUFIO_LOW_WATERMARK_RATIO;<br>
-       unsigned long loops = global_num * 2;<br>
-<br>
-       mutex_lock(&dm_bufio_clients_lock);<br>
-<br>
-       while (1) {<br>
-               cond_resched();<br>
-<br>
-               spin_lock(&global_spinlock);<br>
-               if (unlikely(dm_bufio_current_allocated <= threshold))<br>
-                       break;<br>
-<br>
-               spinlock_hold_count = 0;<br>
-get_next:<br>
-               if (!loops--)<br>
-                       break;<br>
-               if (unlikely(list_empty(&global_queue)))<br>
-                       break;<br>
-               b = list_entry(global_queue.prev, struct dm_buffer, global_list);<br>
-<br>
-               if (b->accessed) {<br>
-                       b->accessed = 0;<br>
-                       list_move(&b->global_list, &global_queue);<br>
-                       if (likely(++spinlock_hold_count < 16))<br>
-                               goto get_next;<br>
-                       spin_unlock(&global_spinlock);<br>
-                       continue;<br>
-               }<br>
-<br>
-               current_client = b->c;<br>
-               if (unlikely(current_client != locked_client)) {<br>
-                       if (locked_client)<br>
-                               dm_bufio_unlock(locked_client);<br>
-<br>
-                       if (!dm_bufio_trylock(current_client)) {<br>
-                               spin_unlock(&global_spinlock);<br>
-                               dm_bufio_lock(current_client);<br>
-                               locked_client = current_client;<br>
-                               continue;<br>
-                       }<br>
-<br>
-                       locked_client = current_client;<br>
-               }<br>
-<br>
-               spin_unlock(&global_spinlock);<br>
-<br>
-               if (unlikely(!__try_evict_buffer(b, GFP_KERNEL))) {<br>
-                       spin_lock(&global_spinlock);<br>
-                       list_move(&b->global_list, &global_queue);<br>
-                       spin_unlock(&global_spinlock);<br>
-               }<br>
-       }<br>
-<br>
-       spin_unlock(&global_spinlock);<br>
-<br>
-       if (locked_client)<br>
-               dm_bufio_unlock(locked_client);<br>
-<br>
-       mutex_unlock(&dm_bufio_clients_lock);<br>
-}<br>
-<br>
 static void cleanup_old_buffers(void)<br>
 {<br>
        unsigned long max_age_hz = get_max_age_hz();<br>
@@ -1990,7 +2704,7 @@ static void cleanup_old_buffers(void)<br>
        __cache_size_refresh();<br>
<br>
        list_for_each_entry(c, &dm_bufio_all_clients, client_list)<br>
-               __evict_old_buffers(c, max_age_hz);<br>
+               evict_old_buffers(c, max_age_hz);<br>
<br>
        mutex_unlock(&dm_bufio_clients_lock);<br>
 }<br>
@@ -2003,6 +2717,107 @@ static void work_fn(struct work_struct *w)<br>
                           DM_BUFIO_WORK_TIMER_SECS * HZ);<br>
 }<br>
<br>
+/*--------------------------------------------------------------*/<br>
+<br>
+/*<br>
+ * Global cleanup tries to evict the oldest buffers from across _all_<br>
+ * the clients.  It does this by repeatedly evicting a few buffers from<br>
+ * the client that holds the oldest buffer.  It's approximate, but hopefully<br>
+ * good enough.<br>
+ */<br>
+static struct dm_bufio_client *__pop_client(void)<br>
+{<br>
+       struct list_head *h;<br>
+<br>
+       if (list_empty(&dm_bufio_all_clients))<br>
+               return NULL;<br>
+<br>
+       h = dm_bufio_all_clients.next;<br>
+       list_del(h);<br>
+       return container_of(h, struct dm_bufio_client, client_list);<br>
+}<br>
+<br>
+/*<br>
+ * Inserts the client in the global client list based on its<br>
+ * 'oldest_buffer' field.<br>
+ */<br>
+static void __insert_client(struct dm_bufio_client *new_client)<br>
+{<br>
+       struct dm_bufio_client *c;<br>
+       struct list_head *h = dm_bufio_all_clients.next;<br>
+<br>
+       while (h != &dm_bufio_all_clients) {<br>
+               c = container_of(h, struct dm_bufio_client, client_list);<br>
+               if (time_after_eq(c->oldest_buffer, new_client->oldest_buffer))<br>
+                       break;<br>
+               h = h->next;<br>
+       }<br>
+<br>
+       list_add_tail(&new_client->client_list, h);<br>
+}<br>
+<br>
+static unsigned int __evict_a_few(unsigned int nr_buffers)<br>
+{<br>
+       unsigned int count;<br>
+       struct dm_bufio_client *c;<br>
+       struct evict_params params = {<br>
+               .gfp = GFP_KERNEL,<br>
+               .age_hz = 0,<br>
+               /* set to jiffies in case there are no buffers in this client */<br>
+               .last_accessed = jiffies<br>
+       };<br>
+<br>
+       c = __pop_client();<br>
+       if (!c)<br>
+               return 0;<br>
+<br>
+       dm_bufio_lock(c);<br>
+       count = __evict_many(c, &params, LIST_CLEAN, nr_buffers);<br>
+       dm_bufio_unlock(c);<br>
+<br>
+       if (count)<br>
+               c->oldest_buffer = params.last_accessed;<br>
+       __insert_client(c);<br>
+<br>
+       return count;<br>
+}<br>
+<br>
+static void check_watermarks(void)<br>
+{<br>
+       LIST_HEAD(write_list);<br>
+       struct dm_bufio_client *c;<br>
+<br>
+       mutex_lock(&dm_bufio_clients_lock);<br>
+       list_for_each_entry(c, &dm_bufio_all_clients, client_list) {<br>
+               dm_bufio_lock(c);<br>
+               __check_watermark(c, &write_list);<br>
+               dm_bufio_unlock(c);<br>
+       }<br>
+       mutex_unlock(&dm_bufio_clients_lock);<br>
+<br>
+       __flush_write_list(&write_list);<br>
+}<br>
+<br>
+static void evict_old(void)<br>
+{<br>
+       unsigned long threshold = dm_bufio_cache_size -<br>
+               dm_bufio_cache_size / DM_BUFIO_LOW_WATERMARK_RATIO;<br>
+<br>
+       mutex_lock(&dm_bufio_clients_lock);<br>
+       while (dm_bufio_current_allocated > threshold) {<br>
+               if (!__evict_a_few(64))<br>
+                       break;<br>
+               cond_resched();<br>
+       }<br>
+       mutex_unlock(&dm_bufio_clients_lock);<br>
+}<br>
+<br>
+static void do_global_cleanup(struct work_struct *w)<br>
+{<br>
+       check_watermarks();<br>
+       evict_old();<br>
+}<br>
+<br>
 /*<br>
  *--------------------------------------------------------------<br>
  * Module setup<br>
-- <br>
2.40.0<br>
<br>
</blockquote></div>