rbd: implement full object parent reads
authorAlex Elder <elder@inktank.com>
Fri, 19 Apr 2013 20:34:50 +0000 (15:34 -0500)
committerSage Weil <sage@inktank.com>
Thu, 2 May 2013 04:19:13 +0000 (21:19 -0700)
As a step toward implementing layered writes, implement reading the
data for a target object from the parent image for a write request
whose target object is known to not exist.  Add a copyup_pages field
to an image request to track the page array used (only) for such a
request.

Signed-off-by: Alex Elder <elder@inktank.com>
Reviewed-by: Josh Durgin <josh.durgin@inktank.com>
drivers/block/rbd.c

index b2819de..639dd91 100644 (file)
@@ -250,6 +250,7 @@ struct rbd_img_request {
                struct request          *rq;            /* block request */
                struct rbd_obj_request  *obj_request;   /* obj req initiator */
        };
+       struct page             **copyup_pages;
        spinlock_t              completion_lock;/* protects next_completion */
        u32                     next_completion;
        rbd_img_callback_t      callback;
@@ -350,6 +351,8 @@ static DEFINE_SPINLOCK(rbd_dev_list_lock);
 static LIST_HEAD(rbd_client_list);             /* clients */
 static DEFINE_SPINLOCK(rbd_client_list_lock);
 
+static int rbd_img_request_submit(struct rbd_img_request *img_request);
+
 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev);
 static int rbd_dev_snaps_register(struct rbd_device *rbd_dev);
 
@@ -1956,6 +1959,133 @@ out_unwind:
        return -ENOMEM;
 }
 
+static void
+rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request)
+{
+       struct rbd_obj_request *orig_request;
+       struct page **pages;
+       u32 page_count;
+       int result;
+       u64 obj_size;
+       u64 xferred;
+
+       rbd_assert(img_request_child_test(img_request));
+
+       /* First get what we need from the image request */
+
+       pages = img_request->copyup_pages;
+       rbd_assert(pages != NULL);
+       img_request->copyup_pages = NULL;
+
+       orig_request = img_request->obj_request;
+       rbd_assert(orig_request != NULL);
+
+       result = img_request->result;
+       obj_size = img_request->length;
+       xferred = img_request->xferred;
+
+       rbd_img_request_put(img_request);
+
+       obj_request_existence_set(orig_request, true);
+
+       page_count = (u32)calc_pages_for(0, obj_size);
+       ceph_release_page_vector(pages, page_count);
+
+       /* Resubmit the original request (for now). */
+
+       orig_request->result = rbd_img_obj_request_submit(orig_request);
+       if (orig_request->result) {
+               obj_request_done_set(orig_request);
+               rbd_obj_request_complete(orig_request);
+       }
+}
+
+/*
+ * Read from the parent image the range of data that covers the
+ * entire target of the given object request.  This is used for
+ * satisfying a layered image write request when the target of an
+ * object request from the image request does not exist.
+ *
+ * A page array big enough to hold the returned data is allocated
+ * and supplied to rbd_img_request_fill() as the "data descriptor."
+ * When the read completes, this page array will be transferred to
+ * the original object request for the copyup operation.
+ *
+ * If an error occurs, record it as the result of the original
+ * object request and mark it done so it gets completed.
+ */
+static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request)
+{
+       struct rbd_img_request *img_request = NULL;
+       struct rbd_img_request *parent_request = NULL;
+       struct rbd_device *rbd_dev;
+       u64 img_offset;
+       u64 length;
+       struct page **pages = NULL;
+       u32 page_count;
+       int result;
+
+       rbd_assert(obj_request_img_data_test(obj_request));
+       rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
+
+       img_request = obj_request->img_request;
+       rbd_assert(img_request != NULL);
+       rbd_dev = img_request->rbd_dev;
+       rbd_assert(rbd_dev->parent != NULL);
+
+       /*
+        * Determine the byte range covered by the object in the
+        * child image to which the original request was to be sent.
+        */
+       img_offset = obj_request->img_offset - obj_request->offset;
+       length = (u64)1 << rbd_dev->header.obj_order;
+
+       /*
+        * Allocate a page array big enough to receive the data read
+        * from the parent.
+        */
+       page_count = (u32)calc_pages_for(0, length);
+       pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
+       if (IS_ERR(pages)) {
+               result = PTR_ERR(pages);
+               pages = NULL;
+               goto out_err;
+       }
+
+       result = -ENOMEM;
+       parent_request = rbd_img_request_create(rbd_dev->parent,
+                                               img_offset, length,
+                                               false, true);
+       if (!parent_request)
+               goto out_err;
+       rbd_obj_request_get(obj_request);
+       parent_request->obj_request = obj_request;
+
+       result = rbd_img_request_fill(parent_request, OBJ_REQUEST_PAGES, pages);
+       if (result)
+               goto out_err;
+       parent_request->copyup_pages = pages;
+
+       parent_request->callback = rbd_img_obj_parent_read_full_callback;
+       result = rbd_img_request_submit(parent_request);
+       if (!result)
+               return 0;
+
+       parent_request->copyup_pages = NULL;
+       parent_request->obj_request = NULL;
+       rbd_obj_request_put(obj_request);
+out_err:
+       if (pages)
+               ceph_release_page_vector(pages, page_count);
+       if (parent_request)
+               rbd_img_request_put(parent_request);
+       obj_request->result = result;
+       obj_request->xferred = 0;
+       obj_request_done_set(obj_request);
+
+       return result;
+}
+
 static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request)
 {
        struct rbd_obj_request *orig_request;
@@ -1996,7 +2126,7 @@ static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request)
                obj_request_existence_set(orig_request, false);
        } else if (result) {
                orig_request->result = result;
-               goto out_err;
+               goto out;
        }
 
        /*
@@ -2004,7 +2134,7 @@ static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request)
         * whether the target object exists.
         */
        orig_request->result = rbd_img_obj_request_submit(orig_request);
-out_err:
+out:
        if (orig_request->result)
                rbd_obj_request_complete(orig_request);
        rbd_obj_request_put(orig_request);
@@ -2070,15 +2200,13 @@ out:
 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request)
 {
        struct rbd_img_request *img_request;
+       bool known;
 
        rbd_assert(obj_request_img_data_test(obj_request));
 
        img_request = obj_request->img_request;
        rbd_assert(img_request);
 
-       /* (At the moment we don't care whether it exists or not...) */
-       (void) obj_request_exists_test;
-
        /*
         * Only layered writes need special handling.  If it's not a
         * layered write, or it is a layered write but we know the
@@ -2087,7 +2215,8 @@ static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request)
         */
        if (!img_request_write_test(img_request) ||
                !img_request_layered_test(img_request) ||
-               obj_request_known_test(obj_request)) {
+               ((known = obj_request_known_test(obj_request)) &&
+                       obj_request_exists_test(obj_request))) {
 
                struct rbd_device *rbd_dev;
                struct ceph_osd_client *osdc;
@@ -2099,10 +2228,15 @@ static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request)
        }
 
        /*
-        * It's a layered write and we don't know whether the target
-        * exists.  Issue existence check; once that completes the
-        * original request will be submitted again.
+        * It's a layered write.  The target object might exist but
+        * we may not know that yet.  If we know it doesn't exist,
+        * start by reading the data for the full target object from
+        * the parent so we can use it for a copyup to the target.
         */
+       if (known)
+               return rbd_img_obj_parent_read_full(obj_request);
+
+       /* We don't know whether the target exists.  Go find out. */
 
        return rbd_img_obj_exists_submit(obj_request);
 }