diff --git a/fs/ceph/Makefile b/fs/ceph/Makefile
index a699e320393f2afba3785d2469da36b2c32350fd..c1da294418d14b86ea77feaa7027ea880b605ed3 100644
--- a/fs/ceph/Makefile
+++ b/fs/ceph/Makefile
@@ -6,7 +6,7 @@
 obj-$(CONFIG_CEPH_FS) += ceph.o
 
 ceph-y := super.o inode.o dir.o file.o locks.o addr.o ioctl.o \
-	export.o caps.o snap.o xattr.o quota.o \
+	export.o caps.o snap.o xattr.o quota.o io.o \
 	mds_client.o mdsmap.o strings.o ceph_frag.o \
 	debugfs.o
 
diff --git a/fs/ceph/file.c b/fs/ceph/file.c
index 5182e1a49d6fa3cbf3ee35d6dd6fc1cabfd1a9ff..ff17a81bf2e2a03d899c55b15b5a6760eee750c4 100644
--- a/fs/ceph/file.c
+++ b/fs/ceph/file.c
@@ -15,6 +15,7 @@
 #include "super.h"
 #include "mds_client.h"
 #include "cache.h"
+#include "io.h"
 
 static __le32 ceph_flags_sys2wire(u32 flags)
 {
@@ -930,7 +931,7 @@ ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter,
 	struct ceph_aio_request *aio_req = NULL;
 	int num_pages = 0;
 	int flags;
-	int ret;
+	int ret = 0;
 	struct timespec64 mtime = current_time(inode);
 	size_t count = iov_iter_count(iter);
 	loff_t pos = iocb->ki_pos;
@@ -944,11 +945,6 @@ ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter,
 	     (write ? "write" : "read"), file, pos, (unsigned)count,
 	     snapc, snapc ? snapc->seq : 0);
 
-	ret = filemap_write_and_wait_range(inode->i_mapping,
-					   pos, pos + count - 1);
-	if (ret < 0)
-		return ret;
-
 	if (write) {
 		int ret2 = invalidate_inode_pages2_range(inode->i_mapping,
 					pos >> PAGE_SHIFT,
@@ -1284,12 +1280,16 @@ static ssize_t ceph_read_iter(struct kiocb *iocb, struct iov_iter *to)
 
 		if (ci->i_inline_version == CEPH_INLINE_NONE) {
 			if (!retry_op && (iocb->ki_flags & IOCB_DIRECT)) {
+				ceph_start_io_direct(inode);
 				ret = ceph_direct_read_write(iocb, to,
 							     NULL, NULL);
+				ceph_end_io_direct(inode);
 				if (ret >= 0 && ret < len)
 					retry_op = CHECK_EOF;
 			} else {
+				ceph_start_io_read(inode);
 				ret = ceph_sync_read(iocb, to, &retry_op);
+				ceph_end_io_read(inode);
 			}
 		} else {
 			retry_op = READ_INLINE;
@@ -1300,7 +1300,9 @@ static ssize_t ceph_read_iter(struct kiocb *iocb, struct iov_iter *to)
 		     inode, ceph_vinop(inode), iocb->ki_pos, (unsigned)len,
 		     ceph_cap_string(got));
 		ceph_add_rw_context(fi, &rw_ctx);
+		ceph_start_io_read(inode);
 		ret = generic_file_read_iter(iocb, to);
+		ceph_end_io_read(inode);
 		ceph_del_rw_context(fi, &rw_ctx);
 	}
 	dout("aio_read %p %llx.%llx dropping cap refs on %s = %d\n",
@@ -1409,7 +1411,10 @@ static ssize_t ceph_write_iter(struct kiocb *iocb, struct iov_iter *from)
 		return -ENOMEM;
 
 retry_snap:
-	inode_lock(inode);
+	if (iocb->ki_flags & IOCB_DIRECT)
+		ceph_start_io_direct(inode);
+	else
+		ceph_start_io_write(inode);
 
 	/* We can write back this queue in page reclaim */
 	current->backing_dev_info = inode_to_bdi(inode);
@@ -1480,7 +1485,6 @@ static ssize_t ceph_write_iter(struct kiocb *iocb, struct iov_iter *from)
 	    (ci->i_ceph_flags & CEPH_I_ERROR_WRITE)) {
 		struct ceph_snap_context *snapc;
 		struct iov_iter data;
-		inode_unlock(inode);
 
 		spin_lock(&ci->i_ceph_lock);
 		if (__ceph_have_pending_cap_snap(ci)) {
@@ -1497,11 +1501,14 @@ static ssize_t ceph_write_iter(struct kiocb *iocb, struct iov_iter *from)
 
 		/* we might need to revert back to that point */
 		data = *from;
-		if (iocb->ki_flags & IOCB_DIRECT)
+		if (iocb->ki_flags & IOCB_DIRECT) {
 			written = ceph_direct_read_write(iocb, &data, snapc,
 							 &prealloc_cf);
-		else
+			ceph_end_io_direct(inode);
+		} else {
 			written = ceph_sync_write(iocb, &data, pos, snapc);
+			ceph_end_io_write(inode);
+		}
 		if (written > 0)
 			iov_iter_advance(from, written);
 		ceph_put_snap_context(snapc);
@@ -1516,7 +1523,7 @@ static ssize_t ceph_write_iter(struct kiocb *iocb, struct iov_iter *from)
 		written = generic_perform_write(file, from, pos);
 		if (likely(written >= 0))
 			iocb->ki_pos = pos + written;
-		inode_unlock(inode);
+		ceph_end_io_write(inode);
 	}
 
 	if (written >= 0) {
@@ -1551,9 +1558,11 @@ static ssize_t ceph_write_iter(struct kiocb *iocb, struct iov_iter *from)
 	}
 
 	goto out_unlocked;
-
 out:
-	inode_unlock(inode);
+	if (iocb->ki_flags & IOCB_DIRECT)
+		ceph_end_io_direct(inode);
+	else
+		ceph_end_io_write(inode);
 out_unlocked:
 	ceph_free_cap_flush(prealloc_cf);
 	current->backing_dev_info = NULL;
diff --git a/fs/ceph/io.c b/fs/ceph/io.c
new file mode 100644
index 0000000000000000000000000000000000000000..97602ea92ff439d671742de30af72bf637e929b1
--- /dev/null
+++ b/fs/ceph/io.c
@@ -0,0 +1,163 @@
+// SPDX-License-Identifier: GPL-2.0
+/*
+ * Copyright (c) 2016 Trond Myklebust
+ * Copyright (c) 2019 Jeff Layton
+ *
+ * I/O and data path helper functionality.
+ *
+ * Heavily borrowed from equivalent code in fs/nfs/io.c
+ */
+
+#include <linux/ceph/ceph_debug.h>
+
+#include <linux/types.h>
+#include <linux/kernel.h>
+#include <linux/rwsem.h>
+#include <linux/fs.h>
+
+#include "super.h"
+#include "io.h"
+
+/* Call with exclusively locked inode->i_rwsem */
+static void ceph_block_o_direct(struct ceph_inode_info *ci, struct inode *inode)
+{
+	lockdep_assert_held_write(&inode->i_rwsem);
+
+	if (READ_ONCE(ci->i_ceph_flags) & CEPH_I_ODIRECT) {
+		spin_lock(&ci->i_ceph_lock);
+		ci->i_ceph_flags &= ~CEPH_I_ODIRECT;
+		spin_unlock(&ci->i_ceph_lock);
+		inode_dio_wait(inode);
+	}
+}
+
+/**
+ * ceph_start_io_read - declare the file is being used for buffered reads
+ * @inode: file inode
+ *
+ * Declare that a buffered read operation is about to start, and ensure
+ * that we block all direct I/O.
+ * On exit, the function ensures that the CEPH_I_ODIRECT flag is unset,
+ * and holds a shared lock on inode->i_rwsem to ensure that the flag
+ * cannot be changed.
+ * In practice, this means that buffered read operations are allowed to
+ * execute in parallel, thanks to the shared lock, whereas direct I/O
+ * operations need to wait to grab an exclusive lock in order to set
+ * CEPH_I_ODIRECT.
+ * Note that buffered writes and truncates both take a write lock on
+ * inode->i_rwsem, meaning that those are serialised w.r.t. the reads.
+ */
+void
+ceph_start_io_read(struct inode *inode)
+{
+	struct ceph_inode_info *ci = ceph_inode(inode);
+
+	/* Be an optimist! */
+	down_read(&inode->i_rwsem);
+	if (!(READ_ONCE(ci->i_ceph_flags) & CEPH_I_ODIRECT))
+		return;
+	up_read(&inode->i_rwsem);
+	/* Slow path.... */
+	down_write(&inode->i_rwsem);
+	ceph_block_o_direct(ci, inode);
+	downgrade_write(&inode->i_rwsem);
+}
+
+/**
+ * ceph_end_io_read - declare that the buffered read operation is done
+ * @inode: file inode
+ *
+ * Declare that a buffered read operation is done, and release the shared
+ * lock on inode->i_rwsem.
+ */
+void
+ceph_end_io_read(struct inode *inode)
+{
+	up_read(&inode->i_rwsem);
+}
+
+/**
+ * ceph_start_io_write - declare the file is being used for buffered writes
+ * @inode: file inode
+ *
+ * Declare that a buffered write operation is about to start, and ensure
+ * that we block all direct I/O.
+ */
+void
+ceph_start_io_write(struct inode *inode)
+{
+	down_write(&inode->i_rwsem);
+	ceph_block_o_direct(ceph_inode(inode), inode);
+}
+
+/**
+ * ceph_end_io_write - declare that the buffered write operation is done
+ * @inode: file inode
+ *
+ * Declare that a buffered write operation is done, and release the
+ * lock on inode->i_rwsem.
+ */
+void
+ceph_end_io_write(struct inode *inode)
+{
+	up_write(&inode->i_rwsem);
+}
+
+/* Call with exclusively locked inode->i_rwsem */
+static void ceph_block_buffered(struct ceph_inode_info *ci, struct inode *inode)
+{
+	lockdep_assert_held_write(&inode->i_rwsem);
+
+	if (!(READ_ONCE(ci->i_ceph_flags) & CEPH_I_ODIRECT)) {
+		spin_lock(&ci->i_ceph_lock);
+		ci->i_ceph_flags |= CEPH_I_ODIRECT;
+		spin_unlock(&ci->i_ceph_lock);
+		/* FIXME: unmap_mapping_range? */
+		filemap_write_and_wait(inode->i_mapping);
+	}
+}
+
+/**
+ * ceph_end_io_direct - declare the file is being used for direct i/o
+ * @inode: file inode
+ *
+ * Declare that a direct I/O operation is about to start, and ensure
+ * that we block all buffered I/O.
+ * On exit, the function ensures that the CEPH_I_ODIRECT flag is set,
+ * and holds a shared lock on inode->i_rwsem to ensure that the flag
+ * cannot be changed.
+ * In practice, this means that direct I/O operations are allowed to
+ * execute in parallel, thanks to the shared lock, whereas buffered I/O
+ * operations need to wait to grab an exclusive lock in order to clear
+ * CEPH_I_ODIRECT.
+ * Note that buffered writes and truncates both take a write lock on
+ * inode->i_rwsem, meaning that those are serialised w.r.t. O_DIRECT.
+ */
+void
+ceph_start_io_direct(struct inode *inode)
+{
+	struct ceph_inode_info *ci = ceph_inode(inode);
+
+	/* Be an optimist! */
+	down_read(&inode->i_rwsem);
+	if (READ_ONCE(ci->i_ceph_flags) & CEPH_I_ODIRECT)
+		return;
+	up_read(&inode->i_rwsem);
+	/* Slow path.... */
+	down_write(&inode->i_rwsem);
+	ceph_block_buffered(ci, inode);
+	downgrade_write(&inode->i_rwsem);
+}
+
+/**
+ * ceph_end_io_direct - declare that the direct i/o operation is done
+ * @inode: file inode
+ *
+ * Declare that a direct I/O operation is done, and release the shared
+ * lock on inode->i_rwsem.
+ */
+void
+ceph_end_io_direct(struct inode *inode)
+{
+	up_read(&inode->i_rwsem);
+}
diff --git a/fs/ceph/io.h b/fs/ceph/io.h
new file mode 100644
index 0000000000000000000000000000000000000000..fa594cd77348ac7bc2acdc8ddc37b8d1c7ce9b22
--- /dev/null
+++ b/fs/ceph/io.h
@@ -0,0 +1,12 @@
+/* SPDX-License-Identifier: GPL-2.0 */
+#ifndef _FS_CEPH_IO_H
+#define _FS_CEPH_IO_H
+
+void ceph_start_io_read(struct inode *inode);
+void ceph_end_io_read(struct inode *inode);
+void ceph_start_io_write(struct inode *inode);
+void ceph_end_io_write(struct inode *inode);
+void ceph_start_io_direct(struct inode *inode);
+void ceph_end_io_direct(struct inode *inode);
+
+#endif /* FS_CEPH_IO_H */
diff --git a/fs/ceph/super.h b/fs/ceph/super.h
index 03e4828c76359e595fb13f7450f965c92b63147b..98d7190289c847e7c9cf1169a2eab2f6ffc88e26 100644
--- a/fs/ceph/super.h
+++ b/fs/ceph/super.h
@@ -513,10 +513,10 @@ static inline struct inode *ceph_find_inode(struct super_block *sb,
 #define CEPH_I_SEC_INITED	(1 << 6)  /* security initialized */
 #define CEPH_I_CAP_DROPPED	(1 << 7)  /* caps were forcibly dropped */
 #define CEPH_I_KICK_FLUSH	(1 << 8)  /* kick flushing caps */
-#define CEPH_I_FLUSH_SNAPS	(1 << 9) /* need flush snapss */
+#define CEPH_I_FLUSH_SNAPS	(1 << 9)  /* need flush snapss */
 #define CEPH_I_ERROR_WRITE	(1 << 10) /* have seen write errors */
 #define CEPH_I_ERROR_FILELOCK	(1 << 11) /* have seen file lock errors */
-
+#define CEPH_I_ODIRECT		(1 << 12) /* inode in direct I/O mode */
 
 /*
  * Masks of ceph inode work.