libceph, rbd: new bio handling code (aka don't clone bios)
authorIlya Dryomov <idryomov@gmail.com>
Sat, 20 Jan 2018 09:30:10 +0000 (10:30 +0100)
committerIlya Dryomov <idryomov@gmail.com>
Mon, 2 Apr 2018 08:12:38 +0000 (10:12 +0200)
The reason we clone bios is to be able to give each object request
(and consequently each ceph_osd_data/ceph_msg_data item) its own
pointer to a (list of) bio(s).  The messenger then initializes its
cursor with cloned bio's ->bi_iter, so it knows where to start reading
from/writing to.  That's all the cloned bios are used for: to determine
each object request's starting position in the provided data buffer.

Introduce ceph_bio_iter to do exactly that -- store position within bio
list (i.e. pointer to bio) + position within that bio (i.e. bvec_iter).

Signed-off-by: Ilya Dryomov <idryomov@gmail.com>
drivers/block/rbd.c
include/linux/ceph/messenger.h
include/linux/ceph/osd_client.h
net/ceph/messenger.c
net/ceph/osd_client.c

index 883f17d..8eaebf6 100644 (file)
@@ -218,7 +218,7 @@ typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *);
 
 enum obj_request_type {
        OBJ_REQUEST_NODATA = 1,
-       OBJ_REQUEST_BIO,
+       OBJ_REQUEST_BIO,        /* pointer into provided bio (list) */
        OBJ_REQUEST_PAGES,
 };
 
@@ -270,7 +270,7 @@ struct rbd_obj_request {
 
        enum obj_request_type   type;
        union {
-               struct bio      *bio_list;
+               struct ceph_bio_iter    bio_pos;
                struct {
                        struct page     **pages;
                        u32             page_count;
@@ -1255,6 +1255,27 @@ static u64 rbd_segment_length(struct rbd_device *rbd_dev,
        return length;
 }
 
+static void zero_bvec(struct bio_vec *bv)
+{
+       void *buf;
+       unsigned long flags;
+
+       buf = bvec_kmap_irq(bv, &flags);
+       memset(buf, 0, bv->bv_len);
+       flush_dcache_page(bv->bv_page);
+       bvec_kunmap_irq(buf, &flags);
+}
+
+static void zero_bios(struct ceph_bio_iter *bio_pos, u32 off, u32 bytes)
+{
+       struct ceph_bio_iter it = *bio_pos;
+
+       ceph_bio_iter_advance(&it, off);
+       ceph_bio_iter_advance_step(&it, bytes, ({
+               zero_bvec(&bv);
+       }));
+}
+
 /*
  * bio helpers
  */
@@ -1719,13 +1740,14 @@ rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request)
        rbd_assert(obj_request->type != OBJ_REQUEST_NODATA);
        if (obj_request->result == -ENOENT) {
                if (obj_request->type == OBJ_REQUEST_BIO)
-                       zero_bio_chain(obj_request->bio_list, 0);
+                       zero_bios(&obj_request->bio_pos, 0, length);
                else
                        zero_pages(obj_request->pages, 0, length);
                obj_request->result = 0;
        } else if (xferred < length && !obj_request->result) {
                if (obj_request->type == OBJ_REQUEST_BIO)
-                       zero_bio_chain(obj_request->bio_list, xferred);
+                       zero_bios(&obj_request->bio_pos, xferred,
+                                 length - xferred);
                else
                        zero_pages(obj_request->pages, xferred, length);
        }
@@ -2036,11 +2058,8 @@ static void rbd_obj_request_destroy(struct kref *kref)
        rbd_assert(obj_request_type_valid(obj_request->type));
        switch (obj_request->type) {
        case OBJ_REQUEST_NODATA:
-               break;          /* Nothing to do */
        case OBJ_REQUEST_BIO:
-               if (obj_request->bio_list)
-                       bio_chain_put(obj_request->bio_list);
-               break;
+               break;          /* Nothing to do */
        case OBJ_REQUEST_PAGES:
                /* img_data requests don't own their page array */
                if (obj_request->pages &&
@@ -2368,7 +2387,7 @@ static void rbd_img_obj_request_fill(struct rbd_obj_request *obj_request,
 
        if (obj_request->type == OBJ_REQUEST_BIO)
                osd_req_op_extent_osd_data_bio(osd_request, num_ops,
-                                       obj_request->bio_list, length);
+                                       &obj_request->bio_pos, length);
        else if (obj_request->type == OBJ_REQUEST_PAGES)
                osd_req_op_extent_osd_data_pages(osd_request, num_ops,
                                        obj_request->pages, length,
@@ -2396,8 +2415,7 @@ static int rbd_img_request_fill(struct rbd_img_request *img_request,
        struct rbd_device *rbd_dev = img_request->rbd_dev;
        struct rbd_obj_request *obj_request = NULL;
        struct rbd_obj_request *next_obj_request;
-       struct bio *bio_list = NULL;
-       unsigned int bio_offset = 0;
+       struct ceph_bio_iter bio_it;
        struct page **pages = NULL;
        enum obj_operation_type op_type;
        u64 img_offset;
@@ -2412,9 +2430,9 @@ static int rbd_img_request_fill(struct rbd_img_request *img_request,
        op_type = rbd_img_request_op_type(img_request);
 
        if (type == OBJ_REQUEST_BIO) {
-               bio_list = data_desc;
+               bio_it = *(struct ceph_bio_iter *)data_desc;
                rbd_assert(img_offset ==
-                          bio_list->bi_iter.bi_sector << SECTOR_SHIFT);
+                          bio_it.iter.bi_sector << SECTOR_SHIFT);
        } else if (type == OBJ_REQUEST_PAGES) {
                pages = data_desc;
        }
@@ -2440,17 +2458,8 @@ static int rbd_img_request_fill(struct rbd_img_request *img_request,
                rbd_img_obj_request_add(img_request, obj_request);
 
                if (type == OBJ_REQUEST_BIO) {
-                       unsigned int clone_size;
-
-                       rbd_assert(length <= (u64)UINT_MAX);
-                       clone_size = (unsigned int)length;
-                       obj_request->bio_list =
-                                       bio_chain_clone_range(&bio_list,
-                                                               &bio_offset,
-                                                               clone_size,
-                                                               GFP_NOIO);
-                       if (!obj_request->bio_list)
-                               goto out_unwind;
+                       obj_request->bio_pos = bio_it;
+                       ceph_bio_iter_advance(&bio_it, length);
                } else if (type == OBJ_REQUEST_PAGES) {
                        unsigned int page_count;
 
@@ -2980,7 +2989,7 @@ static void rbd_img_parent_read(struct rbd_obj_request *obj_request)
 
        if (obj_request->type == OBJ_REQUEST_BIO)
                result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
-                                               obj_request->bio_list);
+                                               &obj_request->bio_pos);
        else
                result = rbd_img_request_fill(img_request, OBJ_REQUEST_PAGES,
                                                obj_request->pages);
@@ -4093,9 +4102,13 @@ static void rbd_queue_workfn(struct work_struct *work)
        if (op_type == OBJ_OP_DISCARD)
                result = rbd_img_request_fill(img_request, OBJ_REQUEST_NODATA,
                                              NULL);
-       else
+       else {
+               struct ceph_bio_iter bio_it = { .bio = rq->bio,
+                                               .iter = rq->bio->bi_iter };
+
                result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
-                                             rq->bio);
+                                             &bio_it);
+       }
        if (result)
                goto err_img_request;
 
index ead9d85..d7b9605 100644 (file)
@@ -93,14 +93,60 @@ static __inline__ bool ceph_msg_data_type_valid(enum ceph_msg_data_type type)
        }
 }
 
+#ifdef CONFIG_BLOCK
+
+struct ceph_bio_iter {
+       struct bio *bio;
+       struct bvec_iter iter;
+};
+
+#define __ceph_bio_iter_advance_step(it, n, STEP) do {                       \
+       unsigned int __n = (n), __cur_n;                                      \
+                                                                             \
+       while (__n) {                                                         \
+               BUG_ON(!(it)->iter.bi_size);                                  \
+               __cur_n = min((it)->iter.bi_size, __n);                       \
+               (void)(STEP);                                                 \
+               bio_advance_iter((it)->bio, &(it)->iter, __cur_n);            \
+               if (!(it)->iter.bi_size && (it)->bio->bi_next) {              \
+                       dout("__ceph_bio_iter_advance_step next bio\n");      \
+                       (it)->bio = (it)->bio->bi_next;                       \
+                       (it)->iter = (it)->bio->bi_iter;                      \
+               }                                                             \
+               __n -= __cur_n;                                               \
+       }                                                                     \
+} while (0)
+
+/*
+ * Advance @it by @n bytes.
+ */
+#define ceph_bio_iter_advance(it, n)                                         \
+       __ceph_bio_iter_advance_step(it, n, 0)
+
+/*
+ * Advance @it by @n bytes, executing BVEC_STEP for each bio_vec.
+ */
+#define ceph_bio_iter_advance_step(it, n, BVEC_STEP)                         \
+       __ceph_bio_iter_advance_step(it, n, ({                                \
+               struct bio_vec bv;                                            \
+               struct bvec_iter __cur_iter;                                  \
+                                                                             \
+               __cur_iter = (it)->iter;                                      \
+               __cur_iter.bi_size = __cur_n;                                 \
+               __bio_for_each_segment(bv, (it)->bio, __cur_iter, __cur_iter) \
+                       (void)(BVEC_STEP);                                    \
+       }))
+
+#endif /* CONFIG_BLOCK */
+
 struct ceph_msg_data {
        struct list_head                links;  /* ceph_msg->data */
        enum ceph_msg_data_type         type;
        union {
 #ifdef CONFIG_BLOCK
                struct {
-                       struct bio      *bio;
-                       size_t          bio_length;
+                       struct ceph_bio_iter    bio_pos;
+                       u32                     bio_length;
                };
 #endif /* CONFIG_BLOCK */
                struct {
@@ -122,10 +168,7 @@ struct ceph_msg_data_cursor {
        bool                    need_crc;       /* crc update needed */
        union {
 #ifdef CONFIG_BLOCK
-               struct {                                /* bio */
-                       struct bio      *bio;           /* bio from list */
-                       struct bvec_iter bvec_iter;
-               };
+               struct ceph_bio_iter    bio_iter;
 #endif /* CONFIG_BLOCK */
                struct {                                /* pages */
                        unsigned int    page_offset;    /* offset in page */
@@ -290,8 +333,8 @@ extern void ceph_msg_data_add_pages(struct ceph_msg *msg, struct page **pages,
 extern void ceph_msg_data_add_pagelist(struct ceph_msg *msg,
                                struct ceph_pagelist *pagelist);
 #ifdef CONFIG_BLOCK
-extern void ceph_msg_data_add_bio(struct ceph_msg *msg, struct bio *bio,
-                               size_t length);
+void ceph_msg_data_add_bio(struct ceph_msg *msg, struct ceph_bio_iter *bio_pos,
+                          u32 length);
 #endif /* CONFIG_BLOCK */
 
 extern struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags,
index 52fb37d..3156914 100644 (file)
@@ -72,8 +72,8 @@ struct ceph_osd_data {
                struct ceph_pagelist    *pagelist;
 #ifdef CONFIG_BLOCK
                struct {
-                       struct bio      *bio;           /* list of bios */
-                       size_t          bio_length;     /* total in list */
+                       struct ceph_bio_iter    bio_pos;
+                       u32                     bio_length;
                };
 #endif /* CONFIG_BLOCK */
        };
@@ -405,9 +405,10 @@ extern void osd_req_op_extent_osd_data_pagelist(struct ceph_osd_request *,
                                        unsigned int which,
                                        struct ceph_pagelist *pagelist);
 #ifdef CONFIG_BLOCK
-extern void osd_req_op_extent_osd_data_bio(struct ceph_osd_request *,
-                                       unsigned int which,
-                                       struct bio *bio, size_t bio_length);
+void osd_req_op_extent_osd_data_bio(struct ceph_osd_request *osd_req,
+                                   unsigned int which,
+                                   struct ceph_bio_iter *bio_pos,
+                                   u32 bio_length);
 #endif /* CONFIG_BLOCK */
 
 extern void osd_req_op_cls_request_data_pagelist(struct ceph_osd_request *,
index 8a4d375..b9fa8b8 100644 (file)
@@ -839,90 +839,57 @@ static void ceph_msg_data_bio_cursor_init(struct ceph_msg_data_cursor *cursor,
                                        size_t length)
 {
        struct ceph_msg_data *data = cursor->data;
-       struct bio *bio;
+       struct ceph_bio_iter *it = &cursor->bio_iter;
 
-       BUG_ON(data->type != CEPH_MSG_DATA_BIO);
+       cursor->resid = min_t(size_t, length, data->bio_length);
+       *it = data->bio_pos;
+       if (cursor->resid < it->iter.bi_size)
+               it->iter.bi_size = cursor->resid;
 
-       bio = data->bio;
-       BUG_ON(!bio);
-
-       cursor->resid = min(length, data->bio_length);
-       cursor->bio = bio;
-       cursor->bvec_iter = bio->bi_iter;
-       cursor->last_piece =
-               cursor->resid <= bio_iter_len(bio, cursor->bvec_iter);
+       BUG_ON(cursor->resid < bio_iter_len(it->bio, it->iter));
+       cursor->last_piece = cursor->resid == bio_iter_len(it->bio, it->iter);
 }
 
 static struct page *ceph_msg_data_bio_next(struct ceph_msg_data_cursor *cursor,
                                                size_t *page_offset,
                                                size_t *length)
 {
-       struct ceph_msg_data *data = cursor->data;
-       struct bio *bio;
-       struct bio_vec bio_vec;
-
-       BUG_ON(data->type != CEPH_MSG_DATA_BIO);
-
-       bio = cursor->bio;
-       BUG_ON(!bio);
+       struct bio_vec bv = bio_iter_iovec(cursor->bio_iter.bio,
+                                          cursor->bio_iter.iter);
 
-       bio_vec = bio_iter_iovec(bio, cursor->bvec_iter);
-
-       *page_offset = (size_t) bio_vec.bv_offset;
-       BUG_ON(*page_offset >= PAGE_SIZE);
-       if (cursor->last_piece) /* pagelist offset is always 0 */
-               *length = cursor->resid;
-       else
-               *length = (size_t) bio_vec.bv_len;
-       BUG_ON(*length > cursor->resid);
-       BUG_ON(*page_offset + *length > PAGE_SIZE);
-
-       return bio_vec.bv_page;
+       *page_offset = bv.bv_offset;
+       *length = bv.bv_len;
+       return bv.bv_page;
 }
 
 static bool ceph_msg_data_bio_advance(struct ceph_msg_data_cursor *cursor,
                                        size_t bytes)
 {
-       struct bio *bio;
-       struct bio_vec bio_vec;
-
-       BUG_ON(cursor->data->type != CEPH_MSG_DATA_BIO);
-
-       bio = cursor->bio;
-       BUG_ON(!bio);
-
-       bio_vec = bio_iter_iovec(bio, cursor->bvec_iter);
+       struct ceph_bio_iter *it = &cursor->bio_iter;
 
-       /* Advance the cursor offset */
-
-       BUG_ON(cursor->resid < bytes);
+       BUG_ON(bytes > cursor->resid);
+       BUG_ON(bytes > bio_iter_len(it->bio, it->iter));
        cursor->resid -= bytes;
+       bio_advance_iter(it->bio, &it->iter, bytes);
 
-       bio_advance_iter(bio, &cursor->bvec_iter, bytes);
+       if (!cursor->resid) {
+               BUG_ON(!cursor->last_piece);
+               return false;   /* no more data */
+       }
 
-       if (bytes < bio_vec.bv_len)
+       if (!bytes || (it->iter.bi_size && it->iter.bi_bvec_done))
                return false;   /* more bytes to process in this segment */
 
-       /* Move on to the next segment, and possibly the next bio */
-
-       if (!cursor->bvec_iter.bi_size) {
-               bio = bio->bi_next;
-               cursor->bio = bio;
-               if (bio)
-                       cursor->bvec_iter = bio->bi_iter;
-               else
-                       memset(&cursor->bvec_iter, 0,
-                              sizeof(cursor->bvec_iter));
-       }
-
-       if (!cursor->last_piece) {
-               BUG_ON(!cursor->resid);
-               BUG_ON(!bio);
-               /* A short read is OK, so use <= rather than == */
-               if (cursor->resid <= bio_iter_len(bio, cursor->bvec_iter))
-                       cursor->last_piece = true;
+       if (!it->iter.bi_size) {
+               it->bio = it->bio->bi_next;
+               it->iter = it->bio->bi_iter;
+               if (cursor->resid < it->iter.bi_size)
+                       it->iter.bi_size = cursor->resid;
        }
 
+       BUG_ON(cursor->last_piece);
+       BUG_ON(cursor->resid < bio_iter_len(it->bio, it->iter));
+       cursor->last_piece = cursor->resid == bio_iter_len(it->bio, it->iter);
        return true;
 }
 #endif /* CONFIG_BLOCK */
@@ -1163,9 +1130,11 @@ static struct page *ceph_msg_data_next(struct ceph_msg_data_cursor *cursor,
                page = NULL;
                break;
        }
+
        BUG_ON(!page);
        BUG_ON(*page_offset + *length > PAGE_SIZE);
        BUG_ON(!*length);
+       BUG_ON(*length > cursor->resid);
        if (last_piece)
                *last_piece = cursor->last_piece;
 
@@ -3262,16 +3231,14 @@ void ceph_msg_data_add_pagelist(struct ceph_msg *msg,
 EXPORT_SYMBOL(ceph_msg_data_add_pagelist);
 
 #ifdef CONFIG_BLOCK
-void ceph_msg_data_add_bio(struct ceph_msg *msg, struct bio *bio,
-               size_t length)
+void ceph_msg_data_add_bio(struct ceph_msg *msg, struct ceph_bio_iter *bio_pos,
+                          u32 length)
 {
        struct ceph_msg_data *data;
 
-       BUG_ON(!bio);
-
        data = ceph_msg_data_create(CEPH_MSG_DATA_BIO);
        BUG_ON(!data);
-       data->bio = bio;
+       data->bio_pos = *bio_pos;
        data->bio_length = length;
 
        list_add_tail(&data->links, &msg->data);
index 4b04854..339d877 100644 (file)
@@ -146,10 +146,11 @@ static void ceph_osd_data_pagelist_init(struct ceph_osd_data *osd_data,
 
 #ifdef CONFIG_BLOCK
 static void ceph_osd_data_bio_init(struct ceph_osd_data *osd_data,
-                       struct bio *bio, size_t bio_length)
+                                  struct ceph_bio_iter *bio_pos,
+                                  u32 bio_length)
 {
        osd_data->type = CEPH_OSD_DATA_TYPE_BIO;
-       osd_data->bio = bio;
+       osd_data->bio_pos = *bio_pos;
        osd_data->bio_length = bio_length;
 }
 #endif /* CONFIG_BLOCK */
@@ -216,12 +217,14 @@ EXPORT_SYMBOL(osd_req_op_extent_osd_data_pagelist);
 
 #ifdef CONFIG_BLOCK
 void osd_req_op_extent_osd_data_bio(struct ceph_osd_request *osd_req,
-                       unsigned int which, struct bio *bio, size_t bio_length)
+                                   unsigned int which,
+                                   struct ceph_bio_iter *bio_pos,
+                                   u32 bio_length)
 {
        struct ceph_osd_data *osd_data;
 
        osd_data = osd_req_op_data(osd_req, which, extent, osd_data);
-       ceph_osd_data_bio_init(osd_data, bio, bio_length);
+       ceph_osd_data_bio_init(osd_data, bio_pos, bio_length);
 }
 EXPORT_SYMBOL(osd_req_op_extent_osd_data_bio);
 #endif /* CONFIG_BLOCK */
@@ -826,7 +829,7 @@ static void ceph_osdc_msg_data_add(struct ceph_msg *msg,
                ceph_msg_data_add_pagelist(msg, osd_data->pagelist);
 #ifdef CONFIG_BLOCK
        } else if (osd_data->type == CEPH_OSD_DATA_TYPE_BIO) {
-               ceph_msg_data_add_bio(msg, osd_data->bio, length);
+               ceph_msg_data_add_bio(msg, &osd_data->bio_pos, length);
 #endif
        } else {
                BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_NONE);