34a6065d95e699dad0ed8879c7dabd6daf4f0af3
[platform/adaptation/renesas_rcar/renesas_kernel.git] / drivers / block / drbd / drbd_worker.c
1 /*
2    drbd_worker.c
3
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10    drbd is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2, or (at your option)
13    any later version.
14
15    drbd is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19
20    You should have received a copy of the GNU General Public License
21    along with drbd; see the file COPYING.  If not, write to
22    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23
24  */
25
26 #include <linux/module.h>
27 #include <linux/drbd.h>
28 #include <linux/sched.h>
29 #include <linux/wait.h>
30 #include <linux/mm.h>
31 #include <linux/memcontrol.h>
32 #include <linux/mm_inline.h>
33 #include <linux/slab.h>
34 #include <linux/random.h>
35 #include <linux/string.h>
36 #include <linux/scatterlist.h>
37
38 #include "drbd_int.h"
39 #include "drbd_req.h"
40
41 static int w_make_ov_request(struct drbd_work *w, int cancel);
42
43
44 /* endio handlers:
45  *   drbd_md_io_complete (defined here)
46  *   drbd_request_endio (defined here)
47  *   drbd_peer_request_endio (defined here)
48  *   bm_async_io_complete (defined in drbd_bitmap.c)
49  *
50  * For all these callbacks, note the following:
51  * The callbacks will be called in irq context by the IDE drivers,
52  * and in Softirqs/Tasklets/BH context by the SCSI drivers.
53  * Try to get the locking right :)
54  *
55  */
56
57
58 /* About the global_state_lock
59    Each state transition on an device holds a read lock. In case we have
60    to evaluate the resync after dependencies, we grab a write lock, because
61    we need stable states on all devices for that.  */
62 rwlock_t global_state_lock;
63
64 /* used for synchronous meta data and bitmap IO
65  * submitted by drbd_md_sync_page_io()
66  */
67 void drbd_md_io_complete(struct bio *bio, int error)
68 {
69         struct drbd_md_io *md_io;
70         struct drbd_conf *mdev;
71
72         md_io = (struct drbd_md_io *)bio->bi_private;
73         mdev = container_of(md_io, struct drbd_conf, md_io);
74
75         md_io->error = error;
76
77         md_io->done = 1;
78         wake_up(&mdev->misc_wait);
79         bio_put(bio);
80         drbd_md_put_buffer(mdev);
81         put_ldev(mdev);
82 }
83
84 /* reads on behalf of the partner,
85  * "submitted" by the receiver
86  */
87 void drbd_endio_read_sec_final(struct drbd_peer_request *peer_req) __releases(local)
88 {
89         unsigned long flags = 0;
90         struct drbd_conf *mdev = peer_req->w.mdev;
91
92         spin_lock_irqsave(&mdev->tconn->req_lock, flags);
93         mdev->read_cnt += peer_req->i.size >> 9;
94         list_del(&peer_req->w.list);
95         if (list_empty(&mdev->read_ee))
96                 wake_up(&mdev->ee_wait);
97         if (test_bit(__EE_WAS_ERROR, &peer_req->flags))
98                 __drbd_chk_io_error(mdev, false);
99         spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
100
101         drbd_queue_work(&mdev->tconn->data.work, &peer_req->w);
102         put_ldev(mdev);
103 }
104
105 /* writes on behalf of the partner, or resync writes,
106  * "submitted" by the receiver, final stage.  */
107 static void drbd_endio_write_sec_final(struct drbd_peer_request *peer_req) __releases(local)
108 {
109         unsigned long flags = 0;
110         struct drbd_conf *mdev = peer_req->w.mdev;
111         struct drbd_interval i;
112         int do_wake;
113         u64 block_id;
114         int do_al_complete_io;
115
116         /* after we moved peer_req to done_ee,
117          * we may no longer access it,
118          * it may be freed/reused already!
119          * (as soon as we release the req_lock) */
120         i = peer_req->i;
121         do_al_complete_io = peer_req->flags & EE_CALL_AL_COMPLETE_IO;
122         block_id = peer_req->block_id;
123
124         spin_lock_irqsave(&mdev->tconn->req_lock, flags);
125         mdev->writ_cnt += peer_req->i.size >> 9;
126         list_del(&peer_req->w.list); /* has been on active_ee or sync_ee */
127         list_add_tail(&peer_req->w.list, &mdev->done_ee);
128
129         /*
130          * Do not remove from the write_requests tree here: we did not send the
131          * Ack yet and did not wake possibly waiting conflicting requests.
132          * Removed from the tree from "drbd_process_done_ee" within the
133          * appropriate w.cb (e_end_block/e_end_resync_block) or from
134          * _drbd_clear_done_ee.
135          */
136
137         do_wake = list_empty(block_id == ID_SYNCER ? &mdev->sync_ee : &mdev->active_ee);
138
139         if (test_bit(__EE_WAS_ERROR, &peer_req->flags))
140                 __drbd_chk_io_error(mdev, false);
141         spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
142
143         if (block_id == ID_SYNCER)
144                 drbd_rs_complete_io(mdev, i.sector);
145
146         if (do_wake)
147                 wake_up(&mdev->ee_wait);
148
149         if (do_al_complete_io)
150                 drbd_al_complete_io(mdev, &i);
151
152         wake_asender(mdev->tconn);
153         put_ldev(mdev);
154 }
155
156 /* writes on behalf of the partner, or resync writes,
157  * "submitted" by the receiver.
158  */
159 void drbd_peer_request_endio(struct bio *bio, int error)
160 {
161         struct drbd_peer_request *peer_req = bio->bi_private;
162         struct drbd_conf *mdev = peer_req->w.mdev;
163         int uptodate = bio_flagged(bio, BIO_UPTODATE);
164         int is_write = bio_data_dir(bio) == WRITE;
165
166         if (error && __ratelimit(&drbd_ratelimit_state))
167                 dev_warn(DEV, "%s: error=%d s=%llus\n",
168                                 is_write ? "write" : "read", error,
169                                 (unsigned long long)peer_req->i.sector);
170         if (!error && !uptodate) {
171                 if (__ratelimit(&drbd_ratelimit_state))
172                         dev_warn(DEV, "%s: setting error to -EIO s=%llus\n",
173                                         is_write ? "write" : "read",
174                                         (unsigned long long)peer_req->i.sector);
175                 /* strange behavior of some lower level drivers...
176                  * fail the request by clearing the uptodate flag,
177                  * but do not return any error?! */
178                 error = -EIO;
179         }
180
181         if (error)
182                 set_bit(__EE_WAS_ERROR, &peer_req->flags);
183
184         bio_put(bio); /* no need for the bio anymore */
185         if (atomic_dec_and_test(&peer_req->pending_bios)) {
186                 if (is_write)
187                         drbd_endio_write_sec_final(peer_req);
188                 else
189                         drbd_endio_read_sec_final(peer_req);
190         }
191 }
192
193 /* read, readA or write requests on R_PRIMARY coming from drbd_make_request
194  */
195 void drbd_request_endio(struct bio *bio, int error)
196 {
197         unsigned long flags;
198         struct drbd_request *req = bio->bi_private;
199         struct drbd_conf *mdev = req->w.mdev;
200         struct bio_and_error m;
201         enum drbd_req_event what;
202         int uptodate = bio_flagged(bio, BIO_UPTODATE);
203
204         if (!error && !uptodate) {
205                 dev_warn(DEV, "p %s: setting error to -EIO\n",
206                          bio_data_dir(bio) == WRITE ? "write" : "read");
207                 /* strange behavior of some lower level drivers...
208                  * fail the request by clearing the uptodate flag,
209                  * but do not return any error?! */
210                 error = -EIO;
211         }
212
213         /* to avoid recursion in __req_mod */
214         if (unlikely(error)) {
215                 what = (bio_data_dir(bio) == WRITE)
216                         ? WRITE_COMPLETED_WITH_ERROR
217                         : (bio_rw(bio) == READ)
218                           ? READ_COMPLETED_WITH_ERROR
219                           : READ_AHEAD_COMPLETED_WITH_ERROR;
220         } else
221                 what = COMPLETED_OK;
222
223         bio_put(req->private_bio);
224         req->private_bio = ERR_PTR(error);
225
226         /* not req_mod(), we need irqsave here! */
227         spin_lock_irqsave(&mdev->tconn->req_lock, flags);
228         __req_mod(req, what, &m);
229         spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
230
231         if (m.bio)
232                 complete_master_bio(mdev, &m);
233 }
234
235 int w_read_retry_remote(struct drbd_work *w, int cancel)
236 {
237         struct drbd_request *req = container_of(w, struct drbd_request, w);
238         struct drbd_conf *mdev = w->mdev;
239
240         /* We should not detach for read io-error,
241          * but try to WRITE the P_DATA_REPLY to the failed location,
242          * to give the disk the chance to relocate that block */
243
244         spin_lock_irq(&mdev->tconn->req_lock);
245         if (cancel || mdev->state.pdsk != D_UP_TO_DATE) {
246                 _req_mod(req, READ_RETRY_REMOTE_CANCELED);
247                 spin_unlock_irq(&mdev->tconn->req_lock);
248                 return 0;
249         }
250         spin_unlock_irq(&mdev->tconn->req_lock);
251
252         return w_send_read_req(w, 0);
253 }
254
255 void drbd_csum_ee(struct drbd_conf *mdev, struct crypto_hash *tfm,
256                   struct drbd_peer_request *peer_req, void *digest)
257 {
258         struct hash_desc desc;
259         struct scatterlist sg;
260         struct page *page = peer_req->pages;
261         struct page *tmp;
262         unsigned len;
263
264         desc.tfm = tfm;
265         desc.flags = 0;
266
267         sg_init_table(&sg, 1);
268         crypto_hash_init(&desc);
269
270         while ((tmp = page_chain_next(page))) {
271                 /* all but the last page will be fully used */
272                 sg_set_page(&sg, page, PAGE_SIZE, 0);
273                 crypto_hash_update(&desc, &sg, sg.length);
274                 page = tmp;
275         }
276         /* and now the last, possibly only partially used page */
277         len = peer_req->i.size & (PAGE_SIZE - 1);
278         sg_set_page(&sg, page, len ?: PAGE_SIZE, 0);
279         crypto_hash_update(&desc, &sg, sg.length);
280         crypto_hash_final(&desc, digest);
281 }
282
283 void drbd_csum_bio(struct drbd_conf *mdev, struct crypto_hash *tfm, struct bio *bio, void *digest)
284 {
285         struct hash_desc desc;
286         struct scatterlist sg;
287         struct bio_vec *bvec;
288         int i;
289
290         desc.tfm = tfm;
291         desc.flags = 0;
292
293         sg_init_table(&sg, 1);
294         crypto_hash_init(&desc);
295
296         __bio_for_each_segment(bvec, bio, i, 0) {
297                 sg_set_page(&sg, bvec->bv_page, bvec->bv_len, bvec->bv_offset);
298                 crypto_hash_update(&desc, &sg, sg.length);
299         }
300         crypto_hash_final(&desc, digest);
301 }
302
303 /* MAYBE merge common code with w_e_end_ov_req */
304 static int w_e_send_csum(struct drbd_work *w, int cancel)
305 {
306         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
307         struct drbd_conf *mdev = w->mdev;
308         int digest_size;
309         void *digest;
310         int err = 0;
311
312         if (unlikely(cancel))
313                 goto out;
314
315         if (unlikely((peer_req->flags & EE_WAS_ERROR) != 0))
316                 goto out;
317
318         digest_size = crypto_hash_digestsize(mdev->tconn->csums_tfm);
319         digest = kmalloc(digest_size, GFP_NOIO);
320         if (digest) {
321                 sector_t sector = peer_req->i.sector;
322                 unsigned int size = peer_req->i.size;
323                 drbd_csum_ee(mdev, mdev->tconn->csums_tfm, peer_req, digest);
324                 /* Free peer_req and pages before send.
325                  * In case we block on congestion, we could otherwise run into
326                  * some distributed deadlock, if the other side blocks on
327                  * congestion as well, because our receiver blocks in
328                  * drbd_alloc_pages due to pp_in_use > max_buffers. */
329                 drbd_free_peer_req(mdev, peer_req);
330                 peer_req = NULL;
331                 inc_rs_pending(mdev);
332                 err = drbd_send_drequest_csum(mdev, sector, size,
333                                               digest, digest_size,
334                                               P_CSUM_RS_REQUEST);
335                 kfree(digest);
336         } else {
337                 dev_err(DEV, "kmalloc() of digest failed.\n");
338                 err = -ENOMEM;
339         }
340
341 out:
342         if (peer_req)
343                 drbd_free_peer_req(mdev, peer_req);
344
345         if (unlikely(err))
346                 dev_err(DEV, "drbd_send_drequest(..., csum) failed\n");
347         return err;
348 }
349
350 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
351
352 static int read_for_csum(struct drbd_conf *mdev, sector_t sector, int size)
353 {
354         struct drbd_peer_request *peer_req;
355
356         if (!get_ldev(mdev))
357                 return -EIO;
358
359         if (drbd_rs_should_slow_down(mdev, sector))
360                 goto defer;
361
362         /* GFP_TRY, because if there is no memory available right now, this may
363          * be rescheduled for later. It is "only" background resync, after all. */
364         peer_req = drbd_alloc_peer_req(mdev, ID_SYNCER /* unused */, sector,
365                                        size, GFP_TRY);
366         if (!peer_req)
367                 goto defer;
368
369         peer_req->w.cb = w_e_send_csum;
370         spin_lock_irq(&mdev->tconn->req_lock);
371         list_add(&peer_req->w.list, &mdev->read_ee);
372         spin_unlock_irq(&mdev->tconn->req_lock);
373
374         atomic_add(size >> 9, &mdev->rs_sect_ev);
375         if (drbd_submit_peer_request(mdev, peer_req, READ, DRBD_FAULT_RS_RD) == 0)
376                 return 0;
377
378         /* If it failed because of ENOMEM, retry should help.  If it failed
379          * because bio_add_page failed (probably broken lower level driver),
380          * retry may or may not help.
381          * If it does not, you may need to force disconnect. */
382         spin_lock_irq(&mdev->tconn->req_lock);
383         list_del(&peer_req->w.list);
384         spin_unlock_irq(&mdev->tconn->req_lock);
385
386         drbd_free_peer_req(mdev, peer_req);
387 defer:
388         put_ldev(mdev);
389         return -EAGAIN;
390 }
391
392 int w_resync_timer(struct drbd_work *w, int cancel)
393 {
394         struct drbd_conf *mdev = w->mdev;
395         switch (mdev->state.conn) {
396         case C_VERIFY_S:
397                 w_make_ov_request(w, cancel);
398                 break;
399         case C_SYNC_TARGET:
400                 w_make_resync_request(w, cancel);
401                 break;
402         }
403
404         return 0;
405 }
406
407 void resync_timer_fn(unsigned long data)
408 {
409         struct drbd_conf *mdev = (struct drbd_conf *) data;
410
411         if (list_empty(&mdev->resync_work.list))
412                 drbd_queue_work(&mdev->tconn->data.work, &mdev->resync_work);
413 }
414
415 static void fifo_set(struct fifo_buffer *fb, int value)
416 {
417         int i;
418
419         for (i = 0; i < fb->size; i++)
420                 fb->values[i] = value;
421 }
422
423 static int fifo_push(struct fifo_buffer *fb, int value)
424 {
425         int ov;
426
427         ov = fb->values[fb->head_index];
428         fb->values[fb->head_index++] = value;
429
430         if (fb->head_index >= fb->size)
431                 fb->head_index = 0;
432
433         return ov;
434 }
435
436 static void fifo_add_val(struct fifo_buffer *fb, int value)
437 {
438         int i;
439
440         for (i = 0; i < fb->size; i++)
441                 fb->values[i] += value;
442 }
443
444 struct fifo_buffer *fifo_alloc(int fifo_size)
445 {
446         struct fifo_buffer *fb;
447
448         fb = kzalloc(sizeof(struct fifo_buffer) + sizeof(int) * fifo_size, GFP_KERNEL);
449         if (!fb)
450                 return NULL;
451
452         fb->head_index = 0;
453         fb->size = fifo_size;
454         fb->total = 0;
455
456         return fb;
457 }
458
459 static int drbd_rs_controller(struct drbd_conf *mdev)
460 {
461         struct disk_conf *dc;
462         unsigned int sect_in;  /* Number of sectors that came in since the last turn */
463         unsigned int want;     /* The number of sectors we want in the proxy */
464         int req_sect; /* Number of sectors to request in this turn */
465         int correction; /* Number of sectors more we need in the proxy*/
466         int cps; /* correction per invocation of drbd_rs_controller() */
467         int steps; /* Number of time steps to plan ahead */
468         int curr_corr;
469         int max_sect;
470         struct fifo_buffer *plan;
471
472         sect_in = atomic_xchg(&mdev->rs_sect_in, 0); /* Number of sectors that came in */
473         mdev->rs_in_flight -= sect_in;
474
475         dc = rcu_dereference(mdev->ldev->disk_conf);
476         plan = rcu_dereference(mdev->rs_plan_s);
477
478         steps = plan->size; /* (dc->c_plan_ahead * 10 * SLEEP_TIME) / HZ; */
479
480         if (mdev->rs_in_flight + sect_in == 0) { /* At start of resync */
481                 want = ((dc->resync_rate * 2 * SLEEP_TIME) / HZ) * steps;
482         } else { /* normal path */
483                 want = dc->c_fill_target ? dc->c_fill_target :
484                         sect_in * dc->c_delay_target * HZ / (SLEEP_TIME * 10);
485         }
486
487         correction = want - mdev->rs_in_flight - plan->total;
488
489         /* Plan ahead */
490         cps = correction / steps;
491         fifo_add_val(plan, cps);
492         plan->total += cps * steps;
493
494         /* What we do in this step */
495         curr_corr = fifo_push(plan, 0);
496         plan->total -= curr_corr;
497
498         req_sect = sect_in + curr_corr;
499         if (req_sect < 0)
500                 req_sect = 0;
501
502         max_sect = (dc->c_max_rate * 2 * SLEEP_TIME) / HZ;
503         if (req_sect > max_sect)
504                 req_sect = max_sect;
505
506         /*
507         dev_warn(DEV, "si=%u if=%d wa=%u co=%d st=%d cps=%d pl=%d cc=%d rs=%d\n",
508                  sect_in, mdev->rs_in_flight, want, correction,
509                  steps, cps, mdev->rs_planed, curr_corr, req_sect);
510         */
511
512         return req_sect;
513 }
514
515 static int drbd_rs_number_requests(struct drbd_conf *mdev)
516 {
517         int number;
518
519         rcu_read_lock();
520         if (rcu_dereference(mdev->rs_plan_s)->size) {
521                 number = drbd_rs_controller(mdev) >> (BM_BLOCK_SHIFT - 9);
522                 mdev->c_sync_rate = number * HZ * (BM_BLOCK_SIZE / 1024) / SLEEP_TIME;
523         } else {
524                 mdev->c_sync_rate = rcu_dereference(mdev->ldev->disk_conf)->resync_rate;
525                 number = SLEEP_TIME * mdev->c_sync_rate  / ((BM_BLOCK_SIZE / 1024) * HZ);
526         }
527         rcu_read_unlock();
528
529         /* ignore the amount of pending requests, the resync controller should
530          * throttle down to incoming reply rate soon enough anyways. */
531         return number;
532 }
533
534 int w_make_resync_request(struct drbd_work *w, int cancel)
535 {
536         struct drbd_conf *mdev = w->mdev;
537         unsigned long bit;
538         sector_t sector;
539         const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
540         int max_bio_size;
541         int number, rollback_i, size;
542         int align, queued, sndbuf;
543         int i = 0;
544
545         if (unlikely(cancel))
546                 return 0;
547
548         if (mdev->rs_total == 0) {
549                 /* empty resync? */
550                 drbd_resync_finished(mdev);
551                 return 0;
552         }
553
554         if (!get_ldev(mdev)) {
555                 /* Since we only need to access mdev->rsync a
556                    get_ldev_if_state(mdev,D_FAILED) would be sufficient, but
557                    to continue resync with a broken disk makes no sense at
558                    all */
559                 dev_err(DEV, "Disk broke down during resync!\n");
560                 return 0;
561         }
562
563         max_bio_size = queue_max_hw_sectors(mdev->rq_queue) << 9;
564         number = drbd_rs_number_requests(mdev);
565         if (number == 0)
566                 goto requeue;
567
568         for (i = 0; i < number; i++) {
569                 /* Stop generating RS requests, when half of the send buffer is filled */
570                 mutex_lock(&mdev->tconn->data.mutex);
571                 if (mdev->tconn->data.socket) {
572                         queued = mdev->tconn->data.socket->sk->sk_wmem_queued;
573                         sndbuf = mdev->tconn->data.socket->sk->sk_sndbuf;
574                 } else {
575                         queued = 1;
576                         sndbuf = 0;
577                 }
578                 mutex_unlock(&mdev->tconn->data.mutex);
579                 if (queued > sndbuf / 2)
580                         goto requeue;
581
582 next_sector:
583                 size = BM_BLOCK_SIZE;
584                 bit  = drbd_bm_find_next(mdev, mdev->bm_resync_fo);
585
586                 if (bit == DRBD_END_OF_BITMAP) {
587                         mdev->bm_resync_fo = drbd_bm_bits(mdev);
588                         put_ldev(mdev);
589                         return 0;
590                 }
591
592                 sector = BM_BIT_TO_SECT(bit);
593
594                 if (drbd_rs_should_slow_down(mdev, sector) ||
595                     drbd_try_rs_begin_io(mdev, sector)) {
596                         mdev->bm_resync_fo = bit;
597                         goto requeue;
598                 }
599                 mdev->bm_resync_fo = bit + 1;
600
601                 if (unlikely(drbd_bm_test_bit(mdev, bit) == 0)) {
602                         drbd_rs_complete_io(mdev, sector);
603                         goto next_sector;
604                 }
605
606 #if DRBD_MAX_BIO_SIZE > BM_BLOCK_SIZE
607                 /* try to find some adjacent bits.
608                  * we stop if we have already the maximum req size.
609                  *
610                  * Additionally always align bigger requests, in order to
611                  * be prepared for all stripe sizes of software RAIDs.
612                  */
613                 align = 1;
614                 rollback_i = i;
615                 for (;;) {
616                         if (size + BM_BLOCK_SIZE > max_bio_size)
617                                 break;
618
619                         /* Be always aligned */
620                         if (sector & ((1<<(align+3))-1))
621                                 break;
622
623                         /* do not cross extent boundaries */
624                         if (((bit+1) & BM_BLOCKS_PER_BM_EXT_MASK) == 0)
625                                 break;
626                         /* now, is it actually dirty, after all?
627                          * caution, drbd_bm_test_bit is tri-state for some
628                          * obscure reason; ( b == 0 ) would get the out-of-band
629                          * only accidentally right because of the "oddly sized"
630                          * adjustment below */
631                         if (drbd_bm_test_bit(mdev, bit+1) != 1)
632                                 break;
633                         bit++;
634                         size += BM_BLOCK_SIZE;
635                         if ((BM_BLOCK_SIZE << align) <= size)
636                                 align++;
637                         i++;
638                 }
639                 /* if we merged some,
640                  * reset the offset to start the next drbd_bm_find_next from */
641                 if (size > BM_BLOCK_SIZE)
642                         mdev->bm_resync_fo = bit + 1;
643 #endif
644
645                 /* adjust very last sectors, in case we are oddly sized */
646                 if (sector + (size>>9) > capacity)
647                         size = (capacity-sector)<<9;
648                 if (mdev->tconn->agreed_pro_version >= 89 && mdev->tconn->csums_tfm) {
649                         switch (read_for_csum(mdev, sector, size)) {
650                         case -EIO: /* Disk failure */
651                                 put_ldev(mdev);
652                                 return -EIO;
653                         case -EAGAIN: /* allocation failed, or ldev busy */
654                                 drbd_rs_complete_io(mdev, sector);
655                                 mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
656                                 i = rollback_i;
657                                 goto requeue;
658                         case 0:
659                                 /* everything ok */
660                                 break;
661                         default:
662                                 BUG();
663                         }
664                 } else {
665                         int err;
666
667                         inc_rs_pending(mdev);
668                         err = drbd_send_drequest(mdev, P_RS_DATA_REQUEST,
669                                                  sector, size, ID_SYNCER);
670                         if (err) {
671                                 dev_err(DEV, "drbd_send_drequest() failed, aborting...\n");
672                                 dec_rs_pending(mdev);
673                                 put_ldev(mdev);
674                                 return err;
675                         }
676                 }
677         }
678
679         if (mdev->bm_resync_fo >= drbd_bm_bits(mdev)) {
680                 /* last syncer _request_ was sent,
681                  * but the P_RS_DATA_REPLY not yet received.  sync will end (and
682                  * next sync group will resume), as soon as we receive the last
683                  * resync data block, and the last bit is cleared.
684                  * until then resync "work" is "inactive" ...
685                  */
686                 put_ldev(mdev);
687                 return 0;
688         }
689
690  requeue:
691         mdev->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
692         mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
693         put_ldev(mdev);
694         return 0;
695 }
696
697 static int w_make_ov_request(struct drbd_work *w, int cancel)
698 {
699         struct drbd_conf *mdev = w->mdev;
700         int number, i, size;
701         sector_t sector;
702         const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
703
704         if (unlikely(cancel))
705                 return 1;
706
707         number = drbd_rs_number_requests(mdev);
708
709         sector = mdev->ov_position;
710         for (i = 0; i < number; i++) {
711                 if (sector >= capacity) {
712                         return 1;
713                 }
714
715                 size = BM_BLOCK_SIZE;
716
717                 if (drbd_rs_should_slow_down(mdev, sector) ||
718                     drbd_try_rs_begin_io(mdev, sector)) {
719                         mdev->ov_position = sector;
720                         goto requeue;
721                 }
722
723                 if (sector + (size>>9) > capacity)
724                         size = (capacity-sector)<<9;
725
726                 inc_rs_pending(mdev);
727                 if (drbd_send_ov_request(mdev, sector, size)) {
728                         dec_rs_pending(mdev);
729                         return 0;
730                 }
731                 sector += BM_SECT_PER_BIT;
732         }
733         mdev->ov_position = sector;
734
735  requeue:
736         mdev->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
737         mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
738         return 1;
739 }
740
741 int w_ov_finished(struct drbd_work *w, int cancel)
742 {
743         struct drbd_conf *mdev = w->mdev;
744         kfree(w);
745         ov_out_of_sync_print(mdev);
746         drbd_resync_finished(mdev);
747
748         return 0;
749 }
750
751 static int w_resync_finished(struct drbd_work *w, int cancel)
752 {
753         struct drbd_conf *mdev = w->mdev;
754         kfree(w);
755
756         drbd_resync_finished(mdev);
757
758         return 0;
759 }
760
761 static void ping_peer(struct drbd_conf *mdev)
762 {
763         struct drbd_tconn *tconn = mdev->tconn;
764
765         clear_bit(GOT_PING_ACK, &tconn->flags);
766         request_ping(tconn);
767         wait_event(tconn->ping_wait,
768                    test_bit(GOT_PING_ACK, &tconn->flags) || mdev->state.conn < C_CONNECTED);
769 }
770
771 int drbd_resync_finished(struct drbd_conf *mdev)
772 {
773         unsigned long db, dt, dbdt;
774         unsigned long n_oos;
775         union drbd_state os, ns;
776         struct drbd_work *w;
777         char *khelper_cmd = NULL;
778         int verify_done = 0;
779
780         /* Remove all elements from the resync LRU. Since future actions
781          * might set bits in the (main) bitmap, then the entries in the
782          * resync LRU would be wrong. */
783         if (drbd_rs_del_all(mdev)) {
784                 /* In case this is not possible now, most probably because
785                  * there are P_RS_DATA_REPLY Packets lingering on the worker's
786                  * queue (or even the read operations for those packets
787                  * is not finished by now).   Retry in 100ms. */
788
789                 schedule_timeout_interruptible(HZ / 10);
790                 w = kmalloc(sizeof(struct drbd_work), GFP_ATOMIC);
791                 if (w) {
792                         w->cb = w_resync_finished;
793                         w->mdev = mdev;
794                         drbd_queue_work(&mdev->tconn->data.work, w);
795                         return 1;
796                 }
797                 dev_err(DEV, "Warn failed to drbd_rs_del_all() and to kmalloc(w).\n");
798         }
799
800         dt = (jiffies - mdev->rs_start - mdev->rs_paused) / HZ;
801         if (dt <= 0)
802                 dt = 1;
803         db = mdev->rs_total;
804         dbdt = Bit2KB(db/dt);
805         mdev->rs_paused /= HZ;
806
807         if (!get_ldev(mdev))
808                 goto out;
809
810         ping_peer(mdev);
811
812         spin_lock_irq(&mdev->tconn->req_lock);
813         os = drbd_read_state(mdev);
814
815         verify_done = (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T);
816
817         /* This protects us against multiple calls (that can happen in the presence
818            of application IO), and against connectivity loss just before we arrive here. */
819         if (os.conn <= C_CONNECTED)
820                 goto out_unlock;
821
822         ns = os;
823         ns.conn = C_CONNECTED;
824
825         dev_info(DEV, "%s done (total %lu sec; paused %lu sec; %lu K/sec)\n",
826              verify_done ? "Online verify " : "Resync",
827              dt + mdev->rs_paused, mdev->rs_paused, dbdt);
828
829         n_oos = drbd_bm_total_weight(mdev);
830
831         if (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) {
832                 if (n_oos) {
833                         dev_alert(DEV, "Online verify found %lu %dk block out of sync!\n",
834                               n_oos, Bit2KB(1));
835                         khelper_cmd = "out-of-sync";
836                 }
837         } else {
838                 D_ASSERT((n_oos - mdev->rs_failed) == 0);
839
840                 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T)
841                         khelper_cmd = "after-resync-target";
842
843                 if (mdev->tconn->csums_tfm && mdev->rs_total) {
844                         const unsigned long s = mdev->rs_same_csum;
845                         const unsigned long t = mdev->rs_total;
846                         const int ratio =
847                                 (t == 0)     ? 0 :
848                         (t < 100000) ? ((s*100)/t) : (s/(t/100));
849                         dev_info(DEV, "%u %% had equal checksums, eliminated: %luK; "
850                              "transferred %luK total %luK\n",
851                              ratio,
852                              Bit2KB(mdev->rs_same_csum),
853                              Bit2KB(mdev->rs_total - mdev->rs_same_csum),
854                              Bit2KB(mdev->rs_total));
855                 }
856         }
857
858         if (mdev->rs_failed) {
859                 dev_info(DEV, "            %lu failed blocks\n", mdev->rs_failed);
860
861                 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
862                         ns.disk = D_INCONSISTENT;
863                         ns.pdsk = D_UP_TO_DATE;
864                 } else {
865                         ns.disk = D_UP_TO_DATE;
866                         ns.pdsk = D_INCONSISTENT;
867                 }
868         } else {
869                 ns.disk = D_UP_TO_DATE;
870                 ns.pdsk = D_UP_TO_DATE;
871
872                 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
873                         if (mdev->p_uuid) {
874                                 int i;
875                                 for (i = UI_BITMAP ; i <= UI_HISTORY_END ; i++)
876                                         _drbd_uuid_set(mdev, i, mdev->p_uuid[i]);
877                                 drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_CURRENT]);
878                                 _drbd_uuid_set(mdev, UI_CURRENT, mdev->p_uuid[UI_CURRENT]);
879                         } else {
880                                 dev_err(DEV, "mdev->p_uuid is NULL! BUG\n");
881                         }
882                 }
883
884                 if (!(os.conn == C_VERIFY_S || os.conn == C_VERIFY_T)) {
885                         /* for verify runs, we don't update uuids here,
886                          * so there would be nothing to report. */
887                         drbd_uuid_set_bm(mdev, 0UL);
888                         drbd_print_uuids(mdev, "updated UUIDs");
889                         if (mdev->p_uuid) {
890                                 /* Now the two UUID sets are equal, update what we
891                                  * know of the peer. */
892                                 int i;
893                                 for (i = UI_CURRENT ; i <= UI_HISTORY_END ; i++)
894                                         mdev->p_uuid[i] = mdev->ldev->md.uuid[i];
895                         }
896                 }
897         }
898
899         _drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
900 out_unlock:
901         spin_unlock_irq(&mdev->tconn->req_lock);
902         put_ldev(mdev);
903 out:
904         mdev->rs_total  = 0;
905         mdev->rs_failed = 0;
906         mdev->rs_paused = 0;
907         if (verify_done)
908                 mdev->ov_start_sector = 0;
909
910         drbd_md_sync(mdev);
911
912         if (khelper_cmd)
913                 drbd_khelper(mdev, khelper_cmd);
914
915         return 1;
916 }
917
918 /* helper */
919 static void move_to_net_ee_or_free(struct drbd_conf *mdev, struct drbd_peer_request *peer_req)
920 {
921         if (drbd_peer_req_has_active_page(peer_req)) {
922                 /* This might happen if sendpage() has not finished */
923                 int i = (peer_req->i.size + PAGE_SIZE -1) >> PAGE_SHIFT;
924                 atomic_add(i, &mdev->pp_in_use_by_net);
925                 atomic_sub(i, &mdev->pp_in_use);
926                 spin_lock_irq(&mdev->tconn->req_lock);
927                 list_add_tail(&peer_req->w.list, &mdev->net_ee);
928                 spin_unlock_irq(&mdev->tconn->req_lock);
929                 wake_up(&drbd_pp_wait);
930         } else
931                 drbd_free_peer_req(mdev, peer_req);
932 }
933
934 /**
935  * w_e_end_data_req() - Worker callback, to send a P_DATA_REPLY packet in response to a P_DATA_REQUEST
936  * @mdev:       DRBD device.
937  * @w:          work object.
938  * @cancel:     The connection will be closed anyways
939  */
940 int w_e_end_data_req(struct drbd_work *w, int cancel)
941 {
942         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
943         struct drbd_conf *mdev = w->mdev;
944         int err;
945
946         if (unlikely(cancel)) {
947                 drbd_free_peer_req(mdev, peer_req);
948                 dec_unacked(mdev);
949                 return 0;
950         }
951
952         if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
953                 err = drbd_send_block(mdev, P_DATA_REPLY, peer_req);
954         } else {
955                 if (__ratelimit(&drbd_ratelimit_state))
956                         dev_err(DEV, "Sending NegDReply. sector=%llus.\n",
957                             (unsigned long long)peer_req->i.sector);
958
959                 err = drbd_send_ack(mdev, P_NEG_DREPLY, peer_req);
960         }
961
962         dec_unacked(mdev);
963
964         move_to_net_ee_or_free(mdev, peer_req);
965
966         if (unlikely(err))
967                 dev_err(DEV, "drbd_send_block() failed\n");
968         return err;
969 }
970
971 /**
972  * w_e_end_rsdata_req() - Worker callback to send a P_RS_DATA_REPLY packet in response to a P_RS_DATA_REQUEST
973  * @mdev:       DRBD device.
974  * @w:          work object.
975  * @cancel:     The connection will be closed anyways
976  */
977 int w_e_end_rsdata_req(struct drbd_work *w, int cancel)
978 {
979         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
980         struct drbd_conf *mdev = w->mdev;
981         int err;
982
983         if (unlikely(cancel)) {
984                 drbd_free_peer_req(mdev, peer_req);
985                 dec_unacked(mdev);
986                 return 0;
987         }
988
989         if (get_ldev_if_state(mdev, D_FAILED)) {
990                 drbd_rs_complete_io(mdev, peer_req->i.sector);
991                 put_ldev(mdev);
992         }
993
994         if (mdev->state.conn == C_AHEAD) {
995                 err = drbd_send_ack(mdev, P_RS_CANCEL, peer_req);
996         } else if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
997                 if (likely(mdev->state.pdsk >= D_INCONSISTENT)) {
998                         inc_rs_pending(mdev);
999                         err = drbd_send_block(mdev, P_RS_DATA_REPLY, peer_req);
1000                 } else {
1001                         if (__ratelimit(&drbd_ratelimit_state))
1002                                 dev_err(DEV, "Not sending RSDataReply, "
1003                                     "partner DISKLESS!\n");
1004                         err = 0;
1005                 }
1006         } else {
1007                 if (__ratelimit(&drbd_ratelimit_state))
1008                         dev_err(DEV, "Sending NegRSDReply. sector %llus.\n",
1009                             (unsigned long long)peer_req->i.sector);
1010
1011                 err = drbd_send_ack(mdev, P_NEG_RS_DREPLY, peer_req);
1012
1013                 /* update resync data with failure */
1014                 drbd_rs_failed_io(mdev, peer_req->i.sector, peer_req->i.size);
1015         }
1016
1017         dec_unacked(mdev);
1018
1019         move_to_net_ee_or_free(mdev, peer_req);
1020
1021         if (unlikely(err))
1022                 dev_err(DEV, "drbd_send_block() failed\n");
1023         return err;
1024 }
1025
1026 int w_e_end_csum_rs_req(struct drbd_work *w, int cancel)
1027 {
1028         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1029         struct drbd_conf *mdev = w->mdev;
1030         struct digest_info *di;
1031         int digest_size;
1032         void *digest = NULL;
1033         int err, eq = 0;
1034
1035         if (unlikely(cancel)) {
1036                 drbd_free_peer_req(mdev, peer_req);
1037                 dec_unacked(mdev);
1038                 return 0;
1039         }
1040
1041         if (get_ldev(mdev)) {
1042                 drbd_rs_complete_io(mdev, peer_req->i.sector);
1043                 put_ldev(mdev);
1044         }
1045
1046         di = peer_req->digest;
1047
1048         if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1049                 /* quick hack to try to avoid a race against reconfiguration.
1050                  * a real fix would be much more involved,
1051                  * introducing more locking mechanisms */
1052                 if (mdev->tconn->csums_tfm) {
1053                         digest_size = crypto_hash_digestsize(mdev->tconn->csums_tfm);
1054                         D_ASSERT(digest_size == di->digest_size);
1055                         digest = kmalloc(digest_size, GFP_NOIO);
1056                 }
1057                 if (digest) {
1058                         drbd_csum_ee(mdev, mdev->tconn->csums_tfm, peer_req, digest);
1059                         eq = !memcmp(digest, di->digest, digest_size);
1060                         kfree(digest);
1061                 }
1062
1063                 if (eq) {
1064                         drbd_set_in_sync(mdev, peer_req->i.sector, peer_req->i.size);
1065                         /* rs_same_csums unit is BM_BLOCK_SIZE */
1066                         mdev->rs_same_csum += peer_req->i.size >> BM_BLOCK_SHIFT;
1067                         err = drbd_send_ack(mdev, P_RS_IS_IN_SYNC, peer_req);
1068                 } else {
1069                         inc_rs_pending(mdev);
1070                         peer_req->block_id = ID_SYNCER; /* By setting block_id, digest pointer becomes invalid! */
1071                         peer_req->flags &= ~EE_HAS_DIGEST; /* This peer request no longer has a digest pointer */
1072                         kfree(di);
1073                         err = drbd_send_block(mdev, P_RS_DATA_REPLY, peer_req);
1074                 }
1075         } else {
1076                 err = drbd_send_ack(mdev, P_NEG_RS_DREPLY, peer_req);
1077                 if (__ratelimit(&drbd_ratelimit_state))
1078                         dev_err(DEV, "Sending NegDReply. I guess it gets messy.\n");
1079         }
1080
1081         dec_unacked(mdev);
1082         move_to_net_ee_or_free(mdev, peer_req);
1083
1084         if (unlikely(err))
1085                 dev_err(DEV, "drbd_send_block/ack() failed\n");
1086         return err;
1087 }
1088
1089 int w_e_end_ov_req(struct drbd_work *w, int cancel)
1090 {
1091         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1092         struct drbd_conf *mdev = w->mdev;
1093         sector_t sector = peer_req->i.sector;
1094         unsigned int size = peer_req->i.size;
1095         int digest_size;
1096         void *digest;
1097         int err = 0;
1098
1099         if (unlikely(cancel))
1100                 goto out;
1101
1102         digest_size = crypto_hash_digestsize(mdev->tconn->verify_tfm);
1103         digest = kmalloc(digest_size, GFP_NOIO);
1104         if (!digest) {
1105                 err = 1;        /* terminate the connection in case the allocation failed */
1106                 goto out;
1107         }
1108
1109         if (likely(!(peer_req->flags & EE_WAS_ERROR)))
1110                 drbd_csum_ee(mdev, mdev->tconn->verify_tfm, peer_req, digest);
1111         else
1112                 memset(digest, 0, digest_size);
1113
1114         /* Free e and pages before send.
1115          * In case we block on congestion, we could otherwise run into
1116          * some distributed deadlock, if the other side blocks on
1117          * congestion as well, because our receiver blocks in
1118          * drbd_alloc_pages due to pp_in_use > max_buffers. */
1119         drbd_free_peer_req(mdev, peer_req);
1120         peer_req = NULL;
1121         inc_rs_pending(mdev);
1122         err = drbd_send_drequest_csum(mdev, sector, size, digest, digest_size, P_OV_REPLY);
1123         if (err)
1124                 dec_rs_pending(mdev);
1125         kfree(digest);
1126
1127 out:
1128         if (peer_req)
1129                 drbd_free_peer_req(mdev, peer_req);
1130         dec_unacked(mdev);
1131         return err;
1132 }
1133
1134 void drbd_ov_out_of_sync_found(struct drbd_conf *mdev, sector_t sector, int size)
1135 {
1136         if (mdev->ov_last_oos_start + mdev->ov_last_oos_size == sector) {
1137                 mdev->ov_last_oos_size += size>>9;
1138         } else {
1139                 mdev->ov_last_oos_start = sector;
1140                 mdev->ov_last_oos_size = size>>9;
1141         }
1142         drbd_set_out_of_sync(mdev, sector, size);
1143 }
1144
1145 int w_e_end_ov_reply(struct drbd_work *w, int cancel)
1146 {
1147         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1148         struct drbd_conf *mdev = w->mdev;
1149         struct digest_info *di;
1150         void *digest;
1151         sector_t sector = peer_req->i.sector;
1152         unsigned int size = peer_req->i.size;
1153         int digest_size;
1154         int err, eq = 0;
1155
1156         if (unlikely(cancel)) {
1157                 drbd_free_peer_req(mdev, peer_req);
1158                 dec_unacked(mdev);
1159                 return 0;
1160         }
1161
1162         /* after "cancel", because after drbd_disconnect/drbd_rs_cancel_all
1163          * the resync lru has been cleaned up already */
1164         if (get_ldev(mdev)) {
1165                 drbd_rs_complete_io(mdev, peer_req->i.sector);
1166                 put_ldev(mdev);
1167         }
1168
1169         di = peer_req->digest;
1170
1171         if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1172                 digest_size = crypto_hash_digestsize(mdev->tconn->verify_tfm);
1173                 digest = kmalloc(digest_size, GFP_NOIO);
1174                 if (digest) {
1175                         drbd_csum_ee(mdev, mdev->tconn->verify_tfm, peer_req, digest);
1176
1177                         D_ASSERT(digest_size == di->digest_size);
1178                         eq = !memcmp(digest, di->digest, digest_size);
1179                         kfree(digest);
1180                 }
1181         }
1182
1183         /* Free peer_req and pages before send.
1184          * In case we block on congestion, we could otherwise run into
1185          * some distributed deadlock, if the other side blocks on
1186          * congestion as well, because our receiver blocks in
1187          * drbd_alloc_pages due to pp_in_use > max_buffers. */
1188         drbd_free_peer_req(mdev, peer_req);
1189         if (!eq)
1190                 drbd_ov_out_of_sync_found(mdev, sector, size);
1191         else
1192                 ov_out_of_sync_print(mdev);
1193
1194         err = drbd_send_ack_ex(mdev, P_OV_RESULT, sector, size,
1195                                eq ? ID_IN_SYNC : ID_OUT_OF_SYNC);
1196
1197         dec_unacked(mdev);
1198
1199         --mdev->ov_left;
1200
1201         /* let's advance progress step marks only for every other megabyte */
1202         if ((mdev->ov_left & 0x200) == 0x200)
1203                 drbd_advance_rs_marks(mdev, mdev->ov_left);
1204
1205         if (mdev->ov_left == 0) {
1206                 ov_out_of_sync_print(mdev);
1207                 drbd_resync_finished(mdev);
1208         }
1209
1210         return err;
1211 }
1212
1213 int w_prev_work_done(struct drbd_work *w, int cancel)
1214 {
1215         struct drbd_wq_barrier *b = container_of(w, struct drbd_wq_barrier, w);
1216
1217         complete(&b->done);
1218         return 0;
1219 }
1220
1221 int w_send_barrier(struct drbd_work *w, int cancel)
1222 {
1223         struct drbd_socket *sock;
1224         struct drbd_tl_epoch *b = container_of(w, struct drbd_tl_epoch, w);
1225         struct drbd_conf *mdev = w->mdev;
1226         struct p_barrier *p;
1227
1228         /* really avoid racing with tl_clear.  w.cb may have been referenced
1229          * just before it was reassigned and re-queued, so double check that.
1230          * actually, this race was harmless, since we only try to send the
1231          * barrier packet here, and otherwise do nothing with the object.
1232          * but compare with the head of w_clear_epoch */
1233         spin_lock_irq(&mdev->tconn->req_lock);
1234         if (w->cb != w_send_barrier || mdev->state.conn < C_CONNECTED)
1235                 cancel = 1;
1236         spin_unlock_irq(&mdev->tconn->req_lock);
1237         if (cancel)
1238                 return 0;
1239
1240         sock = &mdev->tconn->data;
1241         p = drbd_prepare_command(mdev, sock);
1242         if (!p)
1243                 return -EIO;
1244         p->barrier = b->br_number;
1245         /* inc_ap_pending was done where this was queued.
1246          * dec_ap_pending will be done in got_BarrierAck
1247          * or (on connection loss) in w_clear_epoch.  */
1248         return drbd_send_command(mdev, sock, P_BARRIER, sizeof(*p), NULL, 0);
1249 }
1250
1251 int w_send_write_hint(struct drbd_work *w, int cancel)
1252 {
1253         struct drbd_conf *mdev = w->mdev;
1254         struct drbd_socket *sock;
1255
1256         if (cancel)
1257                 return 0;
1258         sock = &mdev->tconn->data;
1259         if (!drbd_prepare_command(mdev, sock))
1260                 return -EIO;
1261         return drbd_send_command(mdev, sock, P_UNPLUG_REMOTE, 0, NULL, 0);
1262 }
1263
1264 int w_send_out_of_sync(struct drbd_work *w, int cancel)
1265 {
1266         struct drbd_request *req = container_of(w, struct drbd_request, w);
1267         struct drbd_conf *mdev = w->mdev;
1268         int err;
1269
1270         if (unlikely(cancel)) {
1271                 req_mod(req, SEND_CANCELED);
1272                 return 0;
1273         }
1274
1275         err = drbd_send_out_of_sync(mdev, req);
1276         req_mod(req, OOS_HANDED_TO_NETWORK);
1277
1278         return err;
1279 }
1280
1281 /**
1282  * w_send_dblock() - Worker callback to send a P_DATA packet in order to mirror a write request
1283  * @mdev:       DRBD device.
1284  * @w:          work object.
1285  * @cancel:     The connection will be closed anyways
1286  */
1287 int w_send_dblock(struct drbd_work *w, int cancel)
1288 {
1289         struct drbd_request *req = container_of(w, struct drbd_request, w);
1290         struct drbd_conf *mdev = w->mdev;
1291         int err;
1292
1293         if (unlikely(cancel)) {
1294                 req_mod(req, SEND_CANCELED);
1295                 return 0;
1296         }
1297
1298         err = drbd_send_dblock(mdev, req);
1299         req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK);
1300
1301         return err;
1302 }
1303
1304 /**
1305  * w_send_read_req() - Worker callback to send a read request (P_DATA_REQUEST) packet
1306  * @mdev:       DRBD device.
1307  * @w:          work object.
1308  * @cancel:     The connection will be closed anyways
1309  */
1310 int w_send_read_req(struct drbd_work *w, int cancel)
1311 {
1312         struct drbd_request *req = container_of(w, struct drbd_request, w);
1313         struct drbd_conf *mdev = w->mdev;
1314         int err;
1315
1316         if (unlikely(cancel)) {
1317                 req_mod(req, SEND_CANCELED);
1318                 return 0;
1319         }
1320
1321         err = drbd_send_drequest(mdev, P_DATA_REQUEST, req->i.sector, req->i.size,
1322                                  (unsigned long)req);
1323
1324         req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK);
1325
1326         return err;
1327 }
1328
1329 int w_restart_disk_io(struct drbd_work *w, int cancel)
1330 {
1331         struct drbd_request *req = container_of(w, struct drbd_request, w);
1332         struct drbd_conf *mdev = w->mdev;
1333
1334         if (bio_data_dir(req->master_bio) == WRITE && req->rq_state & RQ_IN_ACT_LOG)
1335                 drbd_al_begin_io(mdev, &req->i);
1336
1337         drbd_req_make_private_bio(req, req->master_bio);
1338         req->private_bio->bi_bdev = mdev->ldev->backing_bdev;
1339         generic_make_request(req->private_bio);
1340
1341         return 0;
1342 }
1343
1344 static int _drbd_may_sync_now(struct drbd_conf *mdev)
1345 {
1346         struct drbd_conf *odev = mdev;
1347         int resync_after;
1348
1349         while (1) {
1350                 if (!odev->ldev)
1351                         return 1;
1352                 rcu_read_lock();
1353                 resync_after = rcu_dereference(odev->ldev->disk_conf)->resync_after;
1354                 rcu_read_unlock();
1355                 if (resync_after == -1)
1356                         return 1;
1357                 odev = minor_to_mdev(resync_after);
1358                 if (!expect(odev))
1359                         return 1;
1360                 if ((odev->state.conn >= C_SYNC_SOURCE &&
1361                      odev->state.conn <= C_PAUSED_SYNC_T) ||
1362                     odev->state.aftr_isp || odev->state.peer_isp ||
1363                     odev->state.user_isp)
1364                         return 0;
1365         }
1366 }
1367
1368 /**
1369  * _drbd_pause_after() - Pause resync on all devices that may not resync now
1370  * @mdev:       DRBD device.
1371  *
1372  * Called from process context only (admin command and after_state_ch).
1373  */
1374 static int _drbd_pause_after(struct drbd_conf *mdev)
1375 {
1376         struct drbd_conf *odev;
1377         int i, rv = 0;
1378
1379         rcu_read_lock();
1380         idr_for_each_entry(&minors, odev, i) {
1381                 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1382                         continue;
1383                 if (!_drbd_may_sync_now(odev))
1384                         rv |= (__drbd_set_state(_NS(odev, aftr_isp, 1), CS_HARD, NULL)
1385                                != SS_NOTHING_TO_DO);
1386         }
1387         rcu_read_unlock();
1388
1389         return rv;
1390 }
1391
1392 /**
1393  * _drbd_resume_next() - Resume resync on all devices that may resync now
1394  * @mdev:       DRBD device.
1395  *
1396  * Called from process context only (admin command and worker).
1397  */
1398 static int _drbd_resume_next(struct drbd_conf *mdev)
1399 {
1400         struct drbd_conf *odev;
1401         int i, rv = 0;
1402
1403         rcu_read_lock();
1404         idr_for_each_entry(&minors, odev, i) {
1405                 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1406                         continue;
1407                 if (odev->state.aftr_isp) {
1408                         if (_drbd_may_sync_now(odev))
1409                                 rv |= (__drbd_set_state(_NS(odev, aftr_isp, 0),
1410                                                         CS_HARD, NULL)
1411                                        != SS_NOTHING_TO_DO) ;
1412                 }
1413         }
1414         rcu_read_unlock();
1415         return rv;
1416 }
1417
1418 void resume_next_sg(struct drbd_conf *mdev)
1419 {
1420         write_lock_irq(&global_state_lock);
1421         _drbd_resume_next(mdev);
1422         write_unlock_irq(&global_state_lock);
1423 }
1424
1425 void suspend_other_sg(struct drbd_conf *mdev)
1426 {
1427         write_lock_irq(&global_state_lock);
1428         _drbd_pause_after(mdev);
1429         write_unlock_irq(&global_state_lock);
1430 }
1431
1432 /* caller must hold global_state_lock */
1433 enum drbd_ret_code drbd_resync_after_valid(struct drbd_conf *mdev, int o_minor)
1434 {
1435         struct drbd_conf *odev;
1436         int resync_after;
1437
1438         if (o_minor == -1)
1439                 return NO_ERROR;
1440         if (o_minor < -1 || minor_to_mdev(o_minor) == NULL)
1441                 return ERR_RESYNC_AFTER;
1442
1443         /* check for loops */
1444         odev = minor_to_mdev(o_minor);
1445         while (1) {
1446                 if (odev == mdev)
1447                         return ERR_RESYNC_AFTER_CYCLE;
1448
1449                 rcu_read_lock();
1450                 resync_after = rcu_dereference(odev->ldev->disk_conf)->resync_after;
1451                 rcu_read_unlock();
1452                 /* dependency chain ends here, no cycles. */
1453                 if (resync_after == -1)
1454                         return NO_ERROR;
1455
1456                 /* follow the dependency chain */
1457                 odev = minor_to_mdev(resync_after);
1458         }
1459 }
1460
1461 /* caller must hold global_state_lock */
1462 void drbd_resync_after_changed(struct drbd_conf *mdev)
1463 {
1464         int changes;
1465
1466         do {
1467                 changes  = _drbd_pause_after(mdev);
1468                 changes |= _drbd_resume_next(mdev);
1469         } while (changes);
1470 }
1471
1472 void drbd_rs_controller_reset(struct drbd_conf *mdev)
1473 {
1474         struct fifo_buffer *plan;
1475
1476         atomic_set(&mdev->rs_sect_in, 0);
1477         atomic_set(&mdev->rs_sect_ev, 0);
1478         mdev->rs_in_flight = 0;
1479
1480         /* Updating the RCU protected object in place is necessary since
1481            this function gets called from atomic context.
1482            It is valid since all other updates also lead to an completely
1483            empty fifo */
1484         rcu_read_lock();
1485         plan = rcu_dereference(mdev->rs_plan_s);
1486         plan->total = 0;
1487         fifo_set(plan, 0);
1488         rcu_read_unlock();
1489 }
1490
1491 void start_resync_timer_fn(unsigned long data)
1492 {
1493         struct drbd_conf *mdev = (struct drbd_conf *) data;
1494
1495         drbd_queue_work(&mdev->tconn->data.work, &mdev->start_resync_work);
1496 }
1497
1498 int w_start_resync(struct drbd_work *w, int cancel)
1499 {
1500         struct drbd_conf *mdev = w->mdev;
1501
1502         if (atomic_read(&mdev->unacked_cnt) || atomic_read(&mdev->rs_pending_cnt)) {
1503                 dev_warn(DEV, "w_start_resync later...\n");
1504                 mdev->start_resync_timer.expires = jiffies + HZ/10;
1505                 add_timer(&mdev->start_resync_timer);
1506                 return 0;
1507         }
1508
1509         drbd_start_resync(mdev, C_SYNC_SOURCE);
1510         clear_bit(AHEAD_TO_SYNC_SOURCE, &mdev->current_epoch->flags);
1511         return 0;
1512 }
1513
1514 /**
1515  * drbd_start_resync() - Start the resync process
1516  * @mdev:       DRBD device.
1517  * @side:       Either C_SYNC_SOURCE or C_SYNC_TARGET
1518  *
1519  * This function might bring you directly into one of the
1520  * C_PAUSED_SYNC_* states.
1521  */
1522 void drbd_start_resync(struct drbd_conf *mdev, enum drbd_conns side)
1523 {
1524         union drbd_state ns;
1525         int r;
1526
1527         if (mdev->state.conn >= C_SYNC_SOURCE && mdev->state.conn < C_AHEAD) {
1528                 dev_err(DEV, "Resync already running!\n");
1529                 return;
1530         }
1531
1532         if (mdev->state.conn < C_AHEAD) {
1533                 /* In case a previous resync run was aborted by an IO error/detach on the peer. */
1534                 drbd_rs_cancel_all(mdev);
1535                 /* This should be done when we abort the resync. We definitely do not
1536                    want to have this for connections going back and forth between
1537                    Ahead/Behind and SyncSource/SyncTarget */
1538         }
1539
1540         if (!test_bit(B_RS_H_DONE, &mdev->flags)) {
1541                 if (side == C_SYNC_TARGET) {
1542                         /* Since application IO was locked out during C_WF_BITMAP_T and
1543                            C_WF_SYNC_UUID we are still unmodified. Before going to C_SYNC_TARGET
1544                            we check that we might make the data inconsistent. */
1545                         r = drbd_khelper(mdev, "before-resync-target");
1546                         r = (r >> 8) & 0xff;
1547                         if (r > 0) {
1548                                 dev_info(DEV, "before-resync-target handler returned %d, "
1549                                          "dropping connection.\n", r);
1550                                 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
1551                                 return;
1552                         }
1553                 } else /* C_SYNC_SOURCE */ {
1554                         r = drbd_khelper(mdev, "before-resync-source");
1555                         r = (r >> 8) & 0xff;
1556                         if (r > 0) {
1557                                 if (r == 3) {
1558                                         dev_info(DEV, "before-resync-source handler returned %d, "
1559                                                  "ignoring. Old userland tools?", r);
1560                                 } else {
1561                                         dev_info(DEV, "before-resync-source handler returned %d, "
1562                                                  "dropping connection.\n", r);
1563                                         conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
1564                                         return;
1565                                 }
1566                         }
1567                 }
1568         }
1569
1570         if (current == mdev->tconn->worker.task) {
1571                 /* The worker should not sleep waiting for state_mutex,
1572                    that can take long */
1573                 if (!mutex_trylock(mdev->state_mutex)) {
1574                         set_bit(B_RS_H_DONE, &mdev->flags);
1575                         mdev->start_resync_timer.expires = jiffies + HZ/5;
1576                         add_timer(&mdev->start_resync_timer);
1577                         return;
1578                 }
1579         } else {
1580                 mutex_lock(mdev->state_mutex);
1581         }
1582         clear_bit(B_RS_H_DONE, &mdev->flags);
1583
1584         if (!get_ldev_if_state(mdev, D_NEGOTIATING)) {
1585                 mutex_unlock(mdev->state_mutex);
1586                 return;
1587         }
1588
1589         write_lock_irq(&global_state_lock);
1590         ns = drbd_read_state(mdev);
1591
1592         ns.aftr_isp = !_drbd_may_sync_now(mdev);
1593
1594         ns.conn = side;
1595
1596         if (side == C_SYNC_TARGET)
1597                 ns.disk = D_INCONSISTENT;
1598         else /* side == C_SYNC_SOURCE */
1599                 ns.pdsk = D_INCONSISTENT;
1600
1601         r = __drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
1602         ns = drbd_read_state(mdev);
1603
1604         if (ns.conn < C_CONNECTED)
1605                 r = SS_UNKNOWN_ERROR;
1606
1607         if (r == SS_SUCCESS) {
1608                 unsigned long tw = drbd_bm_total_weight(mdev);
1609                 unsigned long now = jiffies;
1610                 int i;
1611
1612                 mdev->rs_failed    = 0;
1613                 mdev->rs_paused    = 0;
1614                 mdev->rs_same_csum = 0;
1615                 mdev->rs_last_events = 0;
1616                 mdev->rs_last_sect_ev = 0;
1617                 mdev->rs_total     = tw;
1618                 mdev->rs_start     = now;
1619                 for (i = 0; i < DRBD_SYNC_MARKS; i++) {
1620                         mdev->rs_mark_left[i] = tw;
1621                         mdev->rs_mark_time[i] = now;
1622                 }
1623                 _drbd_pause_after(mdev);
1624         }
1625         write_unlock_irq(&global_state_lock);
1626
1627         if (r == SS_SUCCESS) {
1628                 dev_info(DEV, "Began resync as %s (will sync %lu KB [%lu bits set]).\n",
1629                      drbd_conn_str(ns.conn),
1630                      (unsigned long) mdev->rs_total << (BM_BLOCK_SHIFT-10),
1631                      (unsigned long) mdev->rs_total);
1632                 if (side == C_SYNC_TARGET)
1633                         mdev->bm_resync_fo = 0;
1634
1635                 /* Since protocol 96, we must serialize drbd_gen_and_send_sync_uuid
1636                  * with w_send_oos, or the sync target will get confused as to
1637                  * how much bits to resync.  We cannot do that always, because for an
1638                  * empty resync and protocol < 95, we need to do it here, as we call
1639                  * drbd_resync_finished from here in that case.
1640                  * We drbd_gen_and_send_sync_uuid here for protocol < 96,
1641                  * and from after_state_ch otherwise. */
1642                 if (side == C_SYNC_SOURCE && mdev->tconn->agreed_pro_version < 96)
1643                         drbd_gen_and_send_sync_uuid(mdev);
1644
1645                 if (mdev->tconn->agreed_pro_version < 95 && mdev->rs_total == 0) {
1646                         /* This still has a race (about when exactly the peers
1647                          * detect connection loss) that can lead to a full sync
1648                          * on next handshake. In 8.3.9 we fixed this with explicit
1649                          * resync-finished notifications, but the fix
1650                          * introduces a protocol change.  Sleeping for some
1651                          * time longer than the ping interval + timeout on the
1652                          * SyncSource, to give the SyncTarget the chance to
1653                          * detect connection loss, then waiting for a ping
1654                          * response (implicit in drbd_resync_finished) reduces
1655                          * the race considerably, but does not solve it. */
1656                         if (side == C_SYNC_SOURCE) {
1657                                 struct net_conf *nc;
1658                                 int timeo;
1659
1660                                 rcu_read_lock();
1661                                 nc = rcu_dereference(mdev->tconn->net_conf);
1662                                 timeo = nc->ping_int * HZ + nc->ping_timeo * HZ / 9;
1663                                 rcu_read_unlock();
1664                                 schedule_timeout_interruptible(timeo);
1665                         }
1666                         drbd_resync_finished(mdev);
1667                 }
1668
1669                 drbd_rs_controller_reset(mdev);
1670                 /* ns.conn may already be != mdev->state.conn,
1671                  * we may have been paused in between, or become paused until
1672                  * the timer triggers.
1673                  * No matter, that is handled in resync_timer_fn() */
1674                 if (ns.conn == C_SYNC_TARGET)
1675                         mod_timer(&mdev->resync_timer, jiffies);
1676
1677                 drbd_md_sync(mdev);
1678         }
1679         put_ldev(mdev);
1680         mutex_unlock(mdev->state_mutex);
1681 }
1682
1683 int drbd_worker(struct drbd_thread *thi)
1684 {
1685         struct drbd_tconn *tconn = thi->tconn;
1686         struct drbd_work *w = NULL;
1687         struct drbd_conf *mdev;
1688         struct net_conf *nc;
1689         LIST_HEAD(work_list);
1690         int vnr, intr = 0;
1691         int cork;
1692
1693         while (get_t_state(thi) == RUNNING) {
1694                 drbd_thread_current_set_cpu(thi);
1695
1696                 if (down_trylock(&tconn->data.work.s)) {
1697                         mutex_lock(&tconn->data.mutex);
1698
1699                         rcu_read_lock();
1700                         nc = rcu_dereference(tconn->net_conf);
1701                         cork = nc ? nc->tcp_cork : 0;
1702                         rcu_read_unlock();
1703
1704                         if (tconn->data.socket && cork)
1705                                 drbd_tcp_uncork(tconn->data.socket);
1706                         mutex_unlock(&tconn->data.mutex);
1707
1708                         intr = down_interruptible(&tconn->data.work.s);
1709
1710                         mutex_lock(&tconn->data.mutex);
1711                         if (tconn->data.socket  && cork)
1712                                 drbd_tcp_cork(tconn->data.socket);
1713                         mutex_unlock(&tconn->data.mutex);
1714                 }
1715
1716                 if (intr) {
1717                         flush_signals(current);
1718                         if (get_t_state(thi) == RUNNING) {
1719                                 conn_warn(tconn, "Worker got an unexpected signal\n");
1720                                 continue;
1721                         }
1722                         break;
1723                 }
1724
1725                 if (get_t_state(thi) != RUNNING)
1726                         break;
1727                 /* With this break, we have done a down() but not consumed
1728                    the entry from the list. The cleanup code takes care of
1729                    this...   */
1730
1731                 w = NULL;
1732                 spin_lock_irq(&tconn->data.work.q_lock);
1733                 if (list_empty(&tconn->data.work.q)) {
1734                         /* something terribly wrong in our logic.
1735                          * we were able to down() the semaphore,
1736                          * but the list is empty... doh.
1737                          *
1738                          * what is the best thing to do now?
1739                          * try again from scratch, restarting the receiver,
1740                          * asender, whatnot? could break even more ugly,
1741                          * e.g. when we are primary, but no good local data.
1742                          *
1743                          * I'll try to get away just starting over this loop.
1744                          */
1745                         conn_warn(tconn, "Work list unexpectedly empty\n");
1746                         spin_unlock_irq(&tconn->data.work.q_lock);
1747                         continue;
1748                 }
1749                 w = list_entry(tconn->data.work.q.next, struct drbd_work, list);
1750                 list_del_init(&w->list);
1751                 spin_unlock_irq(&tconn->data.work.q_lock);
1752
1753                 if (w->cb(w, tconn->cstate < C_WF_REPORT_PARAMS)) {
1754                         /* dev_warn(DEV, "worker: a callback failed! \n"); */
1755                         if (tconn->cstate >= C_WF_REPORT_PARAMS)
1756                                 conn_request_state(tconn, NS(conn, C_NETWORK_FAILURE), CS_HARD);
1757                 }
1758         }
1759
1760         spin_lock_irq(&tconn->data.work.q_lock);
1761         while (!list_empty(&tconn->data.work.q)) {
1762                 list_splice_init(&tconn->data.work.q, &work_list);
1763                 spin_unlock_irq(&tconn->data.work.q_lock);
1764
1765                 while (!list_empty(&work_list)) {
1766                         w = list_entry(work_list.next, struct drbd_work, list);
1767                         list_del_init(&w->list);
1768                         w->cb(w, 1);
1769                 }
1770
1771                 spin_lock_irq(&tconn->data.work.q_lock);
1772         }
1773         sema_init(&tconn->data.work.s, 0);
1774         /* DANGEROUS race: if someone did queue his work within the spinlock,
1775          * but up() ed outside the spinlock, we could get an up() on the
1776          * semaphore without corresponding list entry.
1777          * So don't do that.
1778          */
1779         spin_unlock_irq(&tconn->data.work.q_lock);
1780
1781         rcu_read_lock();
1782         idr_for_each_entry(&tconn->volumes, mdev, vnr) {
1783                 D_ASSERT(mdev->state.disk == D_DISKLESS && mdev->state.conn == C_STANDALONE);
1784                 kref_get(&mdev->kref);
1785                 rcu_read_unlock();
1786                 drbd_mdev_cleanup(mdev);
1787                 kref_put(&mdev->kref, &drbd_minor_destroy);
1788                 rcu_read_lock();
1789         }
1790         rcu_read_unlock();
1791
1792         return 0;
1793 }