diff --git a/Documentation/ABI/testing/sysfs-bus-rbd b/Documentation/ABI/testing/sysfs-bus-rbd
index 501adc2a9ec723841c5246b19875a9bc14471ada..2ddd680929d8f83dc2147592978b37a6e33dc9a5 100644
--- a/Documentation/ABI/testing/sysfs-bus-rbd
+++ b/Documentation/ABI/testing/sysfs-bus-rbd
@@ -94,5 +94,5 @@ current_snap
 
 parent
 
-	Information identifying the pool, image, and snapshot id for
-	the parent image in a layered rbd image (format 2 only).
+	Information identifying the chain of parent images in a layered rbd
+	image.  Entries are separated by empty lines.
diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
index b2c98c1bc037e8c7674722cba6b56f5e4a7383aa..623c84145b792b9ddaa852e45c2cdbf80b1f08d5 100644
--- a/drivers/block/rbd.c
+++ b/drivers/block/rbd.c
@@ -42,6 +42,7 @@
 #include <linux/blkdev.h>
 #include <linux/slab.h>
 #include <linux/idr.h>
+#include <linux/workqueue.h>
 
 #include "rbd_types.h"
 
@@ -332,7 +333,10 @@ struct rbd_device {
 
 	char			name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
 
+	struct list_head	rq_queue;	/* incoming rq queue */
 	spinlock_t		lock;		/* queue, flags, open_count */
+	struct workqueue_struct	*rq_wq;
+	struct work_struct	rq_work;
 
 	struct rbd_image_header	header;
 	unsigned long		flags;		/* possibly lock protected */
@@ -514,7 +518,8 @@ static void rbd_dev_remove_parent(struct rbd_device *rbd_dev);
 
 static int rbd_dev_refresh(struct rbd_device *rbd_dev);
 static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev);
-static int rbd_dev_v2_header_info(struct rbd_device *rbd_dev);
+static int rbd_dev_header_info(struct rbd_device *rbd_dev);
+static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev);
 static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
 					u64 snap_id);
 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
@@ -971,12 +976,6 @@ static int rbd_header_from_disk(struct rbd_device *rbd_dev,
 	header->snap_names = snap_names;
 	header->snap_sizes = snap_sizes;
 
-	/* Make sure mapping size is consistent with header info */
-
-	if (rbd_dev->spec->snap_id == CEPH_NOSNAP || first_time)
-		if (rbd_dev->mapping.size != header->image_size)
-			rbd_dev->mapping.size = header->image_size;
-
 	return 0;
 out_2big:
 	ret = -EIO;
@@ -1139,6 +1138,13 @@ static void rbd_dev_mapping_clear(struct rbd_device *rbd_dev)
 	rbd_dev->mapping.features = 0;
 }
 
+static void rbd_segment_name_free(const char *name)
+{
+	/* The explicit cast here is needed to drop the const qualifier */
+
+	kmem_cache_free(rbd_segment_name_cache, (void *)name);
+}
+
 static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
 {
 	char *name;
@@ -1158,20 +1164,13 @@ static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
 	if (ret < 0 || ret > CEPH_MAX_OID_NAME_LEN) {
 		pr_err("error formatting segment name for #%llu (%d)\n",
 			segment, ret);
-		kfree(name);
+		rbd_segment_name_free(name);
 		name = NULL;
 	}
 
 	return name;
 }
 
-static void rbd_segment_name_free(const char *name)
-{
-	/* The explicit cast here is needed to drop the const qualifier */
-
-	kmem_cache_free(rbd_segment_name_cache, (void *)name);
-}
-
 static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
 {
 	u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
@@ -1371,7 +1370,7 @@ static void obj_request_img_data_set(struct rbd_obj_request *obj_request)
 		struct rbd_device *rbd_dev;
 
 		rbd_dev = obj_request->img_request->rbd_dev;
-		rbd_warn(rbd_dev, "obj_request %p already marked img_data\n",
+		rbd_warn(rbd_dev, "obj_request %p already marked img_data",
 			obj_request);
 	}
 }
@@ -1389,7 +1388,7 @@ static void obj_request_done_set(struct rbd_obj_request *obj_request)
 
 		if (obj_request_img_data_test(obj_request))
 			rbd_dev = obj_request->img_request->rbd_dev;
-		rbd_warn(rbd_dev, "obj_request %p already marked done\n",
+		rbd_warn(rbd_dev, "obj_request %p already marked done",
 			obj_request);
 	}
 }
@@ -1527,11 +1526,37 @@ static bool obj_request_type_valid(enum obj_request_type type)
 static int rbd_obj_request_submit(struct ceph_osd_client *osdc,
 				struct rbd_obj_request *obj_request)
 {
-	dout("%s: osdc %p obj %p\n", __func__, osdc, obj_request);
-
+	dout("%s %p\n", __func__, obj_request);
 	return ceph_osdc_start_request(osdc, obj_request->osd_req, false);
 }
 
+static void rbd_obj_request_end(struct rbd_obj_request *obj_request)
+{
+	dout("%s %p\n", __func__, obj_request);
+	ceph_osdc_cancel_request(obj_request->osd_req);
+}
+
+/*
+ * Wait for an object request to complete.  If interrupted, cancel the
+ * underlying osd request.
+ */
+static int rbd_obj_request_wait(struct rbd_obj_request *obj_request)
+{
+	int ret;
+
+	dout("%s %p\n", __func__, obj_request);
+
+	ret = wait_for_completion_interruptible(&obj_request->completion);
+	if (ret < 0) {
+		dout("%s %p interrupted\n", __func__, obj_request);
+		rbd_obj_request_end(obj_request);
+		return ret;
+	}
+
+	dout("%s %p done\n", __func__, obj_request);
+	return 0;
+}
+
 static void rbd_img_request_complete(struct rbd_img_request *img_request)
 {
 
@@ -1558,15 +1583,6 @@ static void rbd_img_request_complete(struct rbd_img_request *img_request)
 		rbd_img_request_put(img_request);
 }
 
-/* Caller is responsible for rbd_obj_request_destroy(obj_request) */
-
-static int rbd_obj_request_wait(struct rbd_obj_request *obj_request)
-{
-	dout("%s: obj %p\n", __func__, obj_request);
-
-	return wait_for_completion_interruptible(&obj_request->completion);
-}
-
 /*
  * The default/initial value for all image request flags is 0.  Each
  * is conditionally set to 1 at image request initialization time
@@ -1763,7 +1779,7 @@ static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
 		rbd_osd_trivial_callback(obj_request);
 		break;
 	default:
-		rbd_warn(NULL, "%s: unsupported op %hu\n",
+		rbd_warn(NULL, "%s: unsupported op %hu",
 			obj_request->object_name, (unsigned short) opcode);
 		break;
 	}
@@ -1998,7 +2014,7 @@ static void rbd_dev_parent_put(struct rbd_device *rbd_dev)
 	if (!counter)
 		rbd_dev_unparent(rbd_dev);
 	else
-		rbd_warn(rbd_dev, "parent reference underflow\n");
+		rbd_warn(rbd_dev, "parent reference underflow");
 }
 
 /*
@@ -2028,7 +2044,7 @@ static bool rbd_dev_parent_get(struct rbd_device *rbd_dev)
 	/* Image was flattened, but parent is not yet torn down */
 
 	if (counter < 0)
-		rbd_warn(rbd_dev, "parent reference overflow\n");
+		rbd_warn(rbd_dev, "parent reference overflow");
 
 	return false;
 }
@@ -2045,7 +2061,7 @@ static struct rbd_img_request *rbd_img_request_create(
 {
 	struct rbd_img_request *img_request;
 
-	img_request = kmem_cache_alloc(rbd_img_request_cache, GFP_ATOMIC);
+	img_request = kmem_cache_alloc(rbd_img_request_cache, GFP_NOIO);
 	if (!img_request)
 		return NULL;
 
@@ -2161,11 +2177,11 @@ static bool rbd_img_obj_end_request(struct rbd_obj_request *obj_request)
 	if (result) {
 		struct rbd_device *rbd_dev = img_request->rbd_dev;
 
-		rbd_warn(rbd_dev, "%s %llx at %llx (%llx)\n",
+		rbd_warn(rbd_dev, "%s %llx at %llx (%llx)",
 			img_request_write_test(img_request) ? "write" : "read",
 			obj_request->length, obj_request->img_offset,
 			obj_request->offset);
-		rbd_warn(rbd_dev, "  result %d xferred %x\n",
+		rbd_warn(rbd_dev, "  result %d xferred %x",
 			result, xferred);
 		if (!img_request->result)
 			img_request->result = result;
@@ -2946,154 +2962,135 @@ static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
 	dout("%s: \"%s\" notify_id %llu opcode %u\n", __func__,
 		rbd_dev->header_name, (unsigned long long)notify_id,
 		(unsigned int)opcode);
+
+	/*
+	 * Until adequate refresh error handling is in place, there is
+	 * not much we can do here, except warn.
+	 *
+	 * See http://tracker.ceph.com/issues/5040
+	 */
 	ret = rbd_dev_refresh(rbd_dev);
 	if (ret)
-		rbd_warn(rbd_dev, "header refresh error (%d)\n", ret);
+		rbd_warn(rbd_dev, "refresh failed: %d", ret);
 
-	rbd_obj_notify_ack_sync(rbd_dev, notify_id);
+	ret = rbd_obj_notify_ack_sync(rbd_dev, notify_id);
+	if (ret)
+		rbd_warn(rbd_dev, "notify_ack ret %d", ret);
 }
 
 /*
- * Initiate a watch request, synchronously.
+ * Send a (un)watch request and wait for the ack.  Return a request
+ * with a ref held on success or error.
  */
-static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev)
+static struct rbd_obj_request *rbd_obj_watch_request_helper(
+						struct rbd_device *rbd_dev,
+						bool watch)
 {
 	struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
 	struct rbd_obj_request *obj_request;
 	int ret;
 
-	rbd_assert(!rbd_dev->watch_event);
-	rbd_assert(!rbd_dev->watch_request);
-
-	ret = ceph_osdc_create_event(osdc, rbd_watch_cb, rbd_dev,
-				     &rbd_dev->watch_event);
-	if (ret < 0)
-		return ret;
-
-	rbd_assert(rbd_dev->watch_event);
-
 	obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
 					     OBJ_REQUEST_NODATA);
-	if (!obj_request) {
-		ret = -ENOMEM;
-		goto out_cancel;
-	}
+	if (!obj_request)
+		return ERR_PTR(-ENOMEM);
 
 	obj_request->osd_req = rbd_osd_req_create(rbd_dev, true, 1,
 						  obj_request);
 	if (!obj_request->osd_req) {
 		ret = -ENOMEM;
-		goto out_put;
+		goto out;
 	}
 
-	ceph_osdc_set_request_linger(osdc, obj_request->osd_req);
-
 	osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_WATCH,
-			      rbd_dev->watch_event->cookie, 0, 1);
+			      rbd_dev->watch_event->cookie, 0, watch);
 	rbd_osd_req_format_write(obj_request);
 
+	if (watch)
+		ceph_osdc_set_request_linger(osdc, obj_request->osd_req);
+
 	ret = rbd_obj_request_submit(osdc, obj_request);
 	if (ret)
-		goto out_linger;
+		goto out;
 
 	ret = rbd_obj_request_wait(obj_request);
 	if (ret)
-		goto out_linger;
+		goto out;
 
 	ret = obj_request->result;
-	if (ret)
-		goto out_linger;
-
-	/*
-	 * A watch request is set to linger, so the underlying osd
-	 * request won't go away until we unregister it.  We retain
-	 * a pointer to the object request during that time (in
-	 * rbd_dev->watch_request), so we'll keep a reference to
-	 * it.  We'll drop that reference (below) after we've
-	 * unregistered it.
-	 */
-	rbd_dev->watch_request = obj_request;
+	if (ret) {
+		if (watch)
+			rbd_obj_request_end(obj_request);
+		goto out;
+	}
 
-	return 0;
+	return obj_request;
 
-out_linger:
-	ceph_osdc_unregister_linger_request(osdc, obj_request->osd_req);
-out_put:
+out:
 	rbd_obj_request_put(obj_request);
-out_cancel:
-	ceph_osdc_cancel_event(rbd_dev->watch_event);
-	rbd_dev->watch_event = NULL;
-
-	return ret;
+	return ERR_PTR(ret);
 }
 
 /*
- * Tear down a watch request, synchronously.
+ * Initiate a watch request, synchronously.
  */
-static int __rbd_dev_header_unwatch_sync(struct rbd_device *rbd_dev)
+static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev)
 {
 	struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
 	struct rbd_obj_request *obj_request;
 	int ret;
 
-	rbd_assert(rbd_dev->watch_event);
-	rbd_assert(rbd_dev->watch_request);
+	rbd_assert(!rbd_dev->watch_event);
+	rbd_assert(!rbd_dev->watch_request);
 
-	obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
-					     OBJ_REQUEST_NODATA);
-	if (!obj_request) {
-		ret = -ENOMEM;
-		goto out_cancel;
-	}
+	ret = ceph_osdc_create_event(osdc, rbd_watch_cb, rbd_dev,
+				     &rbd_dev->watch_event);
+	if (ret < 0)
+		return ret;
 
-	obj_request->osd_req = rbd_osd_req_create(rbd_dev, true, 1,
-						  obj_request);
-	if (!obj_request->osd_req) {
-		ret = -ENOMEM;
-		goto out_put;
+	obj_request = rbd_obj_watch_request_helper(rbd_dev, true);
+	if (IS_ERR(obj_request)) {
+		ceph_osdc_cancel_event(rbd_dev->watch_event);
+		rbd_dev->watch_event = NULL;
+		return PTR_ERR(obj_request);
 	}
 
-	osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_WATCH,
-			      rbd_dev->watch_event->cookie, 0, 0);
-	rbd_osd_req_format_write(obj_request);
-
-	ret = rbd_obj_request_submit(osdc, obj_request);
-	if (ret)
-		goto out_put;
+	/*
+	 * A watch request is set to linger, so the underlying osd
+	 * request won't go away until we unregister it.  We retain
+	 * a pointer to the object request during that time (in
+	 * rbd_dev->watch_request), so we'll keep a reference to it.
+	 * We'll drop that reference after we've unregistered it in
+	 * rbd_dev_header_unwatch_sync().
+	 */
+	rbd_dev->watch_request = obj_request;
 
-	ret = rbd_obj_request_wait(obj_request);
-	if (ret)
-		goto out_put;
+	return 0;
+}
 
-	ret = obj_request->result;
-	if (ret)
-		goto out_put;
+/*
+ * Tear down a watch request, synchronously.
+ */
+static void rbd_dev_header_unwatch_sync(struct rbd_device *rbd_dev)
+{
+	struct rbd_obj_request *obj_request;
 
-	/* We have successfully torn down the watch request */
+	rbd_assert(rbd_dev->watch_event);
+	rbd_assert(rbd_dev->watch_request);
 
-	ceph_osdc_unregister_linger_request(osdc,
-					    rbd_dev->watch_request->osd_req);
+	rbd_obj_request_end(rbd_dev->watch_request);
 	rbd_obj_request_put(rbd_dev->watch_request);
 	rbd_dev->watch_request = NULL;
 
-out_put:
-	rbd_obj_request_put(obj_request);
-out_cancel:
+	obj_request = rbd_obj_watch_request_helper(rbd_dev, false);
+	if (!IS_ERR(obj_request))
+		rbd_obj_request_put(obj_request);
+	else
+		rbd_warn(rbd_dev, "unable to tear down watch request (%ld)",
+			 PTR_ERR(obj_request));
+
 	ceph_osdc_cancel_event(rbd_dev->watch_event);
 	rbd_dev->watch_event = NULL;
-
-	return ret;
-}
-
-static void rbd_dev_header_unwatch_sync(struct rbd_device *rbd_dev)
-{
-	int ret;
-
-	ret = __rbd_dev_header_unwatch_sync(rbd_dev);
-	if (ret) {
-		rbd_warn(rbd_dev, "unable to tear down watch request: %d\n",
-			 ret);
-	}
 }
 
 /*
@@ -3183,102 +3180,129 @@ static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
 	return ret;
 }
 
-static void rbd_request_fn(struct request_queue *q)
-		__releases(q->queue_lock) __acquires(q->queue_lock)
+static void rbd_handle_request(struct rbd_device *rbd_dev, struct request *rq)
 {
-	struct rbd_device *rbd_dev = q->queuedata;
-	struct request *rq;
+	struct rbd_img_request *img_request;
+	u64 offset = (u64)blk_rq_pos(rq) << SECTOR_SHIFT;
+	u64 length = blk_rq_bytes(rq);
+	bool wr = rq_data_dir(rq) == WRITE;
 	int result;
 
-	while ((rq = blk_fetch_request(q))) {
-		bool write_request = rq_data_dir(rq) == WRITE;
-		struct rbd_img_request *img_request;
-		u64 offset;
-		u64 length;
+	/* Ignore/skip any zero-length requests */
 
-		/* Ignore any non-FS requests that filter through. */
+	if (!length) {
+		dout("%s: zero-length request\n", __func__);
+		result = 0;
+		goto err_rq;
+	}
 
-		if (rq->cmd_type != REQ_TYPE_FS) {
-			dout("%s: non-fs request type %d\n", __func__,
-				(int) rq->cmd_type);
-			__blk_end_request_all(rq, 0);
-			continue;
+	/* Disallow writes to a read-only device */
+
+	if (wr) {
+		if (rbd_dev->mapping.read_only) {
+			result = -EROFS;
+			goto err_rq;
 		}
+		rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
+	}
 
-		/* Ignore/skip any zero-length requests */
+	/*
+	 * Quit early if the mapped snapshot no longer exists.  It's
+	 * still possible the snapshot will have disappeared by the
+	 * time our request arrives at the osd, but there's no sense in
+	 * sending it if we already know.
+	 */
+	if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
+		dout("request for non-existent snapshot");
+		rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
+		result = -ENXIO;
+		goto err_rq;
+	}
 
-		offset = (u64) blk_rq_pos(rq) << SECTOR_SHIFT;
-		length = (u64) blk_rq_bytes(rq);
+	if (offset && length > U64_MAX - offset + 1) {
+		rbd_warn(rbd_dev, "bad request range (%llu~%llu)", offset,
+			 length);
+		result = -EINVAL;
+		goto err_rq;	/* Shouldn't happen */
+	}
 
-		if (!length) {
-			dout("%s: zero-length request\n", __func__);
-			__blk_end_request_all(rq, 0);
-			continue;
-		}
+	if (offset + length > rbd_dev->mapping.size) {
+		rbd_warn(rbd_dev, "beyond EOD (%llu~%llu > %llu)", offset,
+			 length, rbd_dev->mapping.size);
+		result = -EIO;
+		goto err_rq;
+	}
 
-		spin_unlock_irq(q->queue_lock);
+	img_request = rbd_img_request_create(rbd_dev, offset, length, wr);
+	if (!img_request) {
+		result = -ENOMEM;
+		goto err_rq;
+	}
+	img_request->rq = rq;
 
-		/* Disallow writes to a read-only device */
+	result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO, rq->bio);
+	if (result)
+		goto err_img_request;
 
-		if (write_request) {
-			result = -EROFS;
-			if (rbd_dev->mapping.read_only)
-				goto end_request;
-			rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
-		}
+	result = rbd_img_request_submit(img_request);
+	if (result)
+		goto err_img_request;
 
-		/*
-		 * Quit early if the mapped snapshot no longer
-		 * exists.  It's still possible the snapshot will
-		 * have disappeared by the time our request arrives
-		 * at the osd, but there's no sense in sending it if
-		 * we already know.
-		 */
-		if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
-			dout("request for non-existent snapshot");
-			rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
-			result = -ENXIO;
-			goto end_request;
-		}
+	return;
 
-		result = -EINVAL;
-		if (offset && length > U64_MAX - offset + 1) {
-			rbd_warn(rbd_dev, "bad request range (%llu~%llu)\n",
-				offset, length);
-			goto end_request;	/* Shouldn't happen */
-		}
+err_img_request:
+	rbd_img_request_put(img_request);
+err_rq:
+	if (result)
+		rbd_warn(rbd_dev, "%s %llx at %llx result %d",
+			 wr ? "write" : "read", length, offset, result);
+	blk_end_request_all(rq, result);
+}
 
-		result = -EIO;
-		if (offset + length > rbd_dev->mapping.size) {
-			rbd_warn(rbd_dev, "beyond EOD (%llu~%llu > %llu)\n",
-				offset, length, rbd_dev->mapping.size);
-			goto end_request;
-		}
+static void rbd_request_workfn(struct work_struct *work)
+{
+	struct rbd_device *rbd_dev =
+	    container_of(work, struct rbd_device, rq_work);
+	struct request *rq, *next;
+	LIST_HEAD(requests);
 
-		result = -ENOMEM;
-		img_request = rbd_img_request_create(rbd_dev, offset, length,
-							write_request);
-		if (!img_request)
-			goto end_request;
+	spin_lock_irq(&rbd_dev->lock); /* rq->q->queue_lock */
+	list_splice_init(&rbd_dev->rq_queue, &requests);
+	spin_unlock_irq(&rbd_dev->lock);
 
-		img_request->rq = rq;
+	list_for_each_entry_safe(rq, next, &requests, queuelist) {
+		list_del_init(&rq->queuelist);
+		rbd_handle_request(rbd_dev, rq);
+	}
+}
 
-		result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
-						rq->bio);
-		if (!result)
-			result = rbd_img_request_submit(img_request);
-		if (result)
-			rbd_img_request_put(img_request);
-end_request:
-		spin_lock_irq(q->queue_lock);
-		if (result < 0) {
-			rbd_warn(rbd_dev, "%s %llx at %llx result %d\n",
-				write_request ? "write" : "read",
-				length, offset, result);
-
-			__blk_end_request_all(rq, result);
+/*
+ * Called with q->queue_lock held and interrupts disabled, possibly on
+ * the way to schedule().  Do not sleep here!
+ */
+static void rbd_request_fn(struct request_queue *q)
+{
+	struct rbd_device *rbd_dev = q->queuedata;
+	struct request *rq;
+	int queued = 0;
+
+	rbd_assert(rbd_dev);
+
+	while ((rq = blk_fetch_request(q))) {
+		/* Ignore any non-FS requests that filter through. */
+		if (rq->cmd_type != REQ_TYPE_FS) {
+			dout("%s: non-fs request type %d\n", __func__,
+				(int) rq->cmd_type);
+			__blk_end_request_all(rq, 0);
+			continue;
 		}
+
+		list_add_tail(&rq->queuelist, &rbd_dev->rq_queue);
+		queued++;
 	}
+
+	if (queued)
+		queue_work(rbd_dev->rq_wq, &rbd_dev->rq_work);
 }
 
 /*
@@ -3517,24 +3541,37 @@ static int rbd_dev_refresh(struct rbd_device *rbd_dev)
 	u64 mapping_size;
 	int ret;
 
-	rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
 	down_write(&rbd_dev->header_rwsem);
 	mapping_size = rbd_dev->mapping.size;
-	if (rbd_dev->image_format == 1)
-		ret = rbd_dev_v1_header_info(rbd_dev);
-	else
-		ret = rbd_dev_v2_header_info(rbd_dev);
 
-	/* If it's a mapped snapshot, validate its EXISTS flag */
+	ret = rbd_dev_header_info(rbd_dev);
+	if (ret)
+		return ret;
+
+	/*
+	 * If there is a parent, see if it has disappeared due to the
+	 * mapped image getting flattened.
+	 */
+	if (rbd_dev->parent) {
+		ret = rbd_dev_v2_parent_info(rbd_dev);
+		if (ret)
+			return ret;
+	}
+
+	if (rbd_dev->spec->snap_id == CEPH_NOSNAP) {
+		if (rbd_dev->mapping.size != rbd_dev->header.image_size)
+			rbd_dev->mapping.size = rbd_dev->header.image_size;
+	} else {
+		/* validate mapped snapshot's EXISTS flag */
+		rbd_exists_validate(rbd_dev);
+	}
 
-	rbd_exists_validate(rbd_dev);
 	up_write(&rbd_dev->header_rwsem);
 
-	if (mapping_size != rbd_dev->mapping.size) {
+	if (mapping_size != rbd_dev->mapping.size)
 		rbd_dev_update_size(rbd_dev);
-	}
 
-	return ret;
+	return 0;
 }
 
 static int rbd_init_disk(struct rbd_device *rbd_dev)
@@ -3696,46 +3733,36 @@ static ssize_t rbd_snap_show(struct device *dev,
 }
 
 /*
- * For an rbd v2 image, shows the pool id, image id, and snapshot id
- * for the parent image.  If there is no parent, simply shows
- * "(no parent image)".
+ * For a v2 image, shows the chain of parent images, separated by empty
+ * lines.  For v1 images or if there is no parent, shows "(no parent
+ * image)".
  */
 static ssize_t rbd_parent_show(struct device *dev,
-			     struct device_attribute *attr,
-			     char *buf)
+			       struct device_attribute *attr,
+			       char *buf)
 {
 	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
-	struct rbd_spec *spec = rbd_dev->parent_spec;
-	int count;
-	char *bufp = buf;
+	ssize_t count = 0;
 
-	if (!spec)
+	if (!rbd_dev->parent)
 		return sprintf(buf, "(no parent image)\n");
 
-	count = sprintf(bufp, "pool_id %llu\npool_name %s\n",
-			(unsigned long long) spec->pool_id, spec->pool_name);
-	if (count < 0)
-		return count;
-	bufp += count;
-
-	count = sprintf(bufp, "image_id %s\nimage_name %s\n", spec->image_id,
-			spec->image_name ? spec->image_name : "(unknown)");
-	if (count < 0)
-		return count;
-	bufp += count;
-
-	count = sprintf(bufp, "snap_id %llu\nsnap_name %s\n",
-			(unsigned long long) spec->snap_id, spec->snap_name);
-	if (count < 0)
-		return count;
-	bufp += count;
-
-	count = sprintf(bufp, "overlap %llu\n", rbd_dev->parent_overlap);
-	if (count < 0)
-		return count;
-	bufp += count;
+	for ( ; rbd_dev->parent; rbd_dev = rbd_dev->parent) {
+		struct rbd_spec *spec = rbd_dev->parent_spec;
+
+		count += sprintf(&buf[count], "%s"
+			    "pool_id %llu\npool_name %s\n"
+			    "image_id %s\nimage_name %s\n"
+			    "snap_id %llu\nsnap_name %s\n"
+			    "overlap %llu\n",
+			    !count ? "" : "\n", /* first? */
+			    spec->pool_id, spec->pool_name,
+			    spec->image_id, spec->image_name ?: "(unknown)",
+			    spec->snap_id, spec->snap_name,
+			    rbd_dev->parent_overlap);
+	}
 
-	return (ssize_t) (bufp - buf);
+	return count;
 }
 
 static ssize_t rbd_image_refresh(struct device *dev,
@@ -3748,9 +3775,9 @@ static ssize_t rbd_image_refresh(struct device *dev,
 
 	ret = rbd_dev_refresh(rbd_dev);
 	if (ret)
-		rbd_warn(rbd_dev, ": manual header refresh error (%d)\n", ret);
+		return ret;
 
-	return ret < 0 ? ret : size;
+	return size;
 }
 
 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
@@ -3822,6 +3849,9 @@ static struct rbd_spec *rbd_spec_alloc(void)
 	spec = kzalloc(sizeof (*spec), GFP_KERNEL);
 	if (!spec)
 		return NULL;
+
+	spec->pool_id = CEPH_NOPOOL;
+	spec->snap_id = CEPH_NOSNAP;
 	kref_init(&spec->kref);
 
 	return spec;
@@ -3848,6 +3878,8 @@ static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
 		return NULL;
 
 	spin_lock_init(&rbd_dev->lock);
+	INIT_LIST_HEAD(&rbd_dev->rq_queue);
+	INIT_WORK(&rbd_dev->rq_work, rbd_request_workfn);
 	rbd_dev->flags = 0;
 	atomic_set(&rbd_dev->parent_ref, 0);
 	INIT_LIST_HEAD(&rbd_dev->node);
@@ -4021,7 +4053,7 @@ static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
 		goto out_err;
 	}
 
-	snapid = cpu_to_le64(CEPH_NOSNAP);
+	snapid = cpu_to_le64(rbd_dev->spec->snap_id);
 	ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
 				"rbd", "get_parent",
 				&snapid, sizeof (snapid),
@@ -4059,7 +4091,7 @@ static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
 
 	ret = -EIO;
 	if (pool_id > (u64)U32_MAX) {
-		rbd_warn(NULL, "parent pool id too large (%llu > %u)\n",
+		rbd_warn(NULL, "parent pool id too large (%llu > %u)",
 			(unsigned long long)pool_id, U32_MAX);
 		goto out_err;
 	}
@@ -4083,6 +4115,8 @@ static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
 		parent_spec->snap_id = snap_id;
 		rbd_dev->parent_spec = parent_spec;
 		parent_spec = NULL;	/* rbd_dev now owns this */
+	} else {
+		kfree(image_id);
 	}
 
 	/*
@@ -4110,8 +4144,7 @@ static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
 			 * overlap is zero we just pretend there was
 			 * no parent image.
 			 */
-			rbd_warn(rbd_dev, "ignoring parent of "
-						"clone with overlap 0\n");
+			rbd_warn(rbd_dev, "ignoring parent with overlap 0");
 		}
 	}
 out:
@@ -4279,18 +4312,38 @@ static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
 }
 
 /*
- * When an rbd image has a parent image, it is identified by the
- * pool, image, and snapshot ids (not names).  This function fills
- * in the names for those ids.  (It's OK if we can't figure out the
- * name for an image id, but the pool and snapshot ids should always
- * exist and have names.)  All names in an rbd spec are dynamically
- * allocated.
+ * An image being mapped will have everything but the snap id.
+ */
+static int rbd_spec_fill_snap_id(struct rbd_device *rbd_dev)
+{
+	struct rbd_spec *spec = rbd_dev->spec;
+
+	rbd_assert(spec->pool_id != CEPH_NOPOOL && spec->pool_name);
+	rbd_assert(spec->image_id && spec->image_name);
+	rbd_assert(spec->snap_name);
+
+	if (strcmp(spec->snap_name, RBD_SNAP_HEAD_NAME)) {
+		u64 snap_id;
+
+		snap_id = rbd_snap_id_by_name(rbd_dev, spec->snap_name);
+		if (snap_id == CEPH_NOSNAP)
+			return -ENOENT;
+
+		spec->snap_id = snap_id;
+	} else {
+		spec->snap_id = CEPH_NOSNAP;
+	}
+
+	return 0;
+}
+
+/*
+ * A parent image will have all ids but none of the names.
  *
- * When an image being mapped (not a parent) is probed, we have the
- * pool name and pool id, image name and image id, and the snapshot
- * name.  The only thing we're missing is the snapshot id.
+ * All names in an rbd spec are dynamically allocated.  It's OK if we
+ * can't figure out the name for an image id.
  */
-static int rbd_dev_spec_update(struct rbd_device *rbd_dev)
+static int rbd_spec_fill_names(struct rbd_device *rbd_dev)
 {
 	struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
 	struct rbd_spec *spec = rbd_dev->spec;
@@ -4299,24 +4352,9 @@ static int rbd_dev_spec_update(struct rbd_device *rbd_dev)
 	const char *snap_name;
 	int ret;
 
-	/*
-	 * An image being mapped will have the pool name (etc.), but
-	 * we need to look up the snapshot id.
-	 */
-	if (spec->pool_name) {
-		if (strcmp(spec->snap_name, RBD_SNAP_HEAD_NAME)) {
-			u64 snap_id;
-
-			snap_id = rbd_snap_id_by_name(rbd_dev, spec->snap_name);
-			if (snap_id == CEPH_NOSNAP)
-				return -ENOENT;
-			spec->snap_id = snap_id;
-		} else {
-			spec->snap_id = CEPH_NOSNAP;
-		}
-
-		return 0;
-	}
+	rbd_assert(spec->pool_id != CEPH_NOPOOL);
+	rbd_assert(spec->image_id);
+	rbd_assert(spec->snap_id != CEPH_NOSNAP);
 
 	/* Get the pool name; we have to make our own copy of this */
 
@@ -4335,7 +4373,7 @@ static int rbd_dev_spec_update(struct rbd_device *rbd_dev)
 	if (!image_name)
 		rbd_warn(rbd_dev, "unable to get image name");
 
-	/* Look up the snapshot name, and make a copy */
+	/* Fetch the snapshot name */
 
 	snap_name = rbd_snap_name(rbd_dev, spec->snap_id);
 	if (IS_ERR(snap_name)) {
@@ -4348,10 +4386,10 @@ static int rbd_dev_spec_update(struct rbd_device *rbd_dev)
 	spec->snap_name = snap_name;
 
 	return 0;
+
 out_err:
 	kfree(image_name);
 	kfree(pool_name);
-
 	return ret;
 }
 
@@ -4483,43 +4521,22 @@ static int rbd_dev_v2_header_info(struct rbd_device *rbd_dev)
 			return ret;
 	}
 
-	/*
-	 * If the image supports layering, get the parent info.  We
-	 * need to probe the first time regardless.  Thereafter we
-	 * only need to if there's a parent, to see if it has
-	 * disappeared due to the mapped image getting flattened.
-	 */
-	if (rbd_dev->header.features & RBD_FEATURE_LAYERING &&
-			(first_time || rbd_dev->parent_spec)) {
-		bool warn;
-
-		ret = rbd_dev_v2_parent_info(rbd_dev);
-		if (ret)
-			return ret;
-
-		/*
-		 * Print a warning if this is the initial probe and
-		 * the image has a parent.  Don't print it if the
-		 * image now being probed is itself a parent.  We
-		 * can tell at this point because we won't know its
-		 * pool name yet (just its pool id).
-		 */
-		warn = rbd_dev->parent_spec && rbd_dev->spec->pool_name;
-		if (first_time && warn)
-			rbd_warn(rbd_dev, "WARNING: kernel layering "
-					"is EXPERIMENTAL!");
-	}
-
-	if (rbd_dev->spec->snap_id == CEPH_NOSNAP)
-		if (rbd_dev->mapping.size != rbd_dev->header.image_size)
-			rbd_dev->mapping.size = rbd_dev->header.image_size;
-
 	ret = rbd_dev_v2_snap_context(rbd_dev);
 	dout("rbd_dev_v2_snap_context returned %d\n", ret);
 
 	return ret;
 }
 
+static int rbd_dev_header_info(struct rbd_device *rbd_dev)
+{
+	rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
+
+	if (rbd_dev->image_format == 1)
+		return rbd_dev_v1_header_info(rbd_dev);
+
+	return rbd_dev_v2_header_info(rbd_dev);
+}
+
 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
 {
 	struct device *dev;
@@ -5066,12 +5083,17 @@ static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
 	ret = rbd_dev_mapping_set(rbd_dev);
 	if (ret)
 		goto err_out_disk;
+
 	set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
 	set_disk_ro(rbd_dev->disk, rbd_dev->mapping.read_only);
 
+	rbd_dev->rq_wq = alloc_workqueue(rbd_dev->disk->disk_name, 0, 0);
+	if (!rbd_dev->rq_wq)
+		goto err_out_mapping;
+
 	ret = rbd_bus_add_dev(rbd_dev);
 	if (ret)
-		goto err_out_mapping;
+		goto err_out_workqueue;
 
 	/* Everything's ready.  Announce the disk to the world. */
 
@@ -5083,6 +5105,9 @@ static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
 
 	return ret;
 
+err_out_workqueue:
+	destroy_workqueue(rbd_dev->rq_wq);
+	rbd_dev->rq_wq = NULL;
 err_out_mapping:
 	rbd_dev_mapping_clear(rbd_dev);
 err_out_disk:
@@ -5155,8 +5180,6 @@ static int rbd_dev_image_probe(struct rbd_device *rbd_dev, bool mapping)
 	ret = rbd_dev_image_id(rbd_dev);
 	if (ret)
 		return ret;
-	rbd_assert(rbd_dev->spec->image_id);
-	rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
 
 	ret = rbd_dev_header_name(rbd_dev);
 	if (ret)
@@ -5168,25 +5191,45 @@ static int rbd_dev_image_probe(struct rbd_device *rbd_dev, bool mapping)
 			goto out_header_name;
 	}
 
-	if (rbd_dev->image_format == 1)
-		ret = rbd_dev_v1_header_info(rbd_dev);
-	else
-		ret = rbd_dev_v2_header_info(rbd_dev);
+	ret = rbd_dev_header_info(rbd_dev);
 	if (ret)
 		goto err_out_watch;
 
-	ret = rbd_dev_spec_update(rbd_dev);
+	/*
+	 * If this image is the one being mapped, we have pool name and
+	 * id, image name and id, and snap name - need to fill snap id.
+	 * Otherwise this is a parent image, identified by pool, image
+	 * and snap ids - need to fill in names for those ids.
+	 */
+	if (mapping)
+		ret = rbd_spec_fill_snap_id(rbd_dev);
+	else
+		ret = rbd_spec_fill_names(rbd_dev);
 	if (ret)
 		goto err_out_probe;
 
+	if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
+		ret = rbd_dev_v2_parent_info(rbd_dev);
+		if (ret)
+			goto err_out_probe;
+
+		/*
+		 * Need to warn users if this image is the one being
+		 * mapped and has a parent.
+		 */
+		if (mapping && rbd_dev->parent_spec)
+			rbd_warn(rbd_dev,
+				 "WARNING: kernel layering is EXPERIMENTAL!");
+	}
+
 	ret = rbd_dev_probe_parent(rbd_dev);
 	if (ret)
 		goto err_out_probe;
 
 	dout("discovered format %u image, header name is %s\n",
 		rbd_dev->image_format, rbd_dev->header_name);
-
 	return 0;
+
 err_out_probe:
 	rbd_dev_unprobe(rbd_dev);
 err_out_watch:
@@ -5199,9 +5242,6 @@ static int rbd_dev_image_probe(struct rbd_device *rbd_dev, bool mapping)
 	rbd_dev->image_format = 0;
 	kfree(rbd_dev->spec->image_id);
 	rbd_dev->spec->image_id = NULL;
-
-	dout("probe failed, returning %d\n", ret);
-
 	return ret;
 }
 
@@ -5243,7 +5283,7 @@ static ssize_t do_rbd_add(struct bus_type *bus,
 	/* The ceph file layout needs to fit pool id in 32 bits */
 
 	if (spec->pool_id > (u64)U32_MAX) {
-		rbd_warn(NULL, "pool id too large (%llu > %u)\n",
+		rbd_warn(NULL, "pool id too large (%llu > %u)",
 				(unsigned long long)spec->pool_id, U32_MAX);
 		rc = -EIO;
 		goto err_out_client;
@@ -5314,6 +5354,7 @@ static void rbd_dev_device_release(struct device *dev)
 {
 	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
 
+	destroy_workqueue(rbd_dev->rq_wq);
 	rbd_free_disk(rbd_dev);
 	clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
 	rbd_dev_mapping_clear(rbd_dev);
diff --git a/fs/ceph/acl.c b/fs/ceph/acl.c
index 469f2e8657e8426bfb3ad94fb12dde25e06ccb18..cebf2ebefb55dfeb79fe7b23bddd1165e4d4f344 100644
--- a/fs/ceph/acl.c
+++ b/fs/ceph/acl.c
@@ -172,14 +172,24 @@ int ceph_set_acl(struct inode *inode, struct posix_acl *acl, int type)
 int ceph_init_acl(struct dentry *dentry, struct inode *inode, struct inode *dir)
 {
 	struct posix_acl *default_acl, *acl;
+	umode_t new_mode = inode->i_mode;
 	int error;
 
-	error = posix_acl_create(dir, &inode->i_mode, &default_acl, &acl);
+	error = posix_acl_create(dir, &new_mode, &default_acl, &acl);
 	if (error)
 		return error;
 
-	if (!default_acl && !acl)
+	if (!default_acl && !acl) {
 		cache_no_acl(inode);
+		if (new_mode != inode->i_mode) {
+			struct iattr newattrs = {
+				.ia_mode = new_mode,
+				.ia_valid = ATTR_MODE,
+			};
+			error = ceph_setattr(dentry, &newattrs);
+		}
+		return error;
+	}
 
 	if (default_acl) {
 		error = ceph_set_acl(inode, default_acl, ACL_TYPE_DEFAULT);
diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c
index 1fde164b74b54a258cdff8e7c89f52191713c986..6d1cd45dca890f9ab51a7087dbba207035792f20 100644
--- a/fs/ceph/caps.c
+++ b/fs/ceph/caps.c
@@ -3277,7 +3277,7 @@ int ceph_encode_inode_release(void **p, struct inode *inode,
 			rel->ino = cpu_to_le64(ceph_ino(inode));
 			rel->cap_id = cpu_to_le64(cap->cap_id);
 			rel->seq = cpu_to_le32(cap->seq);
-			rel->issue_seq = cpu_to_le32(cap->issue_seq),
+			rel->issue_seq = cpu_to_le32(cap->issue_seq);
 			rel->mseq = cpu_to_le32(cap->mseq);
 			rel->caps = cpu_to_le32(cap->implemented);
 			rel->wanted = cpu_to_le32(cap->mds_wanted);
diff --git a/fs/ceph/file.c b/fs/ceph/file.c
index 302085100c28af1a2ed67269e955b3d0839539be..2eb02f80a0ab05de9071bc5d076f6c8fe952cbf5 100644
--- a/fs/ceph/file.c
+++ b/fs/ceph/file.c
@@ -423,6 +423,9 @@ static ssize_t ceph_sync_read(struct kiocb *iocb, struct iov_iter *i,
 	dout("sync_read on file %p %llu~%u %s\n", file, off,
 	     (unsigned)len,
 	     (file->f_flags & O_DIRECT) ? "O_DIRECT" : "");
+
+	if (!len)
+		return 0;
 	/*
 	 * flush any page cache pages in this range.  this
 	 * will make concurrent normal and sync io slow,
@@ -470,8 +473,11 @@ static ssize_t ceph_sync_read(struct kiocb *iocb, struct iov_iter *i,
 			size_t left = ret;
 
 			while (left) {
-				int copy = min_t(size_t, PAGE_SIZE, left);
-				l = copy_page_to_iter(pages[k++], 0, copy, i);
+				size_t page_off = off & ~PAGE_MASK;
+				size_t copy = min_t(size_t,
+						    PAGE_SIZE - page_off, left);
+				l = copy_page_to_iter(pages[k++], page_off,
+						      copy, i);
 				off += l;
 				left -= l;
 				if (l < copy)
@@ -531,7 +537,7 @@ static void ceph_sync_write_unsafe(struct ceph_osd_request *req, bool unsafe)
  * objects, rollback on failure, etc.)
  */
 static ssize_t
-ceph_sync_direct_write(struct kiocb *iocb, struct iov_iter *from)
+ceph_sync_direct_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos)
 {
 	struct file *file = iocb->ki_filp;
 	struct inode *inode = file_inode(file);
@@ -547,7 +553,6 @@ ceph_sync_direct_write(struct kiocb *iocb, struct iov_iter *from)
 	int check_caps = 0;
 	int ret;
 	struct timespec mtime = CURRENT_TIME;
-	loff_t pos = iocb->ki_pos;
 	size_t count = iov_iter_count(from);
 
 	if (ceph_snap(file_inode(file)) != CEPH_NOSNAP)
@@ -646,7 +651,8 @@ ceph_sync_direct_write(struct kiocb *iocb, struct iov_iter *from)
  * correct atomic write, we should e.g. take write locks on all
  * objects, rollback on failure, etc.)
  */
-static ssize_t ceph_sync_write(struct kiocb *iocb, struct iov_iter *from)
+static ssize_t
+ceph_sync_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos)
 {
 	struct file *file = iocb->ki_filp;
 	struct inode *inode = file_inode(file);
@@ -663,7 +669,6 @@ static ssize_t ceph_sync_write(struct kiocb *iocb, struct iov_iter *from)
 	int check_caps = 0;
 	int ret;
 	struct timespec mtime = CURRENT_TIME;
-	loff_t pos = iocb->ki_pos;
 	size_t count = iov_iter_count(from);
 
 	if (ceph_snap(file_inode(file)) != CEPH_NOSNAP)
@@ -918,9 +923,9 @@ static ssize_t ceph_write_iter(struct kiocb *iocb, struct iov_iter *from)
 		/* we might need to revert back to that point */
 		data = *from;
 		if (file->f_flags & O_DIRECT)
-			written = ceph_sync_direct_write(iocb, &data);
+			written = ceph_sync_direct_write(iocb, &data, pos);
 		else
-			written = ceph_sync_write(iocb, &data);
+			written = ceph_sync_write(iocb, &data, pos);
 		if (written == -EOLDSNAPC) {
 			dout("aio_write %p %llx.%llx %llu~%u"
 				"got EOLDSNAPC, retrying\n",
@@ -1177,6 +1182,9 @@ static long ceph_fallocate(struct file *file, int mode,
 	loff_t endoff = 0;
 	loff_t size;
 
+	if (mode & ~(FALLOC_FL_KEEP_SIZE | FALLOC_FL_PUNCH_HOLE))
+		return -EOPNOTSUPP;
+
 	if (!S_ISREG(inode->i_mode))
 		return -EOPNOTSUPP;
 
diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c
index 92a2548278fca0c52609d120db33cf070db621c6..bad07c09f91ead03419fe8e25fb1df18b4ac439e 100644
--- a/fs/ceph/mds_client.c
+++ b/fs/ceph/mds_client.c
@@ -1904,6 +1904,7 @@ static int __prepare_send_request(struct ceph_mds_client *mdsc,
 	     req->r_tid, ceph_mds_op_name(req->r_op), req->r_attempts);
 
 	if (req->r_got_unsafe) {
+		void *p;
 		/*
 		 * Replay.  Do not regenerate message (and rebuild
 		 * paths, etc.); just use the original message.
@@ -1924,8 +1925,13 @@ static int __prepare_send_request(struct ceph_mds_client *mdsc,
 
 		/* remove cap/dentry releases from message */
 		rhead->num_releases = 0;
-		msg->hdr.front_len = cpu_to_le32(req->r_request_release_offset);
-		msg->front.iov_len = req->r_request_release_offset;
+
+		/* time stamp */
+		p = msg->front.iov_base + req->r_request_release_offset;
+		ceph_encode_copy(&p, &req->r_stamp, sizeof(req->r_stamp));
+
+		msg->front.iov_len = p - msg->front.iov_base;
+		msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
 		return 0;
 	}
 
@@ -2061,11 +2067,12 @@ static void __wake_requests(struct ceph_mds_client *mdsc,
 static void kick_requests(struct ceph_mds_client *mdsc, int mds)
 {
 	struct ceph_mds_request *req;
-	struct rb_node *p;
+	struct rb_node *p = rb_first(&mdsc->request_tree);
 
 	dout("kick_requests mds%d\n", mds);
-	for (p = rb_first(&mdsc->request_tree); p; p = rb_next(p)) {
+	while (p) {
 		req = rb_entry(p, struct ceph_mds_request, r_node);
+		p = rb_next(p);
 		if (req->r_got_unsafe)
 			continue;
 		if (req->r_session &&
@@ -2248,6 +2255,7 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
 	 */
 	if (result == -ESTALE) {
 		dout("got ESTALE on request %llu", req->r_tid);
+		req->r_resend_mds = -1;
 		if (req->r_direct_mode != USE_AUTH_MDS) {
 			dout("not using auth, setting for that now");
 			req->r_direct_mode = USE_AUTH_MDS;
diff --git a/fs/ceph/xattr.c b/fs/ceph/xattr.c
index c9c2b887381ec2504ee48c673cf113e396f3b04e..12f58d22e01798844d17d9b526cf19d1ba112a36 100644
--- a/fs/ceph/xattr.c
+++ b/fs/ceph/xattr.c
@@ -592,12 +592,12 @@ static int __build_xattrs(struct inode *inode)
 		xattr_version = ci->i_xattrs.version;
 		spin_unlock(&ci->i_ceph_lock);
 
-		xattrs = kcalloc(numattr, sizeof(struct ceph_xattr *),
+		xattrs = kcalloc(numattr, sizeof(struct ceph_inode_xattr *),
 				 GFP_NOFS);
 		err = -ENOMEM;
 		if (!xattrs)
 			goto bad_lock;
-		memset(xattrs, 0, numattr*sizeof(struct ceph_xattr *));
+
 		for (i = 0; i < numattr; i++) {
 			xattrs[i] = kmalloc(sizeof(struct ceph_inode_xattr),
 					    GFP_NOFS);
diff --git a/include/linux/ceph/messenger.h b/include/linux/ceph/messenger.h
index d21f2dba07314c48dce2414c4be23d2191180c81..40ae58e3e9db67d5adbfac4c6207ee4af8b1bebc 100644
--- a/include/linux/ceph/messenger.h
+++ b/include/linux/ceph/messenger.h
@@ -285,19 +285,9 @@ extern void ceph_msg_data_add_bio(struct ceph_msg *msg, struct bio *bio,
 
 extern struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags,
 				     bool can_fail);
-extern void ceph_msg_kfree(struct ceph_msg *m);
 
-
-static inline struct ceph_msg *ceph_msg_get(struct ceph_msg *msg)
-{
-	kref_get(&msg->kref);
-	return msg;
-}
-extern void ceph_msg_last_put(struct kref *kref);
-static inline void ceph_msg_put(struct ceph_msg *msg)
-{
-	kref_put(&msg->kref, ceph_msg_last_put);
-}
+extern struct ceph_msg *ceph_msg_get(struct ceph_msg *msg);
+extern void ceph_msg_put(struct ceph_msg *msg);
 
 extern void ceph_msg_dump(struct ceph_msg *msg);
 
diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h
index 94ec69672164c9dd84b41c1ab3a7c995a761c1fc..03aeb27fcc69d74484de4db06550abd05b891905 100644
--- a/include/linux/ceph/osd_client.h
+++ b/include/linux/ceph/osd_client.h
@@ -117,7 +117,7 @@ struct ceph_osd_request {
 	struct list_head r_req_lru_item;
 	struct list_head r_osd_item;
 	struct list_head r_linger_item;
-	struct list_head r_linger_osd;
+	struct list_head r_linger_osd_item;
 	struct ceph_osd *r_osd;
 	struct ceph_pg   r_pgid;
 	int              r_pg_osds[CEPH_PG_MAX_SIZE];
@@ -325,22 +325,14 @@ extern struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *,
 
 extern void ceph_osdc_set_request_linger(struct ceph_osd_client *osdc,
 					 struct ceph_osd_request *req);
-extern void ceph_osdc_unregister_linger_request(struct ceph_osd_client *osdc,
-						struct ceph_osd_request *req);
-
-static inline void ceph_osdc_get_request(struct ceph_osd_request *req)
-{
-	kref_get(&req->r_kref);
-}
-extern void ceph_osdc_release_request(struct kref *kref);
-static inline void ceph_osdc_put_request(struct ceph_osd_request *req)
-{
-	kref_put(&req->r_kref, ceph_osdc_release_request);
-}
+
+extern void ceph_osdc_get_request(struct ceph_osd_request *req);
+extern void ceph_osdc_put_request(struct ceph_osd_request *req);
 
 extern int ceph_osdc_start_request(struct ceph_osd_client *osdc,
 				   struct ceph_osd_request *req,
 				   bool nofail);
+extern void ceph_osdc_cancel_request(struct ceph_osd_request *req);
 extern int ceph_osdc_wait_request(struct ceph_osd_client *osdc,
 				  struct ceph_osd_request *req);
 extern void ceph_osdc_sync(struct ceph_osd_client *osdc);
diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c
index 1948d592aa54c7a1831df546702904898cd68da4..b2f571dd933dde47dd8887c392b0f13c7bfef101 100644
--- a/net/ceph/messenger.c
+++ b/net/ceph/messenger.c
@@ -174,6 +174,7 @@ static struct lock_class_key socket_class;
 #define SKIP_BUF_SIZE	1024
 
 static void queue_con(struct ceph_connection *con);
+static void cancel_con(struct ceph_connection *con);
 static void con_work(struct work_struct *);
 static void con_fault(struct ceph_connection *con);
 
@@ -680,7 +681,7 @@ void ceph_con_close(struct ceph_connection *con)
 
 	reset_connection(con);
 	con->peer_global_seq = 0;
-	cancel_delayed_work(&con->work);
+	cancel_con(con);
 	con_close_socket(con);
 	mutex_unlock(&con->mutex);
 }
@@ -900,7 +901,7 @@ static void ceph_msg_data_pages_cursor_init(struct ceph_msg_data_cursor *cursor,
 	BUG_ON(page_count > (int)USHRT_MAX);
 	cursor->page_count = (unsigned short)page_count;
 	BUG_ON(length > SIZE_MAX - cursor->page_offset);
-	cursor->last_piece = (size_t)cursor->page_offset + length <= PAGE_SIZE;
+	cursor->last_piece = cursor->page_offset + cursor->resid <= PAGE_SIZE;
 }
 
 static struct page *
@@ -2667,19 +2668,16 @@ static int queue_con_delay(struct ceph_connection *con, unsigned long delay)
 {
 	if (!con->ops->get(con)) {
 		dout("%s %p ref count 0\n", __func__, con);
-
 		return -ENOENT;
 	}
 
 	if (!queue_delayed_work(ceph_msgr_wq, &con->work, delay)) {
 		dout("%s %p - already queued\n", __func__, con);
 		con->ops->put(con);
-
 		return -EBUSY;
 	}
 
 	dout("%s %p %lu\n", __func__, con, delay);
-
 	return 0;
 }
 
@@ -2688,6 +2686,14 @@ static void queue_con(struct ceph_connection *con)
 	(void) queue_con_delay(con, 0);
 }
 
+static void cancel_con(struct ceph_connection *con)
+{
+	if (cancel_delayed_work(&con->work)) {
+		dout("%s %p\n", __func__, con);
+		con->ops->put(con);
+	}
+}
+
 static bool con_sock_closed(struct ceph_connection *con)
 {
 	if (!con_flag_test_and_clear(con, CON_FLAG_SOCK_CLOSED))
@@ -3269,24 +3275,21 @@ static int ceph_con_in_msg_alloc(struct ceph_connection *con, int *skip)
 /*
  * Free a generically kmalloc'd message.
  */
-void ceph_msg_kfree(struct ceph_msg *m)
+static void ceph_msg_free(struct ceph_msg *m)
 {
-	dout("msg_kfree %p\n", m);
+	dout("%s %p\n", __func__, m);
 	ceph_kvfree(m->front.iov_base);
 	kmem_cache_free(ceph_msg_cache, m);
 }
 
-/*
- * Drop a msg ref.  Destroy as needed.
- */
-void ceph_msg_last_put(struct kref *kref)
+static void ceph_msg_release(struct kref *kref)
 {
 	struct ceph_msg *m = container_of(kref, struct ceph_msg, kref);
 	LIST_HEAD(data);
 	struct list_head *links;
 	struct list_head *next;
 
-	dout("ceph_msg_put last one on %p\n", m);
+	dout("%s %p\n", __func__, m);
 	WARN_ON(!list_empty(&m->list_head));
 
 	/* drop middle, data, if any */
@@ -3308,9 +3311,25 @@ void ceph_msg_last_put(struct kref *kref)
 	if (m->pool)
 		ceph_msgpool_put(m->pool, m);
 	else
-		ceph_msg_kfree(m);
+		ceph_msg_free(m);
+}
+
+struct ceph_msg *ceph_msg_get(struct ceph_msg *msg)
+{
+	dout("%s %p (was %d)\n", __func__, msg,
+	     atomic_read(&msg->kref.refcount));
+	kref_get(&msg->kref);
+	return msg;
+}
+EXPORT_SYMBOL(ceph_msg_get);
+
+void ceph_msg_put(struct ceph_msg *msg)
+{
+	dout("%s %p (was %d)\n", __func__, msg,
+	     atomic_read(&msg->kref.refcount));
+	kref_put(&msg->kref, ceph_msg_release);
 }
-EXPORT_SYMBOL(ceph_msg_last_put);
+EXPORT_SYMBOL(ceph_msg_put);
 
 void ceph_msg_dump(struct ceph_msg *msg)
 {
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index 05be0c1816958b0d0db6b2c319631d41f273e3d0..30f6faf3584fb529ffdb5b6f9fae5041acf55166 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -297,12 +297,21 @@ static void osd_req_op_data_release(struct ceph_osd_request *osd_req,
 /*
  * requests
  */
-void ceph_osdc_release_request(struct kref *kref)
+static void ceph_osdc_release_request(struct kref *kref)
 {
-	struct ceph_osd_request *req;
+	struct ceph_osd_request *req = container_of(kref,
+					    struct ceph_osd_request, r_kref);
 	unsigned int which;
 
-	req = container_of(kref, struct ceph_osd_request, r_kref);
+	dout("%s %p (r_request %p r_reply %p)\n", __func__, req,
+	     req->r_request, req->r_reply);
+	WARN_ON(!RB_EMPTY_NODE(&req->r_node));
+	WARN_ON(!list_empty(&req->r_req_lru_item));
+	WARN_ON(!list_empty(&req->r_osd_item));
+	WARN_ON(!list_empty(&req->r_linger_item));
+	WARN_ON(!list_empty(&req->r_linger_osd_item));
+	WARN_ON(req->r_osd);
+
 	if (req->r_request)
 		ceph_msg_put(req->r_request);
 	if (req->r_reply) {
@@ -320,7 +329,22 @@ void ceph_osdc_release_request(struct kref *kref)
 		kmem_cache_free(ceph_osd_request_cache, req);
 
 }
-EXPORT_SYMBOL(ceph_osdc_release_request);
+
+void ceph_osdc_get_request(struct ceph_osd_request *req)
+{
+	dout("%s %p (was %d)\n", __func__, req,
+	     atomic_read(&req->r_kref.refcount));
+	kref_get(&req->r_kref);
+}
+EXPORT_SYMBOL(ceph_osdc_get_request);
+
+void ceph_osdc_put_request(struct ceph_osd_request *req)
+{
+	dout("%s %p (was %d)\n", __func__, req,
+	     atomic_read(&req->r_kref.refcount));
+	kref_put(&req->r_kref, ceph_osdc_release_request);
+}
+EXPORT_SYMBOL(ceph_osdc_put_request);
 
 struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
 					       struct ceph_snap_context *snapc,
@@ -364,7 +388,7 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
 	RB_CLEAR_NODE(&req->r_node);
 	INIT_LIST_HEAD(&req->r_unsafe_item);
 	INIT_LIST_HEAD(&req->r_linger_item);
-	INIT_LIST_HEAD(&req->r_linger_osd);
+	INIT_LIST_HEAD(&req->r_linger_osd_item);
 	INIT_LIST_HEAD(&req->r_req_lru_item);
 	INIT_LIST_HEAD(&req->r_osd_item);
 
@@ -916,7 +940,7 @@ static void __kick_osd_requests(struct ceph_osd_client *osdc,
 	 * list at the end to keep things in tid order.
 	 */
 	list_for_each_entry_safe(req, nreq, &osd->o_linger_requests,
-				 r_linger_osd) {
+				 r_linger_osd_item) {
 		/*
 		 * reregister request prior to unregistering linger so
 		 * that r_osd is preserved.
@@ -1008,6 +1032,8 @@ static void __remove_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
 {
 	dout("__remove_osd %p\n", osd);
 	BUG_ON(!list_empty(&osd->o_requests));
+	BUG_ON(!list_empty(&osd->o_linger_requests));
+
 	rb_erase(&osd->o_node, &osdc->osds);
 	list_del_init(&osd->o_osd_lru);
 	ceph_con_close(&osd->o_con);
@@ -1029,12 +1055,23 @@ static void remove_all_osds(struct ceph_osd_client *osdc)
 static void __move_osd_to_lru(struct ceph_osd_client *osdc,
 			      struct ceph_osd *osd)
 {
-	dout("__move_osd_to_lru %p\n", osd);
+	dout("%s %p\n", __func__, osd);
 	BUG_ON(!list_empty(&osd->o_osd_lru));
+
 	list_add_tail(&osd->o_osd_lru, &osdc->osd_lru);
 	osd->lru_ttl = jiffies + osdc->client->options->osd_idle_ttl * HZ;
 }
 
+static void maybe_move_osd_to_lru(struct ceph_osd_client *osdc,
+				  struct ceph_osd *osd)
+{
+	dout("%s %p\n", __func__, osd);
+
+	if (list_empty(&osd->o_requests) &&
+	    list_empty(&osd->o_linger_requests))
+		__move_osd_to_lru(osdc, osd);
+}
+
 static void __remove_osd_from_lru(struct ceph_osd *osd)
 {
 	dout("__remove_osd_from_lru %p\n", osd);
@@ -1175,6 +1212,7 @@ static void __unregister_request(struct ceph_osd_client *osdc,
 
 	dout("__unregister_request %p tid %lld\n", req, req->r_tid);
 	rb_erase(&req->r_node, &osdc->requests);
+	RB_CLEAR_NODE(&req->r_node);
 	osdc->num_requests--;
 
 	if (req->r_osd) {
@@ -1182,12 +1220,8 @@ static void __unregister_request(struct ceph_osd_client *osdc,
 		ceph_msg_revoke(req->r_request);
 
 		list_del_init(&req->r_osd_item);
-		if (list_empty(&req->r_osd->o_requests) &&
-		    list_empty(&req->r_osd->o_linger_requests)) {
-			dout("moving osd to %p lru\n", req->r_osd);
-			__move_osd_to_lru(osdc, req->r_osd);
-		}
-		if (list_empty(&req->r_linger_item))
+		maybe_move_osd_to_lru(osdc, req->r_osd);
+		if (list_empty(&req->r_linger_osd_item))
 			req->r_osd = NULL;
 	}
 
@@ -1214,45 +1248,39 @@ static void __cancel_request(struct ceph_osd_request *req)
 static void __register_linger_request(struct ceph_osd_client *osdc,
 				    struct ceph_osd_request *req)
 {
-	dout("__register_linger_request %p\n", req);
+	dout("%s %p tid %llu\n", __func__, req, req->r_tid);
+	WARN_ON(!req->r_linger);
+
 	ceph_osdc_get_request(req);
 	list_add_tail(&req->r_linger_item, &osdc->req_linger);
 	if (req->r_osd)
-		list_add_tail(&req->r_linger_osd,
+		list_add_tail(&req->r_linger_osd_item,
 			      &req->r_osd->o_linger_requests);
 }
 
 static void __unregister_linger_request(struct ceph_osd_client *osdc,
 					struct ceph_osd_request *req)
 {
-	dout("__unregister_linger_request %p\n", req);
+	WARN_ON(!req->r_linger);
+
+	if (list_empty(&req->r_linger_item)) {
+		dout("%s %p tid %llu not registered\n", __func__, req,
+		     req->r_tid);
+		return;
+	}
+
+	dout("%s %p tid %llu\n", __func__, req, req->r_tid);
 	list_del_init(&req->r_linger_item);
-	if (req->r_osd) {
-		list_del_init(&req->r_linger_osd);
 
-		if (list_empty(&req->r_osd->o_requests) &&
-		    list_empty(&req->r_osd->o_linger_requests)) {
-			dout("moving osd to %p lru\n", req->r_osd);
-			__move_osd_to_lru(osdc, req->r_osd);
-		}
+	if (req->r_osd) {
+		list_del_init(&req->r_linger_osd_item);
+		maybe_move_osd_to_lru(osdc, req->r_osd);
 		if (list_empty(&req->r_osd_item))
 			req->r_osd = NULL;
 	}
 	ceph_osdc_put_request(req);
 }
 
-void ceph_osdc_unregister_linger_request(struct ceph_osd_client *osdc,
-					 struct ceph_osd_request *req)
-{
-	mutex_lock(&osdc->request_mutex);
-	if (req->r_linger) {
-		req->r_linger = 0;
-		__unregister_linger_request(osdc, req);
-	}
-	mutex_unlock(&osdc->request_mutex);
-}
-EXPORT_SYMBOL(ceph_osdc_unregister_linger_request);
-
 void ceph_osdc_set_request_linger(struct ceph_osd_client *osdc,
 				  struct ceph_osd_request *req)
 {
@@ -2429,6 +2457,25 @@ int ceph_osdc_start_request(struct ceph_osd_client *osdc,
 }
 EXPORT_SYMBOL(ceph_osdc_start_request);
 
+/*
+ * Unregister a registered request.  The request is not completed (i.e.
+ * no callbacks or wakeups) - higher layers are supposed to know what
+ * they are canceling.
+ */
+void ceph_osdc_cancel_request(struct ceph_osd_request *req)
+{
+	struct ceph_osd_client *osdc = req->r_osdc;
+
+	mutex_lock(&osdc->request_mutex);
+	if (req->r_linger)
+		__unregister_linger_request(osdc, req);
+	__unregister_request(osdc, req);
+	mutex_unlock(&osdc->request_mutex);
+
+	dout("%s %p tid %llu canceled\n", __func__, req, req->r_tid);
+}
+EXPORT_SYMBOL(ceph_osdc_cancel_request);
+
 /*
  * wait for a request to complete
  */
@@ -2437,18 +2484,18 @@ int ceph_osdc_wait_request(struct ceph_osd_client *osdc,
 {
 	int rc;
 
+	dout("%s %p tid %llu\n", __func__, req, req->r_tid);
+
 	rc = wait_for_completion_interruptible(&req->r_completion);
 	if (rc < 0) {
-		mutex_lock(&osdc->request_mutex);
-		__cancel_request(req);
-		__unregister_request(osdc, req);
-		mutex_unlock(&osdc->request_mutex);
+		dout("%s %p tid %llu interrupted\n", __func__, req, req->r_tid);
+		ceph_osdc_cancel_request(req);
 		complete_request(req);
-		dout("wait_request tid %llu canceled/timed out\n", req->r_tid);
 		return rc;
 	}
 
-	dout("wait_request tid %llu result %d\n", req->r_tid, req->r_result);
+	dout("%s %p tid %llu result %d\n", __func__, req, req->r_tid,
+	     req->r_result);
 	return req->r_result;
 }
 EXPORT_SYMBOL(ceph_osdc_wait_request);