drbd: Allow drbd_epoch_entries to use multiple bios.
authorLars Ellenberg <lars.ellenberg@linbit.com>
Fri, 14 May 2010 15:10:48 +0000 (17:10 +0200)
committerPhilipp Reisner <philipp.reisner@linbit.com>
Tue, 18 May 2010 00:01:23 +0000 (02:01 +0200)
This should allow for better performance if the lower level IO stack
of the peers differs in limits exposed either via the queue,
or via some merge_bvec_fn.

Signed-off-by: Philipp Reisner <philipp.reisner@linbit.com>
Signed-off-by: Lars Ellenberg <lars.ellenberg@linbit.com>
drivers/block/drbd/drbd_int.h
drivers/block/drbd/drbd_main.c
drivers/block/drbd/drbd_nl.c
drivers/block/drbd/drbd_receiver.c
drivers/block/drbd/drbd_worker.c
drivers/block/drbd/drbd_wrappers.h

index 1bc86dd..4b97f30 100644 (file)
@@ -740,18 +740,6 @@ enum epoch_event {
        EV_CLEANUP = 32, /* used as flag */
 };
 
-struct drbd_epoch_entry {
-       struct drbd_work    w;
-       struct drbd_conf *mdev;
-       struct bio *private_bio;
-       struct hlist_node colision;
-       sector_t sector;
-       unsigned int size;
-       unsigned int flags;
-       struct drbd_epoch *epoch;
-       u64    block_id;
-};
-
 struct drbd_wq_barrier {
        struct drbd_work w;
        struct completion done;
@@ -762,17 +750,49 @@ struct digest_info {
        void *digest;
 };
 
-/* ee flag bits */
+struct drbd_epoch_entry {
+       struct drbd_work w;
+       struct hlist_node colision;
+       struct drbd_epoch *epoch;
+       struct drbd_conf *mdev;
+       struct page *pages;
+       atomic_t pending_bios;
+       unsigned int size;
+       /* see comments on ee flag bits below */
+       unsigned long flags;
+       sector_t sector;
+       u64 block_id;
+};
+
+/* ee flag bits.
+ * While corresponding bios are in flight, the only modification will be
+ * set_bit WAS_ERROR, which has to be atomic.
+ * If no bios are in flight yet, or all have been completed,
+ * non-atomic modification to ee->flags is ok.
+ */
 enum {
        __EE_CALL_AL_COMPLETE_IO,
-       __EE_CONFLICT_PENDING,
        __EE_MAY_SET_IN_SYNC,
+
+       /* This epoch entry closes an epoch using a barrier.
+        * On sucessful completion, the epoch is released,
+        * and the P_BARRIER_ACK send. */
        __EE_IS_BARRIER,
+
+       /* In case a barrier failed,
+        * we need to resubmit without the barrier flag. */
+       __EE_RESUBMITTED,
+
+       /* we may have several bios per epoch entry.
+        * if any of those fail, we set this flag atomically
+        * from the endio callback */
+       __EE_WAS_ERROR,
 };
 #define EE_CALL_AL_COMPLETE_IO (1<<__EE_CALL_AL_COMPLETE_IO)
-#define EE_CONFLICT_PENDING    (1<<__EE_CONFLICT_PENDING)
 #define EE_MAY_SET_IN_SYNC     (1<<__EE_MAY_SET_IN_SYNC)
 #define EE_IS_BARRIER          (1<<__EE_IS_BARRIER)
+#define        EE_RESUBMITTED         (1<<__EE_RESUBMITTED)
+#define EE_WAS_ERROR           (1<<__EE_WAS_ERROR)
 
 /* global flag bits */
 enum {
@@ -1441,7 +1461,8 @@ static inline void ov_oos_print(struct drbd_conf *mdev)
 }
 
 
-extern void drbd_csum(struct drbd_conf *, struct crypto_hash *, struct bio *, void *);
+extern void drbd_csum_bio(struct drbd_conf *, struct crypto_hash *, struct bio *, void *);
+extern void drbd_csum_ee(struct drbd_conf *, struct crypto_hash *, struct drbd_epoch_entry *, void *);
 /* worker callbacks */
 extern int w_req_cancel_conflict(struct drbd_conf *, struct drbd_work *, int);
 extern int w_read_retry_remote(struct drbd_conf *, struct drbd_work *, int);
@@ -1465,6 +1486,8 @@ extern int w_e_reissue(struct drbd_conf *, struct drbd_work *, int);
 extern void resync_timer_fn(unsigned long data);
 
 /* drbd_receiver.c */
+extern int drbd_submit_ee(struct drbd_conf *mdev, struct drbd_epoch_entry *e,
+               const unsigned rw, const int fault_type);
 extern int drbd_release_ee(struct drbd_conf *mdev, struct list_head *list);
 extern struct drbd_epoch_entry *drbd_alloc_ee(struct drbd_conf *mdev,
                                            u64 id,
@@ -1620,6 +1643,41 @@ void drbd_bcast_ee(struct drbd_conf *mdev,
  * inline helper functions
  *************************/
 
+/* see also page_chain_add and friends in drbd_receiver.c */
+static inline struct page *page_chain_next(struct page *page)
+{
+       return (struct page *)page_private(page);
+}
+#define page_chain_for_each(page) \
+       for (; page && ({ prefetch(page_chain_next(page)); 1; }); \
+                       page = page_chain_next(page))
+#define page_chain_for_each_safe(page, n) \
+       for (; page && ({ n = page_chain_next(page); 1; }); page = n)
+
+static inline int drbd_bio_has_active_page(struct bio *bio)
+{
+       struct bio_vec *bvec;
+       int i;
+
+       __bio_for_each_segment(bvec, bio, i, 0) {
+               if (page_count(bvec->bv_page) > 1)
+                       return 1;
+       }
+
+       return 0;
+}
+
+static inline int drbd_ee_has_active_page(struct drbd_epoch_entry *e)
+{
+       struct page *page = e->pages;
+       page_chain_for_each(page) {
+               if (page_count(page) > 1)
+                       return 1;
+       }
+       return 0;
+}
+
+
 static inline void drbd_state_lock(struct drbd_conf *mdev)
 {
        wait_event(mdev->misc_wait,
index 3aa0add..d0fabac 100644 (file)
@@ -2354,6 +2354,19 @@ static int _drbd_send_zc_bio(struct drbd_conf *mdev, struct bio *bio)
        return 1;
 }
 
+static int _drbd_send_zc_ee(struct drbd_conf *mdev, struct drbd_epoch_entry *e)
+{
+       struct page *page = e->pages;
+       unsigned len = e->size;
+       page_chain_for_each(page) {
+               unsigned l = min_t(unsigned, len, PAGE_SIZE);
+               if (!_drbd_send_page(mdev, page, 0, l))
+                       return 0;
+               len -= l;
+       }
+       return 1;
+}
+
 static void consider_delay_probes(struct drbd_conf *mdev)
 {
        if (mdev->state.conn != C_SYNC_SOURCE || mdev->agreed_pro_version < 93)
@@ -2430,7 +2443,7 @@ int drbd_send_dblock(struct drbd_conf *mdev, struct drbd_request *req)
                drbd_send(mdev, mdev->data.socket, &p, sizeof(p), MSG_MORE));
        if (ok && dgs) {
                dgb = mdev->int_dig_out;
-               drbd_csum(mdev, mdev->integrity_w_tfm, req->master_bio, dgb);
+               drbd_csum_bio(mdev, mdev->integrity_w_tfm, req->master_bio, dgb);
                ok = drbd_send(mdev, mdev->data.socket, dgb, dgs, MSG_MORE);
        }
        if (ok) {
@@ -2483,11 +2496,11 @@ int drbd_send_block(struct drbd_conf *mdev, enum drbd_packets cmd,
                                        sizeof(p), MSG_MORE);
        if (ok && dgs) {
                dgb = mdev->int_dig_out;
-               drbd_csum(mdev, mdev->integrity_w_tfm, e->private_bio, dgb);
+               drbd_csum_ee(mdev, mdev->integrity_w_tfm, e, dgb);
                ok = drbd_send(mdev, mdev->data.socket, dgb, dgs, MSG_MORE);
        }
        if (ok)
-               ok = _drbd_send_zc_bio(mdev, e->private_bio);
+               ok = _drbd_send_zc_ee(mdev, e);
 
        drbd_put_data_sock(mdev);
 
index 93d1506..28ef76b 100644 (file)
@@ -2215,9 +2215,9 @@ void drbd_bcast_ee(struct drbd_conf *mdev,
 {
        struct cn_msg *cn_reply;
        struct drbd_nl_cfg_reply *reply;
-       struct bio_vec *bvec;
        unsigned short *tl;
-       int i;
+       struct page *page;
+       unsigned len;
 
        if (!e)
                return;
@@ -2255,11 +2255,15 @@ void drbd_bcast_ee(struct drbd_conf *mdev,
        put_unaligned(T_ee_data, tl++);
        put_unaligned(e->size, tl++);
 
-       __bio_for_each_segment(bvec, e->private_bio, i, 0) {
-               void *d = kmap(bvec->bv_page);
-               memcpy(tl, d + bvec->bv_offset, bvec->bv_len);
-               kunmap(bvec->bv_page);
-               tl=(unsigned short*)((char*)tl + bvec->bv_len);
+       len = e->size;
+       page = e->pages;
+       page_chain_for_each(page) {
+               void *d = kmap_atomic(page, KM_USER0);
+               unsigned l = min_t(unsigned, len, PAGE_SIZE);
+               memcpy(tl, d, l);
+               kunmap_atomic(d, KM_USER0);
+               tl = (unsigned short*)((char*)tl + l);
+               len -= l;
        }
        put_unaligned(TT_END, tl++); /* Close the tag list */
 
index fee0d24..388a3e8 100644 (file)
@@ -80,30 +80,124 @@ static struct drbd_epoch *previous_epoch(struct drbd_conf *mdev, struct drbd_epo
 
 #define GFP_TRY        (__GFP_HIGHMEM | __GFP_NOWARN)
 
-static struct page *drbd_pp_first_page_or_try_alloc(struct drbd_conf *mdev)
+/*
+ * some helper functions to deal with single linked page lists,
+ * page->private being our "next" pointer.
+ */
+
+/* If at least n pages are linked at head, get n pages off.
+ * Otherwise, don't modify head, and return NULL.
+ * Locking is the responsibility of the caller.
+ */
+static struct page *page_chain_del(struct page **head, int n)
+{
+       struct page *page;
+       struct page *tmp;
+
+       BUG_ON(!n);
+       BUG_ON(!head);
+
+       page = *head;
+       while (page) {
+               tmp = page_chain_next(page);
+               if (--n == 0)
+                       break; /* found sufficient pages */
+               if (tmp == NULL)
+                       /* insufficient pages, don't use any of them. */
+                       return NULL;
+               page = tmp;
+       }
+
+       /* add end of list marker for the returned list */
+       set_page_private(page, 0);
+       /* actual return value, and adjustment of head */
+       page = *head;
+       *head = tmp;
+       return page;
+}
+
+/* may be used outside of locks to find the tail of a (usually short)
+ * "private" page chain, before adding it back to a global chain head
+ * with page_chain_add() under a spinlock. */
+static struct page *page_chain_tail(struct page *page, int *len)
+{
+       struct page *tmp;
+       int i = 1;
+       while ((tmp = page_chain_next(page)))
+               ++i, page = tmp;
+       if (len)
+               *len = i;
+       return page;
+}
+
+static int page_chain_free(struct page *page)
+{
+       struct page *tmp;
+       int i = 0;
+       page_chain_for_each_safe(page, tmp) {
+               put_page(page);
+               ++i;
+       }
+       return i;
+}
+
+static void page_chain_add(struct page **head,
+               struct page *chain_first, struct page *chain_last)
+{
+#if 1
+       struct page *tmp;
+       tmp = page_chain_tail(chain_first, NULL);
+       BUG_ON(tmp != chain_last);
+#endif
+
+       /* add chain to head */
+       set_page_private(chain_last, (unsigned long)*head);
+       *head = chain_first;
+}
+
+static struct page *drbd_pp_first_pages_or_try_alloc(struct drbd_conf *mdev, int number)
 {
        struct page *page = NULL;
+       struct page *tmp = NULL;
+       int i = 0;
 
        /* Yes, testing drbd_pp_vacant outside the lock is racy.
         * So what. It saves a spin_lock. */
-       if (drbd_pp_vacant > 0) {
+       if (drbd_pp_vacant >= number) {
                spin_lock(&drbd_pp_lock);
-               page = drbd_pp_pool;
-               if (page) {
-                       drbd_pp_pool = (struct page *)page_private(page);
-                       set_page_private(page, 0); /* just to be polite */
-                       drbd_pp_vacant--;
-               }
+               page = page_chain_del(&drbd_pp_pool, number);
+               if (page)
+                       drbd_pp_vacant -= number;
                spin_unlock(&drbd_pp_lock);
+               if (page)
+                       return page;
        }
+
        /* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
         * "criss-cross" setup, that might cause write-out on some other DRBD,
         * which in turn might block on the other node at this very place.  */
-       if (!page)
-               page = alloc_page(GFP_TRY);
-       if (page)
-               atomic_inc(&mdev->pp_in_use);
-       return page;
+       for (i = 0; i < number; i++) {
+               tmp = alloc_page(GFP_TRY);
+               if (!tmp)
+                       break;
+               set_page_private(tmp, (unsigned long)page);
+               page = tmp;
+       }
+
+       if (i == number)
+               return page;
+
+       /* Not enough pages immediately available this time.
+        * No need to jump around here, drbd_pp_alloc will retry this
+        * function "soon". */
+       if (page) {
+               tmp = page_chain_tail(page, NULL);
+               spin_lock(&drbd_pp_lock);
+               page_chain_add(&drbd_pp_pool, page, tmp);
+               drbd_pp_vacant += i;
+               spin_unlock(&drbd_pp_lock);
+       }
+       return NULL;
 }
 
 /* kick lower level device, if we have more than (arbitrary number)
@@ -127,7 +221,7 @@ static void reclaim_net_ee(struct drbd_conf *mdev, struct list_head *to_be_freed
 
        list_for_each_safe(le, tle, &mdev->net_ee) {
                e = list_entry(le, struct drbd_epoch_entry, w.list);
-               if (drbd_bio_has_active_page(e->private_bio))
+               if (drbd_ee_has_active_page(e))
                        break;
                list_move(le, to_be_freed);
        }
@@ -148,32 +242,34 @@ static void drbd_kick_lo_and_reclaim_net(struct drbd_conf *mdev)
 }
 
 /**
- * drbd_pp_alloc() - Returns a page, fails only if a signal comes in
+ * drbd_pp_alloc() - Returns @number pages, retries forever (or until signalled)
  * @mdev:      DRBD device.
- * @retry:     whether or not to retry allocation forever (or until signalled)
+ * @number:    number of pages requested
+ * @retry:     whether to retry, if not enough pages are available right now
+ *
+ * Tries to allocate number pages, first from our own page pool, then from
+ * the kernel, unless this allocation would exceed the max_buffers setting.
+ * Possibly retry until DRBD frees sufficient pages somewhere else.
  *
- * Tries to allocate a page, first from our own page pool, then from the
- * kernel, unless this allocation would exceed the max_buffers setting.
- * If @retry is non-zero, retry until DRBD frees a page somewhere else.
+ * Returns a page chain linked via page->private.
  */
-static struct page *drbd_pp_alloc(struct drbd_conf *mdev, int retry)
+static struct page *drbd_pp_alloc(struct drbd_conf *mdev, unsigned number, bool retry)
 {
        struct page *page = NULL;
        DEFINE_WAIT(wait);
 
-       if (atomic_read(&mdev->pp_in_use) < mdev->net_conf->max_buffers) {
-               page = drbd_pp_first_page_or_try_alloc(mdev);
-               if (page)
-                       return page;
-       }
+       /* Yes, we may run up to @number over max_buffers. If we
+        * follow it strictly, the admin will get it wrong anyways. */
+       if (atomic_read(&mdev->pp_in_use) < mdev->net_conf->max_buffers)
+               page = drbd_pp_first_pages_or_try_alloc(mdev, number);
 
-       for (;;) {
+       while (page == NULL) {
                prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
 
                drbd_kick_lo_and_reclaim_net(mdev);
 
                if (atomic_read(&mdev->pp_in_use) < mdev->net_conf->max_buffers) {
-                       page = drbd_pp_first_page_or_try_alloc(mdev);
+                       page = drbd_pp_first_pages_or_try_alloc(mdev, number);
                        if (page)
                                break;
                }
@@ -190,62 +286,32 @@ static struct page *drbd_pp_alloc(struct drbd_conf *mdev, int retry)
        }
        finish_wait(&drbd_pp_wait, &wait);
 
+       if (page)
+               atomic_add(number, &mdev->pp_in_use);
        return page;
 }
 
 /* Must not be used from irq, as that may deadlock: see drbd_pp_alloc.
- * Is also used from inside an other spin_lock_irq(&mdev->req_lock) */
+ * Is also used from inside an other spin_lock_irq(&mdev->req_lock);
+ * Either links the page chain back to the global pool,
+ * or returns all pages to the system. */
 static void drbd_pp_free(struct drbd_conf *mdev, struct page *page)
 {
-       int free_it;
-
-       spin_lock(&drbd_pp_lock);
-       if (drbd_pp_vacant > (DRBD_MAX_SEGMENT_SIZE/PAGE_SIZE)*minor_count) {
-               free_it = 1;
-       } else {
-               set_page_private(page, (unsigned long)drbd_pp_pool);
-               drbd_pp_pool = page;
-               drbd_pp_vacant++;
-               free_it = 0;
-       }
-       spin_unlock(&drbd_pp_lock);
-
-       atomic_dec(&mdev->pp_in_use);
-
-       if (free_it)
-               __free_page(page);
-
-       wake_up(&drbd_pp_wait);
-}
-
-static void drbd_pp_free_bio_pages(struct drbd_conf *mdev, struct bio *bio)
-{
-       struct page *p_to_be_freed = NULL;
-       struct page *page;
-       struct bio_vec *bvec;
        int i;
-
-       spin_lock(&drbd_pp_lock);
-       __bio_for_each_segment(bvec, bio, i, 0) {
-               if (drbd_pp_vacant > (DRBD_MAX_SEGMENT_SIZE/PAGE_SIZE)*minor_count) {
-                       set_page_private(bvec->bv_page, (unsigned long)p_to_be_freed);
-                       p_to_be_freed = bvec->bv_page;
-               } else {
-                       set_page_private(bvec->bv_page, (unsigned long)drbd_pp_pool);
-                       drbd_pp_pool = bvec->bv_page;
-                       drbd_pp_vacant++;
-               }
-       }
-       spin_unlock(&drbd_pp_lock);
-       atomic_sub(bio->bi_vcnt, &mdev->pp_in_use);
-
-       while (p_to_be_freed) {
-               page = p_to_be_freed;
-               p_to_be_freed = (struct page *)page_private(page);
-               set_page_private(page, 0); /* just to be polite */
-               put_page(page);
+       if (drbd_pp_vacant > (DRBD_MAX_SEGMENT_SIZE/PAGE_SIZE)*minor_count)
+               i = page_chain_free(page);
+       else {
+               struct page *tmp;
+               tmp = page_chain_tail(page, &i);
+               spin_lock(&drbd_pp_lock);
+               page_chain_add(&drbd_pp_pool, page, tmp);
+               drbd_pp_vacant += i;
+               spin_unlock(&drbd_pp_lock);
        }
-
+       atomic_sub(i, &mdev->pp_in_use);
+       i = atomic_read(&mdev->pp_in_use);
+       if (i < 0)
+               dev_warn(DEV, "ASSERTION FAILED: pp_in_use: %d < 0\n", i);
        wake_up(&drbd_pp_wait);
 }
 
@@ -270,11 +336,9 @@ struct drbd_epoch_entry *drbd_alloc_ee(struct drbd_conf *mdev,
                                     unsigned int data_size,
                                     gfp_t gfp_mask) __must_hold(local)
 {
-       struct request_queue *q;
        struct drbd_epoch_entry *e;
        struct page *page;
-       struct bio *bio;
-       unsigned int ds;
+       unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
 
        if (FAULT_ACTIVE(mdev, DRBD_FAULT_AL_EE))
                return NULL;
@@ -286,84 +350,32 @@ struct drbd_epoch_entry *drbd_alloc_ee(struct drbd_conf *mdev,
                return NULL;
        }
 
-       bio = bio_alloc(gfp_mask & ~__GFP_HIGHMEM, div_ceil(data_size, PAGE_SIZE));
-       if (!bio) {
-               if (!(gfp_mask & __GFP_NOWARN))
-                       dev_err(DEV, "alloc_ee: Allocation of a bio failed\n");
-               goto fail1;
-       }
-
-       bio->bi_bdev = mdev->ldev->backing_bdev;
-       bio->bi_sector = sector;
-
-       ds = data_size;
-       while (ds) {
-               page = drbd_pp_alloc(mdev, (gfp_mask & __GFP_WAIT));
-               if (!page) {
-                       if (!(gfp_mask & __GFP_NOWARN))
-                               dev_err(DEV, "alloc_ee: Allocation of a page failed\n");
-                       goto fail2;
-               }
-               if (!bio_add_page(bio, page, min_t(int, ds, PAGE_SIZE), 0)) {
-                       drbd_pp_free(mdev, page);
-                       dev_err(DEV, "alloc_ee: bio_add_page(s=%llu,"
-                           "data_size=%u,ds=%u) failed\n",
-                           (unsigned long long)sector, data_size, ds);
-
-                       q = bdev_get_queue(bio->bi_bdev);
-                       if (q->merge_bvec_fn) {
-                               struct bvec_merge_data bvm = {
-                                       .bi_bdev = bio->bi_bdev,
-                                       .bi_sector = bio->bi_sector,
-                                       .bi_size = bio->bi_size,
-                                       .bi_rw = bio->bi_rw,
-                               };
-                               int l = q->merge_bvec_fn(q, &bvm,
-                                               &bio->bi_io_vec[bio->bi_vcnt]);
-                               dev_err(DEV, "merge_bvec_fn() = %d\n", l);
-                       }
-
-                       /* dump more of the bio. */
-                       dev_err(DEV, "bio->bi_max_vecs = %d\n", bio->bi_max_vecs);
-                       dev_err(DEV, "bio->bi_vcnt = %d\n", bio->bi_vcnt);
-                       dev_err(DEV, "bio->bi_size = %d\n", bio->bi_size);
-                       dev_err(DEV, "bio->bi_phys_segments = %d\n", bio->bi_phys_segments);
-
-                       goto fail2;
-                       break;
-               }
-               ds -= min_t(int, ds, PAGE_SIZE);
-       }
-
-       D_ASSERT(data_size == bio->bi_size);
-
-       bio->bi_private = e;
-       e->mdev = mdev;
-       e->sector = sector;
-       e->size = bio->bi_size;
+       page = drbd_pp_alloc(mdev, nr_pages, (gfp_mask & __GFP_WAIT));
+       if (!page)
+               goto fail;
 
-       e->private_bio = bio;
-       e->block_id = id;
        INIT_HLIST_NODE(&e->colision);
        e->epoch = NULL;
+       e->mdev = mdev;
+       e->pages = page;
+       atomic_set(&e->pending_bios, 0);
+       e->size = data_size;
        e->flags = 0;
+       e->sector = sector;
+       e->sector = sector;
+       e->block_id = id;
 
        return e;
 
- fail2:
-       drbd_pp_free_bio_pages(mdev, bio);
-       bio_put(bio);
- fail1:
+ fail:
        mempool_free(e, drbd_ee_mempool);
-
        return NULL;
 }
 
 void drbd_free_ee(struct drbd_conf *mdev, struct drbd_epoch_entry *e)
 {
-       struct bio *bio = e->private_bio;
-       drbd_pp_free_bio_pages(mdev, bio);
-       bio_put(bio);
+       drbd_pp_free(mdev, e->pages);
+       D_ASSERT(atomic_read(&e->pending_bios) == 0);
        D_ASSERT(hlist_unhashed(&e->colision));
        mempool_free(e, drbd_ee_mempool);
 }
@@ -1121,6 +1133,90 @@ void drbd_bump_write_ordering(struct drbd_conf *mdev, enum write_ordering_e wo)
 }
 
 /**
+ * drbd_submit_ee()
+ * @mdev:      DRBD device.
+ * @e:         epoch entry
+ * @rw:                flag field, see bio->bi_rw
+ */
+/* TODO allocate from our own bio_set. */
+int drbd_submit_ee(struct drbd_conf *mdev, struct drbd_epoch_entry *e,
+               const unsigned rw, const int fault_type)
+{
+       struct bio *bios = NULL;
+       struct bio *bio;
+       struct page *page = e->pages;
+       sector_t sector = e->sector;
+       unsigned ds = e->size;
+       unsigned n_bios = 0;
+       unsigned nr_pages = (ds + PAGE_SIZE -1) >> PAGE_SHIFT;
+
+       /* In most cases, we will only need one bio.  But in case the lower
+        * level restrictions happen to be different at this offset on this
+        * side than those of the sending peer, we may need to submit the
+        * request in more than one bio. */
+next_bio:
+       bio = bio_alloc(GFP_NOIO, nr_pages);
+       if (!bio) {
+               dev_err(DEV, "submit_ee: Allocation of a bio failed\n");
+               goto fail;
+       }
+       /* > e->sector, unless this is the first bio */
+       bio->bi_sector = sector;
+       bio->bi_bdev = mdev->ldev->backing_bdev;
+       /* we special case some flags in the multi-bio case, see below
+        * (BIO_RW_UNPLUG, BIO_RW_BARRIER) */
+       bio->bi_rw = rw;
+       bio->bi_private = e;
+       bio->bi_end_io = drbd_endio_sec;
+
+       bio->bi_next = bios;
+       bios = bio;
+       ++n_bios;
+
+       page_chain_for_each(page) {
+               unsigned len = min_t(unsigned, ds, PAGE_SIZE);
+               if (!bio_add_page(bio, page, len, 0)) {
+                       /* a single page must always be possible! */
+                       BUG_ON(bio->bi_vcnt == 0);
+                       goto next_bio;
+               }
+               ds -= len;
+               sector += len >> 9;
+               --nr_pages;
+       }
+       D_ASSERT(page == NULL);
+       D_ASSERT(ds == 0);
+
+       atomic_set(&e->pending_bios, n_bios);
+       do {
+               bio = bios;
+               bios = bios->bi_next;
+               bio->bi_next = NULL;
+
+               /* strip off BIO_RW_UNPLUG unless it is the last bio */
+               if (bios)
+                       bio->bi_rw &= ~(1<<BIO_RW_UNPLUG);
+
+               drbd_generic_make_request(mdev, fault_type, bio);
+
+               /* strip off BIO_RW_BARRIER,
+                * unless it is the first or last bio */
+               if (bios && bios->bi_next)
+                       bios->bi_rw &= ~(1<<BIO_RW_BARRIER);
+       } while (bios);
+       maybe_kick_lo(mdev);
+       return 0;
+
+fail:
+       while (bios) {
+               bio = bios;
+               bios = bios->bi_next;
+               bio_put(bio);
+       }
+       return -ENOMEM;
+}
+
+/**
  * w_e_reissue() - Worker callback; Resubmit a bio, without BIO_RW_BARRIER set
  * @mdev:      DRBD device.
  * @w:         work object.
@@ -1129,8 +1225,6 @@ void drbd_bump_write_ordering(struct drbd_conf *mdev, enum write_ordering_e wo)
 int w_e_reissue(struct drbd_conf *mdev, struct drbd_work *w, int cancel) __releases(local)
 {
        struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
-       struct bio *bio = e->private_bio;
-
        /* We leave DE_CONTAINS_A_BARRIER and EE_IS_BARRIER in place,
           (and DE_BARRIER_IN_NEXT_EPOCH_ISSUED in the previous Epoch)
           so that we can finish that epoch in drbd_may_finish_epoch().
@@ -1144,33 +1238,17 @@ int w_e_reissue(struct drbd_conf *mdev, struct drbd_work *w, int cancel) __relea
        if (previous_epoch(mdev, e->epoch))
                dev_warn(DEV, "Write ordering was not enforced (one time event)\n");
 
-       /* prepare bio for re-submit,
-        * re-init volatile members */
        /* we still have a local reference,
         * get_ldev was done in receive_Data. */
-       bio->bi_bdev = mdev->ldev->backing_bdev;
-       bio->bi_sector = e->sector;
-       bio->bi_size = e->size;
-       bio->bi_idx = 0;
-
-       bio->bi_flags &= ~(BIO_POOL_MASK - 1);
-       bio->bi_flags |= 1 << BIO_UPTODATE;
-
-       /* don't know whether this is necessary: */
-       bio->bi_phys_segments = 0;
-       bio->bi_next = NULL;
-
-       /* these should be unchanged: */
-       /* bio->bi_end_io = drbd_endio_write_sec; */
-       /* bio->bi_vcnt = whatever; */
 
        e->w.cb = e_end_block;
-
-       /* This is no longer a barrier request. */
-       bio->bi_rw &= ~(1UL << BIO_RW_BARRIER);
-
-       drbd_generic_make_request(mdev, DRBD_FAULT_DT_WR, bio);
-
+       if (drbd_submit_ee(mdev, e, WRITE, DRBD_FAULT_DT_WR) != 0) {
+               /* drbd_submit_ee fails for one reason only:
+                * if was not able to allocate sufficient bios.
+                * requeue, try again later. */
+               e->w.cb = w_e_reissue;
+               drbd_queue_work(&mdev->data.work, &e->w);
+       }
        return 1;
 }
 
@@ -1264,10 +1342,8 @@ read_in_block(struct drbd_conf *mdev, u64 id, sector_t sector, int data_size) __
 {
        const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
        struct drbd_epoch_entry *e;
-       struct bio_vec *bvec;
        struct page *page;
-       struct bio *bio;
-       int dgs, ds, i, rr;
+       int dgs, ds, rr;
        void *dig_in = mdev->int_dig_in;
        void *dig_vv = mdev->int_dig_vv;
        unsigned long *data;
@@ -1304,28 +1380,29 @@ read_in_block(struct drbd_conf *mdev, u64 id, sector_t sector, int data_size) __
        e = drbd_alloc_ee(mdev, id, sector, data_size, GFP_NOIO);
        if (!e)
                return NULL;
-       bio = e->private_bio;
+
        ds = data_size;
-       bio_for_each_segment(bvec, bio, i) {
-               page = bvec->bv_page;
+       page = e->pages;
+       page_chain_for_each(page) {
+               unsigned len = min_t(int, ds, PAGE_SIZE);
                data = kmap(page);
-               rr = drbd_recv(mdev, data, min_t(int, ds, PAGE_SIZE));
+               rr = drbd_recv(mdev, data, len);
                if (FAULT_ACTIVE(mdev, DRBD_FAULT_RECEIVE)) {
                        dev_err(DEV, "Fault injection: Corrupting data on receive\n");
                        data[0] = data[0] ^ (unsigned long)-1;
                }
                kunmap(page);
-               if (rr != min_t(int, ds, PAGE_SIZE)) {
+               if (rr != len) {
                        drbd_free_ee(mdev, e);
                        dev_warn(DEV, "short read receiving data: read %d expected %d\n",
-                            rr, min_t(int, ds, PAGE_SIZE));
+                            rr, len);
                        return NULL;
                }
                ds -= rr;
        }
 
        if (dgs) {
-               drbd_csum(mdev, mdev->integrity_r_tfm, bio, dig_vv);
+               drbd_csum_ee(mdev, mdev->integrity_r_tfm, e, dig_vv);
                if (memcmp(dig_in, dig_vv, dgs)) {
                        dev_err(DEV, "Digest integrity check FAILED.\n");
                        drbd_bcast_ee(mdev, "digest failed",
@@ -1350,7 +1427,7 @@ static int drbd_drain_block(struct drbd_conf *mdev, int data_size)
        if (!data_size)
                return TRUE;
 
-       page = drbd_pp_alloc(mdev, 1);
+       page = drbd_pp_alloc(mdev, 1, 1);
 
        data = kmap(page);
        while (data_size) {
@@ -1414,7 +1491,7 @@ static int recv_dless_read(struct drbd_conf *mdev, struct drbd_request *req,
        }
 
        if (dgs) {
-               drbd_csum(mdev, mdev->integrity_r_tfm, bio, dig_vv);
+               drbd_csum_bio(mdev, mdev->integrity_r_tfm, bio, dig_vv);
                if (memcmp(dig_in, dig_vv, dgs)) {
                        dev_err(DEV, "Digest integrity check FAILED. Broken NICs?\n");
                        return 0;
@@ -1435,7 +1512,7 @@ static int e_end_resync_block(struct drbd_conf *mdev, struct drbd_work *w, int u
 
        D_ASSERT(hlist_unhashed(&e->colision));
 
-       if (likely(drbd_bio_uptodate(e->private_bio))) {
+       if (likely((e->flags & EE_WAS_ERROR) == 0)) {
                drbd_set_in_sync(mdev, sector, e->size);
                ok = drbd_send_ack(mdev, P_RS_WRITE_ACK, e);
        } else {
@@ -1454,30 +1531,28 @@ static int recv_resync_read(struct drbd_conf *mdev, sector_t sector, int data_si
        struct drbd_epoch_entry *e;
 
        e = read_in_block(mdev, ID_SYNCER, sector, data_size);
-       if (!e) {
-               put_ldev(mdev);
-               return FALSE;
-       }
+       if (!e)
+               goto fail;
 
        dec_rs_pending(mdev);
 
-       e->private_bio->bi_end_io = drbd_endio_write_sec;
-       e->private_bio->bi_rw = WRITE;
-       e->w.cb = e_end_resync_block;
-
        inc_unacked(mdev);
        /* corresponding dec_unacked() in e_end_resync_block()
         * respective _drbd_clear_done_ee */
 
+       e->w.cb = e_end_resync_block;
+
        spin_lock_irq(&mdev->req_lock);
        list_add(&e->w.list, &mdev->sync_ee);
        spin_unlock_irq(&mdev->req_lock);
 
-       drbd_generic_make_request(mdev, DRBD_FAULT_RS_WR, e->private_bio);
-       /* accounting done in endio */
+       if (drbd_submit_ee(mdev, e, WRITE, DRBD_FAULT_RS_WR) == 0)
+               return TRUE;
 
-       maybe_kick_lo(mdev);
-       return TRUE;
+       drbd_free_ee(mdev, e);
+fail:
+       put_ldev(mdev);
+       return FALSE;
 }
 
 static int receive_DataReply(struct drbd_conf *mdev, struct p_header *h)
@@ -1572,7 +1647,7 @@ static int e_end_block(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
        }
 
        if (mdev->net_conf->wire_protocol == DRBD_PROT_C) {
-               if (likely(drbd_bio_uptodate(e->private_bio))) {
+               if (likely((e->flags & EE_WAS_ERROR) == 0)) {
                        pcmd = (mdev->state.conn >= C_SYNC_SOURCE &&
                                mdev->state.conn <= C_PAUSED_SYNC_T &&
                                e->flags & EE_MAY_SET_IN_SYNC) ?
@@ -1718,7 +1793,6 @@ static int receive_Data(struct drbd_conf *mdev, struct p_header *h)
                return FALSE;
        }
 
-       e->private_bio->bi_end_io = drbd_endio_write_sec;
        e->w.cb = e_end_block;
 
        spin_lock(&mdev->epoch_lock);
@@ -1914,12 +1988,8 @@ static int receive_Data(struct drbd_conf *mdev, struct p_header *h)
                drbd_al_begin_io(mdev, e->sector);
        }
 
-       e->private_bio->bi_rw = rw;
-       drbd_generic_make_request(mdev, DRBD_FAULT_DT_WR, e->private_bio);
-       /* accounting done in endio */
-
-       maybe_kick_lo(mdev);
-       return TRUE;
+       if (drbd_submit_ee(mdev, e, rw, DRBD_FAULT_DT_WR) == 0)
+               return TRUE;
 
 out_interrupted:
        /* yes, the epoch_size now is imbalanced.
@@ -1977,9 +2047,6 @@ static int receive_DataRequest(struct drbd_conf *mdev, struct p_header *h)
                return FALSE;
        }
 
-       e->private_bio->bi_rw = READ;
-       e->private_bio->bi_end_io = drbd_endio_read_sec;
-
        switch (h->command) {
        case P_DATA_REQUEST:
                e->w.cb = w_e_end_data_req;
@@ -2073,10 +2140,8 @@ static int receive_DataRequest(struct drbd_conf *mdev, struct p_header *h)
 
        inc_unacked(mdev);
 
-       drbd_generic_make_request(mdev, fault_type, e->private_bio);
-       maybe_kick_lo(mdev);
-
-       return TRUE;
+       if (drbd_submit_ee(mdev, e, READ, fault_type) == 0)
+               return TRUE;
 
 out_free_e:
        kfree(di);
@@ -3837,7 +3902,7 @@ static void drbd_disconnect(struct drbd_conf *mdev)
                dev_info(DEV, "net_ee not empty, killed %u entries\n", i);
        i = atomic_read(&mdev->pp_in_use);
        if (i)
-               dev_info(DEV, "pp_in_use = %u, expected 0\n", i);
+               dev_info(DEV, "pp_in_use = %d, expected 0\n", i);
 
        D_ASSERT(list_empty(&mdev->read_ee));
        D_ASSERT(list_empty(&mdev->active_ee));
index 0bbecf4..d771b1e 100644 (file)
@@ -47,8 +47,7 @@ static int w_make_ov_request(struct drbd_conf *mdev, struct drbd_work *w, int ca
 
 /* defined here:
    drbd_md_io_complete
-   drbd_endio_write_sec
-   drbd_endio_read_sec
+   drbd_endio_sec
    drbd_endio_pri
 
  * more endio handlers:
@@ -85,27 +84,10 @@ void drbd_md_io_complete(struct bio *bio, int error)
 /* reads on behalf of the partner,
  * "submitted" by the receiver
  */
-void drbd_endio_read_sec(struct bio *bio, int error) __releases(local)
+void drbd_endio_read_sec_final(struct drbd_epoch_entry *e) __releases(local)
 {
        unsigned long flags = 0;
-       struct drbd_epoch_entry *e = NULL;
-       struct drbd_conf *mdev;
-       int uptodate = bio_flagged(bio, BIO_UPTODATE);
-
-       e = bio->bi_private;
-       mdev = e->mdev;
-
-       if (error)
-               dev_warn(DEV, "read: error=%d s=%llus\n", error,
-                               (unsigned long long)e->sector);
-       if (!error && !uptodate) {
-               dev_warn(DEV, "read: setting error to -EIO s=%llus\n",
-                               (unsigned long long)e->sector);
-               /* strange behavior of some lower level drivers...
-                * fail the request by clearing the uptodate flag,
-                * but do not return any error?! */
-               error = -EIO;
-       }
+       struct drbd_conf *mdev = e->mdev;
 
        D_ASSERT(e->block_id != ID_VACANT);
 
@@ -114,49 +96,38 @@ void drbd_endio_read_sec(struct bio *bio, int error) __releases(local)
        list_del(&e->w.list);
        if (list_empty(&mdev->read_ee))
                wake_up(&mdev->ee_wait);
+       if (test_bit(__EE_WAS_ERROR, &e->flags))
+               __drbd_chk_io_error(mdev, FALSE);
        spin_unlock_irqrestore(&mdev->req_lock, flags);
 
-       drbd_chk_io_error(mdev, error, FALSE);
        drbd_queue_work(&mdev->data.work, &e->w);
        put_ldev(mdev);
 }
 
+static int is_failed_barrier(int ee_flags)
+{
+       return (ee_flags & (EE_IS_BARRIER|EE_WAS_ERROR|EE_RESUBMITTED))
+                       == (EE_IS_BARRIER|EE_WAS_ERROR);
+}
+
 /* writes on behalf of the partner, or resync writes,
- * "submitted" by the receiver.
- */
-void drbd_endio_write_sec(struct bio *bio, int error) __releases(local)
+ * "submitted" by the receiver, final stage.  */
+static void drbd_endio_write_sec_final(struct drbd_epoch_entry *e) __releases(local)
 {
        unsigned long flags = 0;
-       struct drbd_epoch_entry *e = NULL;
-       struct drbd_conf *mdev;
+       struct drbd_conf *mdev = e->mdev;
        sector_t e_sector;
        int do_wake;
        int is_syncer_req;
        int do_al_complete_io;
-       int uptodate = bio_flagged(bio, BIO_UPTODATE);
-       int is_barrier = bio_rw_flagged(bio, BIO_RW_BARRIER);
 
-       e = bio->bi_private;
-       mdev = e->mdev;
-
-       if (error)
-               dev_warn(DEV, "write: error=%d s=%llus\n", error,
-                               (unsigned long long)e->sector);
-       if (!error && !uptodate) {
-               dev_warn(DEV, "write: setting error to -EIO s=%llus\n",
-                               (unsigned long long)e->sector);
-               /* strange behavior of some lower level drivers...
-                * fail the request by clearing the uptodate flag,
-                * but do not return any error?! */
-               error = -EIO;
-       }
-
-       /* error == -ENOTSUPP would be a better test,
-        * alas it is not reliable */
-       if (error && is_barrier && e->flags & EE_IS_BARRIER) {
+       /* if this is a failed barrier request, disable use of barriers,
+        * and schedule for resubmission */
+       if (is_failed_barrier(e->flags)) {
                drbd_bump_write_ordering(mdev, WO_bdev_flush);
                spin_lock_irqsave(&mdev->req_lock, flags);
                list_del(&e->w.list);
+               e->flags |= EE_RESUBMITTED;
                e->w.cb = w_e_reissue;
                /* put_ldev actually happens below, once we come here again. */
                __release(local);
@@ -167,17 +138,16 @@ void drbd_endio_write_sec(struct bio *bio, int error) __releases(local)
 
        D_ASSERT(e->block_id != ID_VACANT);
 
-       spin_lock_irqsave(&mdev->req_lock, flags);
-       mdev->writ_cnt += e->size >> 9;
-       is_syncer_req = is_syncer_block_id(e->block_id);
-
        /* after we moved e to done_ee,
         * we may no longer access it,
         * it may be freed/reused already!
         * (as soon as we release the req_lock) */
        e_sector = e->sector;
        do_al_complete_io = e->flags & EE_CALL_AL_COMPLETE_IO;
+       is_syncer_req = is_syncer_block_id(e->block_id);
 
+       spin_lock_irqsave(&mdev->req_lock, flags);
+       mdev->writ_cnt += e->size >> 9;
        list_del(&e->w.list); /* has been on active_ee or sync_ee */
        list_add_tail(&e->w.list, &mdev->done_ee);
 
@@ -190,7 +160,7 @@ void drbd_endio_write_sec(struct bio *bio, int error) __releases(local)
                ? list_empty(&mdev->sync_ee)
                : list_empty(&mdev->active_ee);
 
-       if (error)
+       if (test_bit(__EE_WAS_ERROR, &e->flags))
                __drbd_chk_io_error(mdev, FALSE);
        spin_unlock_irqrestore(&mdev->req_lock, flags);
 
@@ -205,7 +175,42 @@ void drbd_endio_write_sec(struct bio *bio, int error) __releases(local)
 
        wake_asender(mdev);
        put_ldev(mdev);
+}
 
+/* writes on behalf of the partner, or resync writes,
+ * "submitted" by the receiver.
+ */
+void drbd_endio_sec(struct bio *bio, int error)
+{
+       struct drbd_epoch_entry *e = bio->bi_private;
+       struct drbd_conf *mdev = e->mdev;
+       int uptodate = bio_flagged(bio, BIO_UPTODATE);
+       int is_write = bio_data_dir(bio) == WRITE;
+
+       if (error)
+               dev_warn(DEV, "%s: error=%d s=%llus\n",
+                               is_write ? "write" : "read", error,
+                               (unsigned long long)e->sector);
+       if (!error && !uptodate) {
+               dev_warn(DEV, "%s: setting error to -EIO s=%llus\n",
+                               is_write ? "write" : "read",
+                               (unsigned long long)e->sector);
+               /* strange behavior of some lower level drivers...
+                * fail the request by clearing the uptodate flag,
+                * but do not return any error?! */
+               error = -EIO;
+       }
+
+       if (error)
+               set_bit(__EE_WAS_ERROR, &e->flags);
+
+       bio_put(bio); /* no need for the bio anymore */
+       if (atomic_dec_and_test(&e->pending_bios)) {
+               if (is_write)
+                       drbd_endio_write_sec_final(e);
+               else
+                       drbd_endio_read_sec_final(e);
+       }
 }
 
 /* read, readA or write requests on R_PRIMARY coming from drbd_make_request
@@ -295,7 +300,34 @@ int w_resync_inactive(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
        return 1; /* Simply ignore this! */
 }
 
-void drbd_csum(struct drbd_conf *mdev, struct crypto_hash *tfm, struct bio *bio, void *digest)
+void drbd_csum_ee(struct drbd_conf *mdev, struct crypto_hash *tfm, struct drbd_epoch_entry *e, void *digest)
+{
+       struct hash_desc desc;
+       struct scatterlist sg;
+       struct page *page = e->pages;
+       struct page *tmp;
+       unsigned len;
+
+       desc.tfm = tfm;
+       desc.flags = 0;
+
+       sg_init_table(&sg, 1);
+       crypto_hash_init(&desc);
+
+       while ((tmp = page_chain_next(page))) {
+               /* all but the last page will be fully used */
+               sg_set_page(&sg, page, PAGE_SIZE, 0);
+               crypto_hash_update(&desc, &sg, sg.length);
+               page = tmp;
+       }
+       /* and now the last, possibly only partially used page */
+       len = e->size & (PAGE_SIZE - 1);
+       sg_set_page(&sg, page, len ?: PAGE_SIZE, 0);
+       crypto_hash_update(&desc, &sg, sg.length);
+       crypto_hash_final(&desc, digest);
+}
+
+void drbd_csum_bio(struct drbd_conf *mdev, struct crypto_hash *tfm, struct bio *bio, void *digest)
 {
        struct hash_desc desc;
        struct scatterlist sg;
@@ -329,11 +361,11 @@ static int w_e_send_csum(struct drbd_conf *mdev, struct drbd_work *w, int cancel
                return 1;
        }
 
-       if (likely(drbd_bio_uptodate(e->private_bio))) {
+       if (likely((e->flags & EE_WAS_ERROR) == 0)) {
                digest_size = crypto_hash_digestsize(mdev->csums_tfm);
                digest = kmalloc(digest_size, GFP_NOIO);
                if (digest) {
-                       drbd_csum(mdev, mdev->csums_tfm, e->private_bio, digest);
+                       drbd_csum_ee(mdev, mdev->csums_tfm, e, digest);
 
                        inc_rs_pending(mdev);
                        ok = drbd_send_drequest_csum(mdev,
@@ -369,23 +401,21 @@ static int read_for_csum(struct drbd_conf *mdev, sector_t sector, int size)
        /* GFP_TRY, because if there is no memory available right now, this may
         * be rescheduled for later. It is "only" background resync, after all. */
        e = drbd_alloc_ee(mdev, DRBD_MAGIC+0xbeef, sector, size, GFP_TRY);
-       if (!e) {
-               put_ldev(mdev);
-               return 2;
-       }
+       if (!e)
+               goto fail;
 
        spin_lock_irq(&mdev->req_lock);
        list_add(&e->w.list, &mdev->read_ee);
        spin_unlock_irq(&mdev->req_lock);
 
-       e->private_bio->bi_end_io = drbd_endio_read_sec;
-       e->private_bio->bi_rw = READ;
        e->w.cb = w_e_send_csum;
+       if (drbd_submit_ee(mdev, e, READ, DRBD_FAULT_RS_RD) == 0)
+               return 1;
 
-       mdev->read_cnt += size >> 9;
-       drbd_generic_make_request(mdev, DRBD_FAULT_RS_RD, e->private_bio);
-
-       return 1;
+       drbd_free_ee(mdev, e);
+fail:
+       put_ldev(mdev);
+       return 2;
 }
 
 void resync_timer_fn(unsigned long data)
@@ -819,7 +849,7 @@ out:
 /* helper */
 static void move_to_net_ee_or_free(struct drbd_conf *mdev, struct drbd_epoch_entry *e)
 {
-       if (drbd_bio_has_active_page(e->private_bio)) {
+       if (drbd_ee_has_active_page(e)) {
                /* This might happen if sendpage() has not finished */
                spin_lock_irq(&mdev->req_lock);
                list_add_tail(&e->w.list, &mdev->net_ee);
@@ -845,7 +875,7 @@ int w_e_end_data_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
                return 1;
        }
 
-       if (likely(drbd_bio_uptodate(e->private_bio))) {
+       if (likely((e->flags & EE_WAS_ERROR) == 0)) {
                ok = drbd_send_block(mdev, P_DATA_REPLY, e);
        } else {
                if (__ratelimit(&drbd_ratelimit_state))
@@ -886,7 +916,7 @@ int w_e_end_rsdata_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
                put_ldev(mdev);
        }
 
-       if (likely(drbd_bio_uptodate(e->private_bio))) {
+       if (likely((e->flags & EE_WAS_ERROR) == 0)) {
                if (likely(mdev->state.pdsk >= D_INCONSISTENT)) {
                        inc_rs_pending(mdev);
                        ok = drbd_send_block(mdev, P_RS_DATA_REPLY, e);
@@ -934,7 +964,7 @@ int w_e_end_csum_rs_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
 
        di = (struct digest_info *)(unsigned long)e->block_id;
 
-       if (likely(drbd_bio_uptodate(e->private_bio))) {
+       if (likely((e->flags & EE_WAS_ERROR) == 0)) {
                /* quick hack to try to avoid a race against reconfiguration.
                 * a real fix would be much more involved,
                 * introducing more locking mechanisms */
@@ -944,7 +974,7 @@ int w_e_end_csum_rs_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
                        digest = kmalloc(digest_size, GFP_NOIO);
                }
                if (digest) {
-                       drbd_csum(mdev, mdev->csums_tfm, e->private_bio, digest);
+                       drbd_csum_ee(mdev, mdev->csums_tfm, e, digest);
                        eq = !memcmp(digest, di->digest, digest_size);
                        kfree(digest);
                }
@@ -986,14 +1016,14 @@ int w_e_end_ov_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
        if (unlikely(cancel))
                goto out;
 
-       if (unlikely(!drbd_bio_uptodate(e->private_bio)))
+       if (unlikely((e->flags & EE_WAS_ERROR) != 0))
                goto out;
 
        digest_size = crypto_hash_digestsize(mdev->verify_tfm);
        /* FIXME if this allocation fails, online verify will not terminate! */
        digest = kmalloc(digest_size, GFP_NOIO);
        if (digest) {
-               drbd_csum(mdev, mdev->verify_tfm, e->private_bio, digest);
+               drbd_csum_ee(mdev, mdev->verify_tfm, e, digest);
                inc_rs_pending(mdev);
                ok = drbd_send_drequest_csum(mdev, e->sector, e->size,
                                             digest, digest_size, P_OV_REPLY);
@@ -1042,11 +1072,11 @@ int w_e_end_ov_reply(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
 
        di = (struct digest_info *)(unsigned long)e->block_id;
 
-       if (likely(drbd_bio_uptodate(e->private_bio))) {
+       if (likely((e->flags & EE_WAS_ERROR) == 0)) {
                digest_size = crypto_hash_digestsize(mdev->verify_tfm);
                digest = kmalloc(digest_size, GFP_NOIO);
                if (digest) {
-                       drbd_csum(mdev, mdev->verify_tfm, e->private_bio, digest);
+                       drbd_csum_ee(mdev, mdev->verify_tfm, e, digest);
 
                        D_ASSERT(digest_size == di->digest_size);
                        eq = !memcmp(digest, di->digest, digest_size);
index f93fa11..defdb50 100644 (file)
@@ -18,23 +18,9 @@ static inline void drbd_set_my_capacity(struct drbd_conf *mdev,
 
 #define drbd_bio_uptodate(bio) bio_flagged(bio, BIO_UPTODATE)
 
-static inline int drbd_bio_has_active_page(struct bio *bio)
-{
-       struct bio_vec *bvec;
-       int i;
-
-       __bio_for_each_segment(bvec, bio, i, 0) {
-               if (page_count(bvec->bv_page) > 1)
-                       return 1;
-       }
-
-       return 0;
-}
-
 /* bi_end_io handlers */
 extern void drbd_md_io_complete(struct bio *bio, int error);
-extern void drbd_endio_read_sec(struct bio *bio, int error);
-extern void drbd_endio_write_sec(struct bio *bio, int error);
+extern void drbd_endio_sec(struct bio *bio, int error);
 extern void drbd_endio_pri(struct bio *bio, int error);
 
 /*