drbd: fix READ_RETRY_REMOTE_CANCELED to not complete if device is suspended
[platform/adaptation/renesas_rcar/renesas_kernel.git] / drivers / block / drbd / drbd_req.c
1 /*
2    drbd_req.c
3
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10    drbd is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2, or (at your option)
13    any later version.
14
15    drbd is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19
20    You should have received a copy of the GNU General Public License
21    along with drbd; see the file COPYING.  If not, write to
22    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23
24  */
25
26 #include <linux/module.h>
27
28 #include <linux/slab.h>
29 #include <linux/drbd.h>
30 #include "drbd_int.h"
31 #include "drbd_req.h"
32
33
34 static bool drbd_may_do_local_read(struct drbd_conf *mdev, sector_t sector, int size);
35
36 /* Update disk stats at start of I/O request */
37 static void _drbd_start_io_acct(struct drbd_conf *mdev, struct drbd_request *req, struct bio *bio)
38 {
39         const int rw = bio_data_dir(bio);
40         int cpu;
41         cpu = part_stat_lock();
42         part_round_stats(cpu, &mdev->vdisk->part0);
43         part_stat_inc(cpu, &mdev->vdisk->part0, ios[rw]);
44         part_stat_add(cpu, &mdev->vdisk->part0, sectors[rw], bio_sectors(bio));
45         (void) cpu; /* The macro invocations above want the cpu argument, I do not like
46                        the compiler warning about cpu only assigned but never used... */
47         part_inc_in_flight(&mdev->vdisk->part0, rw);
48         part_stat_unlock();
49 }
50
51 /* Update disk stats when completing request upwards */
52 static void _drbd_end_io_acct(struct drbd_conf *mdev, struct drbd_request *req)
53 {
54         int rw = bio_data_dir(req->master_bio);
55         unsigned long duration = jiffies - req->start_time;
56         int cpu;
57         cpu = part_stat_lock();
58         part_stat_add(cpu, &mdev->vdisk->part0, ticks[rw], duration);
59         part_round_stats(cpu, &mdev->vdisk->part0);
60         part_dec_in_flight(&mdev->vdisk->part0, rw);
61         part_stat_unlock();
62 }
63
64 static struct drbd_request *drbd_req_new(struct drbd_conf *mdev,
65                                                struct bio *bio_src)
66 {
67         struct drbd_request *req;
68
69         req = mempool_alloc(drbd_request_mempool, GFP_NOIO);
70         if (!req)
71                 return NULL;
72
73         drbd_req_make_private_bio(req, bio_src);
74         req->rq_state    = bio_data_dir(bio_src) == WRITE ? RQ_WRITE : 0;
75         req->w.mdev      = mdev;
76         req->master_bio  = bio_src;
77         req->epoch       = 0;
78
79         drbd_clear_interval(&req->i);
80         req->i.sector     = bio_src->bi_sector;
81         req->i.size      = bio_src->bi_size;
82         req->i.local = true;
83         req->i.waiting = false;
84
85         INIT_LIST_HEAD(&req->tl_requests);
86         INIT_LIST_HEAD(&req->w.list);
87
88         return req;
89 }
90
91 static void drbd_req_free(struct drbd_request *req)
92 {
93         mempool_free(req, drbd_request_mempool);
94 }
95
96 /* rw is bio_data_dir(), only READ or WRITE */
97 static void _req_is_done(struct drbd_conf *mdev, struct drbd_request *req, const int rw)
98 {
99         const unsigned long s = req->rq_state;
100
101         /* remove it from the transfer log.
102          * well, only if it had been there in the first
103          * place... if it had not (local only or conflicting
104          * and never sent), it should still be "empty" as
105          * initialized in drbd_req_new(), so we can list_del() it
106          * here unconditionally */
107         list_del_init(&req->tl_requests);
108
109         /* if it was a write, we may have to set the corresponding
110          * bit(s) out-of-sync first. If it had a local part, we need to
111          * release the reference to the activity log. */
112         if (rw == WRITE) {
113                 /* Set out-of-sync unless both OK flags are set
114                  * (local only or remote failed).
115                  * Other places where we set out-of-sync:
116                  * READ with local io-error */
117                 if (!(s & RQ_NET_OK) || !(s & RQ_LOCAL_OK))
118                         drbd_set_out_of_sync(mdev, req->i.sector, req->i.size);
119
120                 if ((s & RQ_NET_OK) && (s & RQ_LOCAL_OK) && (s & RQ_NET_SIS))
121                         drbd_set_in_sync(mdev, req->i.sector, req->i.size);
122
123                 /* one might be tempted to move the drbd_al_complete_io
124                  * to the local io completion callback drbd_request_endio.
125                  * but, if this was a mirror write, we may only
126                  * drbd_al_complete_io after this is RQ_NET_DONE,
127                  * otherwise the extent could be dropped from the al
128                  * before it has actually been written on the peer.
129                  * if we crash before our peer knows about the request,
130                  * but after the extent has been dropped from the al,
131                  * we would forget to resync the corresponding extent.
132                  */
133                 if (s & RQ_LOCAL_MASK) {
134                         if (get_ldev_if_state(mdev, D_FAILED)) {
135                                 if (s & RQ_IN_ACT_LOG)
136                                         drbd_al_complete_io(mdev, &req->i);
137                                 put_ldev(mdev);
138                         } else if (__ratelimit(&drbd_ratelimit_state)) {
139                                 dev_warn(DEV, "Should have called drbd_al_complete_io(, %llu, %u), "
140                                          "but my Disk seems to have failed :(\n",
141                                          (unsigned long long) req->i.sector, req->i.size);
142                         }
143                 }
144         }
145
146         if (s & RQ_POSTPONED)
147                 drbd_restart_write(req);
148         else
149                 drbd_req_free(req);
150 }
151
152 static void queue_barrier(struct drbd_conf *mdev)
153 {
154         struct drbd_tl_epoch *b;
155         struct drbd_tconn *tconn = mdev->tconn;
156
157         /* We are within the req_lock. Once we queued the barrier for sending,
158          * we set the CREATE_BARRIER bit. It is cleared as soon as a new
159          * barrier/epoch object is added. This is the only place this bit is
160          * set. It indicates that the barrier for this epoch is already queued,
161          * and no new epoch has been created yet. */
162         if (test_bit(CREATE_BARRIER, &tconn->flags))
163                 return;
164
165         b = tconn->newest_tle;
166         b->w.cb = w_send_barrier;
167         b->w.mdev = mdev;
168         /* inc_ap_pending done here, so we won't
169          * get imbalanced on connection loss.
170          * dec_ap_pending will be done in got_BarrierAck
171          * or (on connection loss) in tl_clear.  */
172         inc_ap_pending(mdev);
173         drbd_queue_work(&tconn->data.work, &b->w);
174         set_bit(CREATE_BARRIER, &tconn->flags);
175 }
176
177 static void _about_to_complete_local_write(struct drbd_conf *mdev,
178         struct drbd_request *req)
179 {
180         const unsigned long s = req->rq_state;
181
182         /* Before we can signal completion to the upper layers,
183          * we may need to close the current epoch.
184          * We can skip this, if this request has not even been sent, because we
185          * did not have a fully established connection yet/anymore, during
186          * bitmap exchange, or while we are C_AHEAD due to congestion policy.
187          */
188         if (mdev->state.conn >= C_CONNECTED &&
189             (s & RQ_NET_SENT) != 0 &&
190             req->epoch == mdev->tconn->newest_tle->br_number)
191                 queue_barrier(mdev);
192 }
193
194 void complete_master_bio(struct drbd_conf *mdev,
195                 struct bio_and_error *m)
196 {
197         bio_endio(m->bio, m->error);
198         dec_ap_bio(mdev);
199 }
200
201
202 static void drbd_remove_request_interval(struct rb_root *root,
203                                          struct drbd_request *req)
204 {
205         struct drbd_conf *mdev = req->w.mdev;
206         struct drbd_interval *i = &req->i;
207
208         drbd_remove_interval(root, i);
209
210         /* Wake up any processes waiting for this request to complete.  */
211         if (i->waiting)
212                 wake_up(&mdev->misc_wait);
213 }
214
215 /* Helper for __req_mod().
216  * Set m->bio to the master bio, if it is fit to be completed,
217  * or leave it alone (it is initialized to NULL in __req_mod),
218  * if it has already been completed, or cannot be completed yet.
219  * If m->bio is set, the error status to be returned is placed in m->error.
220  */
221 void _req_may_be_done(struct drbd_request *req, struct bio_and_error *m)
222 {
223         const unsigned long s = req->rq_state;
224         struct drbd_conf *mdev = req->w.mdev;
225         int rw = req->rq_state & RQ_WRITE ? WRITE : READ;
226
227         /* we must not complete the master bio, while it is
228          *      still being processed by _drbd_send_zc_bio (drbd_send_dblock)
229          *      not yet acknowledged by the peer
230          *      not yet completed by the local io subsystem
231          * these flags may get cleared in any order by
232          *      the worker,
233          *      the receiver,
234          *      the bio_endio completion callbacks.
235          */
236         if (s & RQ_LOCAL_PENDING && !(s & RQ_LOCAL_ABORTED))
237                 return;
238         if (req->i.waiting) {
239                 /* Retry all conflicting peer requests.  */
240                 wake_up(&mdev->misc_wait);
241         }
242         if (s & RQ_NET_QUEUED)
243                 return;
244         if (s & RQ_NET_PENDING)
245                 return;
246
247         if (req->master_bio) {
248                 /* this is DATA_RECEIVED (remote read)
249                  * or protocol C P_WRITE_ACK
250                  * or protocol B P_RECV_ACK
251                  * or protocol A "HANDED_OVER_TO_NETWORK" (SendAck)
252                  * or canceled or failed,
253                  * or killed from the transfer log due to connection loss.
254                  */
255
256                 /*
257                  * figure out whether to report success or failure.
258                  *
259                  * report success when at least one of the operations succeeded.
260                  * or, to put the other way,
261                  * only report failure, when both operations failed.
262                  *
263                  * what to do about the failures is handled elsewhere.
264                  * what we need to do here is just: complete the master_bio.
265                  *
266                  * local completion error, if any, has been stored as ERR_PTR
267                  * in private_bio within drbd_request_endio.
268                  */
269                 int ok = (s & RQ_LOCAL_OK) || (s & RQ_NET_OK);
270                 int error = PTR_ERR(req->private_bio);
271
272                 /* remove the request from the conflict detection
273                  * respective block_id verification hash */
274                 if (!drbd_interval_empty(&req->i)) {
275                         struct rb_root *root;
276
277                         if (rw == WRITE)
278                                 root = &mdev->write_requests;
279                         else
280                                 root = &mdev->read_requests;
281                         drbd_remove_request_interval(root, req);
282                 } else if (!(s & RQ_POSTPONED))
283                         D_ASSERT((s & (RQ_NET_MASK & ~RQ_NET_DONE)) == 0);
284
285                 /* for writes we need to do some extra housekeeping */
286                 if (rw == WRITE)
287                         _about_to_complete_local_write(mdev, req);
288
289                 /* Update disk stats */
290                 _drbd_end_io_acct(mdev, req);
291
292                 if (!(s & RQ_POSTPONED)) {
293                         m->error = ok ? 0 : (error ?: -EIO);
294                         m->bio = req->master_bio;
295                         req->master_bio = NULL;
296                 } else {
297                         /* Assert that this will be _req_is_done()
298                          * with this very invokation. */
299                         /* FIXME:
300                          * what about (RQ_LOCAL_PENDING | RQ_LOCAL_ABORTED)?
301                          */
302                         D_ASSERT(!(s & RQ_LOCAL_PENDING));
303                         D_ASSERT(s & RQ_NET_DONE);
304                 }
305         }
306
307         if (s & RQ_LOCAL_PENDING)
308                 return;
309
310         if ((s & RQ_NET_MASK) == 0 || (s & RQ_NET_DONE)) {
311                 /* this is disconnected (local only) operation,
312                  * or protocol A, B, or C P_BARRIER_ACK,
313                  * or killed from the transfer log due to connection loss. */
314                 _req_is_done(mdev, req, rw);
315         }
316         /* else: network part and not DONE yet. that is
317          * protocol A, B, or C, barrier ack still pending... */
318 }
319
320 static void _req_may_be_done_not_susp(struct drbd_request *req, struct bio_and_error *m)
321 {
322         struct drbd_conf *mdev = req->w.mdev;
323
324         if (!drbd_suspended(mdev))
325                 _req_may_be_done(req, m);
326 }
327
328 /* obviously this could be coded as many single functions
329  * instead of one huge switch,
330  * or by putting the code directly in the respective locations
331  * (as it has been before).
332  *
333  * but having it this way
334  *  enforces that it is all in this one place, where it is easier to audit,
335  *  it makes it obvious that whatever "event" "happens" to a request should
336  *  happen "atomically" within the req_lock,
337  *  and it enforces that we have to think in a very structured manner
338  *  about the "events" that may happen to a request during its life time ...
339  */
340 int __req_mod(struct drbd_request *req, enum drbd_req_event what,
341                 struct bio_and_error *m)
342 {
343         struct drbd_conf *mdev = req->w.mdev;
344         struct net_conf *nc;
345         int p, rv = 0;
346
347         if (m)
348                 m->bio = NULL;
349
350         switch (what) {
351         default:
352                 dev_err(DEV, "LOGIC BUG in %s:%u\n", __FILE__ , __LINE__);
353                 break;
354
355         /* does not happen...
356          * initialization done in drbd_req_new
357         case CREATED:
358                 break;
359                 */
360
361         case TO_BE_SENT: /* via network */
362                 /* reached via __drbd_make_request
363                  * and from w_read_retry_remote */
364                 D_ASSERT(!(req->rq_state & RQ_NET_MASK));
365                 req->rq_state |= RQ_NET_PENDING;
366                 rcu_read_lock();
367                 nc = rcu_dereference(mdev->tconn->net_conf);
368                 p = nc->wire_protocol;
369                 rcu_read_unlock();
370                 req->rq_state |=
371                         p == DRBD_PROT_C ? RQ_EXP_WRITE_ACK :
372                         p == DRBD_PROT_B ? RQ_EXP_RECEIVE_ACK : 0;
373                 inc_ap_pending(mdev);
374                 break;
375
376         case TO_BE_SUBMITTED: /* locally */
377                 /* reached via __drbd_make_request */
378                 D_ASSERT(!(req->rq_state & RQ_LOCAL_MASK));
379                 req->rq_state |= RQ_LOCAL_PENDING;
380                 break;
381
382         case COMPLETED_OK:
383                 if (req->rq_state & RQ_WRITE)
384                         mdev->writ_cnt += req->i.size >> 9;
385                 else
386                         mdev->read_cnt += req->i.size >> 9;
387
388                 req->rq_state |= (RQ_LOCAL_COMPLETED|RQ_LOCAL_OK);
389                 req->rq_state &= ~RQ_LOCAL_PENDING;
390
391                 _req_may_be_done_not_susp(req, m);
392                 put_ldev(mdev);
393                 break;
394
395         case ABORT_DISK_IO:
396                 req->rq_state |= RQ_LOCAL_ABORTED;
397                 if (req->rq_state & RQ_WRITE)
398                         _req_may_be_done_not_susp(req, m);
399                 else
400                         goto goto_queue_for_net_read;
401                 break;
402
403         case WRITE_COMPLETED_WITH_ERROR:
404                 req->rq_state |= RQ_LOCAL_COMPLETED;
405                 req->rq_state &= ~RQ_LOCAL_PENDING;
406
407                 __drbd_chk_io_error(mdev, false);
408                 _req_may_be_done_not_susp(req, m);
409                 put_ldev(mdev);
410                 break;
411
412         case READ_AHEAD_COMPLETED_WITH_ERROR:
413                 /* it is legal to fail READA */
414                 req->rq_state |= RQ_LOCAL_COMPLETED;
415                 req->rq_state &= ~RQ_LOCAL_PENDING;
416                 _req_may_be_done_not_susp(req, m);
417                 put_ldev(mdev);
418                 break;
419
420         case READ_COMPLETED_WITH_ERROR:
421                 drbd_set_out_of_sync(mdev, req->i.sector, req->i.size);
422
423                 req->rq_state |= RQ_LOCAL_COMPLETED;
424                 req->rq_state &= ~RQ_LOCAL_PENDING;
425
426                 D_ASSERT(!(req->rq_state & RQ_NET_MASK));
427
428                 __drbd_chk_io_error(mdev, false);
429                 put_ldev(mdev);
430
431         goto_queue_for_net_read:
432
433                 /* no point in retrying if there is no good remote data,
434                  * or we have no connection. */
435                 if (mdev->state.pdsk != D_UP_TO_DATE) {
436                         _req_may_be_done_not_susp(req, m);
437                         break;
438                 }
439
440                 /* _req_mod(req,TO_BE_SENT); oops, recursion... */
441                 req->rq_state |= RQ_NET_PENDING;
442                 inc_ap_pending(mdev);
443                 /* fall through: _req_mod(req,QUEUE_FOR_NET_READ); */
444
445         case QUEUE_FOR_NET_READ:
446                 /* READ or READA, and
447                  * no local disk,
448                  * or target area marked as invalid,
449                  * or just got an io-error. */
450                 /* from __drbd_make_request
451                  * or from bio_endio during read io-error recovery */
452
453                 /* so we can verify the handle in the answer packet
454                  * corresponding hlist_del is in _req_may_be_done() */
455                 D_ASSERT(drbd_interval_empty(&req->i));
456                 drbd_insert_interval(&mdev->read_requests, &req->i);
457
458                 set_bit(UNPLUG_REMOTE, &mdev->flags);
459
460                 D_ASSERT(req->rq_state & RQ_NET_PENDING);
461                 req->rq_state |= RQ_NET_QUEUED;
462                 req->w.cb = (req->rq_state & RQ_LOCAL_MASK)
463                         ? w_read_retry_remote
464                         : w_send_read_req;
465                 drbd_queue_work(&mdev->tconn->data.work, &req->w);
466                 break;
467
468         case QUEUE_FOR_NET_WRITE:
469                 /* assert something? */
470                 /* from __drbd_make_request only */
471
472                 /* corresponding hlist_del is in _req_may_be_done() */
473                 D_ASSERT(drbd_interval_empty(&req->i));
474                 drbd_insert_interval(&mdev->write_requests, &req->i);
475
476                 /* NOTE
477                  * In case the req ended up on the transfer log before being
478                  * queued on the worker, it could lead to this request being
479                  * missed during cleanup after connection loss.
480                  * So we have to do both operations here,
481                  * within the same lock that protects the transfer log.
482                  *
483                  * _req_add_to_epoch(req); this has to be after the
484                  * _maybe_start_new_epoch(req); which happened in
485                  * __drbd_make_request, because we now may set the bit
486                  * again ourselves to close the current epoch.
487                  *
488                  * Add req to the (now) current epoch (barrier). */
489
490                 /* otherwise we may lose an unplug, which may cause some remote
491                  * io-scheduler timeout to expire, increasing maximum latency,
492                  * hurting performance. */
493                 set_bit(UNPLUG_REMOTE, &mdev->flags);
494
495                 /* see __drbd_make_request,
496                  * just after it grabs the req_lock */
497                 D_ASSERT(test_bit(CREATE_BARRIER, &mdev->tconn->flags) == 0);
498
499                 req->epoch = mdev->tconn->newest_tle->br_number;
500
501                 /* increment size of current epoch */
502                 mdev->tconn->newest_tle->n_writes++;
503
504                 /* queue work item to send data */
505                 D_ASSERT(req->rq_state & RQ_NET_PENDING);
506                 req->rq_state |= RQ_NET_QUEUED;
507                 req->w.cb =  w_send_dblock;
508                 drbd_queue_work(&mdev->tconn->data.work, &req->w);
509
510                 /* close the epoch, in case it outgrew the limit */
511                 rcu_read_lock();
512                 nc = rcu_dereference(mdev->tconn->net_conf);
513                 p = nc->max_epoch_size;
514                 rcu_read_unlock();
515                 if (mdev->tconn->newest_tle->n_writes >= p)
516                         queue_barrier(mdev);
517
518                 break;
519
520         case QUEUE_FOR_SEND_OOS:
521                 req->rq_state |= RQ_NET_QUEUED;
522                 req->w.cb =  w_send_out_of_sync;
523                 drbd_queue_work(&mdev->tconn->data.work, &req->w);
524                 break;
525
526         case READ_RETRY_REMOTE_CANCELED:
527         case SEND_CANCELED:
528         case SEND_FAILED:
529                 /* real cleanup will be done from tl_clear.  just update flags
530                  * so it is no longer marked as on the worker queue */
531                 req->rq_state &= ~RQ_NET_QUEUED;
532                 /* if we did it right, tl_clear should be scheduled only after
533                  * this, so this should not be necessary! */
534                 _req_may_be_done_not_susp(req, m);
535                 break;
536
537         case HANDED_OVER_TO_NETWORK:
538                 /* assert something? */
539                 if (bio_data_dir(req->master_bio) == WRITE)
540                         atomic_add(req->i.size >> 9, &mdev->ap_in_flight);
541
542                 if (bio_data_dir(req->master_bio) == WRITE &&
543                     !(req->rq_state & (RQ_EXP_RECEIVE_ACK | RQ_EXP_WRITE_ACK))) {
544                         /* this is what is dangerous about protocol A:
545                          * pretend it was successfully written on the peer. */
546                         if (req->rq_state & RQ_NET_PENDING) {
547                                 dec_ap_pending(mdev);
548                                 req->rq_state &= ~RQ_NET_PENDING;
549                                 req->rq_state |= RQ_NET_OK;
550                         } /* else: neg-ack was faster... */
551                         /* it is still not yet RQ_NET_DONE until the
552                          * corresponding epoch barrier got acked as well,
553                          * so we know what to dirty on connection loss */
554                 }
555                 req->rq_state &= ~RQ_NET_QUEUED;
556                 req->rq_state |= RQ_NET_SENT;
557                 _req_may_be_done_not_susp(req, m);
558                 break;
559
560         case OOS_HANDED_TO_NETWORK:
561                 /* Was not set PENDING, no longer QUEUED, so is now DONE
562                  * as far as this connection is concerned. */
563                 req->rq_state &= ~RQ_NET_QUEUED;
564                 req->rq_state |= RQ_NET_DONE;
565                 _req_may_be_done_not_susp(req, m);
566                 break;
567
568         case CONNECTION_LOST_WHILE_PENDING:
569                 /* transfer log cleanup after connection loss */
570                 /* assert something? */
571                 if (req->rq_state & RQ_NET_PENDING)
572                         dec_ap_pending(mdev);
573
574                 p = !(req->rq_state & RQ_WRITE) && req->rq_state & RQ_NET_PENDING;
575
576                 req->rq_state &= ~(RQ_NET_OK|RQ_NET_PENDING);
577                 req->rq_state |= RQ_NET_DONE;
578                 if (req->rq_state & RQ_NET_SENT && req->rq_state & RQ_WRITE)
579                         atomic_sub(req->i.size >> 9, &mdev->ap_in_flight);
580
581                 /* if it is still queued, we may not complete it here.
582                  * it will be canceled soon. */
583                 if (!(req->rq_state & RQ_NET_QUEUED)) {
584                         if (p)
585                                 goto goto_read_retry_local;
586                         _req_may_be_done(req, m); /* Allowed while state.susp */
587                 }
588                 break;
589
590         case WRITE_ACKED_BY_PEER_AND_SIS:
591                 req->rq_state |= RQ_NET_SIS;
592         case DISCARD_WRITE:
593                 /* for discarded conflicting writes of multiple primaries,
594                  * there is no need to keep anything in the tl, potential
595                  * node crashes are covered by the activity log. */
596                 req->rq_state |= RQ_NET_DONE;
597                 /* fall through */
598         case WRITE_ACKED_BY_PEER:
599                 D_ASSERT(req->rq_state & RQ_EXP_WRITE_ACK);
600                 /* protocol C; successfully written on peer.
601                  * Nothing to do here.
602                  * We want to keep the tl in place for all protocols, to cater
603                  * for volatile write-back caches on lower level devices.
604                  *
605                  * A barrier request is expected to have forced all prior
606                  * requests onto stable storage, so completion of a barrier
607                  * request could set NET_DONE right here, and not wait for the
608                  * P_BARRIER_ACK, but that is an unnecessary optimization. */
609
610                 goto ack_common;
611                 /* this makes it effectively the same as for: */
612         case RECV_ACKED_BY_PEER:
613                 D_ASSERT(req->rq_state & RQ_EXP_RECEIVE_ACK);
614                 /* protocol B; pretends to be successfully written on peer.
615                  * see also notes above in HANDED_OVER_TO_NETWORK about
616                  * protocol != C */
617         ack_common:
618                 req->rq_state |= RQ_NET_OK;
619                 D_ASSERT(req->rq_state & RQ_NET_PENDING);
620                 dec_ap_pending(mdev);
621                 atomic_sub(req->i.size >> 9, &mdev->ap_in_flight);
622                 req->rq_state &= ~RQ_NET_PENDING;
623                 _req_may_be_done_not_susp(req, m);
624                 break;
625
626         case POSTPONE_WRITE:
627                 D_ASSERT(req->rq_state & RQ_EXP_WRITE_ACK);
628                 /* If this node has already detected the write conflict, the
629                  * worker will be waiting on misc_wait.  Wake it up once this
630                  * request has completed locally.
631                  */
632                 D_ASSERT(req->rq_state & RQ_NET_PENDING);
633                 req->rq_state |= RQ_POSTPONED;
634                 _req_may_be_done_not_susp(req, m);
635                 break;
636
637         case NEG_ACKED:
638                 /* assert something? */
639                 if (req->rq_state & RQ_NET_PENDING) {
640                         dec_ap_pending(mdev);
641                         if (req->rq_state & RQ_WRITE)
642                                 atomic_sub(req->i.size >> 9, &mdev->ap_in_flight);
643                 }
644                 req->rq_state &= ~(RQ_NET_OK|RQ_NET_PENDING);
645
646                 req->rq_state |= RQ_NET_DONE;
647
648                 if (!(req->rq_state & RQ_WRITE))
649                         goto goto_read_retry_local;
650
651                 _req_may_be_done_not_susp(req, m);
652                 /* else: done by HANDED_OVER_TO_NETWORK */
653                 break;
654
655         goto_read_retry_local:
656                 if (!drbd_may_do_local_read(mdev, req->i.sector, req->i.size)) {
657                         _req_may_be_done_not_susp(req, m);
658                         break;
659                 }
660                 D_ASSERT(!(req->rq_state & RQ_LOCAL_PENDING));
661                 req->rq_state |= RQ_LOCAL_PENDING;
662
663                 get_ldev(mdev);
664                 req->w.cb = w_restart_disk_io;
665                 drbd_queue_work(&mdev->tconn->data.work, &req->w);
666                 break;
667
668         case FAIL_FROZEN_DISK_IO:
669                 if (!(req->rq_state & RQ_LOCAL_COMPLETED))
670                         break;
671
672                 _req_may_be_done(req, m); /* Allowed while state.susp */
673                 break;
674
675         case RESTART_FROZEN_DISK_IO:
676                 if (!(req->rq_state & RQ_LOCAL_COMPLETED))
677                         break;
678
679                 req->rq_state &= ~RQ_LOCAL_COMPLETED;
680
681                 rv = MR_READ;
682                 if (bio_data_dir(req->master_bio) == WRITE)
683                         rv = MR_WRITE;
684
685                 get_ldev(mdev);
686                 req->w.cb = w_restart_disk_io;
687                 drbd_queue_work(&mdev->tconn->data.work, &req->w);
688                 break;
689
690         case RESEND:
691                 /* If RQ_NET_OK is already set, we got a P_WRITE_ACK or P_RECV_ACK
692                    before the connection loss (B&C only); only P_BARRIER_ACK was missing.
693                    Trowing them out of the TL here by pretending we got a BARRIER_ACK
694                    We ensure that the peer was not rebooted */
695                 if (!(req->rq_state & RQ_NET_OK)) {
696                         if (req->w.cb) {
697                                 drbd_queue_work(&mdev->tconn->data.work, &req->w);
698                                 rv = req->rq_state & RQ_WRITE ? MR_WRITE : MR_READ;
699                         }
700                         break;
701                 }
702                 /* else, fall through to BARRIER_ACKED */
703
704         case BARRIER_ACKED:
705                 if (!(req->rq_state & RQ_WRITE))
706                         break;
707
708                 if (req->rq_state & RQ_NET_PENDING) {
709                         /* barrier came in before all requests were acked.
710                          * this is bad, because if the connection is lost now,
711                          * we won't be able to clean them up... */
712                         dev_err(DEV, "FIXME (BARRIER_ACKED but pending)\n");
713                         list_move(&req->tl_requests, &mdev->tconn->out_of_sequence_requests);
714                 }
715                 if ((req->rq_state & RQ_NET_MASK) != 0) {
716                         req->rq_state |= RQ_NET_DONE;
717                         if (!(req->rq_state & (RQ_EXP_RECEIVE_ACK | RQ_EXP_WRITE_ACK)))
718                                 atomic_sub(req->i.size>>9, &mdev->ap_in_flight);
719                 }
720                 _req_may_be_done(req, m); /* Allowed while state.susp */
721                 break;
722
723         case DATA_RECEIVED:
724                 D_ASSERT(req->rq_state & RQ_NET_PENDING);
725                 dec_ap_pending(mdev);
726                 req->rq_state &= ~RQ_NET_PENDING;
727                 req->rq_state |= (RQ_NET_OK|RQ_NET_DONE);
728                 _req_may_be_done_not_susp(req, m);
729                 break;
730         };
731
732         return rv;
733 }
734
735 /* we may do a local read if:
736  * - we are consistent (of course),
737  * - or we are generally inconsistent,
738  *   BUT we are still/already IN SYNC for this area.
739  *   since size may be bigger than BM_BLOCK_SIZE,
740  *   we may need to check several bits.
741  */
742 static bool drbd_may_do_local_read(struct drbd_conf *mdev, sector_t sector, int size)
743 {
744         unsigned long sbnr, ebnr;
745         sector_t esector, nr_sectors;
746
747         if (mdev->state.disk == D_UP_TO_DATE)
748                 return true;
749         if (mdev->state.disk != D_INCONSISTENT)
750                 return false;
751         esector = sector + (size >> 9) - 1;
752         nr_sectors = drbd_get_capacity(mdev->this_bdev);
753         D_ASSERT(sector  < nr_sectors);
754         D_ASSERT(esector < nr_sectors);
755
756         sbnr = BM_SECT_TO_BIT(sector);
757         ebnr = BM_SECT_TO_BIT(esector);
758
759         return drbd_bm_count_bits(mdev, sbnr, ebnr) == 0;
760 }
761
762 static bool remote_due_to_read_balancing(struct drbd_conf *mdev, sector_t sector)
763 {
764         enum drbd_read_balancing rbm;
765         struct backing_dev_info *bdi;
766         int stripe_shift;
767
768         if (mdev->state.pdsk < D_UP_TO_DATE)
769                 return false;
770
771         rcu_read_lock();
772         rbm = rcu_dereference(mdev->ldev->disk_conf)->read_balancing;
773         rcu_read_unlock();
774
775         switch (rbm) {
776         case RB_CONGESTED_REMOTE:
777                 bdi = &mdev->ldev->backing_bdev->bd_disk->queue->backing_dev_info;
778                 return bdi_read_congested(bdi);
779         case RB_LEAST_PENDING:
780                 return atomic_read(&mdev->local_cnt) >
781                         atomic_read(&mdev->ap_pending_cnt) + atomic_read(&mdev->rs_pending_cnt);
782         case RB_32K_STRIPING:  /* stripe_shift = 15 */
783         case RB_64K_STRIPING:
784         case RB_128K_STRIPING:
785         case RB_256K_STRIPING:
786         case RB_512K_STRIPING:
787         case RB_1M_STRIPING:   /* stripe_shift = 20 */
788                 stripe_shift = (rbm - RB_32K_STRIPING + 15);
789                 return (sector >> (stripe_shift - 9)) & 1;
790         case RB_ROUND_ROBIN:
791                 return test_and_change_bit(READ_BALANCE_RR, &mdev->flags);
792         case RB_PREFER_REMOTE:
793                 return true;
794         case RB_PREFER_LOCAL:
795         default:
796                 return false;
797         }
798 }
799
800 /*
801  * complete_conflicting_writes  -  wait for any conflicting write requests
802  *
803  * The write_requests tree contains all active write requests which we
804  * currently know about.  Wait for any requests to complete which conflict with
805  * the new one.
806  */
807 static int complete_conflicting_writes(struct drbd_conf *mdev,
808                                        sector_t sector, int size)
809 {
810         for(;;) {
811                 struct drbd_interval *i;
812                 int err;
813
814                 i = drbd_find_overlap(&mdev->write_requests, sector, size);
815                 if (!i)
816                         return 0;
817                 err = drbd_wait_misc(mdev, i);
818                 if (err)
819                         return err;
820         }
821 }
822
823 int __drbd_make_request(struct drbd_conf *mdev, struct bio *bio, unsigned long start_time)
824 {
825         const int rw = bio_rw(bio);
826         const int size = bio->bi_size;
827         const sector_t sector = bio->bi_sector;
828         struct drbd_tl_epoch *b = NULL;
829         struct drbd_request *req;
830         struct net_conf *nc;
831         int local, remote, send_oos = 0;
832         int err;
833         int ret = 0;
834         union drbd_dev_state s;
835
836         /* allocate outside of all locks; */
837         req = drbd_req_new(mdev, bio);
838         if (!req) {
839                 dec_ap_bio(mdev);
840                 /* only pass the error to the upper layers.
841                  * if user cannot handle io errors, that's not our business. */
842                 dev_err(DEV, "could not kmalloc() req\n");
843                 bio_endio(bio, -ENOMEM);
844                 return 0;
845         }
846         req->start_time = start_time;
847
848         local = get_ldev(mdev);
849         if (!local) {
850                 bio_put(req->private_bio); /* or we get a bio leak */
851                 req->private_bio = NULL;
852         }
853         if (rw == WRITE) {
854                 remote = 1;
855         } else {
856                 /* READ || READA */
857                 if (local) {
858                         if (!drbd_may_do_local_read(mdev, sector, size) ||
859                             remote_due_to_read_balancing(mdev, sector)) {
860                                 /* we could kick the syncer to
861                                  * sync this extent asap, wait for
862                                  * it, then continue locally.
863                                  * Or just issue the request remotely.
864                                  */
865                                 local = 0;
866                                 bio_put(req->private_bio);
867                                 req->private_bio = NULL;
868                                 put_ldev(mdev);
869                         }
870                 }
871                 remote = !local && mdev->state.pdsk >= D_UP_TO_DATE;
872         }
873
874         /* If we have a disk, but a READA request is mapped to remote,
875          * we are R_PRIMARY, D_INCONSISTENT, SyncTarget.
876          * Just fail that READA request right here.
877          *
878          * THINK: maybe fail all READA when not local?
879          *        or make this configurable...
880          *        if network is slow, READA won't do any good.
881          */
882         if (rw == READA && mdev->state.disk >= D_INCONSISTENT && !local) {
883                 err = -EWOULDBLOCK;
884                 goto fail_and_free_req;
885         }
886
887         /* For WRITES going to the local disk, grab a reference on the target
888          * extent.  This waits for any resync activity in the corresponding
889          * resync extent to finish, and, if necessary, pulls in the target
890          * extent into the activity log, which involves further disk io because
891          * of transactional on-disk meta data updates. */
892         if (rw == WRITE && local && !test_bit(AL_SUSPENDED, &mdev->flags)) {
893                 req->rq_state |= RQ_IN_ACT_LOG;
894                 drbd_al_begin_io(mdev, &req->i);
895         }
896
897         s = mdev->state;
898         remote = remote && drbd_should_do_remote(s);
899         send_oos = rw == WRITE && drbd_should_send_out_of_sync(s);
900         D_ASSERT(!(remote && send_oos));
901
902         if (!(local || remote) && !drbd_suspended(mdev)) {
903                 if (__ratelimit(&drbd_ratelimit_state))
904                         dev_err(DEV, "IO ERROR: neither local nor remote disk\n");
905                 err = -EIO;
906                 goto fail_free_complete;
907         }
908
909         /* For WRITE request, we have to make sure that we have an
910          * unused_spare_tle, in case we need to start a new epoch.
911          * I try to be smart and avoid to pre-allocate always "just in case",
912          * but there is a race between testing the bit and pointer outside the
913          * spinlock, and grabbing the spinlock.
914          * if we lost that race, we retry.  */
915         if (rw == WRITE && (remote || send_oos) &&
916             mdev->tconn->unused_spare_tle == NULL &&
917             test_bit(CREATE_BARRIER, &mdev->tconn->flags)) {
918 allocate_barrier:
919                 b = kmalloc(sizeof(struct drbd_tl_epoch), GFP_NOIO);
920                 if (!b) {
921                         dev_err(DEV, "Failed to alloc barrier.\n");
922                         err = -ENOMEM;
923                         goto fail_free_complete;
924                 }
925         }
926
927         /* GOOD, everything prepared, grab the spin_lock */
928         spin_lock_irq(&mdev->tconn->req_lock);
929
930         if (rw == WRITE) {
931                 err = complete_conflicting_writes(mdev, sector, size);
932                 if (err) {
933                         if (err != -ERESTARTSYS)
934                                 _conn_request_state(mdev->tconn,
935                                                     NS(conn, C_TIMEOUT),
936                                                     CS_HARD);
937                         spin_unlock_irq(&mdev->tconn->req_lock);
938                         err = -EIO;
939                         goto fail_free_complete;
940                 }
941         }
942
943         if (drbd_suspended(mdev)) {
944                 /* If we got suspended, use the retry mechanism in
945                    drbd_make_request() to restart processing of this
946                    bio. In the next call to drbd_make_request
947                    we sleep in inc_ap_bio() */
948                 ret = 1;
949                 spin_unlock_irq(&mdev->tconn->req_lock);
950                 goto fail_free_complete;
951         }
952
953         if (remote || send_oos) {
954                 remote = drbd_should_do_remote(mdev->state);
955                 send_oos = rw == WRITE && drbd_should_send_out_of_sync(mdev->state);
956                 D_ASSERT(!(remote && send_oos));
957
958                 if (!(remote || send_oos))
959                         dev_warn(DEV, "lost connection while grabbing the req_lock!\n");
960                 if (!(local || remote)) {
961                         dev_err(DEV, "IO ERROR: neither local nor remote disk\n");
962                         spin_unlock_irq(&mdev->tconn->req_lock);
963                         err = -EIO;
964                         goto fail_free_complete;
965                 }
966         }
967
968         if (b && mdev->tconn->unused_spare_tle == NULL) {
969                 mdev->tconn->unused_spare_tle = b;
970                 b = NULL;
971         }
972         if (rw == WRITE && (remote || send_oos) &&
973             mdev->tconn->unused_spare_tle == NULL &&
974             test_bit(CREATE_BARRIER, &mdev->tconn->flags)) {
975                 /* someone closed the current epoch
976                  * while we were grabbing the spinlock */
977                 spin_unlock_irq(&mdev->tconn->req_lock);
978                 goto allocate_barrier;
979         }
980
981
982         /* Update disk stats */
983         _drbd_start_io_acct(mdev, req, bio);
984
985         /* _maybe_start_new_epoch(mdev);
986          * If we need to generate a write barrier packet, we have to add the
987          * new epoch (barrier) object, and queue the barrier packet for sending,
988          * and queue the req's data after it _within the same lock_, otherwise
989          * we have race conditions were the reorder domains could be mixed up.
990          *
991          * Even read requests may start a new epoch and queue the corresponding
992          * barrier packet.  To get the write ordering right, we only have to
993          * make sure that, if this is a write request and it triggered a
994          * barrier packet, this request is queued within the same spinlock. */
995         if ((remote || send_oos) && mdev->tconn->unused_spare_tle &&
996             test_and_clear_bit(CREATE_BARRIER, &mdev->tconn->flags)) {
997                 _tl_add_barrier(mdev->tconn, mdev->tconn->unused_spare_tle);
998                 mdev->tconn->unused_spare_tle = NULL;
999         } else {
1000                 D_ASSERT(!(remote && rw == WRITE &&
1001                            test_bit(CREATE_BARRIER, &mdev->tconn->flags)));
1002         }
1003
1004         /* NOTE
1005          * Actually, 'local' may be wrong here already, since we may have failed
1006          * to write to the meta data, and may become wrong anytime because of
1007          * local io-error for some other request, which would lead to us
1008          * "detaching" the local disk.
1009          *
1010          * 'remote' may become wrong any time because the network could fail.
1011          *
1012          * This is a harmless race condition, though, since it is handled
1013          * correctly at the appropriate places; so it just defers the failure
1014          * of the respective operation.
1015          */
1016
1017         /* mark them early for readability.
1018          * this just sets some state flags. */
1019         if (remote)
1020                 _req_mod(req, TO_BE_SENT);
1021         if (local)
1022                 _req_mod(req, TO_BE_SUBMITTED);
1023
1024         list_add_tail(&req->tl_requests, &mdev->tconn->newest_tle->requests);
1025
1026         /* NOTE remote first: to get the concurrent write detection right,
1027          * we must register the request before start of local IO.  */
1028         if (remote) {
1029                 /* either WRITE and C_CONNECTED,
1030                  * or READ, and no local disk,
1031                  * or READ, but not in sync.
1032                  */
1033                 _req_mod(req, (rw == WRITE)
1034                                 ? QUEUE_FOR_NET_WRITE
1035                                 : QUEUE_FOR_NET_READ);
1036         }
1037         if (send_oos && drbd_set_out_of_sync(mdev, sector, size))
1038                 _req_mod(req, QUEUE_FOR_SEND_OOS);
1039
1040         rcu_read_lock();
1041         nc = rcu_dereference(mdev->tconn->net_conf);
1042         if (remote &&
1043             nc->on_congestion != OC_BLOCK && mdev->tconn->agreed_pro_version >= 96) {
1044                 int congested = 0;
1045
1046                 if (nc->cong_fill &&
1047                     atomic_read(&mdev->ap_in_flight) >= nc->cong_fill) {
1048                         dev_info(DEV, "Congestion-fill threshold reached\n");
1049                         congested = 1;
1050                 }
1051
1052                 if (mdev->act_log->used >= nc->cong_extents) {
1053                         dev_info(DEV, "Congestion-extents threshold reached\n");
1054                         congested = 1;
1055                 }
1056
1057                 if (congested) {
1058                         queue_barrier(mdev); /* last barrier, after mirrored writes */
1059
1060                         if (nc->on_congestion == OC_PULL_AHEAD)
1061                                 _drbd_set_state(_NS(mdev, conn, C_AHEAD), 0, NULL);
1062                         else  /*nc->on_congestion == OC_DISCONNECT */
1063                                 _drbd_set_state(_NS(mdev, conn, C_DISCONNECTING), 0, NULL);
1064                 }
1065         }
1066         rcu_read_unlock();
1067
1068         spin_unlock_irq(&mdev->tconn->req_lock);
1069         kfree(b); /* if someone else has beaten us to it... */
1070
1071         if (local) {
1072                 req->private_bio->bi_bdev = mdev->ldev->backing_bdev;
1073
1074                 /* State may have changed since we grabbed our reference on the
1075                  * mdev->ldev member. Double check, and short-circuit to endio.
1076                  * In case the last activity log transaction failed to get on
1077                  * stable storage, and this is a WRITE, we may not even submit
1078                  * this bio. */
1079                 if (get_ldev(mdev)) {
1080                         if (drbd_insert_fault(mdev,   rw == WRITE ? DRBD_FAULT_DT_WR
1081                                                     : rw == READ  ? DRBD_FAULT_DT_RD
1082                                                     :               DRBD_FAULT_DT_RA))
1083                                 bio_endio(req->private_bio, -EIO);
1084                         else
1085                                 generic_make_request(req->private_bio);
1086                         put_ldev(mdev);
1087                 } else
1088                         bio_endio(req->private_bio, -EIO);
1089         }
1090
1091         return 0;
1092
1093 fail_free_complete:
1094         if (req->rq_state & RQ_IN_ACT_LOG)
1095                 drbd_al_complete_io(mdev, &req->i);
1096 fail_and_free_req:
1097         if (local) {
1098                 bio_put(req->private_bio);
1099                 req->private_bio = NULL;
1100                 put_ldev(mdev);
1101         }
1102         if (!ret)
1103                 bio_endio(bio, err);
1104
1105         drbd_req_free(req);
1106         dec_ap_bio(mdev);
1107         kfree(b);
1108
1109         return ret;
1110 }
1111
1112 int drbd_make_request(struct request_queue *q, struct bio *bio)
1113 {
1114         struct drbd_conf *mdev = (struct drbd_conf *) q->queuedata;
1115         unsigned long start_time;
1116
1117         start_time = jiffies;
1118
1119         /*
1120          * what we "blindly" assume:
1121          */
1122         D_ASSERT(bio->bi_size > 0);
1123         D_ASSERT(IS_ALIGNED(bio->bi_size, 512));
1124
1125         do {
1126                 inc_ap_bio(mdev);
1127         } while (__drbd_make_request(mdev, bio, start_time));
1128
1129         return 0;
1130 }
1131
1132 /* This is called by bio_add_page().
1133  *
1134  * q->max_hw_sectors and other global limits are already enforced there.
1135  *
1136  * We need to call down to our lower level device,
1137  * in case it has special restrictions.
1138  *
1139  * We also may need to enforce configured max-bio-bvecs limits.
1140  *
1141  * As long as the BIO is empty we have to allow at least one bvec,
1142  * regardless of size and offset, so no need to ask lower levels.
1143  */
1144 int drbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bvm, struct bio_vec *bvec)
1145 {
1146         struct drbd_conf *mdev = (struct drbd_conf *) q->queuedata;
1147         unsigned int bio_size = bvm->bi_size;
1148         int limit = DRBD_MAX_BIO_SIZE;
1149         int backing_limit;
1150
1151         if (bio_size && get_ldev(mdev)) {
1152                 struct request_queue * const b =
1153                         mdev->ldev->backing_bdev->bd_disk->queue;
1154                 if (b->merge_bvec_fn) {
1155                         backing_limit = b->merge_bvec_fn(b, bvm, bvec);
1156                         limit = min(limit, backing_limit);
1157                 }
1158                 put_ldev(mdev);
1159         }
1160         return limit;
1161 }
1162
1163 void request_timer_fn(unsigned long data)
1164 {
1165         struct drbd_conf *mdev = (struct drbd_conf *) data;
1166         struct drbd_tconn *tconn = mdev->tconn;
1167         struct drbd_request *req; /* oldest request */
1168         struct list_head *le;
1169         struct net_conf *nc;
1170         unsigned long ent = 0, dt = 0, et, nt; /* effective timeout = ko_count * timeout */
1171
1172         rcu_read_lock();
1173         nc = rcu_dereference(tconn->net_conf);
1174         ent = nc ? nc->timeout * HZ/10 * nc->ko_count : 0;
1175
1176         if (get_ldev(mdev)) {
1177                 dt = rcu_dereference(mdev->ldev->disk_conf)->disk_timeout * HZ / 10;
1178                 put_ldev(mdev);
1179         }
1180         rcu_read_unlock();
1181
1182         et = min_not_zero(dt, ent);
1183
1184         if (!et || (mdev->state.conn < C_WF_REPORT_PARAMS && mdev->state.disk <= D_FAILED))
1185                 return; /* Recurring timer stopped */
1186
1187         spin_lock_irq(&tconn->req_lock);
1188         le = &tconn->oldest_tle->requests;
1189         if (list_empty(le)) {
1190                 spin_unlock_irq(&tconn->req_lock);
1191                 mod_timer(&mdev->request_timer, jiffies + et);
1192                 return;
1193         }
1194
1195         le = le->prev;
1196         req = list_entry(le, struct drbd_request, tl_requests);
1197         if (ent && req->rq_state & RQ_NET_PENDING) {
1198                 if (time_is_before_eq_jiffies(req->start_time + ent)) {
1199                         dev_warn(DEV, "Remote failed to finish a request within ko-count * timeout\n");
1200                         _drbd_set_state(_NS(mdev, conn, C_TIMEOUT), CS_VERBOSE | CS_HARD, NULL);
1201                 }
1202         }
1203         if (dt && req->rq_state & RQ_LOCAL_PENDING && req->w.mdev == mdev) {
1204                 if (time_is_before_eq_jiffies(req->start_time + dt)) {
1205                         dev_warn(DEV, "Local backing device failed to meet the disk-timeout\n");
1206                         __drbd_chk_io_error(mdev, 1);
1207                 }
1208         }
1209         nt = (time_is_before_eq_jiffies(req->start_time + et) ? jiffies : req->start_time) + et;
1210         spin_unlock_irq(&tconn->req_lock);
1211         mod_timer(&mdev->request_timer, nt);
1212 }