diff --git a/drivers/md/dm-core.h b/drivers/md/dm-core.h
index 688aeba248d7e..8ef780a6baaee 100644
--- a/drivers/md/dm-core.h
+++ b/drivers/md/dm-core.h
@@ -22,6 +22,8 @@
 
 #define DM_RESERVED_MAX_IOS		1024
 
+struct dm_io;
+
 struct dm_kobject_holder {
 	struct kobject kobj;
 	struct completion completion;
@@ -91,6 +93,14 @@ struct mapped_device {
 	spinlock_t deferred_lock;
 	struct bio_list deferred;
 
+	/*
+	 * requeue work context is needed for cloning one new bio
+	 * to represent the dm_io to be requeued, since each
+	 * dm_io may point to the original bio from FS.
+	 */
+	struct work_struct requeue_work;
+	struct dm_io *requeue_list;
+
 	void *interface_ptr;
 
 	/*
@@ -275,7 +285,6 @@ struct dm_io {
 	atomic_t io_count;
 	struct mapped_device *md;
 
-	struct bio *split_bio;
 	/* The three fields represent mapped part of original bio */
 	struct bio *orig_bio;
 	unsigned int sector_offset; /* offset to end of orig_bio */
@@ -303,6 +312,8 @@ static inline void dm_io_set_flag(struct dm_io *io, unsigned int bit)
 	io->flags |= (1U << bit);
 }
 
+void dm_io_rewind(struct dm_io *io, struct bio_set *bs);
+
 static inline struct completion *dm_get_completion_from_kobject(struct kobject *kobj)
 {
 	return &container_of(kobj, struct dm_kobject_holder, kobj)->completion;
@@ -319,6 +330,4 @@ extern atomic_t dm_global_event_nr;
 extern wait_queue_head_t dm_global_eventq;
 void dm_issue_global_event(void);
 
-void dm_bio_rewind(struct bio *bio, unsigned bytes);
-
 #endif
diff --git a/drivers/md/dm-io-rewind.c b/drivers/md/dm-io-rewind.c
index fa59ced30faf0..0db53ccb94ba7 100644
--- a/drivers/md/dm-io-rewind.c
+++ b/drivers/md/dm-io-rewind.c
@@ -131,7 +131,7 @@ static inline void dm_bio_rewind_iter(const struct bio *bio,
  * rewinding from end of bio and restoring its original position.
  * Caller is also responsibile for restoring bio's size.
  */
-void dm_bio_rewind(struct bio *bio, unsigned bytes)
+static void dm_bio_rewind(struct bio *bio, unsigned bytes)
 {
 	if (bio_integrity(bio))
 		dm_bio_integrity_rewind(bio, bytes);
@@ -141,3 +141,26 @@ void dm_bio_rewind(struct bio *bio, unsigned bytes)
 
 	dm_bio_rewind_iter(bio, &bio->bi_iter, bytes);
 }
+
+void dm_io_rewind(struct dm_io *io, struct bio_set *bs)
+{
+	struct bio *orig = io->orig_bio;
+	struct bio *new_orig = bio_alloc_clone(orig->bi_bdev, orig,
+					       GFP_NOIO, bs);
+	/*
+	 * dm_bio_rewind can restore to previous position since the
+	 * end sector is fixed for original bio, but we still need
+	 * to restore bio's size manually (using io->sectors).
+	 */
+	dm_bio_rewind(new_orig, ((io->sector_offset << 9) -
+				 orig->bi_iter.bi_size));
+	bio_trim(new_orig, 0, io->sectors);
+
+	bio_chain(new_orig, orig);
+	/*
+	 * __bi_remaining was increased (by dm_split_and_process_bio),
+	 * so must drop the one added in bio_chain.
+	 */
+	atomic_dec(&orig->__bi_remaining);
+	io->orig_bio = new_orig;
+}
diff --git a/drivers/md/dm.c b/drivers/md/dm.c
index c987f9ad24a40..fa68391411186 100644
--- a/drivers/md/dm.c
+++ b/drivers/md/dm.c
@@ -590,7 +590,6 @@ static struct dm_io *alloc_io(struct mapped_device *md, struct bio *bio)
 	atomic_set(&io->io_count, 2);
 	this_cpu_inc(*md->pending_io);
 	io->orig_bio = bio;
-	io->split_bio = NULL;
 	io->md = md;
 	spin_lock_init(&io->lock);
 	io->start_time = jiffies;
@@ -880,13 +879,35 @@ static int __noflush_suspending(struct mapped_device *md)
 	return test_bit(DMF_NOFLUSH_SUSPENDING, &md->flags);
 }
 
+static void dm_requeue_add_io(struct dm_io *io, bool first_stage)
+{
+	struct mapped_device *md = io->md;
+
+	if (first_stage) {
+		struct dm_io *next = md->requeue_list;
+
+		md->requeue_list = io;
+		io->next = next;
+	} else {
+		bio_list_add_head(&md->deferred, io->orig_bio);
+	}
+}
+
+static void dm_kick_requeue(struct mapped_device *md, bool first_stage)
+{
+	if (first_stage)
+		queue_work(md->wq, &md->requeue_work);
+	else
+		queue_work(md->wq, &md->work);
+}
+
 /*
  * Return true if the dm_io's original bio is requeued.
  * io->status is updated with error if requeue disallowed.
  */
-static bool dm_handle_requeue(struct dm_io *io)
+static bool dm_handle_requeue(struct dm_io *io, bool first_stage)
 {
-	struct bio *bio = io->split_bio ? io->split_bio : io->orig_bio;
+	struct bio *bio = io->orig_bio;
 	bool handle_requeue = (io->status == BLK_STS_DM_REQUEUE);
 	bool handle_polled_eagain = ((io->status == BLK_STS_AGAIN) &&
 				     (bio->bi_opf & REQ_POLLED));
@@ -912,8 +933,8 @@ static bool dm_handle_requeue(struct dm_io *io)
 		spin_lock_irqsave(&md->deferred_lock, flags);
 		if ((__noflush_suspending(md) &&
 		     !WARN_ON_ONCE(dm_is_zone_write(md, bio))) ||
-		    handle_polled_eagain) {
-			bio_list_add_head(&md->deferred, bio);
+		    handle_polled_eagain || first_stage) {
+			dm_requeue_add_io(io, first_stage);
 			requeued = true;
 		} else {
 			/*
@@ -926,19 +947,21 @@ static bool dm_handle_requeue(struct dm_io *io)
 	}
 
 	if (requeued)
-		queue_work(md->wq, &md->work);
+		dm_kick_requeue(md, first_stage);
 
 	return requeued;
 }
 
-static void dm_io_complete(struct dm_io *io)
+static void __dm_io_complete(struct dm_io *io, bool first_stage)
 {
-	struct bio *bio = io->split_bio ? io->split_bio : io->orig_bio;
+	struct bio *bio = io->orig_bio;
 	struct mapped_device *md = io->md;
 	blk_status_t io_error;
 	bool requeued;
 
-	requeued = dm_handle_requeue(io);
+	requeued = dm_handle_requeue(io, first_stage);
+	if (requeued && first_stage)
+		return;
 
 	io_error = io->status;
 	if (dm_io_flagged(io, DM_IO_ACCOUNTED))
@@ -978,6 +1001,58 @@ static void dm_io_complete(struct dm_io *io)
 	}
 }
 
+static void dm_wq_requeue_work(struct work_struct *work)
+{
+	struct mapped_device *md = container_of(work, struct mapped_device,
+						requeue_work);
+	unsigned long flags;
+	struct dm_io *io;
+
+	/* reuse deferred lock to simplify dm_handle_requeue */
+	spin_lock_irqsave(&md->deferred_lock, flags);
+	io = md->requeue_list;
+	md->requeue_list = NULL;
+	spin_unlock_irqrestore(&md->deferred_lock, flags);
+
+	while (io) {
+		struct dm_io *next = io->next;
+
+		dm_io_rewind(io, &md->queue->bio_split);
+
+		io->next = NULL;
+		__dm_io_complete(io, false);
+		io = next;
+	}
+}
+
+/*
+ * Two staged requeue:
+ *
+ * 1) io->orig_bio points to the real original bio, and the part mapped to
+ *    this io must be requeued, instead of other parts of the original bio.
+ *
+ * 2) io->orig_bio points to new cloned bio which matches the requeued dm_io.
+ */
+static void dm_io_complete(struct dm_io *io)
+{
+	bool first_requeue;
+
+	/*
+	 * Only dm_io that has been split needs two stage requeue, otherwise
+	 * we may run into long bio clone chain during suspend and OOM could
+	 * be triggered.
+	 *
+	 * Also flush data dm_io won't be marked as DM_IO_WAS_SPLIT, so they
+	 * also aren't handled via the first stage requeue.
+	 */
+	if (dm_io_flagged(io, DM_IO_WAS_SPLIT))
+		first_requeue = true;
+	else
+		first_requeue = false;
+
+	__dm_io_complete(io, first_requeue);
+}
+
 /*
  * Decrements the number of outstanding ios that a bio has been
  * cloned into, completing the original io if necc.
@@ -1256,6 +1331,7 @@ static size_t dm_dax_recovery_write(struct dax_device *dax_dev, pgoff_t pgoff,
 void dm_accept_partial_bio(struct bio *bio, unsigned n_sectors)
 {
 	struct dm_target_io *tio = clone_to_tio(bio);
+	struct dm_io *io = tio->io;
 	unsigned bio_sectors = bio_sectors(bio);
 
 	BUG_ON(dm_tio_flagged(tio, DM_TIO_IS_DUPLICATE_BIO));
@@ -1271,8 +1347,9 @@ void dm_accept_partial_bio(struct bio *bio, unsigned n_sectors)
 	 * __split_and_process_bio() may have already saved mapped part
 	 * for accounting but it is being reduced so update accordingly.
 	 */
-	dm_io_set_flag(tio->io, DM_IO_WAS_SPLIT);
-	tio->io->sectors = n_sectors;
+	dm_io_set_flag(io, DM_IO_WAS_SPLIT);
+	io->sectors = n_sectors;
+	io->sector_offset = bio_sectors(io->orig_bio);
 }
 EXPORT_SYMBOL_GPL(dm_accept_partial_bio);
 
@@ -1395,17 +1472,7 @@ static void setup_split_accounting(struct clone_info *ci, unsigned len)
 		 */
 		dm_io_set_flag(io, DM_IO_WAS_SPLIT);
 		io->sectors = len;
-	}
-
-	if (static_branch_unlikely(&stats_enabled) &&
-	    unlikely(dm_stats_used(&io->md->stats))) {
-		/*
-		 * Save bi_sector in terms of its offset from end of
-		 * original bio, only needed for DM-stats' benefit.
-		 * - saved regardless of whether split needed so that
-		 *   dm_accept_partial_bio() doesn't need to.
-		 */
-		io->sector_offset = bio_end_sector(ci->bio) - ci->sector;
+		io->sector_offset = bio_sectors(ci->bio);
 	}
 }
 
@@ -1705,11 +1772,9 @@ static void dm_split_and_process_bio(struct mapped_device *md,
 	 * Remainder must be passed to submit_bio_noacct() so it gets handled
 	 * *after* bios already submitted have been completely processed.
 	 */
-	WARN_ON_ONCE(!dm_io_flagged(io, DM_IO_WAS_SPLIT));
-	io->split_bio = bio_split(bio, io->sectors, GFP_NOIO,
-				  &md->queue->bio_split);
-	bio_chain(io->split_bio, bio);
-	trace_block_split(io->split_bio, bio->bi_iter.bi_sector);
+	bio_trim(bio, io->sectors, ci.sector_count);
+	trace_block_split(bio, bio->bi_iter.bi_sector);
+	bio_inc_remaining(bio);
 	submit_bio_noacct(bio);
 out:
 	/*
@@ -1985,9 +2050,11 @@ static struct mapped_device *alloc_dev(int minor)
 
 	init_waitqueue_head(&md->wait);
 	INIT_WORK(&md->work, dm_wq_work);
+	INIT_WORK(&md->requeue_work, dm_wq_requeue_work);
 	init_waitqueue_head(&md->eventq);
 	init_completion(&md->kobj_holder.completion);
 
+	md->requeue_list = NULL;
 	md->swap_bios = get_swap_bios();
 	sema_init(&md->swap_bios_semaphore, md->swap_bios);
 	mutex_init(&md->swap_bios_lock);