drbd: detach from frozen backing device
[platform/adaptation/renesas_rcar/renesas_kernel.git] / drivers / block / drbd / drbd_req.c
1 /*
2    drbd_req.c
3
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10    drbd is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2, or (at your option)
13    any later version.
14
15    drbd is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19
20    You should have received a copy of the GNU General Public License
21    along with drbd; see the file COPYING.  If not, write to
22    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23
24  */
25
26 #include <linux/module.h>
27
28 #include <linux/slab.h>
29 #include <linux/drbd.h>
30 #include "drbd_int.h"
31 #include "drbd_req.h"
32
33
34 /* Update disk stats at start of I/O request */
35 static void _drbd_start_io_acct(struct drbd_conf *mdev, struct drbd_request *req, struct bio *bio)
36 {
37         const int rw = bio_data_dir(bio);
38         int cpu;
39         cpu = part_stat_lock();
40         part_stat_inc(cpu, &mdev->vdisk->part0, ios[rw]);
41         part_stat_add(cpu, &mdev->vdisk->part0, sectors[rw], bio_sectors(bio));
42         part_inc_in_flight(&mdev->vdisk->part0, rw);
43         part_stat_unlock();
44 }
45
46 /* Update disk stats when completing request upwards */
47 static void _drbd_end_io_acct(struct drbd_conf *mdev, struct drbd_request *req)
48 {
49         int rw = bio_data_dir(req->master_bio);
50         unsigned long duration = jiffies - req->start_time;
51         int cpu;
52         cpu = part_stat_lock();
53         part_stat_add(cpu, &mdev->vdisk->part0, ticks[rw], duration);
54         part_round_stats(cpu, &mdev->vdisk->part0);
55         part_dec_in_flight(&mdev->vdisk->part0, rw);
56         part_stat_unlock();
57 }
58
59 static struct drbd_request *drbd_req_new(struct drbd_conf *mdev,
60                                                struct bio *bio_src)
61 {
62         struct drbd_request *req;
63
64         req = mempool_alloc(drbd_request_mempool, GFP_NOIO);
65         if (!req)
66                 return NULL;
67
68         drbd_req_make_private_bio(req, bio_src);
69         req->rq_state    = bio_data_dir(bio_src) == WRITE ? RQ_WRITE : 0;
70         req->w.mdev      = mdev;
71         req->master_bio  = bio_src;
72         req->epoch       = 0;
73
74         drbd_clear_interval(&req->i);
75         req->i.sector     = bio_src->bi_sector;
76         req->i.size      = bio_src->bi_size;
77         req->i.local = true;
78         req->i.waiting = false;
79
80         INIT_LIST_HEAD(&req->tl_requests);
81         INIT_LIST_HEAD(&req->w.list);
82
83         return req;
84 }
85
86 static void drbd_req_free(struct drbd_request *req)
87 {
88         mempool_free(req, drbd_request_mempool);
89 }
90
91 /* rw is bio_data_dir(), only READ or WRITE */
92 static void _req_is_done(struct drbd_conf *mdev, struct drbd_request *req, const int rw)
93 {
94         const unsigned long s = req->rq_state;
95
96         /* remove it from the transfer log.
97          * well, only if it had been there in the first
98          * place... if it had not (local only or conflicting
99          * and never sent), it should still be "empty" as
100          * initialized in drbd_req_new(), so we can list_del() it
101          * here unconditionally */
102         list_del(&req->tl_requests);
103
104         /* if it was a write, we may have to set the corresponding
105          * bit(s) out-of-sync first. If it had a local part, we need to
106          * release the reference to the activity log. */
107         if (rw == WRITE) {
108                 /* Set out-of-sync unless both OK flags are set
109                  * (local only or remote failed).
110                  * Other places where we set out-of-sync:
111                  * READ with local io-error */
112                 if (!(s & RQ_NET_OK) || !(s & RQ_LOCAL_OK))
113                         drbd_set_out_of_sync(mdev, req->i.sector, req->i.size);
114
115                 if ((s & RQ_NET_OK) && (s & RQ_LOCAL_OK) && (s & RQ_NET_SIS))
116                         drbd_set_in_sync(mdev, req->i.sector, req->i.size);
117
118                 /* one might be tempted to move the drbd_al_complete_io
119                  * to the local io completion callback drbd_request_endio.
120                  * but, if this was a mirror write, we may only
121                  * drbd_al_complete_io after this is RQ_NET_DONE,
122                  * otherwise the extent could be dropped from the al
123                  * before it has actually been written on the peer.
124                  * if we crash before our peer knows about the request,
125                  * but after the extent has been dropped from the al,
126                  * we would forget to resync the corresponding extent.
127                  */
128                 if (s & RQ_LOCAL_MASK) {
129                         if (get_ldev_if_state(mdev, D_FAILED)) {
130                                 if (s & RQ_IN_ACT_LOG)
131                                         drbd_al_complete_io(mdev, &req->i);
132                                 put_ldev(mdev);
133                         } else if (__ratelimit(&drbd_ratelimit_state)) {
134                                 dev_warn(DEV, "Should have called drbd_al_complete_io(, %llu, %u), "
135                                          "but my Disk seems to have failed :(\n",
136                                          (unsigned long long) req->i.sector, req->i.size);
137                         }
138                 }
139         }
140
141         drbd_req_free(req);
142 }
143
144 static void queue_barrier(struct drbd_conf *mdev)
145 {
146         struct drbd_tl_epoch *b;
147
148         /* We are within the req_lock. Once we queued the barrier for sending,
149          * we set the CREATE_BARRIER bit. It is cleared as soon as a new
150          * barrier/epoch object is added. This is the only place this bit is
151          * set. It indicates that the barrier for this epoch is already queued,
152          * and no new epoch has been created yet. */
153         if (test_bit(CREATE_BARRIER, &mdev->flags))
154                 return;
155
156         b = mdev->tconn->newest_tle;
157         b->w.cb = w_send_barrier;
158         b->w.mdev = mdev;
159         /* inc_ap_pending done here, so we won't
160          * get imbalanced on connection loss.
161          * dec_ap_pending will be done in got_BarrierAck
162          * or (on connection loss) in tl_clear.  */
163         inc_ap_pending(mdev);
164         drbd_queue_work(&mdev->tconn->data.work, &b->w);
165         set_bit(CREATE_BARRIER, &mdev->flags);
166 }
167
168 static void _about_to_complete_local_write(struct drbd_conf *mdev,
169         struct drbd_request *req)
170 {
171         const unsigned long s = req->rq_state;
172
173         /* Before we can signal completion to the upper layers,
174          * we may need to close the current epoch.
175          * We can skip this, if this request has not even been sent, because we
176          * did not have a fully established connection yet/anymore, during
177          * bitmap exchange, or while we are C_AHEAD due to congestion policy.
178          */
179         if (mdev->state.conn >= C_CONNECTED &&
180             (s & RQ_NET_SENT) != 0 &&
181             req->epoch == mdev->tconn->newest_tle->br_number)
182                 queue_barrier(mdev);
183 }
184
185 void complete_master_bio(struct drbd_conf *mdev,
186                 struct bio_and_error *m)
187 {
188         bio_endio(m->bio, m->error);
189         dec_ap_bio(mdev);
190 }
191
192
193 static void drbd_remove_request_interval(struct rb_root *root,
194                                          struct drbd_request *req)
195 {
196         struct drbd_conf *mdev = req->w.mdev;
197         struct drbd_interval *i = &req->i;
198
199         drbd_remove_interval(root, i);
200
201         /* Wake up any processes waiting for this request to complete.  */
202         if (i->waiting)
203                 wake_up(&mdev->misc_wait);
204 }
205
206 /* Helper for __req_mod().
207  * Set m->bio to the master bio, if it is fit to be completed,
208  * or leave it alone (it is initialized to NULL in __req_mod),
209  * if it has already been completed, or cannot be completed yet.
210  * If m->bio is set, the error status to be returned is placed in m->error.
211  */
212 void _req_may_be_done(struct drbd_request *req, struct bio_and_error *m)
213 {
214         const unsigned long s = req->rq_state;
215         struct drbd_conf *mdev = req->w.mdev;
216         int rw = req->rq_state & RQ_WRITE ? WRITE : READ;
217
218         /* we must not complete the master bio, while it is
219          *      still being processed by _drbd_send_zc_bio (drbd_send_dblock)
220          *      not yet acknowledged by the peer
221          *      not yet completed by the local io subsystem
222          * these flags may get cleared in any order by
223          *      the worker,
224          *      the receiver,
225          *      the bio_endio completion callbacks.
226          */
227         if (s & RQ_LOCAL_PENDING && !(s & RQ_LOCAL_ABORTED))
228                 return;
229         if (req->i.waiting) {
230                 /* Retry all conflicting peer requests.  */
231                 wake_up(&mdev->misc_wait);
232         }
233         if (s & RQ_NET_QUEUED)
234                 return;
235         if (s & RQ_NET_PENDING)
236                 return;
237
238         if (req->master_bio) {
239                 /* this is DATA_RECEIVED (remote read)
240                  * or protocol C P_WRITE_ACK
241                  * or protocol B P_RECV_ACK
242                  * or protocol A "HANDED_OVER_TO_NETWORK" (SendAck)
243                  * or canceled or failed,
244                  * or killed from the transfer log due to connection loss.
245                  */
246
247                 /*
248                  * figure out whether to report success or failure.
249                  *
250                  * report success when at least one of the operations succeeded.
251                  * or, to put the other way,
252                  * only report failure, when both operations failed.
253                  *
254                  * what to do about the failures is handled elsewhere.
255                  * what we need to do here is just: complete the master_bio.
256                  *
257                  * local completion error, if any, has been stored as ERR_PTR
258                  * in private_bio within drbd_request_endio.
259                  */
260                 int ok = (s & RQ_LOCAL_OK) || (s & RQ_NET_OK);
261                 int error = PTR_ERR(req->private_bio);
262
263                 /* remove the request from the conflict detection
264                  * respective block_id verification hash */
265                 if (!drbd_interval_empty(&req->i)) {
266                         struct rb_root *root;
267
268                         if (rw == WRITE)
269                                 root = &mdev->write_requests;
270                         else
271                                 root = &mdev->read_requests;
272                         drbd_remove_request_interval(root, req);
273                 } else if (!(s & RQ_POSTPONED))
274                         D_ASSERT((s & (RQ_NET_MASK & ~RQ_NET_DONE)) == 0);
275
276                 /* for writes we need to do some extra housekeeping */
277                 if (rw == WRITE)
278                         _about_to_complete_local_write(mdev, req);
279
280                 /* Update disk stats */
281                 _drbd_end_io_acct(mdev, req);
282
283                 if (!(s & RQ_POSTPONED)) {
284                         m->error = ok ? 0 : (error ?: -EIO);
285                         m->bio = req->master_bio;
286                 }
287                 req->master_bio = NULL;
288         }
289
290         if (s & RQ_LOCAL_PENDING)
291                 return;
292
293         if ((s & RQ_NET_MASK) == 0 || (s & RQ_NET_DONE)) {
294                 /* this is disconnected (local only) operation,
295                  * or protocol C P_WRITE_ACK,
296                  * or protocol A or B P_BARRIER_ACK,
297                  * or killed from the transfer log due to connection loss. */
298                 _req_is_done(mdev, req, rw);
299         }
300         /* else: network part and not DONE yet. that is
301          * protocol A or B, barrier ack still pending... */
302 }
303
304 static void _req_may_be_done_not_susp(struct drbd_request *req, struct bio_and_error *m)
305 {
306         struct drbd_conf *mdev = req->w.mdev;
307
308         if (!drbd_suspended(mdev))
309                 _req_may_be_done(req, m);
310 }
311
312 /* obviously this could be coded as many single functions
313  * instead of one huge switch,
314  * or by putting the code directly in the respective locations
315  * (as it has been before).
316  *
317  * but having it this way
318  *  enforces that it is all in this one place, where it is easier to audit,
319  *  it makes it obvious that whatever "event" "happens" to a request should
320  *  happen "atomically" within the req_lock,
321  *  and it enforces that we have to think in a very structured manner
322  *  about the "events" that may happen to a request during its life time ...
323  */
324 int __req_mod(struct drbd_request *req, enum drbd_req_event what,
325                 struct bio_and_error *m)
326 {
327         struct drbd_conf *mdev = req->w.mdev;
328         struct net_conf *nc;
329         int p, rv = 0;
330
331         if (m)
332                 m->bio = NULL;
333
334         switch (what) {
335         default:
336                 dev_err(DEV, "LOGIC BUG in %s:%u\n", __FILE__ , __LINE__);
337                 break;
338
339         /* does not happen...
340          * initialization done in drbd_req_new
341         case CREATED:
342                 break;
343                 */
344
345         case TO_BE_SENT: /* via network */
346                 /* reached via __drbd_make_request
347                  * and from w_read_retry_remote */
348                 D_ASSERT(!(req->rq_state & RQ_NET_MASK));
349                 req->rq_state |= RQ_NET_PENDING;
350                 rcu_read_lock();
351                 nc = rcu_dereference(mdev->tconn->net_conf);
352                 p = nc->wire_protocol;
353                 rcu_read_unlock();
354                 req->rq_state |=
355                         p == DRBD_PROT_C ? RQ_EXP_WRITE_ACK :
356                         p == DRBD_PROT_B ? RQ_EXP_RECEIVE_ACK : 0;
357                 inc_ap_pending(mdev);
358                 break;
359
360         case TO_BE_SUBMITTED: /* locally */
361                 /* reached via __drbd_make_request */
362                 D_ASSERT(!(req->rq_state & RQ_LOCAL_MASK));
363                 req->rq_state |= RQ_LOCAL_PENDING;
364                 break;
365
366         case COMPLETED_OK:
367                 if (req->rq_state & RQ_WRITE)
368                         mdev->writ_cnt += req->i.size >> 9;
369                 else
370                         mdev->read_cnt += req->i.size >> 9;
371
372                 req->rq_state |= (RQ_LOCAL_COMPLETED|RQ_LOCAL_OK);
373                 req->rq_state &= ~RQ_LOCAL_PENDING;
374
375                 _req_may_be_done_not_susp(req, m);
376                 put_ldev(mdev);
377                 break;
378
379         case ABORT_DISK_IO:
380                 req->rq_state |= RQ_LOCAL_ABORTED;
381                 if (req->rq_state & RQ_WRITE)
382                         _req_may_be_done_not_susp(req, m);
383                 else
384                         goto goto_queue_for_net_read;
385                 break;
386
387         case WRITE_COMPLETED_WITH_ERROR:
388                 req->rq_state |= RQ_LOCAL_COMPLETED;
389                 req->rq_state &= ~RQ_LOCAL_PENDING;
390
391                 __drbd_chk_io_error(mdev, false);
392                 _req_may_be_done_not_susp(req, m);
393                 put_ldev(mdev);
394                 break;
395
396         case READ_AHEAD_COMPLETED_WITH_ERROR:
397                 /* it is legal to fail READA */
398                 req->rq_state |= RQ_LOCAL_COMPLETED;
399                 req->rq_state &= ~RQ_LOCAL_PENDING;
400                 _req_may_be_done_not_susp(req, m);
401                 put_ldev(mdev);
402                 break;
403
404         case READ_COMPLETED_WITH_ERROR:
405                 drbd_set_out_of_sync(mdev, req->i.sector, req->i.size);
406
407                 req->rq_state |= RQ_LOCAL_COMPLETED;
408                 req->rq_state &= ~RQ_LOCAL_PENDING;
409
410                 D_ASSERT(!(req->rq_state & RQ_NET_MASK));
411
412                 __drbd_chk_io_error(mdev, false);
413                 put_ldev(mdev);
414
415         goto_queue_for_net_read:
416
417                 /* no point in retrying if there is no good remote data,
418                  * or we have no connection. */
419                 if (mdev->state.pdsk != D_UP_TO_DATE) {
420                         _req_may_be_done_not_susp(req, m);
421                         break;
422                 }
423
424                 /* _req_mod(req,TO_BE_SENT); oops, recursion... */
425                 req->rq_state |= RQ_NET_PENDING;
426                 inc_ap_pending(mdev);
427                 /* fall through: _req_mod(req,QUEUE_FOR_NET_READ); */
428
429         case QUEUE_FOR_NET_READ:
430                 /* READ or READA, and
431                  * no local disk,
432                  * or target area marked as invalid,
433                  * or just got an io-error. */
434                 /* from __drbd_make_request
435                  * or from bio_endio during read io-error recovery */
436
437                 /* so we can verify the handle in the answer packet
438                  * corresponding hlist_del is in _req_may_be_done() */
439                 drbd_insert_interval(&mdev->read_requests, &req->i);
440
441                 set_bit(UNPLUG_REMOTE, &mdev->flags);
442
443                 D_ASSERT(req->rq_state & RQ_NET_PENDING);
444                 req->rq_state |= RQ_NET_QUEUED;
445                 req->w.cb = (req->rq_state & RQ_LOCAL_MASK)
446                         ? w_read_retry_remote
447                         : w_send_read_req;
448                 drbd_queue_work(&mdev->tconn->data.work, &req->w);
449                 break;
450
451         case QUEUE_FOR_NET_WRITE:
452                 /* assert something? */
453                 /* from __drbd_make_request only */
454
455                 /* corresponding hlist_del is in _req_may_be_done() */
456                 drbd_insert_interval(&mdev->write_requests, &req->i);
457
458                 /* NOTE
459                  * In case the req ended up on the transfer log before being
460                  * queued on the worker, it could lead to this request being
461                  * missed during cleanup after connection loss.
462                  * So we have to do both operations here,
463                  * within the same lock that protects the transfer log.
464                  *
465                  * _req_add_to_epoch(req); this has to be after the
466                  * _maybe_start_new_epoch(req); which happened in
467                  * __drbd_make_request, because we now may set the bit
468                  * again ourselves to close the current epoch.
469                  *
470                  * Add req to the (now) current epoch (barrier). */
471
472                 /* otherwise we may lose an unplug, which may cause some remote
473                  * io-scheduler timeout to expire, increasing maximum latency,
474                  * hurting performance. */
475                 set_bit(UNPLUG_REMOTE, &mdev->flags);
476
477                 /* see __drbd_make_request,
478                  * just after it grabs the req_lock */
479                 D_ASSERT(test_bit(CREATE_BARRIER, &mdev->flags) == 0);
480
481                 req->epoch = mdev->tconn->newest_tle->br_number;
482
483                 /* increment size of current epoch */
484                 mdev->tconn->newest_tle->n_writes++;
485
486                 /* queue work item to send data */
487                 D_ASSERT(req->rq_state & RQ_NET_PENDING);
488                 req->rq_state |= RQ_NET_QUEUED;
489                 req->w.cb =  w_send_dblock;
490                 drbd_queue_work(&mdev->tconn->data.work, &req->w);
491
492                 /* close the epoch, in case it outgrew the limit */
493                 rcu_read_lock();
494                 nc = rcu_dereference(mdev->tconn->net_conf);
495                 p = nc->max_epoch_size;
496                 rcu_read_unlock();
497                 if (mdev->tconn->newest_tle->n_writes >= p)
498                         queue_barrier(mdev);
499
500                 break;
501
502         case QUEUE_FOR_SEND_OOS:
503                 req->rq_state |= RQ_NET_QUEUED;
504                 req->w.cb =  w_send_out_of_sync;
505                 drbd_queue_work(&mdev->tconn->data.work, &req->w);
506                 break;
507
508         case OOS_HANDED_TO_NETWORK:
509                 /* actually the same */
510         case SEND_CANCELED:
511                 /* treat it the same */
512         case SEND_FAILED:
513                 /* real cleanup will be done from tl_clear.  just update flags
514                  * so it is no longer marked as on the worker queue */
515                 req->rq_state &= ~RQ_NET_QUEUED;
516                 /* if we did it right, tl_clear should be scheduled only after
517                  * this, so this should not be necessary! */
518                 _req_may_be_done_not_susp(req, m);
519                 break;
520
521         case HANDED_OVER_TO_NETWORK:
522                 /* assert something? */
523                 if (bio_data_dir(req->master_bio) == WRITE)
524                         atomic_add(req->i.size >> 9, &mdev->ap_in_flight);
525
526                 if (bio_data_dir(req->master_bio) == WRITE &&
527                     !(req->rq_state & (RQ_EXP_RECEIVE_ACK | RQ_EXP_WRITE_ACK))) {
528                         /* this is what is dangerous about protocol A:
529                          * pretend it was successfully written on the peer. */
530                         if (req->rq_state & RQ_NET_PENDING) {
531                                 dec_ap_pending(mdev);
532                                 req->rq_state &= ~RQ_NET_PENDING;
533                                 req->rq_state |= RQ_NET_OK;
534                         } /* else: neg-ack was faster... */
535                         /* it is still not yet RQ_NET_DONE until the
536                          * corresponding epoch barrier got acked as well,
537                          * so we know what to dirty on connection loss */
538                 }
539                 req->rq_state &= ~RQ_NET_QUEUED;
540                 req->rq_state |= RQ_NET_SENT;
541                 /* because _drbd_send_zc_bio could sleep, and may want to
542                  * dereference the bio even after the "WRITE_ACKED_BY_PEER" and
543                  * "COMPLETED_OK" events came in, once we return from
544                  * _drbd_send_zc_bio (drbd_send_dblock), we have to check
545                  * whether it is done already, and end it.  */
546                 _req_may_be_done_not_susp(req, m);
547                 break;
548
549         case READ_RETRY_REMOTE_CANCELED:
550                 req->rq_state &= ~RQ_NET_QUEUED;
551                 /* fall through, in case we raced with drbd_disconnect */
552         case CONNECTION_LOST_WHILE_PENDING:
553                 /* transfer log cleanup after connection loss */
554                 /* assert something? */
555                 if (req->rq_state & RQ_NET_PENDING)
556                         dec_ap_pending(mdev);
557                 req->rq_state &= ~(RQ_NET_OK|RQ_NET_PENDING);
558                 req->rq_state |= RQ_NET_DONE;
559                 if (req->rq_state & RQ_NET_SENT && req->rq_state & RQ_WRITE)
560                         atomic_sub(req->i.size >> 9, &mdev->ap_in_flight);
561
562                 /* if it is still queued, we may not complete it here.
563                  * it will be canceled soon. */
564                 if (!(req->rq_state & RQ_NET_QUEUED))
565                         _req_may_be_done(req, m); /* Allowed while state.susp */
566                 break;
567
568         case WRITE_ACKED_BY_PEER_AND_SIS:
569                 req->rq_state |= RQ_NET_SIS;
570         case DISCARD_WRITE:
571                 /* for discarded conflicting writes of multiple primaries,
572                  * there is no need to keep anything in the tl, potential
573                  * node crashes are covered by the activity log. */
574                 req->rq_state |= RQ_NET_DONE;
575                 /* fall through */
576         case WRITE_ACKED_BY_PEER:
577                 D_ASSERT(req->rq_state & RQ_EXP_WRITE_ACK);
578                 /* protocol C; successfully written on peer.
579                  * Nothing to do here.
580                  * We want to keep the tl in place for all protocols, to cater
581                  * for volatile write-back caches on lower level devices.
582                  *
583                  * A barrier request is expected to have forced all prior
584                  * requests onto stable storage, so completion of a barrier
585                  * request could set NET_DONE right here, and not wait for the
586                  * P_BARRIER_ACK, but that is an unnecessary optimization. */
587
588                 goto ack_common;
589                 /* this makes it effectively the same as for: */
590         case RECV_ACKED_BY_PEER:
591                 D_ASSERT(req->rq_state & RQ_EXP_RECEIVE_ACK);
592                 /* protocol B; pretends to be successfully written on peer.
593                  * see also notes above in HANDED_OVER_TO_NETWORK about
594                  * protocol != C */
595         ack_common:
596                 req->rq_state |= RQ_NET_OK;
597                 D_ASSERT(req->rq_state & RQ_NET_PENDING);
598                 dec_ap_pending(mdev);
599                 atomic_sub(req->i.size >> 9, &mdev->ap_in_flight);
600                 req->rq_state &= ~RQ_NET_PENDING;
601                 _req_may_be_done_not_susp(req, m);
602                 break;
603
604         case POSTPONE_WRITE:
605                 D_ASSERT(req->rq_state & RQ_EXP_WRITE_ACK);
606                 /* If this node has already detected the write conflict, the
607                  * worker will be waiting on misc_wait.  Wake it up once this
608                  * request has completed locally.
609                  */
610                 D_ASSERT(req->rq_state & RQ_NET_PENDING);
611                 req->rq_state |= RQ_POSTPONED;
612                 _req_may_be_done_not_susp(req, m);
613                 break;
614
615         case NEG_ACKED:
616                 /* assert something? */
617                 if (req->rq_state & RQ_NET_PENDING) {
618                         dec_ap_pending(mdev);
619                         atomic_sub(req->i.size >> 9, &mdev->ap_in_flight);
620                 }
621                 req->rq_state &= ~(RQ_NET_OK|RQ_NET_PENDING);
622
623                 req->rq_state |= RQ_NET_DONE;
624                 _req_may_be_done_not_susp(req, m);
625                 /* else: done by HANDED_OVER_TO_NETWORK */
626                 break;
627
628         case FAIL_FROZEN_DISK_IO:
629                 if (!(req->rq_state & RQ_LOCAL_COMPLETED))
630                         break;
631
632                 _req_may_be_done(req, m); /* Allowed while state.susp */
633                 break;
634
635         case RESTART_FROZEN_DISK_IO:
636                 if (!(req->rq_state & RQ_LOCAL_COMPLETED))
637                         break;
638
639                 req->rq_state &= ~RQ_LOCAL_COMPLETED;
640
641                 rv = MR_READ;
642                 if (bio_data_dir(req->master_bio) == WRITE)
643                         rv = MR_WRITE;
644
645                 get_ldev(mdev);
646                 req->w.cb = w_restart_disk_io;
647                 drbd_queue_work(&mdev->tconn->data.work, &req->w);
648                 break;
649
650         case RESEND:
651                 /* If RQ_NET_OK is already set, we got a P_WRITE_ACK or P_RECV_ACK
652                    before the connection loss (B&C only); only P_BARRIER_ACK was missing.
653                    Trowing them out of the TL here by pretending we got a BARRIER_ACK
654                    We ensure that the peer was not rebooted */
655                 if (!(req->rq_state & RQ_NET_OK)) {
656                         if (req->w.cb) {
657                                 drbd_queue_work(&mdev->tconn->data.work, &req->w);
658                                 rv = req->rq_state & RQ_WRITE ? MR_WRITE : MR_READ;
659                         }
660                         break;
661                 }
662                 /* else, fall through to BARRIER_ACKED */
663
664         case BARRIER_ACKED:
665                 if (!(req->rq_state & RQ_WRITE))
666                         break;
667
668                 if (req->rq_state & RQ_NET_PENDING) {
669                         /* barrier came in before all requests have been acked.
670                          * this is bad, because if the connection is lost now,
671                          * we won't be able to clean them up... */
672                         dev_err(DEV, "FIXME (BARRIER_ACKED but pending)\n");
673                         list_move(&req->tl_requests, &mdev->tconn->out_of_sequence_requests);
674                 }
675                 if ((req->rq_state & RQ_NET_MASK) != 0) {
676                         req->rq_state |= RQ_NET_DONE;
677                         if (!(req->rq_state & (RQ_EXP_RECEIVE_ACK | RQ_EXP_WRITE_ACK)))
678                                 atomic_sub(req->i.size>>9, &mdev->ap_in_flight);
679                 }
680                 _req_may_be_done(req, m); /* Allowed while state.susp */
681                 break;
682
683         case DATA_RECEIVED:
684                 D_ASSERT(req->rq_state & RQ_NET_PENDING);
685                 dec_ap_pending(mdev);
686                 req->rq_state &= ~RQ_NET_PENDING;
687                 req->rq_state |= (RQ_NET_OK|RQ_NET_DONE);
688                 _req_may_be_done_not_susp(req, m);
689                 break;
690         };
691
692         return rv;
693 }
694
695 /* we may do a local read if:
696  * - we are consistent (of course),
697  * - or we are generally inconsistent,
698  *   BUT we are still/already IN SYNC for this area.
699  *   since size may be bigger than BM_BLOCK_SIZE,
700  *   we may need to check several bits.
701  */
702 static bool drbd_may_do_local_read(struct drbd_conf *mdev, sector_t sector, int size)
703 {
704         unsigned long sbnr, ebnr;
705         sector_t esector, nr_sectors;
706
707         if (mdev->state.disk == D_UP_TO_DATE)
708                 return true;
709         if (mdev->state.disk != D_INCONSISTENT)
710                 return false;
711         esector = sector + (size >> 9) - 1;
712         nr_sectors = drbd_get_capacity(mdev->this_bdev);
713         D_ASSERT(sector  < nr_sectors);
714         D_ASSERT(esector < nr_sectors);
715
716         sbnr = BM_SECT_TO_BIT(sector);
717         ebnr = BM_SECT_TO_BIT(esector);
718
719         return drbd_bm_count_bits(mdev, sbnr, ebnr) == 0;
720 }
721
722 /*
723  * complete_conflicting_writes  -  wait for any conflicting write requests
724  *
725  * The write_requests tree contains all active write requests which we
726  * currently know about.  Wait for any requests to complete which conflict with
727  * the new one.
728  */
729 static int complete_conflicting_writes(struct drbd_conf *mdev,
730                                        sector_t sector, int size)
731 {
732         for(;;) {
733                 struct drbd_interval *i;
734                 int err;
735
736                 i = drbd_find_overlap(&mdev->write_requests, sector, size);
737                 if (!i)
738                         return 0;
739                 err = drbd_wait_misc(mdev, i);
740                 if (err)
741                         return err;
742         }
743 }
744
745 int __drbd_make_request(struct drbd_conf *mdev, struct bio *bio, unsigned long start_time)
746 {
747         const int rw = bio_rw(bio);
748         const int size = bio->bi_size;
749         const sector_t sector = bio->bi_sector;
750         struct drbd_tl_epoch *b = NULL;
751         struct drbd_request *req;
752         struct net_conf *nc;
753         int local, remote, send_oos = 0;
754         int err;
755         int ret = 0;
756
757         /* allocate outside of all locks; */
758         req = drbd_req_new(mdev, bio);
759         if (!req) {
760                 dec_ap_bio(mdev);
761                 /* only pass the error to the upper layers.
762                  * if user cannot handle io errors, that's not our business. */
763                 dev_err(DEV, "could not kmalloc() req\n");
764                 bio_endio(bio, -ENOMEM);
765                 return 0;
766         }
767         req->start_time = start_time;
768
769         local = get_ldev(mdev);
770         if (!local) {
771                 bio_put(req->private_bio); /* or we get a bio leak */
772                 req->private_bio = NULL;
773         }
774         if (rw == WRITE) {
775                 remote = 1;
776         } else {
777                 /* READ || READA */
778                 if (local) {
779                         if (!drbd_may_do_local_read(mdev, sector, size)) {
780                                 /* we could kick the syncer to
781                                  * sync this extent asap, wait for
782                                  * it, then continue locally.
783                                  * Or just issue the request remotely.
784                                  */
785                                 local = 0;
786                                 bio_put(req->private_bio);
787                                 req->private_bio = NULL;
788                                 put_ldev(mdev);
789                         }
790                 }
791                 remote = !local && mdev->state.pdsk >= D_UP_TO_DATE;
792         }
793
794         /* If we have a disk, but a READA request is mapped to remote,
795          * we are R_PRIMARY, D_INCONSISTENT, SyncTarget.
796          * Just fail that READA request right here.
797          *
798          * THINK: maybe fail all READA when not local?
799          *        or make this configurable...
800          *        if network is slow, READA won't do any good.
801          */
802         if (rw == READA && mdev->state.disk >= D_INCONSISTENT && !local) {
803                 err = -EWOULDBLOCK;
804                 goto fail_and_free_req;
805         }
806
807         /* For WRITES going to the local disk, grab a reference on the target
808          * extent.  This waits for any resync activity in the corresponding
809          * resync extent to finish, and, if necessary, pulls in the target
810          * extent into the activity log, which involves further disk io because
811          * of transactional on-disk meta data updates. */
812         if (rw == WRITE && local && !test_bit(AL_SUSPENDED, &mdev->flags)) {
813                 req->rq_state |= RQ_IN_ACT_LOG;
814                 drbd_al_begin_io(mdev, &req->i);
815         }
816
817         remote = remote && drbd_should_do_remote(mdev->state);
818         send_oos = rw == WRITE && drbd_should_send_out_of_sync(mdev->state);
819         D_ASSERT(!(remote && send_oos));
820
821         if (!(local || remote) && !drbd_suspended(mdev)) {
822                 if (__ratelimit(&drbd_ratelimit_state))
823                         dev_err(DEV, "IO ERROR: neither local nor remote disk\n");
824                 err = -EIO;
825                 goto fail_free_complete;
826         }
827
828         /* For WRITE request, we have to make sure that we have an
829          * unused_spare_tle, in case we need to start a new epoch.
830          * I try to be smart and avoid to pre-allocate always "just in case",
831          * but there is a race between testing the bit and pointer outside the
832          * spinlock, and grabbing the spinlock.
833          * if we lost that race, we retry.  */
834         if (rw == WRITE && (remote || send_oos) &&
835             mdev->tconn->unused_spare_tle == NULL &&
836             test_bit(CREATE_BARRIER, &mdev->flags)) {
837 allocate_barrier:
838                 b = kmalloc(sizeof(struct drbd_tl_epoch), GFP_NOIO);
839                 if (!b) {
840                         dev_err(DEV, "Failed to alloc barrier.\n");
841                         err = -ENOMEM;
842                         goto fail_free_complete;
843                 }
844         }
845
846         /* GOOD, everything prepared, grab the spin_lock */
847         spin_lock_irq(&mdev->tconn->req_lock);
848
849         if (rw == WRITE) {
850                 err = complete_conflicting_writes(mdev, sector, size);
851                 if (err) {
852                         if (err != -ERESTARTSYS)
853                                 _conn_request_state(mdev->tconn,
854                                                     NS(conn, C_TIMEOUT),
855                                                     CS_HARD);
856                         spin_unlock_irq(&mdev->tconn->req_lock);
857                         err = -EIO;
858                         goto fail_free_complete;
859                 }
860         }
861
862         if (drbd_suspended(mdev)) {
863                 /* If we got suspended, use the retry mechanism of
864                    generic_make_request() to restart processing of this
865                    bio. In the next call to drbd_make_request
866                    we sleep in inc_ap_bio() */
867                 ret = 1;
868                 spin_unlock_irq(&mdev->tconn->req_lock);
869                 goto fail_free_complete;
870         }
871
872         if (remote || send_oos) {
873                 remote = drbd_should_do_remote(mdev->state);
874                 send_oos = rw == WRITE && drbd_should_send_out_of_sync(mdev->state);
875                 D_ASSERT(!(remote && send_oos));
876
877                 if (!(remote || send_oos))
878                         dev_warn(DEV, "lost connection while grabbing the req_lock!\n");
879                 if (!(local || remote)) {
880                         dev_err(DEV, "IO ERROR: neither local nor remote disk\n");
881                         spin_unlock_irq(&mdev->tconn->req_lock);
882                         err = -EIO;
883                         goto fail_free_complete;
884                 }
885         }
886
887         if (b && mdev->tconn->unused_spare_tle == NULL) {
888                 mdev->tconn->unused_spare_tle = b;
889                 b = NULL;
890         }
891         if (rw == WRITE && (remote || send_oos) &&
892             mdev->tconn->unused_spare_tle == NULL &&
893             test_bit(CREATE_BARRIER, &mdev->flags)) {
894                 /* someone closed the current epoch
895                  * while we were grabbing the spinlock */
896                 spin_unlock_irq(&mdev->tconn->req_lock);
897                 goto allocate_barrier;
898         }
899
900
901         /* Update disk stats */
902         _drbd_start_io_acct(mdev, req, bio);
903
904         /* _maybe_start_new_epoch(mdev);
905          * If we need to generate a write barrier packet, we have to add the
906          * new epoch (barrier) object, and queue the barrier packet for sending,
907          * and queue the req's data after it _within the same lock_, otherwise
908          * we have race conditions were the reorder domains could be mixed up.
909          *
910          * Even read requests may start a new epoch and queue the corresponding
911          * barrier packet.  To get the write ordering right, we only have to
912          * make sure that, if this is a write request and it triggered a
913          * barrier packet, this request is queued within the same spinlock. */
914         if ((remote || send_oos) && mdev->tconn->unused_spare_tle &&
915             test_and_clear_bit(CREATE_BARRIER, &mdev->flags)) {
916                 _tl_add_barrier(mdev->tconn, mdev->tconn->unused_spare_tle);
917                 mdev->tconn->unused_spare_tle = NULL;
918         } else {
919                 D_ASSERT(!(remote && rw == WRITE &&
920                            test_bit(CREATE_BARRIER, &mdev->flags)));
921         }
922
923         /* NOTE
924          * Actually, 'local' may be wrong here already, since we may have failed
925          * to write to the meta data, and may become wrong anytime because of
926          * local io-error for some other request, which would lead to us
927          * "detaching" the local disk.
928          *
929          * 'remote' may become wrong any time because the network could fail.
930          *
931          * This is a harmless race condition, though, since it is handled
932          * correctly at the appropriate places; so it just defers the failure
933          * of the respective operation.
934          */
935
936         /* mark them early for readability.
937          * this just sets some state flags. */
938         if (remote)
939                 _req_mod(req, TO_BE_SENT);
940         if (local)
941                 _req_mod(req, TO_BE_SUBMITTED);
942
943         list_add_tail(&req->tl_requests, &mdev->tconn->newest_tle->requests);
944
945         /* NOTE remote first: to get the concurrent write detection right,
946          * we must register the request before start of local IO.  */
947         if (remote) {
948                 /* either WRITE and C_CONNECTED,
949                  * or READ, and no local disk,
950                  * or READ, but not in sync.
951                  */
952                 _req_mod(req, (rw == WRITE)
953                                 ? QUEUE_FOR_NET_WRITE
954                                 : QUEUE_FOR_NET_READ);
955         }
956         if (send_oos && drbd_set_out_of_sync(mdev, sector, size))
957                 _req_mod(req, QUEUE_FOR_SEND_OOS);
958
959         rcu_read_lock();
960         nc = rcu_dereference(mdev->tconn->net_conf);
961         if (remote &&
962             nc->on_congestion != OC_BLOCK && mdev->tconn->agreed_pro_version >= 96) {
963                 int congested = 0;
964
965                 if (nc->cong_fill &&
966                     atomic_read(&mdev->ap_in_flight) >= nc->cong_fill) {
967                         dev_info(DEV, "Congestion-fill threshold reached\n");
968                         congested = 1;
969                 }
970
971                 if (mdev->act_log->used >= nc->cong_extents) {
972                         dev_info(DEV, "Congestion-extents threshold reached\n");
973                         congested = 1;
974                 }
975
976                 if (congested) {
977                         queue_barrier(mdev); /* last barrier, after mirrored writes */
978
979                         if (nc->on_congestion == OC_PULL_AHEAD)
980                                 _drbd_set_state(_NS(mdev, conn, C_AHEAD), 0, NULL);
981                         else  /*nc->on_congestion == OC_DISCONNECT */
982                                 _drbd_set_state(_NS(mdev, conn, C_DISCONNECTING), 0, NULL);
983                 }
984         }
985         rcu_read_unlock();
986
987         spin_unlock_irq(&mdev->tconn->req_lock);
988         kfree(b); /* if someone else has beaten us to it... */
989
990         if (local) {
991                 req->private_bio->bi_bdev = mdev->ldev->backing_bdev;
992
993                 /* State may have changed since we grabbed our reference on the
994                  * mdev->ldev member. Double check, and short-circuit to endio.
995                  * In case the last activity log transaction failed to get on
996                  * stable storage, and this is a WRITE, we may not even submit
997                  * this bio. */
998                 if (get_ldev(mdev)) {
999                         if (drbd_insert_fault(mdev,   rw == WRITE ? DRBD_FAULT_DT_WR
1000                                                     : rw == READ  ? DRBD_FAULT_DT_RD
1001                                                     :               DRBD_FAULT_DT_RA))
1002                                 bio_endio(req->private_bio, -EIO);
1003                         else
1004                                 generic_make_request(req->private_bio);
1005                         put_ldev(mdev);
1006                 } else
1007                         bio_endio(req->private_bio, -EIO);
1008         }
1009
1010         return 0;
1011
1012 fail_free_complete:
1013         if (req->rq_state & RQ_IN_ACT_LOG)
1014                 drbd_al_complete_io(mdev, &req->i);
1015 fail_and_free_req:
1016         if (local) {
1017                 bio_put(req->private_bio);
1018                 req->private_bio = NULL;
1019                 put_ldev(mdev);
1020         }
1021         if (!ret)
1022                 bio_endio(bio, err);
1023
1024         drbd_req_free(req);
1025         dec_ap_bio(mdev);
1026         kfree(b);
1027
1028         return ret;
1029 }
1030
1031 int drbd_make_request(struct request_queue *q, struct bio *bio)
1032 {
1033         struct drbd_conf *mdev = (struct drbd_conf *) q->queuedata;
1034         unsigned long start_time;
1035
1036         start_time = jiffies;
1037
1038         /*
1039          * what we "blindly" assume:
1040          */
1041         D_ASSERT(bio->bi_size > 0);
1042         D_ASSERT(IS_ALIGNED(bio->bi_size, 512));
1043
1044         inc_ap_bio(mdev);
1045         return __drbd_make_request(mdev, bio, start_time);
1046 }
1047
1048 /* This is called by bio_add_page().
1049  *
1050  * q->max_hw_sectors and other global limits are already enforced there.
1051  *
1052  * We need to call down to our lower level device,
1053  * in case it has special restrictions.
1054  *
1055  * We also may need to enforce configured max-bio-bvecs limits.
1056  *
1057  * As long as the BIO is empty we have to allow at least one bvec,
1058  * regardless of size and offset, so no need to ask lower levels.
1059  */
1060 int drbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bvm, struct bio_vec *bvec)
1061 {
1062         struct drbd_conf *mdev = (struct drbd_conf *) q->queuedata;
1063         unsigned int bio_size = bvm->bi_size;
1064         int limit = DRBD_MAX_BIO_SIZE;
1065         int backing_limit;
1066
1067         if (bio_size && get_ldev(mdev)) {
1068                 struct request_queue * const b =
1069                         mdev->ldev->backing_bdev->bd_disk->queue;
1070                 if (b->merge_bvec_fn) {
1071                         backing_limit = b->merge_bvec_fn(b, bvm, bvec);
1072                         limit = min(limit, backing_limit);
1073                 }
1074                 put_ldev(mdev);
1075         }
1076         return limit;
1077 }
1078
1079 void request_timer_fn(unsigned long data)
1080 {
1081         struct drbd_conf *mdev = (struct drbd_conf *) data;
1082         struct drbd_tconn *tconn = mdev->tconn;
1083         struct drbd_request *req; /* oldest request */
1084         struct list_head *le;
1085         struct net_conf *nc;
1086         unsigned long ent = 0, dt = 0, et; /* effective timeout = ko_count * timeout */
1087
1088         rcu_read_lock();
1089         nc = rcu_dereference(tconn->net_conf);
1090         ent = nc ? nc->timeout * HZ/10 * nc->ko_count : 0;
1091
1092         if (get_ldev(mdev)) {
1093                 dt = rcu_dereference(mdev->ldev->disk_conf)->disk_timeout * HZ / 10;
1094                 put_ldev(mdev);
1095         }
1096         rcu_read_unlock();
1097
1098         et = min_not_zero(dt, ent);
1099
1100         if (!et || (mdev->state.conn < C_WF_REPORT_PARAMS && mdev->state.disk <= D_FAILED))
1101                 return; /* Recurring timer stopped */
1102
1103         spin_lock_irq(&tconn->req_lock);
1104         le = &tconn->oldest_tle->requests;
1105         if (list_empty(le)) {
1106                 spin_unlock_irq(&tconn->req_lock);
1107                 mod_timer(&mdev->request_timer, jiffies + et);
1108                 return;
1109         }
1110
1111         le = le->prev;
1112         req = list_entry(le, struct drbd_request, tl_requests);
1113         if (ent && req->rq_state & RQ_NET_PENDING) {
1114                 if (time_is_before_eq_jiffies(req->start_time + ent)) {
1115                         dev_warn(DEV, "Remote failed to finish a request within ko-count * timeout\n");
1116                         _drbd_set_state(_NS(mdev, conn, C_TIMEOUT), CS_VERBOSE | CS_HARD, NULL);
1117                 }
1118         }
1119         if (dt && req->rq_state & RQ_LOCAL_PENDING) {
1120                 if (time_is_before_eq_jiffies(req->start_time + dt)) {
1121                         dev_warn(DEV, "Local backing device failed to meet the disk-timeout\n");
1122                         __drbd_chk_io_error(mdev, 1);
1123                 }
1124         }
1125         spin_unlock_irq(&tconn->req_lock);
1126         mod_timer(&mdev->request_timer, req->start_time + et);
1127 }