#define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
-static struct page *drbd_pp_first_page_or_try_alloc(struct drbd_conf *mdev)
+/*
+ * some helper functions to deal with single linked page lists,
+ * page->private being our "next" pointer.
+ */
+
+/* If at least n pages are linked at head, get n pages off.
+ * Otherwise, don't modify head, and return NULL.
+ * Locking is the responsibility of the caller.
+ */
+static struct page *page_chain_del(struct page **head, int n)
+{
+ struct page *page;
+ struct page *tmp;
+
+ BUG_ON(!n);
+ BUG_ON(!head);
+
+ page = *head;
+ while (page) {
+ tmp = page_chain_next(page);
+ if (--n == 0)
+ break; /* found sufficient pages */
+ if (tmp == NULL)
+ /* insufficient pages, don't use any of them. */
+ return NULL;
+ page = tmp;
+ }
+
+ /* add end of list marker for the returned list */
+ set_page_private(page, 0);
+ /* actual return value, and adjustment of head */
+ page = *head;
+ *head = tmp;
+ return page;
+}
+
+/* may be used outside of locks to find the tail of a (usually short)
+ * "private" page chain, before adding it back to a global chain head
+ * with page_chain_add() under a spinlock. */
+static struct page *page_chain_tail(struct page *page, int *len)
+{
+ struct page *tmp;
+ int i = 1;
+ while ((tmp = page_chain_next(page)))
+ ++i, page = tmp;
+ if (len)
+ *len = i;
+ return page;
+}
+
+static int page_chain_free(struct page *page)
+{
+ struct page *tmp;
+ int i = 0;
+ page_chain_for_each_safe(page, tmp) {
+ put_page(page);
+ ++i;
+ }
+ return i;
+}
+
+static void page_chain_add(struct page **head,
+ struct page *chain_first, struct page *chain_last)
+{
+#if 1
+ struct page *tmp;
+ tmp = page_chain_tail(chain_first, NULL);
+ BUG_ON(tmp != chain_last);
+#endif
+
+ /* add chain to head */
+ set_page_private(chain_last, (unsigned long)*head);
+ *head = chain_first;
+}
+
+static struct page *drbd_pp_first_pages_or_try_alloc(struct drbd_conf *mdev, int number)
{
struct page *page = NULL;
+ struct page *tmp = NULL;
+ int i = 0;
/* Yes, testing drbd_pp_vacant outside the lock is racy.
* So what. It saves a spin_lock. */
- if (drbd_pp_vacant > 0) {
+ if (drbd_pp_vacant >= number) {
spin_lock(&drbd_pp_lock);
- page = drbd_pp_pool;
- if (page) {
- drbd_pp_pool = (struct page *)page_private(page);
- set_page_private(page, 0); /* just to be polite */
- drbd_pp_vacant--;
- }
+ page = page_chain_del(&drbd_pp_pool, number);
+ if (page)
+ drbd_pp_vacant -= number;
spin_unlock(&drbd_pp_lock);
+ if (page)
+ return page;
}
+
/* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
* "criss-cross" setup, that might cause write-out on some other DRBD,
* which in turn might block on the other node at this very place. */
- if (!page)
- page = alloc_page(GFP_TRY);
- if (page)
- atomic_inc(&mdev->pp_in_use);
- return page;
+ for (i = 0; i < number; i++) {
+ tmp = alloc_page(GFP_TRY);
+ if (!tmp)
+ break;
+ set_page_private(tmp, (unsigned long)page);
+ page = tmp;
+ }
+
+ if (i == number)
+ return page;
+
+ /* Not enough pages immediately available this time.
+ * No need to jump around here, drbd_pp_alloc will retry this
+ * function "soon". */
+ if (page) {
+ tmp = page_chain_tail(page, NULL);
+ spin_lock(&drbd_pp_lock);
+ page_chain_add(&drbd_pp_pool, page, tmp);
+ drbd_pp_vacant += i;
+ spin_unlock(&drbd_pp_lock);
+ }
+ return NULL;
}
/* kick lower level device, if we have more than (arbitrary number)
list_for_each_safe(le, tle, &mdev->net_ee) {
e = list_entry(le, struct drbd_epoch_entry, w.list);
- if (drbd_bio_has_active_page(e->private_bio))
+ if (drbd_ee_has_active_page(e))
break;
list_move(le, to_be_freed);
}
}
/**
- * drbd_pp_alloc() - Returns a page, fails only if a signal comes in
+ * drbd_pp_alloc() - Returns @number pages, retries forever (or until signalled)
* @mdev: DRBD device.
- * @retry: whether or not to retry allocation forever (or until signalled)
+ * @number: number of pages requested
+ * @retry: whether to retry, if not enough pages are available right now
+ *
+ * Tries to allocate number pages, first from our own page pool, then from
+ * the kernel, unless this allocation would exceed the max_buffers setting.
+ * Possibly retry until DRBD frees sufficient pages somewhere else.
*
- * Tries to allocate a page, first from our own page pool, then from the
- * kernel, unless this allocation would exceed the max_buffers setting.
- * If @retry is non-zero, retry until DRBD frees a page somewhere else.
+ * Returns a page chain linked via page->private.
*/
-static struct page *drbd_pp_alloc(struct drbd_conf *mdev, int retry)
+static struct page *drbd_pp_alloc(struct drbd_conf *mdev, unsigned number, bool retry)
{
struct page *page = NULL;
DEFINE_WAIT(wait);
- if (atomic_read(&mdev->pp_in_use) < mdev->net_conf->max_buffers) {
- page = drbd_pp_first_page_or_try_alloc(mdev);
- if (page)
- return page;
- }
+ /* Yes, we may run up to @number over max_buffers. If we
+ * follow it strictly, the admin will get it wrong anyways. */
+ if (atomic_read(&mdev->pp_in_use) < mdev->net_conf->max_buffers)
+ page = drbd_pp_first_pages_or_try_alloc(mdev, number);
- for (;;) {
+ while (page == NULL) {
prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
drbd_kick_lo_and_reclaim_net(mdev);
if (atomic_read(&mdev->pp_in_use) < mdev->net_conf->max_buffers) {
- page = drbd_pp_first_page_or_try_alloc(mdev);
+ page = drbd_pp_first_pages_or_try_alloc(mdev, number);
if (page)
break;
}
}
finish_wait(&drbd_pp_wait, &wait);
+ if (page)
+ atomic_add(number, &mdev->pp_in_use);
return page;
}
/* Must not be used from irq, as that may deadlock: see drbd_pp_alloc.
- * Is also used from inside an other spin_lock_irq(&mdev->req_lock) */
+ * Is also used from inside an other spin_lock_irq(&mdev->req_lock);
+ * Either links the page chain back to the global pool,
+ * or returns all pages to the system. */
static void drbd_pp_free(struct drbd_conf *mdev, struct page *page)
{
- int free_it;
-
- spin_lock(&drbd_pp_lock);
- if (drbd_pp_vacant > (DRBD_MAX_SEGMENT_SIZE/PAGE_SIZE)*minor_count) {
- free_it = 1;
- } else {
- set_page_private(page, (unsigned long)drbd_pp_pool);
- drbd_pp_pool = page;
- drbd_pp_vacant++;
- free_it = 0;
- }
- spin_unlock(&drbd_pp_lock);
-
- atomic_dec(&mdev->pp_in_use);
-
- if (free_it)
- __free_page(page);
-
- wake_up(&drbd_pp_wait);
-}
-
-static void drbd_pp_free_bio_pages(struct drbd_conf *mdev, struct bio *bio)
-{
- struct page *p_to_be_freed = NULL;
- struct page *page;
- struct bio_vec *bvec;
int i;
-
- spin_lock(&drbd_pp_lock);
- __bio_for_each_segment(bvec, bio, i, 0) {
- if (drbd_pp_vacant > (DRBD_MAX_SEGMENT_SIZE/PAGE_SIZE)*minor_count) {
- set_page_private(bvec->bv_page, (unsigned long)p_to_be_freed);
- p_to_be_freed = bvec->bv_page;
- } else {
- set_page_private(bvec->bv_page, (unsigned long)drbd_pp_pool);
- drbd_pp_pool = bvec->bv_page;
- drbd_pp_vacant++;
- }
- }
- spin_unlock(&drbd_pp_lock);
- atomic_sub(bio->bi_vcnt, &mdev->pp_in_use);
-
- while (p_to_be_freed) {
- page = p_to_be_freed;
- p_to_be_freed = (struct page *)page_private(page);
- set_page_private(page, 0); /* just to be polite */
- put_page(page);
+ if (drbd_pp_vacant > (DRBD_MAX_SEGMENT_SIZE/PAGE_SIZE)*minor_count)
+ i = page_chain_free(page);
+ else {
+ struct page *tmp;
+ tmp = page_chain_tail(page, &i);
+ spin_lock(&drbd_pp_lock);
+ page_chain_add(&drbd_pp_pool, page, tmp);
+ drbd_pp_vacant += i;
+ spin_unlock(&drbd_pp_lock);
}
-
+ atomic_sub(i, &mdev->pp_in_use);
+ i = atomic_read(&mdev->pp_in_use);
+ if (i < 0)
+ dev_warn(DEV, "ASSERTION FAILED: pp_in_use: %d < 0\n", i);
wake_up(&drbd_pp_wait);
}
unsigned int data_size,
gfp_t gfp_mask) __must_hold(local)
{
- struct request_queue *q;
struct drbd_epoch_entry *e;
struct page *page;
- struct bio *bio;
- unsigned int ds;
+ unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
if (FAULT_ACTIVE(mdev, DRBD_FAULT_AL_EE))
return NULL;
return NULL;
}
- bio = bio_alloc(gfp_mask & ~__GFP_HIGHMEM, div_ceil(data_size, PAGE_SIZE));
- if (!bio) {
- if (!(gfp_mask & __GFP_NOWARN))
- dev_err(DEV, "alloc_ee: Allocation of a bio failed\n");
- goto fail1;
- }
-
- bio->bi_bdev = mdev->ldev->backing_bdev;
- bio->bi_sector = sector;
-
- ds = data_size;
- while (ds) {
- page = drbd_pp_alloc(mdev, (gfp_mask & __GFP_WAIT));
- if (!page) {
- if (!(gfp_mask & __GFP_NOWARN))
- dev_err(DEV, "alloc_ee: Allocation of a page failed\n");
- goto fail2;
- }
- if (!bio_add_page(bio, page, min_t(int, ds, PAGE_SIZE), 0)) {
- drbd_pp_free(mdev, page);
- dev_err(DEV, "alloc_ee: bio_add_page(s=%llu,"
- "data_size=%u,ds=%u) failed\n",
- (unsigned long long)sector, data_size, ds);
-
- q = bdev_get_queue(bio->bi_bdev);
- if (q->merge_bvec_fn) {
- struct bvec_merge_data bvm = {
- .bi_bdev = bio->bi_bdev,
- .bi_sector = bio->bi_sector,
- .bi_size = bio->bi_size,
- .bi_rw = bio->bi_rw,
- };
- int l = q->merge_bvec_fn(q, &bvm,
- &bio->bi_io_vec[bio->bi_vcnt]);
- dev_err(DEV, "merge_bvec_fn() = %d\n", l);
- }
-
- /* dump more of the bio. */
- dev_err(DEV, "bio->bi_max_vecs = %d\n", bio->bi_max_vecs);
- dev_err(DEV, "bio->bi_vcnt = %d\n", bio->bi_vcnt);
- dev_err(DEV, "bio->bi_size = %d\n", bio->bi_size);
- dev_err(DEV, "bio->bi_phys_segments = %d\n", bio->bi_phys_segments);
-
- goto fail2;
- break;
- }
- ds -= min_t(int, ds, PAGE_SIZE);
- }
-
- D_ASSERT(data_size == bio->bi_size);
-
- bio->bi_private = e;
- e->mdev = mdev;
- e->sector = sector;
- e->size = bio->bi_size;
+ page = drbd_pp_alloc(mdev, nr_pages, (gfp_mask & __GFP_WAIT));
+ if (!page)
+ goto fail;
- e->private_bio = bio;
- e->block_id = id;
INIT_HLIST_NODE(&e->colision);
e->epoch = NULL;
+ e->mdev = mdev;
+ e->pages = page;
+ atomic_set(&e->pending_bios, 0);
+ e->size = data_size;
e->flags = 0;
+ e->sector = sector;
+ e->sector = sector;
+ e->block_id = id;
return e;
- fail2:
- drbd_pp_free_bio_pages(mdev, bio);
- bio_put(bio);
- fail1:
+ fail:
mempool_free(e, drbd_ee_mempool);
-
return NULL;
}
void drbd_free_ee(struct drbd_conf *mdev, struct drbd_epoch_entry *e)
{
- struct bio *bio = e->private_bio;
- drbd_pp_free_bio_pages(mdev, bio);
- bio_put(bio);
+ drbd_pp_free(mdev, e->pages);
+ D_ASSERT(atomic_read(&e->pending_bios) == 0);
D_ASSERT(hlist_unhashed(&e->colision));
mempool_free(e, drbd_ee_mempool);
}
}
/**
+ * drbd_submit_ee()
+ * @mdev: DRBD device.
+ * @e: epoch entry
+ * @rw: flag field, see bio->bi_rw
+ */
+/* TODO allocate from our own bio_set. */
+int drbd_submit_ee(struct drbd_conf *mdev, struct drbd_epoch_entry *e,
+ const unsigned rw, const int fault_type)
+{
+ struct bio *bios = NULL;
+ struct bio *bio;
+ struct page *page = e->pages;
+ sector_t sector = e->sector;
+ unsigned ds = e->size;
+ unsigned n_bios = 0;
+ unsigned nr_pages = (ds + PAGE_SIZE -1) >> PAGE_SHIFT;
+
+ /* In most cases, we will only need one bio. But in case the lower
+ * level restrictions happen to be different at this offset on this
+ * side than those of the sending peer, we may need to submit the
+ * request in more than one bio. */
+next_bio:
+ bio = bio_alloc(GFP_NOIO, nr_pages);
+ if (!bio) {
+ dev_err(DEV, "submit_ee: Allocation of a bio failed\n");
+ goto fail;
+ }
+ /* > e->sector, unless this is the first bio */
+ bio->bi_sector = sector;
+ bio->bi_bdev = mdev->ldev->backing_bdev;
+ /* we special case some flags in the multi-bio case, see below
+ * (BIO_RW_UNPLUG, BIO_RW_BARRIER) */
+ bio->bi_rw = rw;
+ bio->bi_private = e;
+ bio->bi_end_io = drbd_endio_sec;
+
+ bio->bi_next = bios;
+ bios = bio;
+ ++n_bios;
+
+ page_chain_for_each(page) {
+ unsigned len = min_t(unsigned, ds, PAGE_SIZE);
+ if (!bio_add_page(bio, page, len, 0)) {
+ /* a single page must always be possible! */
+ BUG_ON(bio->bi_vcnt == 0);
+ goto next_bio;
+ }
+ ds -= len;
+ sector += len >> 9;
+ --nr_pages;
+ }
+ D_ASSERT(page == NULL);
+ D_ASSERT(ds == 0);
+
+ atomic_set(&e->pending_bios, n_bios);
+ do {
+ bio = bios;
+ bios = bios->bi_next;
+ bio->bi_next = NULL;
+
+ /* strip off BIO_RW_UNPLUG unless it is the last bio */
+ if (bios)
+ bio->bi_rw &= ~(1<<BIO_RW_UNPLUG);
+
+ drbd_generic_make_request(mdev, fault_type, bio);
+
+ /* strip off BIO_RW_BARRIER,
+ * unless it is the first or last bio */
+ if (bios && bios->bi_next)
+ bios->bi_rw &= ~(1<<BIO_RW_BARRIER);
+ } while (bios);
+ maybe_kick_lo(mdev);
+ return 0;
+
+fail:
+ while (bios) {
+ bio = bios;
+ bios = bios->bi_next;
+ bio_put(bio);
+ }
+ return -ENOMEM;
+}
+
+/**
* w_e_reissue() - Worker callback; Resubmit a bio, without BIO_RW_BARRIER set
* @mdev: DRBD device.
* @w: work object.
int w_e_reissue(struct drbd_conf *mdev, struct drbd_work *w, int cancel) __releases(local)
{
struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
- struct bio *bio = e->private_bio;
-
/* We leave DE_CONTAINS_A_BARRIER and EE_IS_BARRIER in place,
(and DE_BARRIER_IN_NEXT_EPOCH_ISSUED in the previous Epoch)
so that we can finish that epoch in drbd_may_finish_epoch().
if (previous_epoch(mdev, e->epoch))
dev_warn(DEV, "Write ordering was not enforced (one time event)\n");
- /* prepare bio for re-submit,
- * re-init volatile members */
/* we still have a local reference,
* get_ldev was done in receive_Data. */
- bio->bi_bdev = mdev->ldev->backing_bdev;
- bio->bi_sector = e->sector;
- bio->bi_size = e->size;
- bio->bi_idx = 0;
-
- bio->bi_flags &= ~(BIO_POOL_MASK - 1);
- bio->bi_flags |= 1 << BIO_UPTODATE;
-
- /* don't know whether this is necessary: */
- bio->bi_phys_segments = 0;
- bio->bi_next = NULL;
-
- /* these should be unchanged: */
- /* bio->bi_end_io = drbd_endio_write_sec; */
- /* bio->bi_vcnt = whatever; */
e->w.cb = e_end_block;
-
- /* This is no longer a barrier request. */
- bio->bi_rw &= ~(1UL << BIO_RW_BARRIER);
-
- drbd_generic_make_request(mdev, DRBD_FAULT_DT_WR, bio);
-
+ if (drbd_submit_ee(mdev, e, WRITE, DRBD_FAULT_DT_WR) != 0) {
+ /* drbd_submit_ee fails for one reason only:
+ * if was not able to allocate sufficient bios.
+ * requeue, try again later. */
+ e->w.cb = w_e_reissue;
+ drbd_queue_work(&mdev->data.work, &e->w);
+ }
return 1;
}
{
const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
struct drbd_epoch_entry *e;
- struct bio_vec *bvec;
struct page *page;
- struct bio *bio;
- int dgs, ds, i, rr;
+ int dgs, ds, rr;
void *dig_in = mdev->int_dig_in;
void *dig_vv = mdev->int_dig_vv;
unsigned long *data;
e = drbd_alloc_ee(mdev, id, sector, data_size, GFP_NOIO);
if (!e)
return NULL;
- bio = e->private_bio;
+
ds = data_size;
- bio_for_each_segment(bvec, bio, i) {
- page = bvec->bv_page;
+ page = e->pages;
+ page_chain_for_each(page) {
+ unsigned len = min_t(int, ds, PAGE_SIZE);
data = kmap(page);
- rr = drbd_recv(mdev, data, min_t(int, ds, PAGE_SIZE));
+ rr = drbd_recv(mdev, data, len);
if (FAULT_ACTIVE(mdev, DRBD_FAULT_RECEIVE)) {
dev_err(DEV, "Fault injection: Corrupting data on receive\n");
data[0] = data[0] ^ (unsigned long)-1;
}
kunmap(page);
- if (rr != min_t(int, ds, PAGE_SIZE)) {
+ if (rr != len) {
drbd_free_ee(mdev, e);
dev_warn(DEV, "short read receiving data: read %d expected %d\n",
- rr, min_t(int, ds, PAGE_SIZE));
+ rr, len);
return NULL;
}
ds -= rr;
}
if (dgs) {
- drbd_csum(mdev, mdev->integrity_r_tfm, bio, dig_vv);
+ drbd_csum_ee(mdev, mdev->integrity_r_tfm, e, dig_vv);
if (memcmp(dig_in, dig_vv, dgs)) {
dev_err(DEV, "Digest integrity check FAILED.\n");
drbd_bcast_ee(mdev, "digest failed",
if (!data_size)
return TRUE;
- page = drbd_pp_alloc(mdev, 1);
+ page = drbd_pp_alloc(mdev, 1, 1);
data = kmap(page);
while (data_size) {
}
if (dgs) {
- drbd_csum(mdev, mdev->integrity_r_tfm, bio, dig_vv);
+ drbd_csum_bio(mdev, mdev->integrity_r_tfm, bio, dig_vv);
if (memcmp(dig_in, dig_vv, dgs)) {
dev_err(DEV, "Digest integrity check FAILED. Broken NICs?\n");
return 0;
D_ASSERT(hlist_unhashed(&e->colision));
- if (likely(drbd_bio_uptodate(e->private_bio))) {
+ if (likely((e->flags & EE_WAS_ERROR) == 0)) {
drbd_set_in_sync(mdev, sector, e->size);
ok = drbd_send_ack(mdev, P_RS_WRITE_ACK, e);
} else {
struct drbd_epoch_entry *e;
e = read_in_block(mdev, ID_SYNCER, sector, data_size);
- if (!e) {
- put_ldev(mdev);
- return FALSE;
- }
+ if (!e)
+ goto fail;
dec_rs_pending(mdev);
- e->private_bio->bi_end_io = drbd_endio_write_sec;
- e->private_bio->bi_rw = WRITE;
- e->w.cb = e_end_resync_block;
-
inc_unacked(mdev);
/* corresponding dec_unacked() in e_end_resync_block()
* respective _drbd_clear_done_ee */
+ e->w.cb = e_end_resync_block;
+
spin_lock_irq(&mdev->req_lock);
list_add(&e->w.list, &mdev->sync_ee);
spin_unlock_irq(&mdev->req_lock);
- drbd_generic_make_request(mdev, DRBD_FAULT_RS_WR, e->private_bio);
- /* accounting done in endio */
+ if (drbd_submit_ee(mdev, e, WRITE, DRBD_FAULT_RS_WR) == 0)
+ return TRUE;
- maybe_kick_lo(mdev);
- return TRUE;
+ drbd_free_ee(mdev, e);
+fail:
+ put_ldev(mdev);
+ return FALSE;
}
static int receive_DataReply(struct drbd_conf *mdev, struct p_header *h)
}
if (mdev->net_conf->wire_protocol == DRBD_PROT_C) {
- if (likely(drbd_bio_uptodate(e->private_bio))) {
+ if (likely((e->flags & EE_WAS_ERROR) == 0)) {
pcmd = (mdev->state.conn >= C_SYNC_SOURCE &&
mdev->state.conn <= C_PAUSED_SYNC_T &&
e->flags & EE_MAY_SET_IN_SYNC) ?
return FALSE;
}
- e->private_bio->bi_end_io = drbd_endio_write_sec;
e->w.cb = e_end_block;
spin_lock(&mdev->epoch_lock);
drbd_al_begin_io(mdev, e->sector);
}
- e->private_bio->bi_rw = rw;
- drbd_generic_make_request(mdev, DRBD_FAULT_DT_WR, e->private_bio);
- /* accounting done in endio */
-
- maybe_kick_lo(mdev);
- return TRUE;
+ if (drbd_submit_ee(mdev, e, rw, DRBD_FAULT_DT_WR) == 0)
+ return TRUE;
out_interrupted:
/* yes, the epoch_size now is imbalanced.
return FALSE;
}
- e->private_bio->bi_rw = READ;
- e->private_bio->bi_end_io = drbd_endio_read_sec;
-
switch (h->command) {
case P_DATA_REQUEST:
e->w.cb = w_e_end_data_req;
inc_unacked(mdev);
- drbd_generic_make_request(mdev, fault_type, e->private_bio);
- maybe_kick_lo(mdev);
-
- return TRUE;
+ if (drbd_submit_ee(mdev, e, READ, fault_type) == 0)
+ return TRUE;
out_free_e:
kfree(di);
dev_info(DEV, "net_ee not empty, killed %u entries\n", i);
i = atomic_read(&mdev->pp_in_use);
if (i)
- dev_info(DEV, "pp_in_use = %u, expected 0\n", i);
+ dev_info(DEV, "pp_in_use = %d, expected 0\n", i);
D_ASSERT(list_empty(&mdev->read_ee));
D_ASSERT(list_empty(&mdev->active_ee));
/* defined here:
drbd_md_io_complete
- drbd_endio_write_sec
- drbd_endio_read_sec
+ drbd_endio_sec
drbd_endio_pri
* more endio handlers:
/* reads on behalf of the partner,
* "submitted" by the receiver
*/
-void drbd_endio_read_sec(struct bio *bio, int error) __releases(local)
+void drbd_endio_read_sec_final(struct drbd_epoch_entry *e) __releases(local)
{
unsigned long flags = 0;
- struct drbd_epoch_entry *e = NULL;
- struct drbd_conf *mdev;
- int uptodate = bio_flagged(bio, BIO_UPTODATE);
-
- e = bio->bi_private;
- mdev = e->mdev;
-
- if (error)
- dev_warn(DEV, "read: error=%d s=%llus\n", error,
- (unsigned long long)e->sector);
- if (!error && !uptodate) {
- dev_warn(DEV, "read: setting error to -EIO s=%llus\n",
- (unsigned long long)e->sector);
- /* strange behavior of some lower level drivers...
- * fail the request by clearing the uptodate flag,
- * but do not return any error?! */
- error = -EIO;
- }
+ struct drbd_conf *mdev = e->mdev;
D_ASSERT(e->block_id != ID_VACANT);
list_del(&e->w.list);
if (list_empty(&mdev->read_ee))
wake_up(&mdev->ee_wait);
+ if (test_bit(__EE_WAS_ERROR, &e->flags))
+ __drbd_chk_io_error(mdev, FALSE);
spin_unlock_irqrestore(&mdev->req_lock, flags);
- drbd_chk_io_error(mdev, error, FALSE);
drbd_queue_work(&mdev->data.work, &e->w);
put_ldev(mdev);
}
+static int is_failed_barrier(int ee_flags)
+{
+ return (ee_flags & (EE_IS_BARRIER|EE_WAS_ERROR|EE_RESUBMITTED))
+ == (EE_IS_BARRIER|EE_WAS_ERROR);
+}
+
/* writes on behalf of the partner, or resync writes,
- * "submitted" by the receiver.
- */
-void drbd_endio_write_sec(struct bio *bio, int error) __releases(local)
+ * "submitted" by the receiver, final stage. */
+static void drbd_endio_write_sec_final(struct drbd_epoch_entry *e) __releases(local)
{
unsigned long flags = 0;
- struct drbd_epoch_entry *e = NULL;
- struct drbd_conf *mdev;
+ struct drbd_conf *mdev = e->mdev;
sector_t e_sector;
int do_wake;
int is_syncer_req;
int do_al_complete_io;
- int uptodate = bio_flagged(bio, BIO_UPTODATE);
- int is_barrier = bio_rw_flagged(bio, BIO_RW_BARRIER);
- e = bio->bi_private;
- mdev = e->mdev;
-
- if (error)
- dev_warn(DEV, "write: error=%d s=%llus\n", error,
- (unsigned long long)e->sector);
- if (!error && !uptodate) {
- dev_warn(DEV, "write: setting error to -EIO s=%llus\n",
- (unsigned long long)e->sector);
- /* strange behavior of some lower level drivers...
- * fail the request by clearing the uptodate flag,
- * but do not return any error?! */
- error = -EIO;
- }
-
- /* error == -ENOTSUPP would be a better test,
- * alas it is not reliable */
- if (error && is_barrier && e->flags & EE_IS_BARRIER) {
+ /* if this is a failed barrier request, disable use of barriers,
+ * and schedule for resubmission */
+ if (is_failed_barrier(e->flags)) {
drbd_bump_write_ordering(mdev, WO_bdev_flush);
spin_lock_irqsave(&mdev->req_lock, flags);
list_del(&e->w.list);
+ e->flags |= EE_RESUBMITTED;
e->w.cb = w_e_reissue;
/* put_ldev actually happens below, once we come here again. */
__release(local);
D_ASSERT(e->block_id != ID_VACANT);
- spin_lock_irqsave(&mdev->req_lock, flags);
- mdev->writ_cnt += e->size >> 9;
- is_syncer_req = is_syncer_block_id(e->block_id);
-
/* after we moved e to done_ee,
* we may no longer access it,
* it may be freed/reused already!
* (as soon as we release the req_lock) */
e_sector = e->sector;
do_al_complete_io = e->flags & EE_CALL_AL_COMPLETE_IO;
+ is_syncer_req = is_syncer_block_id(e->block_id);
+ spin_lock_irqsave(&mdev->req_lock, flags);
+ mdev->writ_cnt += e->size >> 9;
list_del(&e->w.list); /* has been on active_ee or sync_ee */
list_add_tail(&e->w.list, &mdev->done_ee);
? list_empty(&mdev->sync_ee)
: list_empty(&mdev->active_ee);
- if (error)
+ if (test_bit(__EE_WAS_ERROR, &e->flags))
__drbd_chk_io_error(mdev, FALSE);
spin_unlock_irqrestore(&mdev->req_lock, flags);
wake_asender(mdev);
put_ldev(mdev);
+}
+/* writes on behalf of the partner, or resync writes,
+ * "submitted" by the receiver.
+ */
+void drbd_endio_sec(struct bio *bio, int error)
+{
+ struct drbd_epoch_entry *e = bio->bi_private;
+ struct drbd_conf *mdev = e->mdev;
+ int uptodate = bio_flagged(bio, BIO_UPTODATE);
+ int is_write = bio_data_dir(bio) == WRITE;
+
+ if (error)
+ dev_warn(DEV, "%s: error=%d s=%llus\n",
+ is_write ? "write" : "read", error,
+ (unsigned long long)e->sector);
+ if (!error && !uptodate) {
+ dev_warn(DEV, "%s: setting error to -EIO s=%llus\n",
+ is_write ? "write" : "read",
+ (unsigned long long)e->sector);
+ /* strange behavior of some lower level drivers...
+ * fail the request by clearing the uptodate flag,
+ * but do not return any error?! */
+ error = -EIO;
+ }
+
+ if (error)
+ set_bit(__EE_WAS_ERROR, &e->flags);
+
+ bio_put(bio); /* no need for the bio anymore */
+ if (atomic_dec_and_test(&e->pending_bios)) {
+ if (is_write)
+ drbd_endio_write_sec_final(e);
+ else
+ drbd_endio_read_sec_final(e);
+ }
}
/* read, readA or write requests on R_PRIMARY coming from drbd_make_request
return 1; /* Simply ignore this! */
}
-void drbd_csum(struct drbd_conf *mdev, struct crypto_hash *tfm, struct bio *bio, void *digest)
+void drbd_csum_ee(struct drbd_conf *mdev, struct crypto_hash *tfm, struct drbd_epoch_entry *e, void *digest)
+{
+ struct hash_desc desc;
+ struct scatterlist sg;
+ struct page *page = e->pages;
+ struct page *tmp;
+ unsigned len;
+
+ desc.tfm = tfm;
+ desc.flags = 0;
+
+ sg_init_table(&sg, 1);
+ crypto_hash_init(&desc);
+
+ while ((tmp = page_chain_next(page))) {
+ /* all but the last page will be fully used */
+ sg_set_page(&sg, page, PAGE_SIZE, 0);
+ crypto_hash_update(&desc, &sg, sg.length);
+ page = tmp;
+ }
+ /* and now the last, possibly only partially used page */
+ len = e->size & (PAGE_SIZE - 1);
+ sg_set_page(&sg, page, len ?: PAGE_SIZE, 0);
+ crypto_hash_update(&desc, &sg, sg.length);
+ crypto_hash_final(&desc, digest);
+}
+
+void drbd_csum_bio(struct drbd_conf *mdev, struct crypto_hash *tfm, struct bio *bio, void *digest)
{
struct hash_desc desc;
struct scatterlist sg;
return 1;
}
- if (likely(drbd_bio_uptodate(e->private_bio))) {
+ if (likely((e->flags & EE_WAS_ERROR) == 0)) {
digest_size = crypto_hash_digestsize(mdev->csums_tfm);
digest = kmalloc(digest_size, GFP_NOIO);
if (digest) {
- drbd_csum(mdev, mdev->csums_tfm, e->private_bio, digest);
+ drbd_csum_ee(mdev, mdev->csums_tfm, e, digest);
inc_rs_pending(mdev);
ok = drbd_send_drequest_csum(mdev,
/* GFP_TRY, because if there is no memory available right now, this may
* be rescheduled for later. It is "only" background resync, after all. */
e = drbd_alloc_ee(mdev, DRBD_MAGIC+0xbeef, sector, size, GFP_TRY);
- if (!e) {
- put_ldev(mdev);
- return 2;
- }
+ if (!e)
+ goto fail;
spin_lock_irq(&mdev->req_lock);
list_add(&e->w.list, &mdev->read_ee);
spin_unlock_irq(&mdev->req_lock);
- e->private_bio->bi_end_io = drbd_endio_read_sec;
- e->private_bio->bi_rw = READ;
e->w.cb = w_e_send_csum;
+ if (drbd_submit_ee(mdev, e, READ, DRBD_FAULT_RS_RD) == 0)
+ return 1;
- mdev->read_cnt += size >> 9;
- drbd_generic_make_request(mdev, DRBD_FAULT_RS_RD, e->private_bio);
-
- return 1;
+ drbd_free_ee(mdev, e);
+fail:
+ put_ldev(mdev);
+ return 2;
}
void resync_timer_fn(unsigned long data)
/* helper */
static void move_to_net_ee_or_free(struct drbd_conf *mdev, struct drbd_epoch_entry *e)
{
- if (drbd_bio_has_active_page(e->private_bio)) {
+ if (drbd_ee_has_active_page(e)) {
/* This might happen if sendpage() has not finished */
spin_lock_irq(&mdev->req_lock);
list_add_tail(&e->w.list, &mdev->net_ee);
return 1;
}
- if (likely(drbd_bio_uptodate(e->private_bio))) {
+ if (likely((e->flags & EE_WAS_ERROR) == 0)) {
ok = drbd_send_block(mdev, P_DATA_REPLY, e);
} else {
if (__ratelimit(&drbd_ratelimit_state))
put_ldev(mdev);
}
- if (likely(drbd_bio_uptodate(e->private_bio))) {
+ if (likely((e->flags & EE_WAS_ERROR) == 0)) {
if (likely(mdev->state.pdsk >= D_INCONSISTENT)) {
inc_rs_pending(mdev);
ok = drbd_send_block(mdev, P_RS_DATA_REPLY, e);
di = (struct digest_info *)(unsigned long)e->block_id;
- if (likely(drbd_bio_uptodate(e->private_bio))) {
+ if (likely((e->flags & EE_WAS_ERROR) == 0)) {
/* quick hack to try to avoid a race against reconfiguration.
* a real fix would be much more involved,
* introducing more locking mechanisms */
digest = kmalloc(digest_size, GFP_NOIO);
}
if (digest) {
- drbd_csum(mdev, mdev->csums_tfm, e->private_bio, digest);
+ drbd_csum_ee(mdev, mdev->csums_tfm, e, digest);
eq = !memcmp(digest, di->digest, digest_size);
kfree(digest);
}
if (unlikely(cancel))
goto out;
- if (unlikely(!drbd_bio_uptodate(e->private_bio)))
+ if (unlikely((e->flags & EE_WAS_ERROR) != 0))
goto out;
digest_size = crypto_hash_digestsize(mdev->verify_tfm);
/* FIXME if this allocation fails, online verify will not terminate! */
digest = kmalloc(digest_size, GFP_NOIO);
if (digest) {
- drbd_csum(mdev, mdev->verify_tfm, e->private_bio, digest);
+ drbd_csum_ee(mdev, mdev->verify_tfm, e, digest);
inc_rs_pending(mdev);
ok = drbd_send_drequest_csum(mdev, e->sector, e->size,
digest, digest_size, P_OV_REPLY);
di = (struct digest_info *)(unsigned long)e->block_id;
- if (likely(drbd_bio_uptodate(e->private_bio))) {
+ if (likely((e->flags & EE_WAS_ERROR) == 0)) {
digest_size = crypto_hash_digestsize(mdev->verify_tfm);
digest = kmalloc(digest_size, GFP_NOIO);
if (digest) {
- drbd_csum(mdev, mdev->verify_tfm, e->private_bio, digest);
+ drbd_csum_ee(mdev, mdev->verify_tfm, e, digest);
D_ASSERT(digest_size == di->digest_size);
eq = !memcmp(digest, di->digest, digest_size);