drbd: fix wrong assert in completion/retry path of failed local reads
[platform/adaptation/renesas_rcar/renesas_kernel.git] / drivers / block / drbd / drbd_req.c
1 /*
2    drbd_req.c
3
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10    drbd is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2, or (at your option)
13    any later version.
14
15    drbd is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19
20    You should have received a copy of the GNU General Public License
21    along with drbd; see the file COPYING.  If not, write to
22    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23
24  */
25
26 #include <linux/module.h>
27
28 #include <linux/slab.h>
29 #include <linux/drbd.h>
30 #include "drbd_int.h"
31 #include "drbd_req.h"
32
33
34 static bool drbd_may_do_local_read(struct drbd_conf *mdev, sector_t sector, int size);
35
36 /* Update disk stats at start of I/O request */
37 static void _drbd_start_io_acct(struct drbd_conf *mdev, struct drbd_request *req, struct bio *bio)
38 {
39         const int rw = bio_data_dir(bio);
40         int cpu;
41         cpu = part_stat_lock();
42         part_round_stats(cpu, &mdev->vdisk->part0);
43         part_stat_inc(cpu, &mdev->vdisk->part0, ios[rw]);
44         part_stat_add(cpu, &mdev->vdisk->part0, sectors[rw], bio_sectors(bio));
45         (void) cpu; /* The macro invocations above want the cpu argument, I do not like
46                        the compiler warning about cpu only assigned but never used... */
47         part_inc_in_flight(&mdev->vdisk->part0, rw);
48         part_stat_unlock();
49 }
50
51 /* Update disk stats when completing request upwards */
52 static void _drbd_end_io_acct(struct drbd_conf *mdev, struct drbd_request *req)
53 {
54         int rw = bio_data_dir(req->master_bio);
55         unsigned long duration = jiffies - req->start_time;
56         int cpu;
57         cpu = part_stat_lock();
58         part_stat_add(cpu, &mdev->vdisk->part0, ticks[rw], duration);
59         part_round_stats(cpu, &mdev->vdisk->part0);
60         part_dec_in_flight(&mdev->vdisk->part0, rw);
61         part_stat_unlock();
62 }
63
64 static struct drbd_request *drbd_req_new(struct drbd_conf *mdev,
65                                                struct bio *bio_src)
66 {
67         struct drbd_request *req;
68
69         req = mempool_alloc(drbd_request_mempool, GFP_NOIO);
70         if (!req)
71                 return NULL;
72
73         drbd_req_make_private_bio(req, bio_src);
74         req->rq_state    = bio_data_dir(bio_src) == WRITE ? RQ_WRITE : 0;
75         req->w.mdev      = mdev;
76         req->master_bio  = bio_src;
77         req->epoch       = 0;
78
79         drbd_clear_interval(&req->i);
80         req->i.sector     = bio_src->bi_sector;
81         req->i.size      = bio_src->bi_size;
82         req->i.local = true;
83         req->i.waiting = false;
84
85         INIT_LIST_HEAD(&req->tl_requests);
86         INIT_LIST_HEAD(&req->w.list);
87
88         return req;
89 }
90
91 static void drbd_req_free(struct drbd_request *req)
92 {
93         mempool_free(req, drbd_request_mempool);
94 }
95
96 /* rw is bio_data_dir(), only READ or WRITE */
97 static void _req_is_done(struct drbd_conf *mdev, struct drbd_request *req, const int rw)
98 {
99         const unsigned long s = req->rq_state;
100
101         /* remove it from the transfer log.
102          * well, only if it had been there in the first
103          * place... if it had not (local only or conflicting
104          * and never sent), it should still be "empty" as
105          * initialized in drbd_req_new(), so we can list_del() it
106          * here unconditionally */
107         list_del_init(&req->tl_requests);
108
109         /* if it was a write, we may have to set the corresponding
110          * bit(s) out-of-sync first. If it had a local part, we need to
111          * release the reference to the activity log. */
112         if (rw == WRITE) {
113                 /* Set out-of-sync unless both OK flags are set
114                  * (local only or remote failed).
115                  * Other places where we set out-of-sync:
116                  * READ with local io-error */
117                 if (!(s & RQ_NET_OK) || !(s & RQ_LOCAL_OK))
118                         drbd_set_out_of_sync(mdev, req->i.sector, req->i.size);
119
120                 if ((s & RQ_NET_OK) && (s & RQ_LOCAL_OK) && (s & RQ_NET_SIS))
121                         drbd_set_in_sync(mdev, req->i.sector, req->i.size);
122
123                 /* one might be tempted to move the drbd_al_complete_io
124                  * to the local io completion callback drbd_request_endio.
125                  * but, if this was a mirror write, we may only
126                  * drbd_al_complete_io after this is RQ_NET_DONE,
127                  * otherwise the extent could be dropped from the al
128                  * before it has actually been written on the peer.
129                  * if we crash before our peer knows about the request,
130                  * but after the extent has been dropped from the al,
131                  * we would forget to resync the corresponding extent.
132                  */
133                 if (s & RQ_LOCAL_MASK) {
134                         if (get_ldev_if_state(mdev, D_FAILED)) {
135                                 if (s & RQ_IN_ACT_LOG)
136                                         drbd_al_complete_io(mdev, &req->i);
137                                 put_ldev(mdev);
138                         } else if (__ratelimit(&drbd_ratelimit_state)) {
139                                 dev_warn(DEV, "Should have called drbd_al_complete_io(, %llu, %u), "
140                                          "but my Disk seems to have failed :(\n",
141                                          (unsigned long long) req->i.sector, req->i.size);
142                         }
143                 }
144         }
145
146         if (s & RQ_POSTPONED)
147                 drbd_restart_write(req);
148         else
149                 drbd_req_free(req);
150 }
151
152 static void queue_barrier(struct drbd_conf *mdev)
153 {
154         struct drbd_tl_epoch *b;
155         struct drbd_tconn *tconn = mdev->tconn;
156
157         /* We are within the req_lock. Once we queued the barrier for sending,
158          * we set the CREATE_BARRIER bit. It is cleared as soon as a new
159          * barrier/epoch object is added. This is the only place this bit is
160          * set. It indicates that the barrier for this epoch is already queued,
161          * and no new epoch has been created yet. */
162         if (test_bit(CREATE_BARRIER, &tconn->flags))
163                 return;
164
165         b = tconn->newest_tle;
166         b->w.cb = w_send_barrier;
167         b->w.mdev = mdev;
168         /* inc_ap_pending done here, so we won't
169          * get imbalanced on connection loss.
170          * dec_ap_pending will be done in got_BarrierAck
171          * or (on connection loss) in tl_clear.  */
172         inc_ap_pending(mdev);
173         drbd_queue_work(&tconn->data.work, &b->w);
174         set_bit(CREATE_BARRIER, &tconn->flags);
175 }
176
177 static void _about_to_complete_local_write(struct drbd_conf *mdev,
178         struct drbd_request *req)
179 {
180         const unsigned long s = req->rq_state;
181
182         /* Before we can signal completion to the upper layers,
183          * we may need to close the current epoch.
184          * We can skip this, if this request has not even been sent, because we
185          * did not have a fully established connection yet/anymore, during
186          * bitmap exchange, or while we are C_AHEAD due to congestion policy.
187          */
188         if (mdev->state.conn >= C_CONNECTED &&
189             (s & RQ_NET_SENT) != 0 &&
190             req->epoch == mdev->tconn->newest_tle->br_number)
191                 queue_barrier(mdev);
192 }
193
194 void complete_master_bio(struct drbd_conf *mdev,
195                 struct bio_and_error *m)
196 {
197         bio_endio(m->bio, m->error);
198         dec_ap_bio(mdev);
199 }
200
201
202 static void drbd_remove_request_interval(struct rb_root *root,
203                                          struct drbd_request *req)
204 {
205         struct drbd_conf *mdev = req->w.mdev;
206         struct drbd_interval *i = &req->i;
207
208         drbd_remove_interval(root, i);
209
210         /* Wake up any processes waiting for this request to complete.  */
211         if (i->waiting)
212                 wake_up(&mdev->misc_wait);
213 }
214
215 static void maybe_wakeup_conflicting_requests(struct drbd_request *req)
216 {
217         const unsigned long s = req->rq_state;
218         if (s & RQ_LOCAL_PENDING && !(s & RQ_LOCAL_ABORTED))
219                 return;
220         if (req->i.waiting)
221                 /* Retry all conflicting peer requests.  */
222                 wake_up(&req->w.mdev->misc_wait);
223 }
224
225 static
226 void req_may_be_done(struct drbd_request *req)
227 {
228         const unsigned long s = req->rq_state;
229         struct drbd_conf *mdev = req->w.mdev;
230         int rw = req->rq_state & RQ_WRITE ? WRITE : READ;
231
232         /* req->master_bio still present means: Not yet completed.
233          *
234          * Unless this is RQ_POSTPONED, which will cause _req_is_done() to
235          * queue it on the retry workqueue instead of destroying it.
236          */
237         if (req->master_bio && !(s & RQ_POSTPONED))
238                 return;
239
240         /* Local still pending, even though master_bio is already completed?
241          * may happen for RQ_LOCAL_ABORTED requests. */
242         if (s & RQ_LOCAL_PENDING)
243                 return;
244
245         if ((s & RQ_NET_MASK) == 0 || (s & RQ_NET_DONE)) {
246                 /* this is disconnected (local only) operation,
247                  * or protocol A, B, or C P_BARRIER_ACK,
248                  * or killed from the transfer log due to connection loss. */
249                 _req_is_done(mdev, req, rw);
250         }
251         /* else: network part and not DONE yet. that is
252          * protocol A, B, or C, barrier ack still pending... */
253 }
254
255 /* Helper for __req_mod().
256  * Set m->bio to the master bio, if it is fit to be completed,
257  * or leave it alone (it is initialized to NULL in __req_mod),
258  * if it has already been completed, or cannot be completed yet.
259  * If m->bio is set, the error status to be returned is placed in m->error.
260  */
261 static
262 void req_may_be_completed(struct drbd_request *req, struct bio_and_error *m)
263 {
264         const unsigned long s = req->rq_state;
265         struct drbd_conf *mdev = req->w.mdev;
266
267         /* we must not complete the master bio, while it is
268          *      still being processed by _drbd_send_zc_bio (drbd_send_dblock)
269          *      not yet acknowledged by the peer
270          *      not yet completed by the local io subsystem
271          * these flags may get cleared in any order by
272          *      the worker,
273          *      the receiver,
274          *      the bio_endio completion callbacks.
275          */
276         if (s & RQ_LOCAL_PENDING && !(s & RQ_LOCAL_ABORTED))
277                 return;
278         if (s & RQ_NET_QUEUED)
279                 return;
280         if (s & RQ_NET_PENDING)
281                 return;
282
283         if (req->master_bio) {
284                 int rw = bio_rw(req->master_bio);
285
286                 /* this is DATA_RECEIVED (remote read)
287                  * or protocol C P_WRITE_ACK
288                  * or protocol B P_RECV_ACK
289                  * or protocol A "HANDED_OVER_TO_NETWORK" (SendAck)
290                  * or canceled or failed,
291                  * or killed from the transfer log due to connection loss.
292                  */
293
294                 /*
295                  * figure out whether to report success or failure.
296                  *
297                  * report success when at least one of the operations succeeded.
298                  * or, to put the other way,
299                  * only report failure, when both operations failed.
300                  *
301                  * what to do about the failures is handled elsewhere.
302                  * what we need to do here is just: complete the master_bio.
303                  *
304                  * local completion error, if any, has been stored as ERR_PTR
305                  * in private_bio within drbd_request_endio.
306                  */
307                 int ok = (s & RQ_LOCAL_OK) || (s & RQ_NET_OK);
308                 int error = PTR_ERR(req->private_bio);
309
310                 /* remove the request from the conflict detection
311                  * respective block_id verification hash */
312                 if (!drbd_interval_empty(&req->i)) {
313                         struct rb_root *root;
314
315                         if (rw == WRITE)
316                                 root = &mdev->write_requests;
317                         else
318                                 root = &mdev->read_requests;
319                         drbd_remove_request_interval(root, req);
320                 } else if (!(s & RQ_POSTPONED))
321                         D_ASSERT((s & (RQ_NET_MASK & ~RQ_NET_DONE)) == 0);
322
323                 /* for writes we need to do some extra housekeeping */
324                 if (rw == WRITE)
325                         _about_to_complete_local_write(mdev, req);
326
327                 /* Update disk stats */
328                 _drbd_end_io_acct(mdev, req);
329
330                 /* if READ failed,
331                  * have it be pushed back to the retry work queue,
332                  * so it will re-enter __drbd_make_request,
333                  * and be re-assigned to a suitable local or remote path,
334                  * or failed if we do not have access to good data anymore.
335                  * READA may fail.
336                  * WRITE should have used all available paths already.
337                  */
338                 if (!ok && rw == READ)
339                         req->rq_state |= RQ_POSTPONED;
340
341                 if (!(req->rq_state & RQ_POSTPONED)) {
342                         m->error = ok ? 0 : (error ?: -EIO);
343                         m->bio = req->master_bio;
344                         req->master_bio = NULL;
345                 } else {
346                         /* Assert that this will be _req_is_done()
347                          * with this very invokation. */
348                         /* FIXME:
349                          * what about (RQ_LOCAL_PENDING | RQ_LOCAL_ABORTED)?
350                          */
351                         D_ASSERT(!(s & RQ_LOCAL_PENDING));
352                         D_ASSERT((s & RQ_NET_MASK) == 0 || (s & RQ_NET_DONE));
353                 }
354         }
355         req_may_be_done(req);
356 }
357
358 static void req_may_be_completed_not_susp(struct drbd_request *req, struct bio_and_error *m)
359 {
360         struct drbd_conf *mdev = req->w.mdev;
361
362         if (!drbd_suspended(mdev))
363                 req_may_be_completed(req, m);
364 }
365
366 /* obviously this could be coded as many single functions
367  * instead of one huge switch,
368  * or by putting the code directly in the respective locations
369  * (as it has been before).
370  *
371  * but having it this way
372  *  enforces that it is all in this one place, where it is easier to audit,
373  *  it makes it obvious that whatever "event" "happens" to a request should
374  *  happen "atomically" within the req_lock,
375  *  and it enforces that we have to think in a very structured manner
376  *  about the "events" that may happen to a request during its life time ...
377  */
378 int __req_mod(struct drbd_request *req, enum drbd_req_event what,
379                 struct bio_and_error *m)
380 {
381         struct drbd_conf *mdev = req->w.mdev;
382         struct net_conf *nc;
383         int p, rv = 0;
384
385         if (m)
386                 m->bio = NULL;
387
388         switch (what) {
389         default:
390                 dev_err(DEV, "LOGIC BUG in %s:%u\n", __FILE__ , __LINE__);
391                 break;
392
393         /* does not happen...
394          * initialization done in drbd_req_new
395         case CREATED:
396                 break;
397                 */
398
399         case TO_BE_SENT: /* via network */
400                 /* reached via __drbd_make_request
401                  * and from w_read_retry_remote */
402                 D_ASSERT(!(req->rq_state & RQ_NET_MASK));
403                 req->rq_state |= RQ_NET_PENDING;
404                 rcu_read_lock();
405                 nc = rcu_dereference(mdev->tconn->net_conf);
406                 p = nc->wire_protocol;
407                 rcu_read_unlock();
408                 req->rq_state |=
409                         p == DRBD_PROT_C ? RQ_EXP_WRITE_ACK :
410                         p == DRBD_PROT_B ? RQ_EXP_RECEIVE_ACK : 0;
411                 inc_ap_pending(mdev);
412                 break;
413
414         case TO_BE_SUBMITTED: /* locally */
415                 /* reached via __drbd_make_request */
416                 D_ASSERT(!(req->rq_state & RQ_LOCAL_MASK));
417                 req->rq_state |= RQ_LOCAL_PENDING;
418                 break;
419
420         case COMPLETED_OK:
421                 if (req->rq_state & RQ_WRITE)
422                         mdev->writ_cnt += req->i.size >> 9;
423                 else
424                         mdev->read_cnt += req->i.size >> 9;
425
426                 req->rq_state |= (RQ_LOCAL_COMPLETED|RQ_LOCAL_OK);
427                 req->rq_state &= ~RQ_LOCAL_PENDING;
428
429                 maybe_wakeup_conflicting_requests(req);
430                 req_may_be_completed_not_susp(req, m);
431                 break;
432
433         case ABORT_DISK_IO:
434                 req->rq_state |= RQ_LOCAL_ABORTED;
435                 req_may_be_completed_not_susp(req, m);
436                 break;
437
438         case WRITE_COMPLETED_WITH_ERROR:
439                 req->rq_state |= RQ_LOCAL_COMPLETED;
440                 req->rq_state &= ~RQ_LOCAL_PENDING;
441
442                 __drbd_chk_io_error(mdev, false);
443                 maybe_wakeup_conflicting_requests(req);
444                 req_may_be_completed_not_susp(req, m);
445                 break;
446
447         case READ_AHEAD_COMPLETED_WITH_ERROR:
448                 /* it is legal to fail READA */
449                 req->rq_state |= RQ_LOCAL_COMPLETED;
450                 req->rq_state &= ~RQ_LOCAL_PENDING;
451                 req_may_be_completed_not_susp(req, m);
452                 break;
453
454         case READ_COMPLETED_WITH_ERROR:
455                 drbd_set_out_of_sync(mdev, req->i.sector, req->i.size);
456
457                 req->rq_state |= RQ_LOCAL_COMPLETED;
458                 req->rq_state &= ~RQ_LOCAL_PENDING;
459
460                 D_ASSERT(!(req->rq_state & RQ_NET_MASK));
461
462                 __drbd_chk_io_error(mdev, false);
463                 req_may_be_completed_not_susp(req, m);
464                 break;
465
466         case QUEUE_FOR_NET_READ:
467                 /* READ or READA, and
468                  * no local disk,
469                  * or target area marked as invalid,
470                  * or just got an io-error. */
471                 /* from __drbd_make_request
472                  * or from bio_endio during read io-error recovery */
473
474                 /* So we can verify the handle in the answer packet.
475                  * Corresponding drbd_remove_request_interval is in
476                  * req_may_be_completed() */
477                 D_ASSERT(drbd_interval_empty(&req->i));
478                 drbd_insert_interval(&mdev->read_requests, &req->i);
479
480                 set_bit(UNPLUG_REMOTE, &mdev->flags);
481
482                 D_ASSERT(req->rq_state & RQ_NET_PENDING);
483                 D_ASSERT((req->rq_state & RQ_LOCAL_MASK) == 0);
484                 req->rq_state |= RQ_NET_QUEUED;
485                 req->w.cb = w_send_read_req;
486                 drbd_queue_work(&mdev->tconn->data.work, &req->w);
487                 break;
488
489         case QUEUE_FOR_NET_WRITE:
490                 /* assert something? */
491                 /* from __drbd_make_request only */
492
493                 /* Corresponding drbd_remove_request_interval is in
494                  * req_may_be_completed() */
495                 D_ASSERT(drbd_interval_empty(&req->i));
496                 drbd_insert_interval(&mdev->write_requests, &req->i);
497
498                 /* NOTE
499                  * In case the req ended up on the transfer log before being
500                  * queued on the worker, it could lead to this request being
501                  * missed during cleanup after connection loss.
502                  * So we have to do both operations here,
503                  * within the same lock that protects the transfer log.
504                  *
505                  * _req_add_to_epoch(req); this has to be after the
506                  * _maybe_start_new_epoch(req); which happened in
507                  * __drbd_make_request, because we now may set the bit
508                  * again ourselves to close the current epoch.
509                  *
510                  * Add req to the (now) current epoch (barrier). */
511
512                 /* otherwise we may lose an unplug, which may cause some remote
513                  * io-scheduler timeout to expire, increasing maximum latency,
514                  * hurting performance. */
515                 set_bit(UNPLUG_REMOTE, &mdev->flags);
516
517                 /* see __drbd_make_request,
518                  * just after it grabs the req_lock */
519                 D_ASSERT(test_bit(CREATE_BARRIER, &mdev->tconn->flags) == 0);
520
521                 req->epoch = mdev->tconn->newest_tle->br_number;
522
523                 /* increment size of current epoch */
524                 mdev->tconn->newest_tle->n_writes++;
525
526                 /* queue work item to send data */
527                 D_ASSERT(req->rq_state & RQ_NET_PENDING);
528                 req->rq_state |= RQ_NET_QUEUED;
529                 req->w.cb =  w_send_dblock;
530                 drbd_queue_work(&mdev->tconn->data.work, &req->w);
531
532                 /* close the epoch, in case it outgrew the limit */
533                 rcu_read_lock();
534                 nc = rcu_dereference(mdev->tconn->net_conf);
535                 p = nc->max_epoch_size;
536                 rcu_read_unlock();
537                 if (mdev->tconn->newest_tle->n_writes >= p)
538                         queue_barrier(mdev);
539
540                 break;
541
542         case QUEUE_FOR_SEND_OOS:
543                 req->rq_state |= RQ_NET_QUEUED;
544                 req->w.cb =  w_send_out_of_sync;
545                 drbd_queue_work(&mdev->tconn->data.work, &req->w);
546                 break;
547
548         case READ_RETRY_REMOTE_CANCELED:
549         case SEND_CANCELED:
550         case SEND_FAILED:
551                 /* real cleanup will be done from tl_clear.  just update flags
552                  * so it is no longer marked as on the worker queue */
553                 req->rq_state &= ~RQ_NET_QUEUED;
554                 /* if we did it right, tl_clear should be scheduled only after
555                  * this, so this should not be necessary! */
556                 req_may_be_completed_not_susp(req, m);
557                 break;
558
559         case HANDED_OVER_TO_NETWORK:
560                 /* assert something? */
561                 if (bio_data_dir(req->master_bio) == WRITE)
562                         atomic_add(req->i.size >> 9, &mdev->ap_in_flight);
563
564                 if (bio_data_dir(req->master_bio) == WRITE &&
565                     !(req->rq_state & (RQ_EXP_RECEIVE_ACK | RQ_EXP_WRITE_ACK))) {
566                         /* this is what is dangerous about protocol A:
567                          * pretend it was successfully written on the peer. */
568                         if (req->rq_state & RQ_NET_PENDING) {
569                                 dec_ap_pending(mdev);
570                                 req->rq_state &= ~RQ_NET_PENDING;
571                                 req->rq_state |= RQ_NET_OK;
572                         } /* else: neg-ack was faster... */
573                         /* it is still not yet RQ_NET_DONE until the
574                          * corresponding epoch barrier got acked as well,
575                          * so we know what to dirty on connection loss */
576                 }
577                 req->rq_state &= ~RQ_NET_QUEUED;
578                 req->rq_state |= RQ_NET_SENT;
579                 req_may_be_completed_not_susp(req, m);
580                 break;
581
582         case OOS_HANDED_TO_NETWORK:
583                 /* Was not set PENDING, no longer QUEUED, so is now DONE
584                  * as far as this connection is concerned. */
585                 req->rq_state &= ~RQ_NET_QUEUED;
586                 req->rq_state |= RQ_NET_DONE;
587                 req_may_be_completed_not_susp(req, m);
588                 break;
589
590         case CONNECTION_LOST_WHILE_PENDING:
591                 /* transfer log cleanup after connection loss */
592                 /* assert something? */
593                 if (req->rq_state & RQ_NET_PENDING)
594                         dec_ap_pending(mdev);
595
596                 p = !(req->rq_state & RQ_WRITE) && req->rq_state & RQ_NET_PENDING;
597
598                 req->rq_state &= ~(RQ_NET_OK|RQ_NET_PENDING);
599                 req->rq_state |= RQ_NET_DONE;
600                 if (req->rq_state & RQ_NET_SENT && req->rq_state & RQ_WRITE)
601                         atomic_sub(req->i.size >> 9, &mdev->ap_in_flight);
602
603                 req_may_be_completed(req, m); /* Allowed while state.susp */
604                 break;
605
606         case DISCARD_WRITE:
607                 /* for discarded conflicting writes of multiple primaries,
608                  * there is no need to keep anything in the tl, potential
609                  * node crashes are covered by the activity log. */
610                 req->rq_state |= RQ_NET_DONE;
611                 /* fall through */
612         case WRITE_ACKED_BY_PEER_AND_SIS:
613         case WRITE_ACKED_BY_PEER:
614                 if (what == WRITE_ACKED_BY_PEER_AND_SIS)
615                         req->rq_state |= RQ_NET_SIS;
616                 D_ASSERT(req->rq_state & RQ_EXP_WRITE_ACK);
617                 /* protocol C; successfully written on peer.
618                  * Nothing more to do here.
619                  * We want to keep the tl in place for all protocols, to cater
620                  * for volatile write-back caches on lower level devices. */
621
622                 goto ack_common;
623         case RECV_ACKED_BY_PEER:
624                 D_ASSERT(req->rq_state & RQ_EXP_RECEIVE_ACK);
625                 /* protocol B; pretends to be successfully written on peer.
626                  * see also notes above in HANDED_OVER_TO_NETWORK about
627                  * protocol != C */
628         ack_common:
629                 req->rq_state |= RQ_NET_OK;
630                 D_ASSERT(req->rq_state & RQ_NET_PENDING);
631                 dec_ap_pending(mdev);
632                 atomic_sub(req->i.size >> 9, &mdev->ap_in_flight);
633                 req->rq_state &= ~RQ_NET_PENDING;
634                 maybe_wakeup_conflicting_requests(req);
635                 req_may_be_completed_not_susp(req, m);
636                 break;
637
638         case POSTPONE_WRITE:
639                 D_ASSERT(req->rq_state & RQ_EXP_WRITE_ACK);
640                 /* If this node has already detected the write conflict, the
641                  * worker will be waiting on misc_wait.  Wake it up once this
642                  * request has completed locally.
643                  */
644                 D_ASSERT(req->rq_state & RQ_NET_PENDING);
645                 req->rq_state |= RQ_POSTPONED;
646                 maybe_wakeup_conflicting_requests(req);
647                 req_may_be_completed_not_susp(req, m);
648                 break;
649
650         case NEG_ACKED:
651                 /* assert something? */
652                 if (req->rq_state & RQ_NET_PENDING) {
653                         dec_ap_pending(mdev);
654                         if (req->rq_state & RQ_WRITE)
655                                 atomic_sub(req->i.size >> 9, &mdev->ap_in_flight);
656                 }
657                 req->rq_state &= ~(RQ_NET_OK|RQ_NET_PENDING);
658
659                 req->rq_state |= RQ_NET_DONE;
660
661                 maybe_wakeup_conflicting_requests(req);
662                 req_may_be_completed_not_susp(req, m);
663                 /* else: done by HANDED_OVER_TO_NETWORK */
664                 break;
665
666         case FAIL_FROZEN_DISK_IO:
667                 if (!(req->rq_state & RQ_LOCAL_COMPLETED))
668                         break;
669
670                 req_may_be_completed(req, m); /* Allowed while state.susp */
671                 break;
672
673         case RESTART_FROZEN_DISK_IO:
674                 if (!(req->rq_state & RQ_LOCAL_COMPLETED))
675                         break;
676
677                 req->rq_state &= ~RQ_LOCAL_COMPLETED;
678
679                 rv = MR_READ;
680                 if (bio_data_dir(req->master_bio) == WRITE)
681                         rv = MR_WRITE;
682
683                 get_ldev(mdev);
684                 req->w.cb = w_restart_disk_io;
685                 drbd_queue_work(&mdev->tconn->data.work, &req->w);
686                 break;
687
688         case RESEND:
689                 /* If RQ_NET_OK is already set, we got a P_WRITE_ACK or P_RECV_ACK
690                    before the connection loss (B&C only); only P_BARRIER_ACK was missing.
691                    Throwing them out of the TL here by pretending we got a BARRIER_ACK.
692                    During connection handshake, we ensure that the peer was not rebooted. */
693                 if (!(req->rq_state & RQ_NET_OK)) {
694                         if (req->w.cb) {
695                                 drbd_queue_work(&mdev->tconn->data.work, &req->w);
696                                 rv = req->rq_state & RQ_WRITE ? MR_WRITE : MR_READ;
697                         }
698                         break;
699                 }
700                 /* else, fall through to BARRIER_ACKED */
701
702         case BARRIER_ACKED:
703                 if (!(req->rq_state & RQ_WRITE))
704                         break;
705
706                 if (req->rq_state & RQ_NET_PENDING) {
707                         /* barrier came in before all requests were acked.
708                          * this is bad, because if the connection is lost now,
709                          * we won't be able to clean them up... */
710                         dev_err(DEV, "FIXME (BARRIER_ACKED but pending)\n");
711                         list_move(&req->tl_requests, &mdev->tconn->out_of_sequence_requests);
712                 }
713                 if ((req->rq_state & RQ_NET_MASK) != 0) {
714                         req->rq_state |= RQ_NET_DONE;
715                         if (!(req->rq_state & (RQ_EXP_RECEIVE_ACK | RQ_EXP_WRITE_ACK)))
716                                 atomic_sub(req->i.size>>9, &mdev->ap_in_flight);
717                 }
718                 req_may_be_done(req); /* Allowed while state.susp */
719                 break;
720
721         case DATA_RECEIVED:
722                 D_ASSERT(req->rq_state & RQ_NET_PENDING);
723                 dec_ap_pending(mdev);
724                 req->rq_state &= ~RQ_NET_PENDING;
725                 req->rq_state |= (RQ_NET_OK|RQ_NET_DONE);
726                 req_may_be_completed_not_susp(req, m);
727                 break;
728         };
729
730         return rv;
731 }
732
733 /* we may do a local read if:
734  * - we are consistent (of course),
735  * - or we are generally inconsistent,
736  *   BUT we are still/already IN SYNC for this area.
737  *   since size may be bigger than BM_BLOCK_SIZE,
738  *   we may need to check several bits.
739  */
740 static bool drbd_may_do_local_read(struct drbd_conf *mdev, sector_t sector, int size)
741 {
742         unsigned long sbnr, ebnr;
743         sector_t esector, nr_sectors;
744
745         if (mdev->state.disk == D_UP_TO_DATE)
746                 return true;
747         if (mdev->state.disk != D_INCONSISTENT)
748                 return false;
749         esector = sector + (size >> 9) - 1;
750         nr_sectors = drbd_get_capacity(mdev->this_bdev);
751         D_ASSERT(sector  < nr_sectors);
752         D_ASSERT(esector < nr_sectors);
753
754         sbnr = BM_SECT_TO_BIT(sector);
755         ebnr = BM_SECT_TO_BIT(esector);
756
757         return drbd_bm_count_bits(mdev, sbnr, ebnr) == 0;
758 }
759
760 static bool remote_due_to_read_balancing(struct drbd_conf *mdev, sector_t sector)
761 {
762         enum drbd_read_balancing rbm;
763         struct backing_dev_info *bdi;
764         int stripe_shift;
765
766         if (mdev->state.pdsk < D_UP_TO_DATE)
767                 return false;
768
769         rcu_read_lock();
770         rbm = rcu_dereference(mdev->ldev->disk_conf)->read_balancing;
771         rcu_read_unlock();
772
773         switch (rbm) {
774         case RB_CONGESTED_REMOTE:
775                 bdi = &mdev->ldev->backing_bdev->bd_disk->queue->backing_dev_info;
776                 return bdi_read_congested(bdi);
777         case RB_LEAST_PENDING:
778                 return atomic_read(&mdev->local_cnt) >
779                         atomic_read(&mdev->ap_pending_cnt) + atomic_read(&mdev->rs_pending_cnt);
780         case RB_32K_STRIPING:  /* stripe_shift = 15 */
781         case RB_64K_STRIPING:
782         case RB_128K_STRIPING:
783         case RB_256K_STRIPING:
784         case RB_512K_STRIPING:
785         case RB_1M_STRIPING:   /* stripe_shift = 20 */
786                 stripe_shift = (rbm - RB_32K_STRIPING + 15);
787                 return (sector >> (stripe_shift - 9)) & 1;
788         case RB_ROUND_ROBIN:
789                 return test_and_change_bit(READ_BALANCE_RR, &mdev->flags);
790         case RB_PREFER_REMOTE:
791                 return true;
792         case RB_PREFER_LOCAL:
793         default:
794                 return false;
795         }
796 }
797
798 /*
799  * complete_conflicting_writes  -  wait for any conflicting write requests
800  *
801  * The write_requests tree contains all active write requests which we
802  * currently know about.  Wait for any requests to complete which conflict with
803  * the new one.
804  *
805  * Only way out: remove the conflicting intervals from the tree.
806  */
807 static void complete_conflicting_writes(struct drbd_request *req)
808 {
809         DEFINE_WAIT(wait);
810         struct drbd_conf *mdev = req->w.mdev;
811         struct drbd_interval *i;
812         sector_t sector = req->i.sector;
813         int size = req->i.size;
814
815         i = drbd_find_overlap(&mdev->write_requests, sector, size);
816         if (!i)
817                 return;
818
819         for (;;) {
820                 prepare_to_wait(&mdev->misc_wait, &wait, TASK_UNINTERRUPTIBLE);
821                 i = drbd_find_overlap(&mdev->write_requests, sector, size);
822                 if (!i)
823                         break;
824                 /* Indicate to wake up device->misc_wait on progress.  */
825                 i->waiting = true;
826                 spin_unlock_irq(&mdev->tconn->req_lock);
827                 schedule();
828                 spin_lock_irq(&mdev->tconn->req_lock);
829         }
830         finish_wait(&mdev->misc_wait, &wait);
831 }
832
833 int __drbd_make_request(struct drbd_conf *mdev, struct bio *bio, unsigned long start_time)
834 {
835         const int rw = bio_rw(bio);
836         const int size = bio->bi_size;
837         const sector_t sector = bio->bi_sector;
838         struct drbd_tl_epoch *b = NULL;
839         struct drbd_request *req;
840         struct net_conf *nc;
841         int local, remote, send_oos = 0;
842         int err = 0;
843         int ret = 0;
844         union drbd_dev_state s;
845
846         /* allocate outside of all locks; */
847         req = drbd_req_new(mdev, bio);
848         if (!req) {
849                 dec_ap_bio(mdev);
850                 /* only pass the error to the upper layers.
851                  * if user cannot handle io errors, that's not our business. */
852                 dev_err(DEV, "could not kmalloc() req\n");
853                 bio_endio(bio, -ENOMEM);
854                 return 0;
855         }
856         req->start_time = start_time;
857
858         local = get_ldev(mdev);
859         if (!local) {
860                 bio_put(req->private_bio); /* or we get a bio leak */
861                 req->private_bio = NULL;
862         }
863         if (rw == WRITE) {
864                 remote = 1;
865         } else {
866                 /* READ || READA */
867                 if (local) {
868                         if (!drbd_may_do_local_read(mdev, sector, size) ||
869                             remote_due_to_read_balancing(mdev, sector)) {
870                                 /* we could kick the syncer to
871                                  * sync this extent asap, wait for
872                                  * it, then continue locally.
873                                  * Or just issue the request remotely.
874                                  */
875                                 local = 0;
876                                 bio_put(req->private_bio);
877                                 req->private_bio = NULL;
878                                 put_ldev(mdev);
879                         }
880                 }
881                 remote = !local && mdev->state.pdsk >= D_UP_TO_DATE;
882         }
883
884         /* If we have a disk, but a READA request is mapped to remote,
885          * we are R_PRIMARY, D_INCONSISTENT, SyncTarget.
886          * Just fail that READA request right here.
887          *
888          * THINK: maybe fail all READA when not local?
889          *        or make this configurable...
890          *        if network is slow, READA won't do any good.
891          */
892         if (rw == READA && mdev->state.disk >= D_INCONSISTENT && !local) {
893                 err = -EWOULDBLOCK;
894                 goto fail_and_free_req;
895         }
896
897         /* For WRITES going to the local disk, grab a reference on the target
898          * extent.  This waits for any resync activity in the corresponding
899          * resync extent to finish, and, if necessary, pulls in the target
900          * extent into the activity log, which involves further disk io because
901          * of transactional on-disk meta data updates. */
902         if (rw == WRITE && local && !test_bit(AL_SUSPENDED, &mdev->flags)) {
903                 req->rq_state |= RQ_IN_ACT_LOG;
904                 drbd_al_begin_io(mdev, &req->i);
905         }
906
907         s = mdev->state;
908         remote = remote && drbd_should_do_remote(s);
909         send_oos = rw == WRITE && drbd_should_send_out_of_sync(s);
910         D_ASSERT(!(remote && send_oos));
911
912         if (!(local || remote) && !drbd_suspended(mdev)) {
913                 if (__ratelimit(&drbd_ratelimit_state))
914                         dev_err(DEV, "IO ERROR: neither local nor remote disk\n");
915                 err = -EIO;
916                 goto fail_free_complete;
917         }
918
919         /* For WRITE request, we have to make sure that we have an
920          * unused_spare_tle, in case we need to start a new epoch.
921          * I try to be smart and avoid to pre-allocate always "just in case",
922          * but there is a race between testing the bit and pointer outside the
923          * spinlock, and grabbing the spinlock.
924          * if we lost that race, we retry.  */
925         if (rw == WRITE && (remote || send_oos) &&
926             mdev->tconn->unused_spare_tle == NULL &&
927             test_bit(CREATE_BARRIER, &mdev->tconn->flags)) {
928 allocate_barrier:
929                 b = kmalloc(sizeof(struct drbd_tl_epoch), GFP_NOIO);
930                 if (!b) {
931                         dev_err(DEV, "Failed to alloc barrier.\n");
932                         err = -ENOMEM;
933                         goto fail_free_complete;
934                 }
935         }
936
937         /* GOOD, everything prepared, grab the spin_lock */
938         spin_lock_irq(&mdev->tconn->req_lock);
939
940         if (rw == WRITE) {
941                 /* This may temporarily give up the req_lock,
942                  * but will re-aquire it before it returns here.
943                  * Needs to be before the check on drbd_suspended() */
944                 complete_conflicting_writes(req);
945         }
946
947         if (drbd_suspended(mdev)) {
948                 /* If we got suspended, use the retry mechanism in
949                    drbd_make_request() to restart processing of this
950                    bio. In the next call to drbd_make_request
951                    we sleep in inc_ap_bio() */
952                 ret = 1;
953                 spin_unlock_irq(&mdev->tconn->req_lock);
954                 goto fail_free_complete;
955         }
956
957         if (remote || send_oos) {
958                 remote = drbd_should_do_remote(mdev->state);
959                 send_oos = rw == WRITE && drbd_should_send_out_of_sync(mdev->state);
960                 D_ASSERT(!(remote && send_oos));
961
962                 if (!(remote || send_oos))
963                         dev_warn(DEV, "lost connection while grabbing the req_lock!\n");
964                 if (!(local || remote)) {
965                         dev_err(DEV, "IO ERROR: neither local nor remote disk\n");
966                         spin_unlock_irq(&mdev->tconn->req_lock);
967                         err = -EIO;
968                         goto fail_free_complete;
969                 }
970         }
971
972         if (b && mdev->tconn->unused_spare_tle == NULL) {
973                 mdev->tconn->unused_spare_tle = b;
974                 b = NULL;
975         }
976         if (rw == WRITE && (remote || send_oos) &&
977             mdev->tconn->unused_spare_tle == NULL &&
978             test_bit(CREATE_BARRIER, &mdev->tconn->flags)) {
979                 /* someone closed the current epoch
980                  * while we were grabbing the spinlock */
981                 spin_unlock_irq(&mdev->tconn->req_lock);
982                 goto allocate_barrier;
983         }
984
985
986         /* Update disk stats */
987         _drbd_start_io_acct(mdev, req, bio);
988
989         /* _maybe_start_new_epoch(mdev);
990          * If we need to generate a write barrier packet, we have to add the
991          * new epoch (barrier) object, and queue the barrier packet for sending,
992          * and queue the req's data after it _within the same lock_, otherwise
993          * we have race conditions were the reorder domains could be mixed up.
994          *
995          * Even read requests may start a new epoch and queue the corresponding
996          * barrier packet.  To get the write ordering right, we only have to
997          * make sure that, if this is a write request and it triggered a
998          * barrier packet, this request is queued within the same spinlock. */
999         if ((remote || send_oos) && mdev->tconn->unused_spare_tle &&
1000             test_and_clear_bit(CREATE_BARRIER, &mdev->tconn->flags)) {
1001                 _tl_add_barrier(mdev->tconn, mdev->tconn->unused_spare_tle);
1002                 mdev->tconn->unused_spare_tle = NULL;
1003         } else {
1004                 D_ASSERT(!(remote && rw == WRITE &&
1005                            test_bit(CREATE_BARRIER, &mdev->tconn->flags)));
1006         }
1007
1008         /* NOTE
1009          * Actually, 'local' may be wrong here already, since we may have failed
1010          * to write to the meta data, and may become wrong anytime because of
1011          * local io-error for some other request, which would lead to us
1012          * "detaching" the local disk.
1013          *
1014          * 'remote' may become wrong any time because the network could fail.
1015          *
1016          * This is a harmless race condition, though, since it is handled
1017          * correctly at the appropriate places; so it just defers the failure
1018          * of the respective operation.
1019          */
1020
1021         /* mark them early for readability.
1022          * this just sets some state flags. */
1023         if (remote)
1024                 _req_mod(req, TO_BE_SENT);
1025         if (local)
1026                 _req_mod(req, TO_BE_SUBMITTED);
1027
1028         list_add_tail(&req->tl_requests, &mdev->tconn->newest_tle->requests);
1029
1030         /* NOTE remote first: to get the concurrent write detection right,
1031          * we must register the request before start of local IO.  */
1032         if (remote) {
1033                 /* either WRITE and C_CONNECTED,
1034                  * or READ, and no local disk,
1035                  * or READ, but not in sync.
1036                  */
1037                 _req_mod(req, (rw == WRITE)
1038                                 ? QUEUE_FOR_NET_WRITE
1039                                 : QUEUE_FOR_NET_READ);
1040         }
1041         if (send_oos && drbd_set_out_of_sync(mdev, sector, size))
1042                 _req_mod(req, QUEUE_FOR_SEND_OOS);
1043
1044         rcu_read_lock();
1045         nc = rcu_dereference(mdev->tconn->net_conf);
1046         if (remote &&
1047             nc->on_congestion != OC_BLOCK && mdev->tconn->agreed_pro_version >= 96) {
1048                 int congested = 0;
1049
1050                 if (nc->cong_fill &&
1051                     atomic_read(&mdev->ap_in_flight) >= nc->cong_fill) {
1052                         dev_info(DEV, "Congestion-fill threshold reached\n");
1053                         congested = 1;
1054                 }
1055
1056                 if (mdev->act_log->used >= nc->cong_extents) {
1057                         dev_info(DEV, "Congestion-extents threshold reached\n");
1058                         congested = 1;
1059                 }
1060
1061                 if (congested) {
1062                         queue_barrier(mdev); /* last barrier, after mirrored writes */
1063
1064                         if (nc->on_congestion == OC_PULL_AHEAD)
1065                                 _drbd_set_state(_NS(mdev, conn, C_AHEAD), 0, NULL);
1066                         else  /*nc->on_congestion == OC_DISCONNECT */
1067                                 _drbd_set_state(_NS(mdev, conn, C_DISCONNECTING), 0, NULL);
1068                 }
1069         }
1070         rcu_read_unlock();
1071
1072         spin_unlock_irq(&mdev->tconn->req_lock);
1073         kfree(b); /* if someone else has beaten us to it... */
1074
1075         if (local) {
1076                 req->private_bio->bi_bdev = mdev->ldev->backing_bdev;
1077
1078                 /* State may have changed since we grabbed our reference on the
1079                  * mdev->ldev member. Double check, and short-circuit to endio.
1080                  * In case the last activity log transaction failed to get on
1081                  * stable storage, and this is a WRITE, we may not even submit
1082                  * this bio. */
1083                 if (get_ldev(mdev)) {
1084                         if (drbd_insert_fault(mdev,   rw == WRITE ? DRBD_FAULT_DT_WR
1085                                                     : rw == READ  ? DRBD_FAULT_DT_RD
1086                                                     :               DRBD_FAULT_DT_RA))
1087                                 bio_endio(req->private_bio, -EIO);
1088                         else
1089                                 generic_make_request(req->private_bio);
1090                         put_ldev(mdev);
1091                 } else
1092                         bio_endio(req->private_bio, -EIO);
1093         }
1094
1095         return 0;
1096
1097 fail_free_complete:
1098         if (req->rq_state & RQ_IN_ACT_LOG)
1099                 drbd_al_complete_io(mdev, &req->i);
1100 fail_and_free_req:
1101         if (local) {
1102                 bio_put(req->private_bio);
1103                 req->private_bio = NULL;
1104                 put_ldev(mdev);
1105         }
1106         if (!ret)
1107                 bio_endio(bio, err);
1108
1109         drbd_req_free(req);
1110         dec_ap_bio(mdev);
1111         kfree(b);
1112
1113         return ret;
1114 }
1115
1116 int drbd_make_request(struct request_queue *q, struct bio *bio)
1117 {
1118         struct drbd_conf *mdev = (struct drbd_conf *) q->queuedata;
1119         unsigned long start_time;
1120
1121         start_time = jiffies;
1122
1123         /*
1124          * what we "blindly" assume:
1125          */
1126         D_ASSERT(bio->bi_size > 0);
1127         D_ASSERT(IS_ALIGNED(bio->bi_size, 512));
1128
1129         do {
1130                 inc_ap_bio(mdev);
1131         } while (__drbd_make_request(mdev, bio, start_time));
1132
1133         return 0;
1134 }
1135
1136 /* This is called by bio_add_page().
1137  *
1138  * q->max_hw_sectors and other global limits are already enforced there.
1139  *
1140  * We need to call down to our lower level device,
1141  * in case it has special restrictions.
1142  *
1143  * We also may need to enforce configured max-bio-bvecs limits.
1144  *
1145  * As long as the BIO is empty we have to allow at least one bvec,
1146  * regardless of size and offset, so no need to ask lower levels.
1147  */
1148 int drbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bvm, struct bio_vec *bvec)
1149 {
1150         struct drbd_conf *mdev = (struct drbd_conf *) q->queuedata;
1151         unsigned int bio_size = bvm->bi_size;
1152         int limit = DRBD_MAX_BIO_SIZE;
1153         int backing_limit;
1154
1155         if (bio_size && get_ldev(mdev)) {
1156                 struct request_queue * const b =
1157                         mdev->ldev->backing_bdev->bd_disk->queue;
1158                 if (b->merge_bvec_fn) {
1159                         backing_limit = b->merge_bvec_fn(b, bvm, bvec);
1160                         limit = min(limit, backing_limit);
1161                 }
1162                 put_ldev(mdev);
1163         }
1164         return limit;
1165 }
1166
1167 void request_timer_fn(unsigned long data)
1168 {
1169         struct drbd_conf *mdev = (struct drbd_conf *) data;
1170         struct drbd_tconn *tconn = mdev->tconn;
1171         struct drbd_request *req; /* oldest request */
1172         struct list_head *le;
1173         struct net_conf *nc;
1174         unsigned long ent = 0, dt = 0, et, nt; /* effective timeout = ko_count * timeout */
1175         unsigned long now;
1176
1177         rcu_read_lock();
1178         nc = rcu_dereference(tconn->net_conf);
1179         if (nc && mdev->state.conn >= C_WF_REPORT_PARAMS)
1180                 ent = nc->timeout * HZ/10 * nc->ko_count;
1181
1182         if (get_ldev(mdev)) { /* implicit state.disk >= D_INCONSISTENT */
1183                 dt = rcu_dereference(mdev->ldev->disk_conf)->disk_timeout * HZ / 10;
1184                 put_ldev(mdev);
1185         }
1186         rcu_read_unlock();
1187
1188         et = min_not_zero(dt, ent);
1189
1190         if (!et)
1191                 return; /* Recurring timer stopped */
1192
1193         now = jiffies;
1194
1195         spin_lock_irq(&tconn->req_lock);
1196         le = &tconn->oldest_tle->requests;
1197         if (list_empty(le)) {
1198                 spin_unlock_irq(&tconn->req_lock);
1199                 mod_timer(&mdev->request_timer, now + et);
1200                 return;
1201         }
1202
1203         le = le->prev;
1204         req = list_entry(le, struct drbd_request, tl_requests);
1205
1206         /* The request is considered timed out, if
1207          * - we have some effective timeout from the configuration,
1208          *   with above state restrictions applied,
1209          * - the oldest request is waiting for a response from the network
1210          *   resp. the local disk,
1211          * - the oldest request is in fact older than the effective timeout,
1212          * - the connection was established (resp. disk was attached)
1213          *   for longer than the timeout already.
1214          * Note that for 32bit jiffies and very stable connections/disks,
1215          * we may have a wrap around, which is catched by
1216          *   !time_in_range(now, last_..._jif, last_..._jif + timeout).
1217          *
1218          * Side effect: once per 32bit wrap-around interval, which means every
1219          * ~198 days with 250 HZ, we have a window where the timeout would need
1220          * to expire twice (worst case) to become effective. Good enough.
1221          */
1222         if (ent && req->rq_state & RQ_NET_PENDING &&
1223                  time_after(now, req->start_time + ent) &&
1224                 !time_in_range(now, tconn->last_reconnect_jif, tconn->last_reconnect_jif + ent)) {
1225                 dev_warn(DEV, "Remote failed to finish a request within ko-count * timeout\n");
1226                 _drbd_set_state(_NS(mdev, conn, C_TIMEOUT), CS_VERBOSE | CS_HARD, NULL);
1227         }
1228         if (dt && req->rq_state & RQ_LOCAL_PENDING && req->w.mdev == mdev &&
1229                  time_after(now, req->start_time + dt) &&
1230                 !time_in_range(now, mdev->last_reattach_jif, mdev->last_reattach_jif + dt)) {
1231                 dev_warn(DEV, "Local backing device failed to meet the disk-timeout\n");
1232                 __drbd_chk_io_error(mdev, 1);
1233         }
1234         nt = (time_after(now, req->start_time + et) ? now : req->start_time) + et;
1235         spin_unlock_irq(&tconn->req_lock);
1236         mod_timer(&mdev->request_timer, nt);
1237 }