[dm-devel] [PATCH 6/15] block copy: use asynchronous notification

Mikulas Patocka mpatocka at redhat.com
Tue Jul 15 19:37:37 UTC 2014


block copy: use asynchronous notification

In dm-snapshot target there may be large number of copy requests in
progress. If every pending copy request consumed a process context, it
would put too much load on the system.

To avoid this load, we need asynchronous notification when copy finishes -
we can pass a callback to the function blkdev_issue_copy, if the callback
is non-NULL, blkdev_issue_copy exits when it submits all the copy bios and
the callback is called when the copy operation finishes.

With the callback mechanism, there can be large number of in-progress copy
requests and we do not need process context for each of them.

Signed-off-by: Mikulas Patocka <mpatocka at redhat.com>

---
 block/blk-lib.c           |  152 ++++++++++++++++++++++++++++++++--------------
 block/ioctl.c             |    2 
 include/linux/blk_types.h |    5 -
 include/linux/blkdev.h    |    2 
 4 files changed, 114 insertions(+), 47 deletions(-)

Index: linux-3.16-rc5/block/blk-lib.c
===================================================================
--- linux-3.16-rc5.orig/block/blk-lib.c	2014-07-15 15:27:59.000000000 +0200
+++ linux-3.16-rc5/block/blk-lib.c	2014-07-15 16:16:53.000000000 +0200
@@ -305,6 +305,17 @@ int blkdev_issue_zeroout(struct block_de
 }
 EXPORT_SYMBOL(blkdev_issue_zeroout);
 
+struct bio_copy_batch {
+	atomic_long_t done;
+	int async_error;
+	int sync_error;
+	sector_t sync_copied;
+	atomic64_t first_error;
+	void (*callback)(void *data, int error);
+	void *data;
+	sector_t *copied;
+};
+
 #define BLK_COPY_TIMEOUT	(10 * HZ)
 
 static void blk_copy_timeout(unsigned long bc_)
@@ -329,6 +340,18 @@ static void blk_copy_timeout(unsigned lo
 		bio_endio(bio1, -ETIMEDOUT);
 }
 
+static void blk_copy_batch_finish(struct bio_copy_batch *batch)
+{
+	void (*fn)(void *, int) = batch->callback;
+	void *data = batch->data;
+	int error = unlikely(batch->sync_error) ? batch->sync_error : batch->async_error;
+	if (batch->copied)
+		*batch->copied = min(batch->sync_copied, (sector_t)atomic64_read(&batch->first_error));
+	kfree(batch);
+	if (fn)
+		fn(data, error);
+}
+
 static void bio_copy_end_io(struct bio *bio, int error)
 {
 	struct bio_copy *bc = bio->bi_copy;
@@ -350,22 +373,22 @@ static void bio_copy_end_io(struct bio *
 	}
 	bio_put(bio);
 	if (atomic_dec_and_test(&bc->in_flight)) {
-		struct bio_batch *bb = bc->private;
+		struct bio_copy_batch *batch = bc->batch;
 		if (unlikely(bc->error < 0)) {
 			u64 first_error;
-			if (!ACCESS_ONCE(bb->error))
-				ACCESS_ONCE(bb->error) = bc->error;
+			if (!ACCESS_ONCE(batch->async_error))
+				ACCESS_ONCE(batch->async_error) = bc->error;
 			do {
-				first_error = atomic64_read(bc->first_error);
+				first_error = atomic64_read(&batch->first_error);
 				if (bc->offset >= first_error)
 					break;
-			} while (unlikely(atomic64_cmpxchg(bc->first_error,
+			} while (unlikely(atomic64_cmpxchg(&batch->first_error,
 				first_error, bc->offset) != first_error));
 		}
 		del_timer_sync(&bc->timer);
 		kfree(bc);
-		if (atomic_dec_and_test(&bb->done))
-			complete(bb->wait);
+		if (atomic_long_dec_and_test(&batch->done))
+			blk_copy_batch_finish(batch);
 	}
 }
 
@@ -394,6 +417,18 @@ static unsigned blkdev_copy_merge(struct
 	}
 }
 
+struct bio_copy_completion {
+	struct completion wait;
+	int error;
+};
+
+static void bio_copy_sync_callback(void *ptr, int error)
+{
+	struct bio_copy_completion *comp = ptr;
+	comp->error = error;
+	complete(&comp->wait);
+}
+
 /**
  * blkdev_issue_copy - queue a copy same operation
  * @src_bdev:	source blockdev
@@ -408,69 +443,95 @@ static unsigned blkdev_copy_merge(struct
  */
 int blkdev_issue_copy(struct block_device *src_bdev, sector_t src_sector,
 		      struct block_device *dst_bdev, sector_t dst_sector,
-		      sector_t nr_sects, gfp_t gfp_mask, sector_t *copied)
+		      sector_t nr_sects, gfp_t gfp_mask,
+		      void (*callback)(void *, int), void *data,
+		      sector_t *copied)
 {
 	DECLARE_COMPLETION_ONSTACK(wait);
 	struct request_queue *sq = bdev_get_queue(src_bdev);
 	struct request_queue *dq = bdev_get_queue(dst_bdev);
 	unsigned int max_copy_sectors;
-	struct bio_batch bb;
-	int ret = 0;
-	atomic64_t first_error = ATOMIC64_INIT(nr_sects);
-	sector_t offset = 0;
+	int ret;
+	struct bio_copy_batch *batch;
+	struct bio_copy_completion comp;
 
 	if (copied)
 		*copied = 0;
 
-	if (!sq || !dq)
-		return -ENXIO;
+	if (!sq || !dq) {
+		ret = -ENXIO;
+		goto end_callback;
+	}
 
 	max_copy_sectors = min(sq->limits.max_copy_sectors,
 			       dq->limits.max_copy_sectors);
 
-	if (max_copy_sectors == 0)
-		return -EOPNOTSUPP;
+	if (max_copy_sectors == 0) {
+		ret = -EOPNOTSUPP;
+		goto end_callback;
+	}
 
 	if (src_sector + nr_sects < src_sector ||
-	    dst_sector + nr_sects < dst_sector)
-		return -EINVAL;
+	    dst_sector + nr_sects < dst_sector) {
+		ret = -EINVAL;
+		goto end_callback;
+	}
 
 	/* Do not support overlapping copies */
 	if (src_bdev == dst_bdev &&
-	    abs64((u64)dst_sector - (u64)src_sector) < nr_sects)
-		return -EOPNOTSUPP;
+	    abs64((u64)dst_sector - (u64)src_sector) < nr_sects) {
+		ret = -EOPNOTSUPP;
+		goto end_callback;
+	}
+
+	batch = kmalloc(sizeof(struct bio_copy_batch), gfp_mask);
+	if (!batch) {
+		ret = -ENOMEM;
+		goto end_callback;
+	}
 
-	atomic_set(&bb.done, 1);
-	bb.error = 0;
-	bb.wait = &wait;
+	batch->done = (atomic_long_t)ATOMIC_LONG_INIT(1);
+	batch->async_error = 0;
+	batch->sync_error = 0;
+	batch->sync_copied = 0;
+	batch->first_error = (atomic64_t)ATOMIC64_INIT(nr_sects);
+	batch->copied = copied;
+	if (callback) {
+		batch->callback = callback;
+		batch->data = data;
+	} else {
+		comp.wait = COMPLETION_INITIALIZER_ONSTACK(comp.wait);
+		batch->callback = bio_copy_sync_callback;
+		batch->data = ∁
+	}
 
-	while (nr_sects && !ACCESS_ONCE(bb.error)) {
+	while (nr_sects && !ACCESS_ONCE(batch->async_error)) {
 		struct bio *read_bio, *write_bio;
 		struct bio_copy *bc;
 		unsigned chunk = (unsigned)min(nr_sects, (sector_t)max_copy_sectors);
 
 		chunk = blkdev_copy_merge(src_bdev, sq, READ | REQ_COPY, src_sector, chunk);
 		if (!chunk) {
-			ret = -EOPNOTSUPP;
+			batch->sync_error = -EOPNOTSUPP;
 			break;
 		}
 
 		chunk = blkdev_copy_merge(dst_bdev, dq, WRITE | REQ_COPY, dst_sector, chunk);
 		if (!chunk) {
-			ret = -EOPNOTSUPP;
+			batch->sync_error = -EOPNOTSUPP;
 			break;
 		}
 
 		bc = kmalloc(sizeof(struct bio_copy), gfp_mask);
 		if (!bc) {
-			ret = -ENOMEM;
+			batch->sync_error = -ENOMEM;
 			break;
 		}
 
 		read_bio = bio_alloc(gfp_mask, 1);
 		if (!read_bio) {
 			kfree(bc);
-			ret = -ENOMEM;
+			batch->sync_error = -ENOMEM;
 			break;
 		}
 
@@ -478,7 +539,7 @@ int blkdev_issue_copy(struct block_devic
 		if (!write_bio) {
 			bio_put(read_bio);
 			kfree(bc);
-			ret = -ENOMEM;
+			batch->sync_error = -ENOMEM;
 			break;
 		}
 
@@ -486,9 +547,8 @@ int blkdev_issue_copy(struct block_devic
 		bc->error = 1;
 		bc->pair[0] = NULL;
 		bc->pair[1] = NULL;
-		bc->private = &bb;
-		bc->first_error = &first_error;
-		bc->offset = offset;
+		bc->batch = batch;
+		bc->offset = batch->sync_copied;
 		spin_lock_init(&bc->spinlock);
 		__setup_timer(&bc->timer, blk_copy_timeout, (unsigned long)bc, TIMER_IRQSAFE);
 		mod_timer(&bc->timer, jiffies + BLK_COPY_TIMEOUT);
@@ -505,27 +565,33 @@ int blkdev_issue_copy(struct block_devic
 		write_bio->bi_bdev = dst_bdev;
 		write_bio->bi_copy = bc;
 
-		atomic_inc(&bb.done);
+		atomic_long_inc(&batch->done);
 		submit_bio(READ | REQ_COPY, read_bio);
 		submit_bio(WRITE | REQ_COPY, write_bio);
 
 		src_sector += chunk;
 		dst_sector += chunk;
 		nr_sects -= chunk;
-		offset += chunk;
+		batch->sync_copied += chunk;
 	}
 
-	/* Wait for bios in-flight */
-	if (!atomic_dec_and_test(&bb.done))
-		wait_for_completion_io(&wait);
+	if (atomic_long_dec_and_test(&batch->done))
+		blk_copy_batch_finish(batch);
 
-	if (copied)
-		*copied = min((sector_t)atomic64_read(&first_error), offset);
-
-	if (likely(!ret))
-		ret = bb.error;
+	if (callback) {
+		return 0;
+	} else {
+		wait_for_completion_io(&comp.wait);
+		return comp.error;
+	}
 
-	return ret;
+end_callback:
+	if (callback) {
+		callback(data, ret);
+		return 0;
+	} else {
+		return ret;
+	}
 }
 EXPORT_SYMBOL(blkdev_issue_copy);
 
Index: linux-3.16-rc5/include/linux/blk_types.h
===================================================================
--- linux-3.16-rc5.orig/include/linux/blk_types.h	2014-07-15 15:27:51.000000000 +0200
+++ linux-3.16-rc5/include/linux/blk_types.h	2014-07-15 15:28:46.000000000 +0200
@@ -40,6 +40,8 @@ struct bvec_iter {
 						   current bvec */
 };
 
+struct bio_copy_batch;
+
 struct bio_copy {
 	/*
 	 * error == 1 - bios are waiting to be paired
@@ -49,8 +51,7 @@ struct bio_copy {
 	int error;
 	atomic_t in_flight;
 	struct bio *pair[2];
-	void *private;
-	atomic64_t *first_error;
+	struct bio_copy_batch *batch;
 	sector_t offset;
 	spinlock_t spinlock;
 	struct timer_list timer;
Index: linux-3.16-rc5/include/linux/blkdev.h
===================================================================
--- linux-3.16-rc5.orig/include/linux/blkdev.h	2014-07-15 15:27:49.000000000 +0200
+++ linux-3.16-rc5/include/linux/blkdev.h	2014-07-15 15:28:46.000000000 +0200
@@ -1173,7 +1173,7 @@ extern int blkdev_issue_write_same(struc
 		sector_t nr_sects, gfp_t gfp_mask, struct page *page);
 extern int blkdev_issue_copy(struct block_device *, sector_t,
 		struct block_device *, sector_t, sector_t, gfp_t,
-		sector_t *);
+		void (*)(void *, int), void *, sector_t *);
 extern int blkdev_issue_zeroout(struct block_device *bdev, sector_t sector,
 			sector_t nr_sects, gfp_t gfp_mask);
 static inline int sb_issue_discard(struct super_block *sb, sector_t block,
Index: linux-3.16-rc5/block/ioctl.c
===================================================================
--- linux-3.16-rc5.orig/block/ioctl.c	2014-07-15 15:27:49.000000000 +0200
+++ linux-3.16-rc5/block/ioctl.c	2014-07-15 15:28:46.000000000 +0200
@@ -228,7 +228,7 @@ static int blk_ioctl_copy(struct block_d
 		return -EINVAL;
 
 	ret = blkdev_issue_copy(bdev, src_offset, bdev, dst_offset, len,
-				GFP_KERNEL, &copied_sec);
+				GFP_KERNEL, NULL, NULL, &copied_sec);
 
 	*copied = (uint64_t)copied_sec << 9;
 




More information about the dm-devel mailing list