[Cluster-devel] [PATCH 11/13] GFS2: Reduce glock_work_func to a single call to state_machine

Bob Peterson rpeterso at redhat.com
Mon Nov 19 13:29:29 UTC 2018


Before this patch, function glock_work_func would call into the
state machine for GL_FINISH_XMOTE, then GL_RUN, plus some work
related to dropping references and requeueing itself. This patch
moves all that functionality to a new GL_WORK state. This reduces
glock_work_func to a single call to the state machine.

The goal here is to allow for patches in the future that will
bypass the delayed workqueue altogether to improve performance.

Signed-off-by: Bob Peterson <rpeterso at redhat.com>
---
 fs/gfs2/glock.c | 97 ++++++++++++++++++++++++++++---------------------
 fs/gfs2/glock.h |  1 +
 2 files changed, 56 insertions(+), 42 deletions(-)

diff --git a/fs/gfs2/glock.c b/fs/gfs2/glock.c
index 858f42e66698..22ddeda90199 100644
--- a/fs/gfs2/glock.c
+++ b/fs/gfs2/glock.c
@@ -624,9 +624,25 @@ static void __state_machine(struct gfs2_glock *gl, int new_state,
 			    const int nonblock)
 {
 	struct gfs2_holder *gh = NULL;
+	unsigned long delay = 0;
+	unsigned int drop_refs = 0;
 	int ret;
 
 	BUG_ON(!spin_is_locked(&gl->gl_lockref.lock));
+	if (new_state == GL_ST_WORK) {
+		drop_refs = 1;
+		/**
+		 * Before we can do the rest of the work, we need to finish
+		 * any xmotes due to a reply from dlm. Note that since we did
+		 * not change new_state, we'll drop back into GL_ST_WORK when
+		 * the GL_ST_FINISH_XMOTE completes its cycle, regardless
+		 * of how many other states it passes through.
+		 */
+		if (test_and_clear_bit(GLF_REPLY_PENDING, &gl->gl_flags)) {
+			gl->gl_mch = GL_ST_FINISH_XMOTE;
+			drop_refs++;
+		}
+	}
 
 	do {
 		switch (gl->gl_mch) {
@@ -716,8 +732,41 @@ static void __state_machine(struct gfs2_glock *gl, int new_state,
 			else
 				gl->gl_mch = GL_ST_PROMOTE;
 			break;
+
+		case GL_ST_WORK:
+			if (test_bit(GLF_PENDING_DEMOTE, &gl->gl_flags) &&
+			    gl->gl_state != LM_ST_UNLOCKED &&
+			    gl->gl_demote_state != LM_ST_EXCLUSIVE) {
+				unsigned long holdtime, now = jiffies;
+
+				holdtime = gl->gl_tchange + gl->gl_hold_time;
+				if (time_before(now, holdtime))
+					delay = holdtime - now;
+
+				if (!delay) {
+					clear_bit(GLF_PENDING_DEMOTE, &gl->gl_flags);
+					set_bit(GLF_DEMOTE, &gl->gl_flags);
+				}
+			}
+			gl->gl_mch = GL_ST_RUN;
+			break;
 		}
-	} while (gl->gl_mch != GL_ST_IDLE);
+	} while (gl->gl_mch != GL_ST_IDLE || new_state != GL_ST_IDLE);
+
+	/* Now check if a delayed re-queue of the work is needed */
+	if (delay) {
+		/* Keep one glock reference for the work we requeue. */
+		drop_refs--;
+		if (gl->gl_name.ln_type != LM_TYPE_INODE)
+			delay = 0;
+		__gfs2_glock_queue_work(gl, delay);
+	}
+	/*
+	 * Drop the remaining glock references manually here. (Mind that
+	 * __gfs2_glock_queue_work depends on the lockref spinlock begin held
+	 * here as well.)
+	 */
+	gl->gl_lockref.count -= drop_refs;
 }
 
 /**
@@ -734,6 +783,10 @@ __acquires(&gl->gl_lockref.lock)
 {
 	spin_lock(&gl->gl_lockref.lock);
 	__state_machine(gl, new_state, nonblock);
+	if (new_state == GL_ST_WORK && !gl->gl_lockref.count) {
+		__gfs2_glock_put(gl);
+		return;
+	}
 	spin_unlock(&gl->gl_lockref.lock);
 }
 
@@ -761,49 +814,9 @@ static void delete_work_func(struct work_struct *work)
 
 static void glock_work_func(struct work_struct *work)
 {
-	unsigned long delay = 0;
 	struct gfs2_glock *gl = container_of(work, struct gfs2_glock, gl_work.work);
-	unsigned int drop_refs = 1;
-
-	if (test_and_clear_bit(GLF_REPLY_PENDING, &gl->gl_flags)) {
-		state_machine(gl, GL_ST_FINISH_XMOTE, 0);
-		drop_refs++;
-	}
-	spin_lock(&gl->gl_lockref.lock);
-	if (test_bit(GLF_PENDING_DEMOTE, &gl->gl_flags) &&
-	    gl->gl_state != LM_ST_UNLOCKED &&
-	    gl->gl_demote_state != LM_ST_EXCLUSIVE) {
-		unsigned long holdtime, now = jiffies;
-
-		holdtime = gl->gl_tchange + gl->gl_hold_time;
-		if (time_before(now, holdtime))
-			delay = holdtime - now;
 
-		if (!delay) {
-			clear_bit(GLF_PENDING_DEMOTE, &gl->gl_flags);
-			set_bit(GLF_DEMOTE, &gl->gl_flags);
-		}
-	}
-	__state_machine(gl, GL_ST_RUN, 0);
-	if (delay) {
-		/* Keep one glock reference for the work we requeue. */
-		drop_refs--;
-		if (gl->gl_name.ln_type != LM_TYPE_INODE)
-			delay = 0;
-		__gfs2_glock_queue_work(gl, delay);
-	}
-
-	/*
-	 * Drop the remaining glock references manually here. (Mind that
-	 * __gfs2_glock_queue_work depends on the lockref spinlock begin held
-	 * here as well.)
-	 */
-	gl->gl_lockref.count -= drop_refs;
-	if (!gl->gl_lockref.count) {
-		__gfs2_glock_put(gl);
-		return;
-	}
-	spin_unlock(&gl->gl_lockref.lock);
+	state_machine(gl, GL_ST_WORK, 0);
 }
 
 static struct gfs2_glock *find_insert_glock(struct lm_lockname *name,
diff --git a/fs/gfs2/glock.h b/fs/gfs2/glock.h
index 0239d3a9040c..0b1dffb92e8a 100644
--- a/fs/gfs2/glock.h
+++ b/fs/gfs2/glock.h
@@ -130,6 +130,7 @@ enum gl_machine_states {
 	GL_ST_BLOCKING_DEMOTE = 5, /* Demote is in progress - blocking */
 	GL_ST_PROMOTE = 6,	   /* Promote the lock */
 	GL_ST_RUN = 7,		/* "Run" or progress the lock */
+	GL_ST_WORK = 8,		/* Perform general glock work */
 };
 
 struct lm_lockops {
-- 
2.19.1




More information about the Cluster-devel mailing list