struct bio *private_bio;
struct drbd_interval i;
- unsigned int epoch; /* barrier_nr */
- /* barrier_nr: used to check on "completion" whether this req was in
+ /* epoch: used to check on "completion" whether this req was in
* the current epoch, and we therefore have to close it,
- * starting a new epoch...
+ * causing a p_barrier packet to be send, starting a new epoch.
+ *
+ * This corresponds to "barrier" in struct p_barrier[_ack],
+ * and to "barrier_nr" in struct drbd_epoch (and various
+ * comments/function parameters/local variable names).
*/
+ unsigned int epoch;
struct list_head tl_requests; /* ring list in the transfer log */
struct bio *master_bio; /* master bio pointer */
unsigned long start_time;
};
-struct drbd_tl_epoch {
- struct drbd_work w;
- struct list_head requests; /* requests before */
- struct drbd_tl_epoch *next; /* pointer to the next barrier */
- unsigned int br_number; /* the barriers identifier. */
- int n_writes; /* number of requests attached before this barrier */
-};
-
struct drbd_epoch {
struct drbd_tconn *tconn;
struct list_head list;
unsigned int ko_count;
spinlock_t req_lock;
- struct drbd_tl_epoch *unused_spare_tle; /* for pre-allocation */
- struct drbd_tl_epoch *newest_tle;
- struct drbd_tl_epoch *oldest_tle;
- struct list_head out_of_sequence_requests;
- struct list_head barrier_acked_requests;
+
+ struct list_head transfer_log; /* all requests not yet fully processed */
struct crypto_hash *cram_hmac_tfm;
struct crypto_hash *integrity_tfm; /* checksums we compute, updates protected by tconn->data->mutex */
void *int_dig_in;
void *int_dig_vv;
+ /* receiver side */
struct drbd_epoch *current_epoch;
spinlock_t epoch_lock;
unsigned int epochs;
enum write_ordering_e write_ordering;
atomic_t current_tle_nr; /* transfer log epoch number */
+ unsigned current_tle_writes; /* writes seen within this tl epoch */
unsigned long last_reconnect_jif;
struct drbd_thread receiver;
struct drbd_thread worker;
struct drbd_thread asender;
cpumask_var_t cpu_mask;
+
+ /* sender side */
struct drbd_work_queue sender_work;
+
+ struct {
+ /* whether this sender thread
+ * has processed a single write yet. */
+ bool seen_any_write_yet;
+
+ /* Which barrier number to send with the next P_BARRIER */
+ int current_epoch_nr;
+
+ /* how many write requests have been sent
+ * with req->epoch == current_epoch_nr.
+ * If none, no P_BARRIER will be sent. */
+ unsigned current_epoch_writes;
+ } send;
};
struct drbd_conf {
extern void tl_release(struct drbd_tconn *, unsigned int barrier_nr,
unsigned int set_size);
extern void tl_clear(struct drbd_tconn *);
-extern void _tl_add_barrier(struct drbd_tconn *, struct drbd_tl_epoch *);
extern void drbd_free_sock(struct drbd_tconn *tconn);
extern int drbd_send(struct drbd_tconn *tconn, struct socket *sock,
void *buf, size_t size, unsigned msg_flags);
extern int w_send_write_hint(struct drbd_work *, int);
extern int w_make_resync_request(struct drbd_work *, int);
extern int w_send_dblock(struct drbd_work *, int);
-extern int w_send_barrier(struct drbd_work *, int);
extern int w_send_read_req(struct drbd_work *, int);
extern int w_prev_work_done(struct drbd_work *, int);
extern int w_e_reissue(struct drbd_work *, int);
#endif
/**
- * DOC: The transfer log
- *
- * The transfer log is a single linked list of &struct drbd_tl_epoch objects.
- * mdev->tconn->newest_tle points to the head, mdev->tconn->oldest_tle points to the tail
- * of the list. There is always at least one &struct drbd_tl_epoch object.
- *
- * Each &struct drbd_tl_epoch has a circular double linked list of requests
- * attached.
- */
-static int tl_init(struct drbd_tconn *tconn)
-{
- struct drbd_tl_epoch *b;
-
- /* during device minor initialization, we may well use GFP_KERNEL */
- b = kmalloc(sizeof(struct drbd_tl_epoch), GFP_KERNEL);
- if (!b)
- return 0;
- INIT_LIST_HEAD(&b->requests);
- INIT_LIST_HEAD(&b->w.list);
- b->next = NULL;
- b->br_number = atomic_inc_return(&tconn->current_tle_nr);
- b->n_writes = 0;
- b->w.cb = NULL; /* if this is != NULL, we need to dec_ap_pending in tl_clear */
-
- tconn->oldest_tle = b;
- tconn->newest_tle = b;
- INIT_LIST_HEAD(&tconn->out_of_sequence_requests);
- INIT_LIST_HEAD(&tconn->barrier_acked_requests);
-
- return 1;
-}
-
-static void tl_cleanup(struct drbd_tconn *tconn)
-{
- if (tconn->oldest_tle != tconn->newest_tle)
- conn_err(tconn, "ASSERT FAILED: oldest_tle == newest_tle\n");
- if (!list_empty(&tconn->out_of_sequence_requests))
- conn_err(tconn, "ASSERT FAILED: list_empty(out_of_sequence_requests)\n");
- kfree(tconn->oldest_tle);
- tconn->oldest_tle = NULL;
- kfree(tconn->unused_spare_tle);
- tconn->unused_spare_tle = NULL;
-}
-
-/**
- * _tl_add_barrier() - Adds a barrier to the transfer log
- * @mdev: DRBD device.
- * @new: Barrier to be added before the current head of the TL.
- *
- * The caller must hold the req_lock.
- */
-void _tl_add_barrier(struct drbd_tconn *tconn, struct drbd_tl_epoch *new)
-{
- INIT_LIST_HEAD(&new->requests);
- INIT_LIST_HEAD(&new->w.list);
- new->w.cb = NULL; /* if this is != NULL, we need to dec_ap_pending in tl_clear */
- new->next = NULL;
- new->n_writes = 0;
-
- new->br_number = atomic_inc_return(&tconn->current_tle_nr);
- if (tconn->newest_tle != new) {
- tconn->newest_tle->next = new;
- tconn->newest_tle = new;
- }
-}
-
-/**
- * tl_release() - Free or recycle the oldest &struct drbd_tl_epoch object of the TL
- * @mdev: DRBD device.
+ * tl_release() - mark as BARRIER_ACKED all requests in the corresponding transfer log epoch
+ * @tconn: DRBD connection.
* @barrier_nr: Expected identifier of the DRBD write barrier packet.
* @set_size: Expected number of requests before that barrier.
*
* In case the passed barrier_nr or set_size does not match the oldest
- * &struct drbd_tl_epoch objects this function will cause a termination
- * of the connection.
+ * epoch of not yet barrier-acked requests, this function will cause a
+ * termination of the connection.
*/
void tl_release(struct drbd_tconn *tconn, unsigned int barrier_nr,
unsigned int set_size)
{
- struct drbd_conf *mdev;
- struct drbd_tl_epoch *b, *nob; /* next old barrier */
- struct list_head *le, *tle;
struct drbd_request *r;
+ struct drbd_request *req = NULL;
+ int expect_epoch = 0;
+ int expect_size = 0;
spin_lock_irq(&tconn->req_lock);
- b = tconn->oldest_tle;
+ /* find latest not yet barrier-acked write request,
+ * count writes in its epoch. */
+ list_for_each_entry(r, &tconn->transfer_log, tl_requests) {
+ const unsigned long s = r->rq_state;
+ if (!req) {
+ if (!(s & RQ_WRITE))
+ continue;
+ if (!(s & RQ_NET_MASK))
+ continue;
+ if (s & RQ_NET_DONE)
+ continue;
+ req = r;
+ expect_epoch = req->epoch;
+ expect_size ++;
+ } else {
+ if (r->epoch != expect_epoch)
+ break;
+ if (!(s & RQ_WRITE))
+ continue;
+ /* if (s & RQ_DONE): not expected */
+ /* if (!(s & RQ_NET_MASK)): not expected */
+ expect_size++;
+ }
+ }
/* first some paranoia code */
- if (b == NULL) {
+ if (req == NULL) {
conn_err(tconn, "BAD! BarrierAck #%u received, but no epoch in tl!?\n",
barrier_nr);
goto bail;
}
- if (b->br_number != barrier_nr) {
+ if (expect_epoch != barrier_nr) {
conn_err(tconn, "BAD! BarrierAck #%u received, expected #%u!\n",
- barrier_nr, b->br_number);
+ barrier_nr, expect_epoch);
goto bail;
}
- if (b->n_writes != set_size) {
+
+ if (expect_size != set_size) {
conn_err(tconn, "BAD! BarrierAck #%u received with n_writes=%u, expected n_writes=%u!\n",
- barrier_nr, set_size, b->n_writes);
+ barrier_nr, set_size, expect_size);
goto bail;
}
/* Clean up list of requests processed during current epoch */
- list_for_each_safe(le, tle, &b->requests) {
- r = list_entry(le, struct drbd_request, tl_requests);
- _req_mod(r, BARRIER_ACKED);
- }
- /* There could be requests on the list waiting for completion
- of the write to the local disk. To avoid corruptions of
- slab's data structures we have to remove the lists head.
-
- Also there could have been a barrier ack out of sequence, overtaking
- the write acks - which would be a bug and violating write ordering.
- To not deadlock in case we lose connection while such requests are
- still pending, we need some way to find them for the
- _req_mode(CONNECTION_LOST_WHILE_PENDING).
-
- These have been list_move'd to the out_of_sequence_requests list in
- _req_mod(, BARRIER_ACKED) above.
- */
- list_splice_init(&b->requests, &tconn->barrier_acked_requests);
- mdev = b->w.mdev;
-
- nob = b->next;
- if (test_and_clear_bit(CREATE_BARRIER, &tconn->flags)) {
- _tl_add_barrier(tconn, b);
- if (nob)
- tconn->oldest_tle = nob;
- /* if nob == NULL b was the only barrier, and becomes the new
- barrier. Therefore tconn->oldest_tle points already to b */
- } else {
- D_ASSERT(nob != NULL);
- tconn->oldest_tle = nob;
- kfree(b);
+ list_for_each_entry_safe(req, r, &tconn->transfer_log, tl_requests) {
+ if (req->epoch != expect_epoch)
+ break;
+ _req_mod(req, BARRIER_ACKED);
}
-
spin_unlock_irq(&tconn->req_lock);
- dec_ap_pending(mdev);
return;
* @what might be one of CONNECTION_LOST_WHILE_PENDING, RESEND, FAIL_FROZEN_DISK_IO,
* RESTART_FROZEN_DISK_IO.
*/
+/* must hold resource->req_lock */
void _tl_restart(struct drbd_tconn *tconn, enum drbd_req_event what)
{
- struct drbd_tl_epoch *b, *tmp, **pn;
- struct list_head *le, *tle, carry_reads;
- struct drbd_request *req;
- int rv, n_writes, n_reads;
-
- b = tconn->oldest_tle;
- pn = &tconn->oldest_tle;
- while (b) {
- n_writes = 0;
- n_reads = 0;
- INIT_LIST_HEAD(&carry_reads);
- list_for_each_safe(le, tle, &b->requests) {
- req = list_entry(le, struct drbd_request, tl_requests);
- rv = _req_mod(req, what);
-
- if (rv & MR_WRITE)
- n_writes++;
- if (rv & MR_READ)
- n_reads++;
- }
- tmp = b->next;
-
- if (n_writes) {
- if (what == RESEND) {
- b->n_writes = n_writes;
- if (b->w.cb == NULL) {
- b->w.cb = w_send_barrier;
- inc_ap_pending(b->w.mdev);
- set_bit(CREATE_BARRIER, &tconn->flags);
- }
-
- drbd_queue_work(&tconn->sender_work, &b->w);
- }
- pn = &b->next;
- } else {
- if (n_reads)
- list_add(&carry_reads, &b->requests);
- /* there could still be requests on that ring list,
- * in case local io is still pending */
- list_del(&b->requests);
-
- /* dec_ap_pending corresponding to queue_barrier.
- * the newest barrier may not have been queued yet,
- * in which case w.cb is still NULL. */
- if (b->w.cb != NULL)
- dec_ap_pending(b->w.mdev);
-
- if (b == tconn->newest_tle) {
- /* recycle, but reinit! */
- if (tmp != NULL)
- conn_err(tconn, "ASSERT FAILED tmp == NULL");
- INIT_LIST_HEAD(&b->requests);
- list_splice(&carry_reads, &b->requests);
- INIT_LIST_HEAD(&b->w.list);
- b->w.cb = NULL;
- b->br_number = atomic_inc_return(&tconn->current_tle_nr);
- b->n_writes = 0;
-
- *pn = b;
- break;
- }
- *pn = tmp;
- kfree(b);
- }
- b = tmp;
- list_splice(&carry_reads, &b->requests);
- }
-
- /* Actions operating on the disk state, also want to work on
- requests that got barrier acked. */
- switch (what) {
- case FAIL_FROZEN_DISK_IO:
- case RESTART_FROZEN_DISK_IO:
- list_for_each_safe(le, tle, &tconn->barrier_acked_requests) {
- req = list_entry(le, struct drbd_request, tl_requests);
- _req_mod(req, what);
- }
- case CONNECTION_LOST_WHILE_PENDING:
- case RESEND:
- break;
- default:
- conn_err(tconn, "what = %d in _tl_restart()\n", what);
- }
+ struct drbd_request *req, *r;
+
+ list_for_each_entry_safe(req, r, &tconn->transfer_log, tl_requests)
+ _req_mod(req, what);
+}
+
+void tl_restart(struct drbd_tconn *tconn, enum drbd_req_event what)
+{
+ spin_lock_irq(&tconn->req_lock);
+ _tl_restart(tconn, what);
+ spin_unlock_irq(&tconn->req_lock);
}
/**
*/
void tl_clear(struct drbd_tconn *tconn)
{
- struct list_head *le, *tle;
- struct drbd_request *r;
-
- spin_lock_irq(&tconn->req_lock);
-
- _tl_restart(tconn, CONNECTION_LOST_WHILE_PENDING);
-
- /* we expect this list to be empty. */
- if (!list_empty(&tconn->out_of_sequence_requests))
- conn_err(tconn, "ASSERT FAILED list_empty(&out_of_sequence_requests)\n");
-
- /* but just in case, clean it up anyways! */
- list_for_each_safe(le, tle, &tconn->out_of_sequence_requests) {
- r = list_entry(le, struct drbd_request, tl_requests);
- /* It would be nice to complete outside of spinlock.
- * But this is easier for now. */
- _req_mod(r, CONNECTION_LOST_WHILE_PENDING);
- }
-
- /* ensure bit indicating barrier is required is clear */
- clear_bit(CREATE_BARRIER, &tconn->flags);
-
- spin_unlock_irq(&tconn->req_lock);
-}
-
-void tl_restart(struct drbd_tconn *tconn, enum drbd_req_event what)
-{
- spin_lock_irq(&tconn->req_lock);
- _tl_restart(tconn, what);
- spin_unlock_irq(&tconn->req_lock);
+ tl_restart(tconn, CONNECTION_LOST_WHILE_PENDING);
}
/**
void tl_abort_disk_io(struct drbd_conf *mdev)
{
struct drbd_tconn *tconn = mdev->tconn;
- struct drbd_tl_epoch *b;
- struct list_head *le, *tle;
- struct drbd_request *req;
+ struct drbd_request *req, *r;
spin_lock_irq(&tconn->req_lock);
- b = tconn->oldest_tle;
- while (b) {
- list_for_each_safe(le, tle, &b->requests) {
- req = list_entry(le, struct drbd_request, tl_requests);
- if (!(req->rq_state & RQ_LOCAL_PENDING))
- continue;
- if (req->w.mdev == mdev)
- _req_mod(req, ABORT_DISK_IO);
- }
- b = b->next;
- }
-
- list_for_each_safe(le, tle, &tconn->barrier_acked_requests) {
- req = list_entry(le, struct drbd_request, tl_requests);
+ list_for_each_entry_safe(req, r, &tconn->transfer_log, tl_requests) {
if (!(req->rq_state & RQ_LOCAL_PENDING))
continue;
- if (req->w.mdev == mdev)
- _req_mod(req, ABORT_DISK_IO);
+ if (req->w.mdev != mdev)
+ continue;
+ _req_mod(req, ABORT_DISK_IO);
}
-
spin_unlock_irq(&tconn->req_lock);
}
if (set_resource_options(tconn, res_opts))
goto fail;
- if (!tl_init(tconn))
- goto fail;
-
tconn->current_epoch = kzalloc(sizeof(struct drbd_epoch), GFP_KERNEL);
if (!tconn->current_epoch)
goto fail;
+
+ INIT_LIST_HEAD(&tconn->transfer_log);
+
INIT_LIST_HEAD(&tconn->current_epoch->list);
tconn->epochs = 1;
spin_lock_init(&tconn->epoch_lock);
tconn->write_ordering = WO_bdev_flush;
+ tconn->send.seen_any_write_yet = false;
+ tconn->send.current_epoch_nr = 0;
+ tconn->send.current_epoch_writes = 0;
+
tconn->cstate = C_STANDALONE;
mutex_init(&tconn->cstate_mutex);
spin_lock_init(&tconn->req_lock);
fail:
kfree(tconn->current_epoch);
- tl_cleanup(tconn);
free_cpumask_var(tconn->cpu_mask);
drbd_free_socket(&tconn->meta);
drbd_free_socket(&tconn->data);
/* Wait until nothing is on the fly :) */
wait_event(mdev->misc_wait, atomic_read(&mdev->ap_pending_cnt) == 0);
+ /* FIXME also wait for all pending P_BARRIER_ACK? */
+
if (new_role == R_SECONDARY) {
set_disk_ro(mdev->vdisk, true);
if (get_ldev(mdev)) {
drbd_suspend_io(mdev);
/* also wait for the last barrier ack. */
+ /* FIXME see also https://daiquiri.linbit/cgi-bin/bugzilla/show_bug.cgi?id=171
+ * We need a way to either ignore barrier acks for barriers sent before a device
+ * was attached, or a way to wait for all pending barrier acks to come in.
+ * As barriers are counted per resource,
+ * we'd need to suspend io on all devices of a resource.
+ */
wait_event(mdev->misc_wait, !atomic_read(&mdev->ap_pending_cnt) || drbd_suspended(mdev));
/* and for any other previously queued work */
drbd_flush_workqueue(mdev);
conn_err(tconn, "ASSERTION FAILED: tconn->current_epoch->list not empty\n");
/* ok, no more ee's on the fly, it is safe to reset the epoch_size */
atomic_set(&tconn->current_epoch->epoch_size, 0);
+ tconn->send.seen_any_write_yet = false;
conn_info(tconn, "Connection closed\n");
drbd_req_free(req);
}
-static void queue_barrier(struct drbd_conf *mdev)
-{
- struct drbd_tl_epoch *b;
- struct drbd_tconn *tconn = mdev->tconn;
-
- /* We are within the req_lock. Once we queued the barrier for sending,
- * we set the CREATE_BARRIER bit. It is cleared as soon as a new
- * barrier/epoch object is added. This is the only place this bit is
- * set. It indicates that the barrier for this epoch is already queued,
- * and no new epoch has been created yet. */
- if (test_bit(CREATE_BARRIER, &tconn->flags))
- return;
-
- b = tconn->newest_tle;
- b->w.cb = w_send_barrier;
- b->w.mdev = mdev;
- /* inc_ap_pending done here, so we won't
- * get imbalanced on connection loss.
- * dec_ap_pending will be done in got_BarrierAck
- * or (on connection loss) in tl_clear. */
- inc_ap_pending(mdev);
- drbd_queue_work(&tconn->sender_work, &b->w);
- set_bit(CREATE_BARRIER, &tconn->flags);
+static void wake_all_senders(struct drbd_tconn *tconn) {
+ wake_up(&tconn->sender_work.q_wait);
}
-static void _about_to_complete_local_write(struct drbd_conf *mdev,
- struct drbd_request *req)
+/* must hold resource->req_lock */
+static void start_new_tl_epoch(struct drbd_tconn *tconn)
{
- const unsigned long s = req->rq_state;
-
- /* Before we can signal completion to the upper layers,
- * we may need to close the current epoch.
- * We can skip this, if this request has not even been sent, because we
- * did not have a fully established connection yet/anymore, during
- * bitmap exchange, or while we are C_AHEAD due to congestion policy.
- */
- if (mdev->state.conn >= C_CONNECTED &&
- (s & RQ_NET_SENT) != 0 &&
- req->epoch == atomic_read(&mdev->tconn->current_tle_nr))
- queue_barrier(mdev);
+ tconn->current_tle_writes = 0;
+ atomic_inc(&tconn->current_tle_nr);
+ wake_all_senders(tconn);
}
void complete_master_bio(struct drbd_conf *mdev,
} else if (!(s & RQ_POSTPONED))
D_ASSERT((s & (RQ_NET_MASK & ~RQ_NET_DONE)) == 0);
- /* for writes we need to do some extra housekeeping */
- if (rw == WRITE)
- _about_to_complete_local_write(mdev, req);
+ /* Before we can signal completion to the upper layers,
+ * we may need to close the current transfer log epoch.
+ * We are within the request lock, so we can simply compare
+ * the request epoch number with the current transfer log
+ * epoch number. If they match, increase the current_tle_nr,
+ * and reset the transfer log epoch write_cnt.
+ */
+ if (rw == WRITE &&
+ req->epoch == atomic_read(&mdev->tconn->current_tle_nr))
+ start_new_tl_epoch(mdev->tconn);
/* Update disk stats */
_drbd_end_io_acct(mdev, req);
* hurting performance. */
set_bit(UNPLUG_REMOTE, &mdev->flags);
- /* see __drbd_make_request,
- * just after it grabs the req_lock */
- D_ASSERT(test_bit(CREATE_BARRIER, &mdev->tconn->flags) == 0);
-
- req->epoch = atomic_read(&mdev->tconn->current_tle_nr);
-
- /* increment size of current epoch */
- mdev->tconn->newest_tle->n_writes++;
-
/* queue work item to send data */
D_ASSERT(req->rq_state & RQ_NET_PENDING);
req->rq_state |= RQ_NET_QUEUED;
nc = rcu_dereference(mdev->tconn->net_conf);
p = nc->max_epoch_size;
rcu_read_unlock();
- if (mdev->tconn->newest_tle->n_writes >= p)
- queue_barrier(mdev);
+ if (mdev->tconn->current_tle_writes >= p)
+ start_new_tl_epoch(mdev->tconn);
break;
During connection handshake, we ensure that the peer was not rebooted. */
if (!(req->rq_state & RQ_NET_OK)) {
if (req->w.cb) {
+ /* w.cb expected to be w_send_dblock, or w_send_read_req */
drbd_queue_work(&mdev->tconn->sender_work, &req->w);
rv = req->rq_state & RQ_WRITE ? MR_WRITE : MR_READ;
}
* this is bad, because if the connection is lost now,
* we won't be able to clean them up... */
dev_err(DEV, "FIXME (BARRIER_ACKED but pending)\n");
- list_move(&req->tl_requests, &mdev->tconn->out_of_sequence_requests);
}
if ((req->rq_state & RQ_NET_MASK) != 0) {
req->rq_state |= RQ_NET_DONE;
const int rw = bio_rw(bio);
const int size = bio->bi_size;
const sector_t sector = bio->bi_sector;
- struct drbd_tl_epoch *b = NULL;
struct drbd_request *req;
struct net_conf *nc;
int local, remote, send_oos = 0;
goto fail_free_complete;
}
- /* For WRITE request, we have to make sure that we have an
- * unused_spare_tle, in case we need to start a new epoch.
- * I try to be smart and avoid to pre-allocate always "just in case",
- * but there is a race between testing the bit and pointer outside the
- * spinlock, and grabbing the spinlock.
- * if we lost that race, we retry. */
- if (rw == WRITE && (remote || send_oos) &&
- mdev->tconn->unused_spare_tle == NULL &&
- test_bit(CREATE_BARRIER, &mdev->tconn->flags)) {
-allocate_barrier:
- b = kmalloc(sizeof(struct drbd_tl_epoch), GFP_NOIO);
- if (!b) {
- dev_err(DEV, "Failed to alloc barrier.\n");
- err = -ENOMEM;
- goto fail_free_complete;
- }
- }
-
/* GOOD, everything prepared, grab the spin_lock */
spin_lock_irq(&mdev->tconn->req_lock);
}
}
- if (b && mdev->tconn->unused_spare_tle == NULL) {
- mdev->tconn->unused_spare_tle = b;
- b = NULL;
- }
- if (rw == WRITE && (remote || send_oos) &&
- mdev->tconn->unused_spare_tle == NULL &&
- test_bit(CREATE_BARRIER, &mdev->tconn->flags)) {
- /* someone closed the current epoch
- * while we were grabbing the spinlock */
- spin_unlock_irq(&mdev->tconn->req_lock);
- goto allocate_barrier;
- }
-
-
/* Update disk stats */
_drbd_start_io_acct(mdev, req, bio);
- /* _maybe_start_new_epoch(mdev);
- * If we need to generate a write barrier packet, we have to add the
- * new epoch (barrier) object, and queue the barrier packet for sending,
- * and queue the req's data after it _within the same lock_, otherwise
- * we have race conditions were the reorder domains could be mixed up.
- *
- * Even read requests may start a new epoch and queue the corresponding
- * barrier packet. To get the write ordering right, we only have to
- * make sure that, if this is a write request and it triggered a
- * barrier packet, this request is queued within the same spinlock. */
- if ((remote || send_oos) && mdev->tconn->unused_spare_tle &&
- test_and_clear_bit(CREATE_BARRIER, &mdev->tconn->flags)) {
- _tl_add_barrier(mdev->tconn, mdev->tconn->unused_spare_tle);
- mdev->tconn->unused_spare_tle = NULL;
- } else {
- D_ASSERT(!(remote && rw == WRITE &&
- test_bit(CREATE_BARRIER, &mdev->tconn->flags)));
- }
-
/* NOTE
* Actually, 'local' may be wrong here already, since we may have failed
* to write to the meta data, and may become wrong anytime because of
if (local)
_req_mod(req, TO_BE_SUBMITTED);
- list_add_tail(&req->tl_requests, &mdev->tconn->newest_tle->requests);
+ /* which transfer log epoch does this belong to? */
+ req->epoch = atomic_read(&mdev->tconn->current_tle_nr);
+ if (rw == WRITE)
+ mdev->tconn->current_tle_writes++;
+
+ list_add_tail(&req->tl_requests, &mdev->tconn->transfer_log);
/* NOTE remote first: to get the concurrent write detection right,
* we must register the request before start of local IO. */
}
if (congested) {
- queue_barrier(mdev); /* last barrier, after mirrored writes */
+ if (mdev->tconn->current_tle_writes)
+ /* start a new epoch for non-mirrored writes */
+ start_new_tl_epoch(mdev->tconn);
if (nc->on_congestion == OC_PULL_AHEAD)
_drbd_set_state(_NS(mdev, conn, C_AHEAD), 0, NULL);
rcu_read_unlock();
spin_unlock_irq(&mdev->tconn->req_lock);
- kfree(b); /* if someone else has beaten us to it... */
if (local) {
req->private_bio->bi_bdev = mdev->ldev->backing_bdev;
drbd_req_free(req);
dec_ap_bio(mdev);
- kfree(b);
return ret;
}
return limit;
}
+struct drbd_request *find_oldest_request(struct drbd_tconn *tconn)
+{
+ /* Walk the transfer log,
+ * and find the oldest not yet completed request */
+ struct drbd_request *r;
+ list_for_each_entry(r, &tconn->transfer_log, tl_requests) {
+ if (r->rq_state & (RQ_NET_PENDING|RQ_LOCAL_PENDING))
+ return r;
+ }
+ return NULL;
+}
+
void request_timer_fn(unsigned long data)
{
struct drbd_conf *mdev = (struct drbd_conf *) data;
struct drbd_tconn *tconn = mdev->tconn;
struct drbd_request *req; /* oldest request */
- struct list_head *le;
struct net_conf *nc;
unsigned long ent = 0, dt = 0, et, nt; /* effective timeout = ko_count * timeout */
unsigned long now;
now = jiffies;
spin_lock_irq(&tconn->req_lock);
- le = &tconn->oldest_tle->requests;
- if (list_empty(le)) {
+ req = find_oldest_request(tconn);
+ if (!req) {
spin_unlock_irq(&tconn->req_lock);
mod_timer(&mdev->request_timer, now + et);
return;
}
- le = le->prev;
- req = list_entry(le, struct drbd_request, tl_requests);
-
/* The request is considered timed out, if
* - we have some effective timeout from the configuration,
* with above state restrictions applied,
return 0;
}
-int w_send_barrier(struct drbd_work *w, int cancel)
+/* FIXME
+ * We need to track the number of pending barrier acks,
+ * and to be able to wait for them.
+ * See also comment in drbd_adm_attach before drbd_suspend_io.
+ */
+int drbd_send_barrier(struct drbd_tconn *tconn)
{
- struct drbd_socket *sock;
- struct drbd_tl_epoch *b = container_of(w, struct drbd_tl_epoch, w);
- struct drbd_conf *mdev = w->mdev;
struct p_barrier *p;
+ struct drbd_socket *sock;
- /* really avoid racing with tl_clear. w.cb may have been referenced
- * just before it was reassigned and re-queued, so double check that.
- * actually, this race was harmless, since we only try to send the
- * barrier packet here, and otherwise do nothing with the object.
- * but compare with the head of w_clear_epoch */
- spin_lock_irq(&mdev->tconn->req_lock);
- if (w->cb != w_send_barrier || mdev->state.conn < C_CONNECTED)
- cancel = 1;
- spin_unlock_irq(&mdev->tconn->req_lock);
- if (cancel)
- return 0;
-
- sock = &mdev->tconn->data;
- p = drbd_prepare_command(mdev, sock);
+ sock = &tconn->data;
+ p = conn_prepare_command(tconn, sock);
if (!p)
return -EIO;
- p->barrier = b->br_number;
- /* inc_ap_pending was done where this was queued.
- * dec_ap_pending will be done in got_BarrierAck
- * or (on connection loss) in w_clear_epoch. */
- return drbd_send_command(mdev, sock, P_BARRIER, sizeof(*p), NULL, 0);
+ p->barrier = tconn->send.current_epoch_nr;
+ p->pad = 0;
+ tconn->send.current_epoch_writes = 0;
+
+ return conn_send_command(tconn, sock, P_BARRIER, sizeof(*p), NULL, 0);
}
int w_send_write_hint(struct drbd_work *w, int cancel)
{
struct drbd_request *req = container_of(w, struct drbd_request, w);
struct drbd_conf *mdev = w->mdev;
+ struct drbd_tconn *tconn = mdev->tconn;
int err;
if (unlikely(cancel)) {
return 0;
}
+ if (!tconn->send.seen_any_write_yet) {
+ tconn->send.seen_any_write_yet = true;
+ tconn->send.current_epoch_nr = req->epoch;
+ }
+ if (tconn->send.current_epoch_nr != req->epoch) {
+ if (tconn->send.current_epoch_writes)
+ drbd_send_barrier(tconn);
+ tconn->send.current_epoch_nr = req->epoch;
+ }
+ /* this time, no tconn->send.current_epoch_writes++;
+ * If it was sent, it was the closing barrier for the last
+ * replicated epoch, before we went into AHEAD mode.
+ * No more barriers will be sent, until we leave AHEAD mode again. */
+
err = drbd_send_out_of_sync(mdev, req);
req_mod(req, OOS_HANDED_TO_NETWORK);
{
struct drbd_request *req = container_of(w, struct drbd_request, w);
struct drbd_conf *mdev = w->mdev;
+ struct drbd_tconn *tconn = mdev->tconn;
int err;
if (unlikely(cancel)) {
return 0;
}
+ if (!tconn->send.seen_any_write_yet) {
+ tconn->send.seen_any_write_yet = true;
+ tconn->send.current_epoch_nr = req->epoch;
+ }
+ if (tconn->send.current_epoch_nr != req->epoch) {
+ if (tconn->send.current_epoch_writes)
+ drbd_send_barrier(tconn);
+ tconn->send.current_epoch_nr = req->epoch;
+ }
+ tconn->send.current_epoch_writes++;
+
err = drbd_send_dblock(mdev, req);
req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK);
{
struct drbd_request *req = container_of(w, struct drbd_request, w);
struct drbd_conf *mdev = w->mdev;
+ struct drbd_tconn *tconn = mdev->tconn;
int err;
if (unlikely(cancel)) {
return 0;
}
+ /* Even read requests may close a write epoch,
+ * if there was any yet. */
+ if (tconn->send.seen_any_write_yet &&
+ tconn->send.current_epoch_nr != req->epoch) {
+ if (tconn->send.current_epoch_writes)
+ drbd_send_barrier(tconn);
+ tconn->send.current_epoch_nr = req->epoch;
+ }
+
err = drbd_send_drequest(mdev, P_DATA_REQUEST, req->i.sector, req->i.size,
(unsigned long)req);
mutex_unlock(mdev->state_mutex);
}
+/* If the resource already closed the current epoch, but we did not
+ * (because we have not yet seen new requests), we should send the
+ * corresponding barrier now. Must be checked within the same spinlock
+ * that is used to check for new requests. */
+bool need_to_send_barrier(struct drbd_tconn *connection)
+{
+ if (!connection->send.seen_any_write_yet)
+ return false;
+
+ /* Skip barriers that do not contain any writes.
+ * This may happen during AHEAD mode. */
+ if (!connection->send.current_epoch_writes)
+ return false;
+
+ /* ->req_lock is held when requests are queued on
+ * connection->sender_work, and put into ->transfer_log.
+ * It is also held when ->current_tle_nr is increased.
+ * So either there are already new requests queued,
+ * and corresponding barriers will be send there.
+ * Or nothing new is queued yet, so the difference will be 1.
+ */
+ if (atomic_read(&connection->current_tle_nr) !=
+ connection->send.current_epoch_nr + 1)
+ return false;
+
+ return true;
+}
+
bool dequeue_work_batch(struct drbd_work_queue *queue, struct list_head *work_list)
{
spin_lock_irq(&queue->q_lock);
return !list_empty(work_list);
}
+void wait_for_work(struct drbd_tconn *connection, struct list_head *work_list)
+{
+ DEFINE_WAIT(wait);
+ struct net_conf *nc;
+ int uncork, cork;
+
+ dequeue_work_item(&connection->sender_work, work_list);
+ if (!list_empty(work_list))
+ return;
+
+ /* Still nothing to do?
+ * Maybe we still need to close the current epoch,
+ * even if no new requests are queued yet.
+ *
+ * Also, poke TCP, just in case.
+ * Then wait for new work (or signal). */
+ rcu_read_lock();
+ nc = rcu_dereference(connection->net_conf);
+ uncork = nc ? nc->tcp_cork : 0;
+ rcu_read_unlock();
+ if (uncork) {
+ mutex_lock(&connection->data.mutex);
+ if (connection->data.socket)
+ drbd_tcp_uncork(connection->data.socket);
+ mutex_unlock(&connection->data.mutex);
+ }
+
+ for (;;) {
+ int send_barrier;
+ prepare_to_wait(&connection->sender_work.q_wait, &wait, TASK_INTERRUPTIBLE);
+ spin_lock_irq(&connection->req_lock);
+ spin_lock(&connection->sender_work.q_lock); /* FIXME get rid of this one? */
+ list_splice_init(&connection->sender_work.q, work_list);
+ spin_unlock(&connection->sender_work.q_lock); /* FIXME get rid of this one? */
+ if (!list_empty(work_list) || signal_pending(current)) {
+ spin_unlock_irq(&connection->req_lock);
+ break;
+ }
+ send_barrier = need_to_send_barrier(connection);
+ spin_unlock_irq(&connection->req_lock);
+ if (send_barrier) {
+ drbd_send_barrier(connection);
+ connection->send.current_epoch_nr++;
+ }
+ schedule();
+ /* may be woken up for other things but new work, too,
+ * e.g. if the current epoch got closed.
+ * In which case we send the barrier above. */
+ }
+ finish_wait(&connection->sender_work.q_wait, &wait);
+
+ /* someone may have changed the config while we have been waiting above. */
+ rcu_read_lock();
+ nc = rcu_dereference(connection->net_conf);
+ cork = nc ? nc->tcp_cork : 0;
+ rcu_read_unlock();
+ mutex_lock(&connection->data.mutex);
+ if (connection->data.socket) {
+ if (cork)
+ drbd_tcp_cork(connection->data.socket);
+ else if (!uncork)
+ drbd_tcp_uncork(connection->data.socket);
+ }
+ mutex_unlock(&connection->data.mutex);
+}
+
int drbd_worker(struct drbd_thread *thi)
{
struct drbd_tconn *tconn = thi->tconn;
struct drbd_work *w = NULL;
struct drbd_conf *mdev;
- struct net_conf *nc;
LIST_HEAD(work_list);
int vnr;
- int cork;
while (get_t_state(thi) == RUNNING) {
drbd_thread_current_set_cpu(thi);
/* as long as we use drbd_queue_work_front(),
* we may only dequeue single work items here, not batches. */
if (list_empty(&work_list))
- dequeue_work_item(&tconn->sender_work, &work_list);
-
- /* Still nothing to do? Poke TCP, just in case,
- * then wait for new work (or signal). */
- if (list_empty(&work_list)) {
- mutex_lock(&tconn->data.mutex);
- rcu_read_lock();
- nc = rcu_dereference(tconn->net_conf);
- cork = nc ? nc->tcp_cork : 0;
- rcu_read_unlock();
-
- if (tconn->data.socket && cork)
- drbd_tcp_uncork(tconn->data.socket);
- mutex_unlock(&tconn->data.mutex);
-
- wait_event_interruptible(tconn->sender_work.q_wait,
- dequeue_work_item(&tconn->sender_work, &work_list));
-
- mutex_lock(&tconn->data.mutex);
- if (tconn->data.socket && cork)
- drbd_tcp_cork(tconn->data.socket);
- mutex_unlock(&tconn->data.mutex);
- }
+ wait_for_work(tconn, &work_list);
if (signal_pending(current)) {
flush_signals(current);