859afdfe5a085fc0d02ec60a15c340a50b10883a
[platform/adaptation/renesas_rcar/renesas_kernel.git] / drivers / block / drbd / drbd_worker.c
1 /*
2    drbd_worker.c
3
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10    drbd is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2, or (at your option)
13    any later version.
14
15    drbd is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19
20    You should have received a copy of the GNU General Public License
21    along with drbd; see the file COPYING.  If not, write to
22    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23
24  */
25
26 #include <linux/module.h>
27 #include <linux/drbd.h>
28 #include <linux/sched.h>
29 #include <linux/wait.h>
30 #include <linux/mm.h>
31 #include <linux/memcontrol.h>
32 #include <linux/mm_inline.h>
33 #include <linux/slab.h>
34 #include <linux/random.h>
35 #include <linux/string.h>
36 #include <linux/scatterlist.h>
37
38 #include "drbd_int.h"
39 #include "drbd_req.h"
40
41 static int w_make_ov_request(struct drbd_work *w, int cancel);
42
43
44 /* endio handlers:
45  *   drbd_md_io_complete (defined here)
46  *   drbd_request_endio (defined here)
47  *   drbd_peer_request_endio (defined here)
48  *   bm_async_io_complete (defined in drbd_bitmap.c)
49  *
50  * For all these callbacks, note the following:
51  * The callbacks will be called in irq context by the IDE drivers,
52  * and in Softirqs/Tasklets/BH context by the SCSI drivers.
53  * Try to get the locking right :)
54  *
55  */
56
57
58 /* About the global_state_lock
59    Each state transition on an device holds a read lock. In case we have
60    to evaluate the resync after dependencies, we grab a write lock, because
61    we need stable states on all devices for that.  */
62 rwlock_t global_state_lock;
63
64 /* used for synchronous meta data and bitmap IO
65  * submitted by drbd_md_sync_page_io()
66  */
67 void drbd_md_io_complete(struct bio *bio, int error)
68 {
69         struct drbd_md_io *md_io;
70         struct drbd_conf *mdev;
71
72         md_io = (struct drbd_md_io *)bio->bi_private;
73         mdev = container_of(md_io, struct drbd_conf, md_io);
74
75         md_io->error = error;
76
77         /* We grabbed an extra reference in _drbd_md_sync_page_io() to be able
78          * to timeout on the lower level device, and eventually detach from it.
79          * If this io completion runs after that timeout expired, this
80          * drbd_md_put_buffer() may allow us to finally try and re-attach.
81          * During normal operation, this only puts that extra reference
82          * down to 1 again.
83          * Make sure we first drop the reference, and only then signal
84          * completion, or we may (in drbd_al_read_log()) cycle so fast into the
85          * next drbd_md_sync_page_io(), that we trigger the
86          * ASSERT(atomic_read(&mdev->md_io_in_use) == 1) there.
87          */
88         drbd_md_put_buffer(mdev);
89         md_io->done = 1;
90         wake_up(&mdev->misc_wait);
91         bio_put(bio);
92         put_ldev(mdev);
93 }
94
95 /* reads on behalf of the partner,
96  * "submitted" by the receiver
97  */
98 void drbd_endio_read_sec_final(struct drbd_peer_request *peer_req) __releases(local)
99 {
100         unsigned long flags = 0;
101         struct drbd_conf *mdev = peer_req->w.mdev;
102
103         spin_lock_irqsave(&mdev->tconn->req_lock, flags);
104         mdev->read_cnt += peer_req->i.size >> 9;
105         list_del(&peer_req->w.list);
106         if (list_empty(&mdev->read_ee))
107                 wake_up(&mdev->ee_wait);
108         if (test_bit(__EE_WAS_ERROR, &peer_req->flags))
109                 __drbd_chk_io_error(mdev, DRBD_IO_ERROR);
110         spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
111
112         drbd_queue_work(&mdev->tconn->sender_work, &peer_req->w);
113         put_ldev(mdev);
114 }
115
116 /* writes on behalf of the partner, or resync writes,
117  * "submitted" by the receiver, final stage.  */
118 static void drbd_endio_write_sec_final(struct drbd_peer_request *peer_req) __releases(local)
119 {
120         unsigned long flags = 0;
121         struct drbd_conf *mdev = peer_req->w.mdev;
122         struct drbd_interval i;
123         int do_wake;
124         u64 block_id;
125         int do_al_complete_io;
126
127         /* after we moved peer_req to done_ee,
128          * we may no longer access it,
129          * it may be freed/reused already!
130          * (as soon as we release the req_lock) */
131         i = peer_req->i;
132         do_al_complete_io = peer_req->flags & EE_CALL_AL_COMPLETE_IO;
133         block_id = peer_req->block_id;
134
135         spin_lock_irqsave(&mdev->tconn->req_lock, flags);
136         mdev->writ_cnt += peer_req->i.size >> 9;
137         list_del(&peer_req->w.list); /* has been on active_ee or sync_ee */
138         list_add_tail(&peer_req->w.list, &mdev->done_ee);
139
140         /*
141          * Do not remove from the write_requests tree here: we did not send the
142          * Ack yet and did not wake possibly waiting conflicting requests.
143          * Removed from the tree from "drbd_process_done_ee" within the
144          * appropriate w.cb (e_end_block/e_end_resync_block) or from
145          * _drbd_clear_done_ee.
146          */
147
148         do_wake = list_empty(block_id == ID_SYNCER ? &mdev->sync_ee : &mdev->active_ee);
149
150         if (test_bit(__EE_WAS_ERROR, &peer_req->flags))
151                 __drbd_chk_io_error(mdev, DRBD_IO_ERROR);
152         spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
153
154         if (block_id == ID_SYNCER)
155                 drbd_rs_complete_io(mdev, i.sector);
156
157         if (do_wake)
158                 wake_up(&mdev->ee_wait);
159
160         if (do_al_complete_io)
161                 drbd_al_complete_io(mdev, &i);
162
163         wake_asender(mdev->tconn);
164         put_ldev(mdev);
165 }
166
167 /* writes on behalf of the partner, or resync writes,
168  * "submitted" by the receiver.
169  */
170 void drbd_peer_request_endio(struct bio *bio, int error)
171 {
172         struct drbd_peer_request *peer_req = bio->bi_private;
173         struct drbd_conf *mdev = peer_req->w.mdev;
174         int uptodate = bio_flagged(bio, BIO_UPTODATE);
175         int is_write = bio_data_dir(bio) == WRITE;
176
177         if (error && __ratelimit(&drbd_ratelimit_state))
178                 dev_warn(DEV, "%s: error=%d s=%llus\n",
179                                 is_write ? "write" : "read", error,
180                                 (unsigned long long)peer_req->i.sector);
181         if (!error && !uptodate) {
182                 if (__ratelimit(&drbd_ratelimit_state))
183                         dev_warn(DEV, "%s: setting error to -EIO s=%llus\n",
184                                         is_write ? "write" : "read",
185                                         (unsigned long long)peer_req->i.sector);
186                 /* strange behavior of some lower level drivers...
187                  * fail the request by clearing the uptodate flag,
188                  * but do not return any error?! */
189                 error = -EIO;
190         }
191
192         if (error)
193                 set_bit(__EE_WAS_ERROR, &peer_req->flags);
194
195         bio_put(bio); /* no need for the bio anymore */
196         if (atomic_dec_and_test(&peer_req->pending_bios)) {
197                 if (is_write)
198                         drbd_endio_write_sec_final(peer_req);
199                 else
200                         drbd_endio_read_sec_final(peer_req);
201         }
202 }
203
204 /* read, readA or write requests on R_PRIMARY coming from drbd_make_request
205  */
206 void drbd_request_endio(struct bio *bio, int error)
207 {
208         unsigned long flags;
209         struct drbd_request *req = bio->bi_private;
210         struct drbd_conf *mdev = req->w.mdev;
211         struct bio_and_error m;
212         enum drbd_req_event what;
213         int uptodate = bio_flagged(bio, BIO_UPTODATE);
214
215         if (!error && !uptodate) {
216                 dev_warn(DEV, "p %s: setting error to -EIO\n",
217                          bio_data_dir(bio) == WRITE ? "write" : "read");
218                 /* strange behavior of some lower level drivers...
219                  * fail the request by clearing the uptodate flag,
220                  * but do not return any error?! */
221                 error = -EIO;
222         }
223
224         /* to avoid recursion in __req_mod */
225         if (unlikely(error)) {
226                 what = (bio_data_dir(bio) == WRITE)
227                         ? WRITE_COMPLETED_WITH_ERROR
228                         : (bio_rw(bio) == READ)
229                           ? READ_COMPLETED_WITH_ERROR
230                           : READ_AHEAD_COMPLETED_WITH_ERROR;
231         } else
232                 what = COMPLETED_OK;
233
234         bio_put(req->private_bio);
235         req->private_bio = ERR_PTR(error);
236
237         /* not req_mod(), we need irqsave here! */
238         spin_lock_irqsave(&mdev->tconn->req_lock, flags);
239         __req_mod(req, what, &m);
240         spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
241         put_ldev(mdev);
242
243         if (m.bio)
244                 complete_master_bio(mdev, &m);
245 }
246
247 void drbd_csum_ee(struct drbd_conf *mdev, struct crypto_hash *tfm,
248                   struct drbd_peer_request *peer_req, void *digest)
249 {
250         struct hash_desc desc;
251         struct scatterlist sg;
252         struct page *page = peer_req->pages;
253         struct page *tmp;
254         unsigned len;
255
256         desc.tfm = tfm;
257         desc.flags = 0;
258
259         sg_init_table(&sg, 1);
260         crypto_hash_init(&desc);
261
262         while ((tmp = page_chain_next(page))) {
263                 /* all but the last page will be fully used */
264                 sg_set_page(&sg, page, PAGE_SIZE, 0);
265                 crypto_hash_update(&desc, &sg, sg.length);
266                 page = tmp;
267         }
268         /* and now the last, possibly only partially used page */
269         len = peer_req->i.size & (PAGE_SIZE - 1);
270         sg_set_page(&sg, page, len ?: PAGE_SIZE, 0);
271         crypto_hash_update(&desc, &sg, sg.length);
272         crypto_hash_final(&desc, digest);
273 }
274
275 void drbd_csum_bio(struct drbd_conf *mdev, struct crypto_hash *tfm, struct bio *bio, void *digest)
276 {
277         struct hash_desc desc;
278         struct scatterlist sg;
279         struct bio_vec *bvec;
280         int i;
281
282         desc.tfm = tfm;
283         desc.flags = 0;
284
285         sg_init_table(&sg, 1);
286         crypto_hash_init(&desc);
287
288         bio_for_each_segment(bvec, bio, i) {
289                 sg_set_page(&sg, bvec->bv_page, bvec->bv_len, bvec->bv_offset);
290                 crypto_hash_update(&desc, &sg, sg.length);
291         }
292         crypto_hash_final(&desc, digest);
293 }
294
295 /* MAYBE merge common code with w_e_end_ov_req */
296 static int w_e_send_csum(struct drbd_work *w, int cancel)
297 {
298         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
299         struct drbd_conf *mdev = w->mdev;
300         int digest_size;
301         void *digest;
302         int err = 0;
303
304         if (unlikely(cancel))
305                 goto out;
306
307         if (unlikely((peer_req->flags & EE_WAS_ERROR) != 0))
308                 goto out;
309
310         digest_size = crypto_hash_digestsize(mdev->tconn->csums_tfm);
311         digest = kmalloc(digest_size, GFP_NOIO);
312         if (digest) {
313                 sector_t sector = peer_req->i.sector;
314                 unsigned int size = peer_req->i.size;
315                 drbd_csum_ee(mdev, mdev->tconn->csums_tfm, peer_req, digest);
316                 /* Free peer_req and pages before send.
317                  * In case we block on congestion, we could otherwise run into
318                  * some distributed deadlock, if the other side blocks on
319                  * congestion as well, because our receiver blocks in
320                  * drbd_alloc_pages due to pp_in_use > max_buffers. */
321                 drbd_free_peer_req(mdev, peer_req);
322                 peer_req = NULL;
323                 inc_rs_pending(mdev);
324                 err = drbd_send_drequest_csum(mdev, sector, size,
325                                               digest, digest_size,
326                                               P_CSUM_RS_REQUEST);
327                 kfree(digest);
328         } else {
329                 dev_err(DEV, "kmalloc() of digest failed.\n");
330                 err = -ENOMEM;
331         }
332
333 out:
334         if (peer_req)
335                 drbd_free_peer_req(mdev, peer_req);
336
337         if (unlikely(err))
338                 dev_err(DEV, "drbd_send_drequest(..., csum) failed\n");
339         return err;
340 }
341
342 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
343
344 static int read_for_csum(struct drbd_conf *mdev, sector_t sector, int size)
345 {
346         struct drbd_peer_request *peer_req;
347
348         if (!get_ldev(mdev))
349                 return -EIO;
350
351         if (drbd_rs_should_slow_down(mdev, sector))
352                 goto defer;
353
354         /* GFP_TRY, because if there is no memory available right now, this may
355          * be rescheduled for later. It is "only" background resync, after all. */
356         peer_req = drbd_alloc_peer_req(mdev, ID_SYNCER /* unused */, sector,
357                                        size, GFP_TRY);
358         if (!peer_req)
359                 goto defer;
360
361         peer_req->w.cb = w_e_send_csum;
362         spin_lock_irq(&mdev->tconn->req_lock);
363         list_add(&peer_req->w.list, &mdev->read_ee);
364         spin_unlock_irq(&mdev->tconn->req_lock);
365
366         atomic_add(size >> 9, &mdev->rs_sect_ev);
367         if (drbd_submit_peer_request(mdev, peer_req, READ, DRBD_FAULT_RS_RD) == 0)
368                 return 0;
369
370         /* If it failed because of ENOMEM, retry should help.  If it failed
371          * because bio_add_page failed (probably broken lower level driver),
372          * retry may or may not help.
373          * If it does not, you may need to force disconnect. */
374         spin_lock_irq(&mdev->tconn->req_lock);
375         list_del(&peer_req->w.list);
376         spin_unlock_irq(&mdev->tconn->req_lock);
377
378         drbd_free_peer_req(mdev, peer_req);
379 defer:
380         put_ldev(mdev);
381         return -EAGAIN;
382 }
383
384 int w_resync_timer(struct drbd_work *w, int cancel)
385 {
386         struct drbd_conf *mdev = w->mdev;
387         switch (mdev->state.conn) {
388         case C_VERIFY_S:
389                 w_make_ov_request(w, cancel);
390                 break;
391         case C_SYNC_TARGET:
392                 w_make_resync_request(w, cancel);
393                 break;
394         }
395
396         return 0;
397 }
398
399 void resync_timer_fn(unsigned long data)
400 {
401         struct drbd_conf *mdev = (struct drbd_conf *) data;
402
403         if (list_empty(&mdev->resync_work.list))
404                 drbd_queue_work(&mdev->tconn->sender_work, &mdev->resync_work);
405 }
406
407 static void fifo_set(struct fifo_buffer *fb, int value)
408 {
409         int i;
410
411         for (i = 0; i < fb->size; i++)
412                 fb->values[i] = value;
413 }
414
415 static int fifo_push(struct fifo_buffer *fb, int value)
416 {
417         int ov;
418
419         ov = fb->values[fb->head_index];
420         fb->values[fb->head_index++] = value;
421
422         if (fb->head_index >= fb->size)
423                 fb->head_index = 0;
424
425         return ov;
426 }
427
428 static void fifo_add_val(struct fifo_buffer *fb, int value)
429 {
430         int i;
431
432         for (i = 0; i < fb->size; i++)
433                 fb->values[i] += value;
434 }
435
436 struct fifo_buffer *fifo_alloc(int fifo_size)
437 {
438         struct fifo_buffer *fb;
439
440         fb = kzalloc(sizeof(struct fifo_buffer) + sizeof(int) * fifo_size, GFP_KERNEL);
441         if (!fb)
442                 return NULL;
443
444         fb->head_index = 0;
445         fb->size = fifo_size;
446         fb->total = 0;
447
448         return fb;
449 }
450
451 static int drbd_rs_controller(struct drbd_conf *mdev)
452 {
453         struct disk_conf *dc;
454         unsigned int sect_in;  /* Number of sectors that came in since the last turn */
455         unsigned int want;     /* The number of sectors we want in the proxy */
456         int req_sect; /* Number of sectors to request in this turn */
457         int correction; /* Number of sectors more we need in the proxy*/
458         int cps; /* correction per invocation of drbd_rs_controller() */
459         int steps; /* Number of time steps to plan ahead */
460         int curr_corr;
461         int max_sect;
462         struct fifo_buffer *plan;
463
464         sect_in = atomic_xchg(&mdev->rs_sect_in, 0); /* Number of sectors that came in */
465         mdev->rs_in_flight -= sect_in;
466
467         dc = rcu_dereference(mdev->ldev->disk_conf);
468         plan = rcu_dereference(mdev->rs_plan_s);
469
470         steps = plan->size; /* (dc->c_plan_ahead * 10 * SLEEP_TIME) / HZ; */
471
472         if (mdev->rs_in_flight + sect_in == 0) { /* At start of resync */
473                 want = ((dc->resync_rate * 2 * SLEEP_TIME) / HZ) * steps;
474         } else { /* normal path */
475                 want = dc->c_fill_target ? dc->c_fill_target :
476                         sect_in * dc->c_delay_target * HZ / (SLEEP_TIME * 10);
477         }
478
479         correction = want - mdev->rs_in_flight - plan->total;
480
481         /* Plan ahead */
482         cps = correction / steps;
483         fifo_add_val(plan, cps);
484         plan->total += cps * steps;
485
486         /* What we do in this step */
487         curr_corr = fifo_push(plan, 0);
488         plan->total -= curr_corr;
489
490         req_sect = sect_in + curr_corr;
491         if (req_sect < 0)
492                 req_sect = 0;
493
494         max_sect = (dc->c_max_rate * 2 * SLEEP_TIME) / HZ;
495         if (req_sect > max_sect)
496                 req_sect = max_sect;
497
498         /*
499         dev_warn(DEV, "si=%u if=%d wa=%u co=%d st=%d cps=%d pl=%d cc=%d rs=%d\n",
500                  sect_in, mdev->rs_in_flight, want, correction,
501                  steps, cps, mdev->rs_planed, curr_corr, req_sect);
502         */
503
504         return req_sect;
505 }
506
507 static int drbd_rs_number_requests(struct drbd_conf *mdev)
508 {
509         int number;
510
511         rcu_read_lock();
512         if (rcu_dereference(mdev->rs_plan_s)->size) {
513                 number = drbd_rs_controller(mdev) >> (BM_BLOCK_SHIFT - 9);
514                 mdev->c_sync_rate = number * HZ * (BM_BLOCK_SIZE / 1024) / SLEEP_TIME;
515         } else {
516                 mdev->c_sync_rate = rcu_dereference(mdev->ldev->disk_conf)->resync_rate;
517                 number = SLEEP_TIME * mdev->c_sync_rate  / ((BM_BLOCK_SIZE / 1024) * HZ);
518         }
519         rcu_read_unlock();
520
521         /* ignore the amount of pending requests, the resync controller should
522          * throttle down to incoming reply rate soon enough anyways. */
523         return number;
524 }
525
526 int w_make_resync_request(struct drbd_work *w, int cancel)
527 {
528         struct drbd_conf *mdev = w->mdev;
529         unsigned long bit;
530         sector_t sector;
531         const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
532         int max_bio_size;
533         int number, rollback_i, size;
534         int align, queued, sndbuf;
535         int i = 0;
536
537         if (unlikely(cancel))
538                 return 0;
539
540         if (mdev->rs_total == 0) {
541                 /* empty resync? */
542                 drbd_resync_finished(mdev);
543                 return 0;
544         }
545
546         if (!get_ldev(mdev)) {
547                 /* Since we only need to access mdev->rsync a
548                    get_ldev_if_state(mdev,D_FAILED) would be sufficient, but
549                    to continue resync with a broken disk makes no sense at
550                    all */
551                 dev_err(DEV, "Disk broke down during resync!\n");
552                 return 0;
553         }
554
555         max_bio_size = queue_max_hw_sectors(mdev->rq_queue) << 9;
556         number = drbd_rs_number_requests(mdev);
557         if (number == 0)
558                 goto requeue;
559
560         for (i = 0; i < number; i++) {
561                 /* Stop generating RS requests, when half of the send buffer is filled */
562                 mutex_lock(&mdev->tconn->data.mutex);
563                 if (mdev->tconn->data.socket) {
564                         queued = mdev->tconn->data.socket->sk->sk_wmem_queued;
565                         sndbuf = mdev->tconn->data.socket->sk->sk_sndbuf;
566                 } else {
567                         queued = 1;
568                         sndbuf = 0;
569                 }
570                 mutex_unlock(&mdev->tconn->data.mutex);
571                 if (queued > sndbuf / 2)
572                         goto requeue;
573
574 next_sector:
575                 size = BM_BLOCK_SIZE;
576                 bit  = drbd_bm_find_next(mdev, mdev->bm_resync_fo);
577
578                 if (bit == DRBD_END_OF_BITMAP) {
579                         mdev->bm_resync_fo = drbd_bm_bits(mdev);
580                         put_ldev(mdev);
581                         return 0;
582                 }
583
584                 sector = BM_BIT_TO_SECT(bit);
585
586                 if (drbd_rs_should_slow_down(mdev, sector) ||
587                     drbd_try_rs_begin_io(mdev, sector)) {
588                         mdev->bm_resync_fo = bit;
589                         goto requeue;
590                 }
591                 mdev->bm_resync_fo = bit + 1;
592
593                 if (unlikely(drbd_bm_test_bit(mdev, bit) == 0)) {
594                         drbd_rs_complete_io(mdev, sector);
595                         goto next_sector;
596                 }
597
598 #if DRBD_MAX_BIO_SIZE > BM_BLOCK_SIZE
599                 /* try to find some adjacent bits.
600                  * we stop if we have already the maximum req size.
601                  *
602                  * Additionally always align bigger requests, in order to
603                  * be prepared for all stripe sizes of software RAIDs.
604                  */
605                 align = 1;
606                 rollback_i = i;
607                 for (;;) {
608                         if (size + BM_BLOCK_SIZE > max_bio_size)
609                                 break;
610
611                         /* Be always aligned */
612                         if (sector & ((1<<(align+3))-1))
613                                 break;
614
615                         /* do not cross extent boundaries */
616                         if (((bit+1) & BM_BLOCKS_PER_BM_EXT_MASK) == 0)
617                                 break;
618                         /* now, is it actually dirty, after all?
619                          * caution, drbd_bm_test_bit is tri-state for some
620                          * obscure reason; ( b == 0 ) would get the out-of-band
621                          * only accidentally right because of the "oddly sized"
622                          * adjustment below */
623                         if (drbd_bm_test_bit(mdev, bit+1) != 1)
624                                 break;
625                         bit++;
626                         size += BM_BLOCK_SIZE;
627                         if ((BM_BLOCK_SIZE << align) <= size)
628                                 align++;
629                         i++;
630                 }
631                 /* if we merged some,
632                  * reset the offset to start the next drbd_bm_find_next from */
633                 if (size > BM_BLOCK_SIZE)
634                         mdev->bm_resync_fo = bit + 1;
635 #endif
636
637                 /* adjust very last sectors, in case we are oddly sized */
638                 if (sector + (size>>9) > capacity)
639                         size = (capacity-sector)<<9;
640                 if (mdev->tconn->agreed_pro_version >= 89 && mdev->tconn->csums_tfm) {
641                         switch (read_for_csum(mdev, sector, size)) {
642                         case -EIO: /* Disk failure */
643                                 put_ldev(mdev);
644                                 return -EIO;
645                         case -EAGAIN: /* allocation failed, or ldev busy */
646                                 drbd_rs_complete_io(mdev, sector);
647                                 mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
648                                 i = rollback_i;
649                                 goto requeue;
650                         case 0:
651                                 /* everything ok */
652                                 break;
653                         default:
654                                 BUG();
655                         }
656                 } else {
657                         int err;
658
659                         inc_rs_pending(mdev);
660                         err = drbd_send_drequest(mdev, P_RS_DATA_REQUEST,
661                                                  sector, size, ID_SYNCER);
662                         if (err) {
663                                 dev_err(DEV, "drbd_send_drequest() failed, aborting...\n");
664                                 dec_rs_pending(mdev);
665                                 put_ldev(mdev);
666                                 return err;
667                         }
668                 }
669         }
670
671         if (mdev->bm_resync_fo >= drbd_bm_bits(mdev)) {
672                 /* last syncer _request_ was sent,
673                  * but the P_RS_DATA_REPLY not yet received.  sync will end (and
674                  * next sync group will resume), as soon as we receive the last
675                  * resync data block, and the last bit is cleared.
676                  * until then resync "work" is "inactive" ...
677                  */
678                 put_ldev(mdev);
679                 return 0;
680         }
681
682  requeue:
683         mdev->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
684         mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
685         put_ldev(mdev);
686         return 0;
687 }
688
689 static int w_make_ov_request(struct drbd_work *w, int cancel)
690 {
691         struct drbd_conf *mdev = w->mdev;
692         int number, i, size;
693         sector_t sector;
694         const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
695         bool stop_sector_reached = false;
696
697         if (unlikely(cancel))
698                 return 1;
699
700         number = drbd_rs_number_requests(mdev);
701
702         sector = mdev->ov_position;
703         for (i = 0; i < number; i++) {
704                 if (sector >= capacity)
705                         return 1;
706
707                 /* We check for "finished" only in the reply path:
708                  * w_e_end_ov_reply().
709                  * We need to send at least one request out. */
710                 stop_sector_reached = i > 0
711                         && verify_can_do_stop_sector(mdev)
712                         && sector >= mdev->ov_stop_sector;
713                 if (stop_sector_reached)
714                         break;
715
716                 size = BM_BLOCK_SIZE;
717
718                 if (drbd_rs_should_slow_down(mdev, sector) ||
719                     drbd_try_rs_begin_io(mdev, sector)) {
720                         mdev->ov_position = sector;
721                         goto requeue;
722                 }
723
724                 if (sector + (size>>9) > capacity)
725                         size = (capacity-sector)<<9;
726
727                 inc_rs_pending(mdev);
728                 if (drbd_send_ov_request(mdev, sector, size)) {
729                         dec_rs_pending(mdev);
730                         return 0;
731                 }
732                 sector += BM_SECT_PER_BIT;
733         }
734         mdev->ov_position = sector;
735
736  requeue:
737         mdev->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
738         if (i == 0 || !stop_sector_reached)
739                 mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
740         return 1;
741 }
742
743 int w_ov_finished(struct drbd_work *w, int cancel)
744 {
745         struct drbd_conf *mdev = w->mdev;
746         kfree(w);
747         ov_out_of_sync_print(mdev);
748         drbd_resync_finished(mdev);
749
750         return 0;
751 }
752
753 static int w_resync_finished(struct drbd_work *w, int cancel)
754 {
755         struct drbd_conf *mdev = w->mdev;
756         kfree(w);
757
758         drbd_resync_finished(mdev);
759
760         return 0;
761 }
762
763 static void ping_peer(struct drbd_conf *mdev)
764 {
765         struct drbd_tconn *tconn = mdev->tconn;
766
767         clear_bit(GOT_PING_ACK, &tconn->flags);
768         request_ping(tconn);
769         wait_event(tconn->ping_wait,
770                    test_bit(GOT_PING_ACK, &tconn->flags) || mdev->state.conn < C_CONNECTED);
771 }
772
773 int drbd_resync_finished(struct drbd_conf *mdev)
774 {
775         unsigned long db, dt, dbdt;
776         unsigned long n_oos;
777         union drbd_state os, ns;
778         struct drbd_work *w;
779         char *khelper_cmd = NULL;
780         int verify_done = 0;
781
782         /* Remove all elements from the resync LRU. Since future actions
783          * might set bits in the (main) bitmap, then the entries in the
784          * resync LRU would be wrong. */
785         if (drbd_rs_del_all(mdev)) {
786                 /* In case this is not possible now, most probably because
787                  * there are P_RS_DATA_REPLY Packets lingering on the worker's
788                  * queue (or even the read operations for those packets
789                  * is not finished by now).   Retry in 100ms. */
790
791                 schedule_timeout_interruptible(HZ / 10);
792                 w = kmalloc(sizeof(struct drbd_work), GFP_ATOMIC);
793                 if (w) {
794                         w->cb = w_resync_finished;
795                         w->mdev = mdev;
796                         drbd_queue_work(&mdev->tconn->sender_work, w);
797                         return 1;
798                 }
799                 dev_err(DEV, "Warn failed to drbd_rs_del_all() and to kmalloc(w).\n");
800         }
801
802         dt = (jiffies - mdev->rs_start - mdev->rs_paused) / HZ;
803         if (dt <= 0)
804                 dt = 1;
805         
806         db = mdev->rs_total;
807         /* adjust for verify start and stop sectors, respective reached position */
808         if (mdev->state.conn == C_VERIFY_S || mdev->state.conn == C_VERIFY_T)
809                 db -= mdev->ov_left;
810
811         dbdt = Bit2KB(db/dt);
812         mdev->rs_paused /= HZ;
813
814         if (!get_ldev(mdev))
815                 goto out;
816
817         ping_peer(mdev);
818
819         spin_lock_irq(&mdev->tconn->req_lock);
820         os = drbd_read_state(mdev);
821
822         verify_done = (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T);
823
824         /* This protects us against multiple calls (that can happen in the presence
825            of application IO), and against connectivity loss just before we arrive here. */
826         if (os.conn <= C_CONNECTED)
827                 goto out_unlock;
828
829         ns = os;
830         ns.conn = C_CONNECTED;
831
832         dev_info(DEV, "%s done (total %lu sec; paused %lu sec; %lu K/sec)\n",
833              verify_done ? "Online verify" : "Resync",
834              dt + mdev->rs_paused, mdev->rs_paused, dbdt);
835
836         n_oos = drbd_bm_total_weight(mdev);
837
838         if (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) {
839                 if (n_oos) {
840                         dev_alert(DEV, "Online verify found %lu %dk block out of sync!\n",
841                               n_oos, Bit2KB(1));
842                         khelper_cmd = "out-of-sync";
843                 }
844         } else {
845                 D_ASSERT((n_oos - mdev->rs_failed) == 0);
846
847                 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T)
848                         khelper_cmd = "after-resync-target";
849
850                 if (mdev->tconn->csums_tfm && mdev->rs_total) {
851                         const unsigned long s = mdev->rs_same_csum;
852                         const unsigned long t = mdev->rs_total;
853                         const int ratio =
854                                 (t == 0)     ? 0 :
855                         (t < 100000) ? ((s*100)/t) : (s/(t/100));
856                         dev_info(DEV, "%u %% had equal checksums, eliminated: %luK; "
857                              "transferred %luK total %luK\n",
858                              ratio,
859                              Bit2KB(mdev->rs_same_csum),
860                              Bit2KB(mdev->rs_total - mdev->rs_same_csum),
861                              Bit2KB(mdev->rs_total));
862                 }
863         }
864
865         if (mdev->rs_failed) {
866                 dev_info(DEV, "            %lu failed blocks\n", mdev->rs_failed);
867
868                 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
869                         ns.disk = D_INCONSISTENT;
870                         ns.pdsk = D_UP_TO_DATE;
871                 } else {
872                         ns.disk = D_UP_TO_DATE;
873                         ns.pdsk = D_INCONSISTENT;
874                 }
875         } else {
876                 ns.disk = D_UP_TO_DATE;
877                 ns.pdsk = D_UP_TO_DATE;
878
879                 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
880                         if (mdev->p_uuid) {
881                                 int i;
882                                 for (i = UI_BITMAP ; i <= UI_HISTORY_END ; i++)
883                                         _drbd_uuid_set(mdev, i, mdev->p_uuid[i]);
884                                 drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_CURRENT]);
885                                 _drbd_uuid_set(mdev, UI_CURRENT, mdev->p_uuid[UI_CURRENT]);
886                         } else {
887                                 dev_err(DEV, "mdev->p_uuid is NULL! BUG\n");
888                         }
889                 }
890
891                 if (!(os.conn == C_VERIFY_S || os.conn == C_VERIFY_T)) {
892                         /* for verify runs, we don't update uuids here,
893                          * so there would be nothing to report. */
894                         drbd_uuid_set_bm(mdev, 0UL);
895                         drbd_print_uuids(mdev, "updated UUIDs");
896                         if (mdev->p_uuid) {
897                                 /* Now the two UUID sets are equal, update what we
898                                  * know of the peer. */
899                                 int i;
900                                 for (i = UI_CURRENT ; i <= UI_HISTORY_END ; i++)
901                                         mdev->p_uuid[i] = mdev->ldev->md.uuid[i];
902                         }
903                 }
904         }
905
906         _drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
907 out_unlock:
908         spin_unlock_irq(&mdev->tconn->req_lock);
909         put_ldev(mdev);
910 out:
911         mdev->rs_total  = 0;
912         mdev->rs_failed = 0;
913         mdev->rs_paused = 0;
914
915         /* reset start sector, if we reached end of device */
916         if (verify_done && mdev->ov_left == 0)
917                 mdev->ov_start_sector = 0;
918
919         drbd_md_sync(mdev);
920
921         if (khelper_cmd)
922                 drbd_khelper(mdev, khelper_cmd);
923
924         return 1;
925 }
926
927 /* helper */
928 static void move_to_net_ee_or_free(struct drbd_conf *mdev, struct drbd_peer_request *peer_req)
929 {
930         if (drbd_peer_req_has_active_page(peer_req)) {
931                 /* This might happen if sendpage() has not finished */
932                 int i = (peer_req->i.size + PAGE_SIZE -1) >> PAGE_SHIFT;
933                 atomic_add(i, &mdev->pp_in_use_by_net);
934                 atomic_sub(i, &mdev->pp_in_use);
935                 spin_lock_irq(&mdev->tconn->req_lock);
936                 list_add_tail(&peer_req->w.list, &mdev->net_ee);
937                 spin_unlock_irq(&mdev->tconn->req_lock);
938                 wake_up(&drbd_pp_wait);
939         } else
940                 drbd_free_peer_req(mdev, peer_req);
941 }
942
943 /**
944  * w_e_end_data_req() - Worker callback, to send a P_DATA_REPLY packet in response to a P_DATA_REQUEST
945  * @mdev:       DRBD device.
946  * @w:          work object.
947  * @cancel:     The connection will be closed anyways
948  */
949 int w_e_end_data_req(struct drbd_work *w, int cancel)
950 {
951         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
952         struct drbd_conf *mdev = w->mdev;
953         int err;
954
955         if (unlikely(cancel)) {
956                 drbd_free_peer_req(mdev, peer_req);
957                 dec_unacked(mdev);
958                 return 0;
959         }
960
961         if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
962                 err = drbd_send_block(mdev, P_DATA_REPLY, peer_req);
963         } else {
964                 if (__ratelimit(&drbd_ratelimit_state))
965                         dev_err(DEV, "Sending NegDReply. sector=%llus.\n",
966                             (unsigned long long)peer_req->i.sector);
967
968                 err = drbd_send_ack(mdev, P_NEG_DREPLY, peer_req);
969         }
970
971         dec_unacked(mdev);
972
973         move_to_net_ee_or_free(mdev, peer_req);
974
975         if (unlikely(err))
976                 dev_err(DEV, "drbd_send_block() failed\n");
977         return err;
978 }
979
980 /**
981  * w_e_end_rsdata_req() - Worker callback to send a P_RS_DATA_REPLY packet in response to a P_RS_DATA_REQUEST
982  * @mdev:       DRBD device.
983  * @w:          work object.
984  * @cancel:     The connection will be closed anyways
985  */
986 int w_e_end_rsdata_req(struct drbd_work *w, int cancel)
987 {
988         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
989         struct drbd_conf *mdev = w->mdev;
990         int err;
991
992         if (unlikely(cancel)) {
993                 drbd_free_peer_req(mdev, peer_req);
994                 dec_unacked(mdev);
995                 return 0;
996         }
997
998         if (get_ldev_if_state(mdev, D_FAILED)) {
999                 drbd_rs_complete_io(mdev, peer_req->i.sector);
1000                 put_ldev(mdev);
1001         }
1002
1003         if (mdev->state.conn == C_AHEAD) {
1004                 err = drbd_send_ack(mdev, P_RS_CANCEL, peer_req);
1005         } else if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1006                 if (likely(mdev->state.pdsk >= D_INCONSISTENT)) {
1007                         inc_rs_pending(mdev);
1008                         err = drbd_send_block(mdev, P_RS_DATA_REPLY, peer_req);
1009                 } else {
1010                         if (__ratelimit(&drbd_ratelimit_state))
1011                                 dev_err(DEV, "Not sending RSDataReply, "
1012                                     "partner DISKLESS!\n");
1013                         err = 0;
1014                 }
1015         } else {
1016                 if (__ratelimit(&drbd_ratelimit_state))
1017                         dev_err(DEV, "Sending NegRSDReply. sector %llus.\n",
1018                             (unsigned long long)peer_req->i.sector);
1019
1020                 err = drbd_send_ack(mdev, P_NEG_RS_DREPLY, peer_req);
1021
1022                 /* update resync data with failure */
1023                 drbd_rs_failed_io(mdev, peer_req->i.sector, peer_req->i.size);
1024         }
1025
1026         dec_unacked(mdev);
1027
1028         move_to_net_ee_or_free(mdev, peer_req);
1029
1030         if (unlikely(err))
1031                 dev_err(DEV, "drbd_send_block() failed\n");
1032         return err;
1033 }
1034
1035 int w_e_end_csum_rs_req(struct drbd_work *w, int cancel)
1036 {
1037         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1038         struct drbd_conf *mdev = w->mdev;
1039         struct digest_info *di;
1040         int digest_size;
1041         void *digest = NULL;
1042         int err, eq = 0;
1043
1044         if (unlikely(cancel)) {
1045                 drbd_free_peer_req(mdev, peer_req);
1046                 dec_unacked(mdev);
1047                 return 0;
1048         }
1049
1050         if (get_ldev(mdev)) {
1051                 drbd_rs_complete_io(mdev, peer_req->i.sector);
1052                 put_ldev(mdev);
1053         }
1054
1055         di = peer_req->digest;
1056
1057         if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1058                 /* quick hack to try to avoid a race against reconfiguration.
1059                  * a real fix would be much more involved,
1060                  * introducing more locking mechanisms */
1061                 if (mdev->tconn->csums_tfm) {
1062                         digest_size = crypto_hash_digestsize(mdev->tconn->csums_tfm);
1063                         D_ASSERT(digest_size == di->digest_size);
1064                         digest = kmalloc(digest_size, GFP_NOIO);
1065                 }
1066                 if (digest) {
1067                         drbd_csum_ee(mdev, mdev->tconn->csums_tfm, peer_req, digest);
1068                         eq = !memcmp(digest, di->digest, digest_size);
1069                         kfree(digest);
1070                 }
1071
1072                 if (eq) {
1073                         drbd_set_in_sync(mdev, peer_req->i.sector, peer_req->i.size);
1074                         /* rs_same_csums unit is BM_BLOCK_SIZE */
1075                         mdev->rs_same_csum += peer_req->i.size >> BM_BLOCK_SHIFT;
1076                         err = drbd_send_ack(mdev, P_RS_IS_IN_SYNC, peer_req);
1077                 } else {
1078                         inc_rs_pending(mdev);
1079                         peer_req->block_id = ID_SYNCER; /* By setting block_id, digest pointer becomes invalid! */
1080                         peer_req->flags &= ~EE_HAS_DIGEST; /* This peer request no longer has a digest pointer */
1081                         kfree(di);
1082                         err = drbd_send_block(mdev, P_RS_DATA_REPLY, peer_req);
1083                 }
1084         } else {
1085                 err = drbd_send_ack(mdev, P_NEG_RS_DREPLY, peer_req);
1086                 if (__ratelimit(&drbd_ratelimit_state))
1087                         dev_err(DEV, "Sending NegDReply. I guess it gets messy.\n");
1088         }
1089
1090         dec_unacked(mdev);
1091         move_to_net_ee_or_free(mdev, peer_req);
1092
1093         if (unlikely(err))
1094                 dev_err(DEV, "drbd_send_block/ack() failed\n");
1095         return err;
1096 }
1097
1098 int w_e_end_ov_req(struct drbd_work *w, int cancel)
1099 {
1100         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1101         struct drbd_conf *mdev = w->mdev;
1102         sector_t sector = peer_req->i.sector;
1103         unsigned int size = peer_req->i.size;
1104         int digest_size;
1105         void *digest;
1106         int err = 0;
1107
1108         if (unlikely(cancel))
1109                 goto out;
1110
1111         digest_size = crypto_hash_digestsize(mdev->tconn->verify_tfm);
1112         digest = kmalloc(digest_size, GFP_NOIO);
1113         if (!digest) {
1114                 err = 1;        /* terminate the connection in case the allocation failed */
1115                 goto out;
1116         }
1117
1118         if (likely(!(peer_req->flags & EE_WAS_ERROR)))
1119                 drbd_csum_ee(mdev, mdev->tconn->verify_tfm, peer_req, digest);
1120         else
1121                 memset(digest, 0, digest_size);
1122
1123         /* Free e and pages before send.
1124          * In case we block on congestion, we could otherwise run into
1125          * some distributed deadlock, if the other side blocks on
1126          * congestion as well, because our receiver blocks in
1127          * drbd_alloc_pages due to pp_in_use > max_buffers. */
1128         drbd_free_peer_req(mdev, peer_req);
1129         peer_req = NULL;
1130         inc_rs_pending(mdev);
1131         err = drbd_send_drequest_csum(mdev, sector, size, digest, digest_size, P_OV_REPLY);
1132         if (err)
1133                 dec_rs_pending(mdev);
1134         kfree(digest);
1135
1136 out:
1137         if (peer_req)
1138                 drbd_free_peer_req(mdev, peer_req);
1139         dec_unacked(mdev);
1140         return err;
1141 }
1142
1143 void drbd_ov_out_of_sync_found(struct drbd_conf *mdev, sector_t sector, int size)
1144 {
1145         if (mdev->ov_last_oos_start + mdev->ov_last_oos_size == sector) {
1146                 mdev->ov_last_oos_size += size>>9;
1147         } else {
1148                 mdev->ov_last_oos_start = sector;
1149                 mdev->ov_last_oos_size = size>>9;
1150         }
1151         drbd_set_out_of_sync(mdev, sector, size);
1152 }
1153
1154 int w_e_end_ov_reply(struct drbd_work *w, int cancel)
1155 {
1156         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1157         struct drbd_conf *mdev = w->mdev;
1158         struct digest_info *di;
1159         void *digest;
1160         sector_t sector = peer_req->i.sector;
1161         unsigned int size = peer_req->i.size;
1162         int digest_size;
1163         int err, eq = 0;
1164         bool stop_sector_reached = false;
1165
1166         if (unlikely(cancel)) {
1167                 drbd_free_peer_req(mdev, peer_req);
1168                 dec_unacked(mdev);
1169                 return 0;
1170         }
1171
1172         /* after "cancel", because after drbd_disconnect/drbd_rs_cancel_all
1173          * the resync lru has been cleaned up already */
1174         if (get_ldev(mdev)) {
1175                 drbd_rs_complete_io(mdev, peer_req->i.sector);
1176                 put_ldev(mdev);
1177         }
1178
1179         di = peer_req->digest;
1180
1181         if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1182                 digest_size = crypto_hash_digestsize(mdev->tconn->verify_tfm);
1183                 digest = kmalloc(digest_size, GFP_NOIO);
1184                 if (digest) {
1185                         drbd_csum_ee(mdev, mdev->tconn->verify_tfm, peer_req, digest);
1186
1187                         D_ASSERT(digest_size == di->digest_size);
1188                         eq = !memcmp(digest, di->digest, digest_size);
1189                         kfree(digest);
1190                 }
1191         }
1192
1193         /* Free peer_req and pages before send.
1194          * In case we block on congestion, we could otherwise run into
1195          * some distributed deadlock, if the other side blocks on
1196          * congestion as well, because our receiver blocks in
1197          * drbd_alloc_pages due to pp_in_use > max_buffers. */
1198         drbd_free_peer_req(mdev, peer_req);
1199         if (!eq)
1200                 drbd_ov_out_of_sync_found(mdev, sector, size);
1201         else
1202                 ov_out_of_sync_print(mdev);
1203
1204         err = drbd_send_ack_ex(mdev, P_OV_RESULT, sector, size,
1205                                eq ? ID_IN_SYNC : ID_OUT_OF_SYNC);
1206
1207         dec_unacked(mdev);
1208
1209         --mdev->ov_left;
1210
1211         /* let's advance progress step marks only for every other megabyte */
1212         if ((mdev->ov_left & 0x200) == 0x200)
1213                 drbd_advance_rs_marks(mdev, mdev->ov_left);
1214
1215         stop_sector_reached = verify_can_do_stop_sector(mdev) &&
1216                 (sector + (size>>9)) >= mdev->ov_stop_sector;
1217
1218         if (mdev->ov_left == 0 || stop_sector_reached) {
1219                 ov_out_of_sync_print(mdev);
1220                 drbd_resync_finished(mdev);
1221         }
1222
1223         return err;
1224 }
1225
1226 int w_prev_work_done(struct drbd_work *w, int cancel)
1227 {
1228         struct drbd_wq_barrier *b = container_of(w, struct drbd_wq_barrier, w);
1229
1230         complete(&b->done);
1231         return 0;
1232 }
1233
1234 /* FIXME
1235  * We need to track the number of pending barrier acks,
1236  * and to be able to wait for them.
1237  * See also comment in drbd_adm_attach before drbd_suspend_io.
1238  */
1239 int drbd_send_barrier(struct drbd_tconn *tconn)
1240 {
1241         struct p_barrier *p;
1242         struct drbd_socket *sock;
1243
1244         sock = &tconn->data;
1245         p = conn_prepare_command(tconn, sock);
1246         if (!p)
1247                 return -EIO;
1248         p->barrier = tconn->send.current_epoch_nr;
1249         p->pad = 0;
1250         tconn->send.current_epoch_writes = 0;
1251
1252         return conn_send_command(tconn, sock, P_BARRIER, sizeof(*p), NULL, 0);
1253 }
1254
1255 int w_send_write_hint(struct drbd_work *w, int cancel)
1256 {
1257         struct drbd_conf *mdev = w->mdev;
1258         struct drbd_socket *sock;
1259
1260         if (cancel)
1261                 return 0;
1262         sock = &mdev->tconn->data;
1263         if (!drbd_prepare_command(mdev, sock))
1264                 return -EIO;
1265         return drbd_send_command(mdev, sock, P_UNPLUG_REMOTE, 0, NULL, 0);
1266 }
1267
1268 static void re_init_if_first_write(struct drbd_tconn *tconn, unsigned int epoch)
1269 {
1270         if (!tconn->send.seen_any_write_yet) {
1271                 tconn->send.seen_any_write_yet = true;
1272                 tconn->send.current_epoch_nr = epoch;
1273                 tconn->send.current_epoch_writes = 0;
1274         }
1275 }
1276
1277 static void maybe_send_barrier(struct drbd_tconn *tconn, unsigned int epoch)
1278 {
1279         /* re-init if first write on this connection */
1280         if (!tconn->send.seen_any_write_yet)
1281                 return;
1282         if (tconn->send.current_epoch_nr != epoch) {
1283                 if (tconn->send.current_epoch_writes)
1284                         drbd_send_barrier(tconn);
1285                 tconn->send.current_epoch_nr = epoch;
1286         }
1287 }
1288
1289 int w_send_out_of_sync(struct drbd_work *w, int cancel)
1290 {
1291         struct drbd_request *req = container_of(w, struct drbd_request, w);
1292         struct drbd_conf *mdev = w->mdev;
1293         struct drbd_tconn *tconn = mdev->tconn;
1294         int err;
1295
1296         if (unlikely(cancel)) {
1297                 req_mod(req, SEND_CANCELED);
1298                 return 0;
1299         }
1300
1301         /* this time, no tconn->send.current_epoch_writes++;
1302          * If it was sent, it was the closing barrier for the last
1303          * replicated epoch, before we went into AHEAD mode.
1304          * No more barriers will be sent, until we leave AHEAD mode again. */
1305         maybe_send_barrier(tconn, req->epoch);
1306
1307         err = drbd_send_out_of_sync(mdev, req);
1308         req_mod(req, OOS_HANDED_TO_NETWORK);
1309
1310         return err;
1311 }
1312
1313 /**
1314  * w_send_dblock() - Worker callback to send a P_DATA packet in order to mirror a write request
1315  * @mdev:       DRBD device.
1316  * @w:          work object.
1317  * @cancel:     The connection will be closed anyways
1318  */
1319 int w_send_dblock(struct drbd_work *w, int cancel)
1320 {
1321         struct drbd_request *req = container_of(w, struct drbd_request, w);
1322         struct drbd_conf *mdev = w->mdev;
1323         struct drbd_tconn *tconn = mdev->tconn;
1324         int err;
1325
1326         if (unlikely(cancel)) {
1327                 req_mod(req, SEND_CANCELED);
1328                 return 0;
1329         }
1330
1331         re_init_if_first_write(tconn, req->epoch);
1332         maybe_send_barrier(tconn, req->epoch);
1333         tconn->send.current_epoch_writes++;
1334
1335         err = drbd_send_dblock(mdev, req);
1336         req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK);
1337
1338         return err;
1339 }
1340
1341 /**
1342  * w_send_read_req() - Worker callback to send a read request (P_DATA_REQUEST) packet
1343  * @mdev:       DRBD device.
1344  * @w:          work object.
1345  * @cancel:     The connection will be closed anyways
1346  */
1347 int w_send_read_req(struct drbd_work *w, int cancel)
1348 {
1349         struct drbd_request *req = container_of(w, struct drbd_request, w);
1350         struct drbd_conf *mdev = w->mdev;
1351         struct drbd_tconn *tconn = mdev->tconn;
1352         int err;
1353
1354         if (unlikely(cancel)) {
1355                 req_mod(req, SEND_CANCELED);
1356                 return 0;
1357         }
1358
1359         /* Even read requests may close a write epoch,
1360          * if there was any yet. */
1361         maybe_send_barrier(tconn, req->epoch);
1362
1363         err = drbd_send_drequest(mdev, P_DATA_REQUEST, req->i.sector, req->i.size,
1364                                  (unsigned long)req);
1365
1366         req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK);
1367
1368         return err;
1369 }
1370
1371 int w_restart_disk_io(struct drbd_work *w, int cancel)
1372 {
1373         struct drbd_request *req = container_of(w, struct drbd_request, w);
1374         struct drbd_conf *mdev = w->mdev;
1375
1376         if (bio_data_dir(req->master_bio) == WRITE && req->rq_state & RQ_IN_ACT_LOG)
1377                 drbd_al_begin_io(mdev, &req->i);
1378
1379         drbd_req_make_private_bio(req, req->master_bio);
1380         req->private_bio->bi_bdev = mdev->ldev->backing_bdev;
1381         generic_make_request(req->private_bio);
1382
1383         return 0;
1384 }
1385
1386 static int _drbd_may_sync_now(struct drbd_conf *mdev)
1387 {
1388         struct drbd_conf *odev = mdev;
1389         int resync_after;
1390
1391         while (1) {
1392                 if (!odev->ldev)
1393                         return 1;
1394                 rcu_read_lock();
1395                 resync_after = rcu_dereference(odev->ldev->disk_conf)->resync_after;
1396                 rcu_read_unlock();
1397                 if (resync_after == -1)
1398                         return 1;
1399                 odev = minor_to_mdev(resync_after);
1400                 if (!expect(odev))
1401                         return 1;
1402                 if ((odev->state.conn >= C_SYNC_SOURCE &&
1403                      odev->state.conn <= C_PAUSED_SYNC_T) ||
1404                     odev->state.aftr_isp || odev->state.peer_isp ||
1405                     odev->state.user_isp)
1406                         return 0;
1407         }
1408 }
1409
1410 /**
1411  * _drbd_pause_after() - Pause resync on all devices that may not resync now
1412  * @mdev:       DRBD device.
1413  *
1414  * Called from process context only (admin command and after_state_ch).
1415  */
1416 static int _drbd_pause_after(struct drbd_conf *mdev)
1417 {
1418         struct drbd_conf *odev;
1419         int i, rv = 0;
1420
1421         rcu_read_lock();
1422         idr_for_each_entry(&minors, odev, i) {
1423                 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1424                         continue;
1425                 if (!_drbd_may_sync_now(odev))
1426                         rv |= (__drbd_set_state(_NS(odev, aftr_isp, 1), CS_HARD, NULL)
1427                                != SS_NOTHING_TO_DO);
1428         }
1429         rcu_read_unlock();
1430
1431         return rv;
1432 }
1433
1434 /**
1435  * _drbd_resume_next() - Resume resync on all devices that may resync now
1436  * @mdev:       DRBD device.
1437  *
1438  * Called from process context only (admin command and worker).
1439  */
1440 static int _drbd_resume_next(struct drbd_conf *mdev)
1441 {
1442         struct drbd_conf *odev;
1443         int i, rv = 0;
1444
1445         rcu_read_lock();
1446         idr_for_each_entry(&minors, odev, i) {
1447                 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1448                         continue;
1449                 if (odev->state.aftr_isp) {
1450                         if (_drbd_may_sync_now(odev))
1451                                 rv |= (__drbd_set_state(_NS(odev, aftr_isp, 0),
1452                                                         CS_HARD, NULL)
1453                                        != SS_NOTHING_TO_DO) ;
1454                 }
1455         }
1456         rcu_read_unlock();
1457         return rv;
1458 }
1459
1460 void resume_next_sg(struct drbd_conf *mdev)
1461 {
1462         write_lock_irq(&global_state_lock);
1463         _drbd_resume_next(mdev);
1464         write_unlock_irq(&global_state_lock);
1465 }
1466
1467 void suspend_other_sg(struct drbd_conf *mdev)
1468 {
1469         write_lock_irq(&global_state_lock);
1470         _drbd_pause_after(mdev);
1471         write_unlock_irq(&global_state_lock);
1472 }
1473
1474 /* caller must hold global_state_lock */
1475 enum drbd_ret_code drbd_resync_after_valid(struct drbd_conf *mdev, int o_minor)
1476 {
1477         struct drbd_conf *odev;
1478         int resync_after;
1479
1480         if (o_minor == -1)
1481                 return NO_ERROR;
1482         if (o_minor < -1 || minor_to_mdev(o_minor) == NULL)
1483                 return ERR_RESYNC_AFTER;
1484
1485         /* check for loops */
1486         odev = minor_to_mdev(o_minor);
1487         while (1) {
1488                 if (odev == mdev)
1489                         return ERR_RESYNC_AFTER_CYCLE;
1490
1491                 rcu_read_lock();
1492                 resync_after = rcu_dereference(odev->ldev->disk_conf)->resync_after;
1493                 rcu_read_unlock();
1494                 /* dependency chain ends here, no cycles. */
1495                 if (resync_after == -1)
1496                         return NO_ERROR;
1497
1498                 /* follow the dependency chain */
1499                 odev = minor_to_mdev(resync_after);
1500         }
1501 }
1502
1503 /* caller must hold global_state_lock */
1504 void drbd_resync_after_changed(struct drbd_conf *mdev)
1505 {
1506         int changes;
1507
1508         do {
1509                 changes  = _drbd_pause_after(mdev);
1510                 changes |= _drbd_resume_next(mdev);
1511         } while (changes);
1512 }
1513
1514 void drbd_rs_controller_reset(struct drbd_conf *mdev)
1515 {
1516         struct fifo_buffer *plan;
1517
1518         atomic_set(&mdev->rs_sect_in, 0);
1519         atomic_set(&mdev->rs_sect_ev, 0);
1520         mdev->rs_in_flight = 0;
1521
1522         /* Updating the RCU protected object in place is necessary since
1523            this function gets called from atomic context.
1524            It is valid since all other updates also lead to an completely
1525            empty fifo */
1526         rcu_read_lock();
1527         plan = rcu_dereference(mdev->rs_plan_s);
1528         plan->total = 0;
1529         fifo_set(plan, 0);
1530         rcu_read_unlock();
1531 }
1532
1533 void start_resync_timer_fn(unsigned long data)
1534 {
1535         struct drbd_conf *mdev = (struct drbd_conf *) data;
1536
1537         drbd_queue_work(&mdev->tconn->sender_work, &mdev->start_resync_work);
1538 }
1539
1540 int w_start_resync(struct drbd_work *w, int cancel)
1541 {
1542         struct drbd_conf *mdev = w->mdev;
1543
1544         if (atomic_read(&mdev->unacked_cnt) || atomic_read(&mdev->rs_pending_cnt)) {
1545                 dev_warn(DEV, "w_start_resync later...\n");
1546                 mdev->start_resync_timer.expires = jiffies + HZ/10;
1547                 add_timer(&mdev->start_resync_timer);
1548                 return 0;
1549         }
1550
1551         drbd_start_resync(mdev, C_SYNC_SOURCE);
1552         clear_bit(AHEAD_TO_SYNC_SOURCE, &mdev->flags);
1553         return 0;
1554 }
1555
1556 /**
1557  * drbd_start_resync() - Start the resync process
1558  * @mdev:       DRBD device.
1559  * @side:       Either C_SYNC_SOURCE or C_SYNC_TARGET
1560  *
1561  * This function might bring you directly into one of the
1562  * C_PAUSED_SYNC_* states.
1563  */
1564 void drbd_start_resync(struct drbd_conf *mdev, enum drbd_conns side)
1565 {
1566         union drbd_state ns;
1567         int r;
1568
1569         if (mdev->state.conn >= C_SYNC_SOURCE && mdev->state.conn < C_AHEAD) {
1570                 dev_err(DEV, "Resync already running!\n");
1571                 return;
1572         }
1573
1574         if (!test_bit(B_RS_H_DONE, &mdev->flags)) {
1575                 if (side == C_SYNC_TARGET) {
1576                         /* Since application IO was locked out during C_WF_BITMAP_T and
1577                            C_WF_SYNC_UUID we are still unmodified. Before going to C_SYNC_TARGET
1578                            we check that we might make the data inconsistent. */
1579                         r = drbd_khelper(mdev, "before-resync-target");
1580                         r = (r >> 8) & 0xff;
1581                         if (r > 0) {
1582                                 dev_info(DEV, "before-resync-target handler returned %d, "
1583                                          "dropping connection.\n", r);
1584                                 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
1585                                 return;
1586                         }
1587                 } else /* C_SYNC_SOURCE */ {
1588                         r = drbd_khelper(mdev, "before-resync-source");
1589                         r = (r >> 8) & 0xff;
1590                         if (r > 0) {
1591                                 if (r == 3) {
1592                                         dev_info(DEV, "before-resync-source handler returned %d, "
1593                                                  "ignoring. Old userland tools?", r);
1594                                 } else {
1595                                         dev_info(DEV, "before-resync-source handler returned %d, "
1596                                                  "dropping connection.\n", r);
1597                                         conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
1598                                         return;
1599                                 }
1600                         }
1601                 }
1602         }
1603
1604         if (current == mdev->tconn->worker.task) {
1605                 /* The worker should not sleep waiting for state_mutex,
1606                    that can take long */
1607                 if (!mutex_trylock(mdev->state_mutex)) {
1608                         set_bit(B_RS_H_DONE, &mdev->flags);
1609                         mdev->start_resync_timer.expires = jiffies + HZ/5;
1610                         add_timer(&mdev->start_resync_timer);
1611                         return;
1612                 }
1613         } else {
1614                 mutex_lock(mdev->state_mutex);
1615         }
1616         clear_bit(B_RS_H_DONE, &mdev->flags);
1617
1618         write_lock_irq(&global_state_lock);
1619         if (!get_ldev_if_state(mdev, D_NEGOTIATING)) {
1620                 write_unlock_irq(&global_state_lock);
1621                 mutex_unlock(mdev->state_mutex);
1622                 return;
1623         }
1624
1625         ns = drbd_read_state(mdev);
1626
1627         ns.aftr_isp = !_drbd_may_sync_now(mdev);
1628
1629         ns.conn = side;
1630
1631         if (side == C_SYNC_TARGET)
1632                 ns.disk = D_INCONSISTENT;
1633         else /* side == C_SYNC_SOURCE */
1634                 ns.pdsk = D_INCONSISTENT;
1635
1636         r = __drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
1637         ns = drbd_read_state(mdev);
1638
1639         if (ns.conn < C_CONNECTED)
1640                 r = SS_UNKNOWN_ERROR;
1641
1642         if (r == SS_SUCCESS) {
1643                 unsigned long tw = drbd_bm_total_weight(mdev);
1644                 unsigned long now = jiffies;
1645                 int i;
1646
1647                 mdev->rs_failed    = 0;
1648                 mdev->rs_paused    = 0;
1649                 mdev->rs_same_csum = 0;
1650                 mdev->rs_last_events = 0;
1651                 mdev->rs_last_sect_ev = 0;
1652                 mdev->rs_total     = tw;
1653                 mdev->rs_start     = now;
1654                 for (i = 0; i < DRBD_SYNC_MARKS; i++) {
1655                         mdev->rs_mark_left[i] = tw;
1656                         mdev->rs_mark_time[i] = now;
1657                 }
1658                 _drbd_pause_after(mdev);
1659         }
1660         write_unlock_irq(&global_state_lock);
1661
1662         if (r == SS_SUCCESS) {
1663                 dev_info(DEV, "Began resync as %s (will sync %lu KB [%lu bits set]).\n",
1664                      drbd_conn_str(ns.conn),
1665                      (unsigned long) mdev->rs_total << (BM_BLOCK_SHIFT-10),
1666                      (unsigned long) mdev->rs_total);
1667                 if (side == C_SYNC_TARGET)
1668                         mdev->bm_resync_fo = 0;
1669
1670                 /* Since protocol 96, we must serialize drbd_gen_and_send_sync_uuid
1671                  * with w_send_oos, or the sync target will get confused as to
1672                  * how much bits to resync.  We cannot do that always, because for an
1673                  * empty resync and protocol < 95, we need to do it here, as we call
1674                  * drbd_resync_finished from here in that case.
1675                  * We drbd_gen_and_send_sync_uuid here for protocol < 96,
1676                  * and from after_state_ch otherwise. */
1677                 if (side == C_SYNC_SOURCE && mdev->tconn->agreed_pro_version < 96)
1678                         drbd_gen_and_send_sync_uuid(mdev);
1679
1680                 if (mdev->tconn->agreed_pro_version < 95 && mdev->rs_total == 0) {
1681                         /* This still has a race (about when exactly the peers
1682                          * detect connection loss) that can lead to a full sync
1683                          * on next handshake. In 8.3.9 we fixed this with explicit
1684                          * resync-finished notifications, but the fix
1685                          * introduces a protocol change.  Sleeping for some
1686                          * time longer than the ping interval + timeout on the
1687                          * SyncSource, to give the SyncTarget the chance to
1688                          * detect connection loss, then waiting for a ping
1689                          * response (implicit in drbd_resync_finished) reduces
1690                          * the race considerably, but does not solve it. */
1691                         if (side == C_SYNC_SOURCE) {
1692                                 struct net_conf *nc;
1693                                 int timeo;
1694
1695                                 rcu_read_lock();
1696                                 nc = rcu_dereference(mdev->tconn->net_conf);
1697                                 timeo = nc->ping_int * HZ + nc->ping_timeo * HZ / 9;
1698                                 rcu_read_unlock();
1699                                 schedule_timeout_interruptible(timeo);
1700                         }
1701                         drbd_resync_finished(mdev);
1702                 }
1703
1704                 drbd_rs_controller_reset(mdev);
1705                 /* ns.conn may already be != mdev->state.conn,
1706                  * we may have been paused in between, or become paused until
1707                  * the timer triggers.
1708                  * No matter, that is handled in resync_timer_fn() */
1709                 if (ns.conn == C_SYNC_TARGET)
1710                         mod_timer(&mdev->resync_timer, jiffies);
1711
1712                 drbd_md_sync(mdev);
1713         }
1714         put_ldev(mdev);
1715         mutex_unlock(mdev->state_mutex);
1716 }
1717
1718 /* If the resource already closed the current epoch, but we did not
1719  * (because we have not yet seen new requests), we should send the
1720  * corresponding barrier now.  Must be checked within the same spinlock
1721  * that is used to check for new requests. */
1722 bool need_to_send_barrier(struct drbd_tconn *connection)
1723 {
1724         if (!connection->send.seen_any_write_yet)
1725                 return false;
1726
1727         /* Skip barriers that do not contain any writes.
1728          * This may happen during AHEAD mode. */
1729         if (!connection->send.current_epoch_writes)
1730                 return false;
1731
1732         /* ->req_lock is held when requests are queued on
1733          * connection->sender_work, and put into ->transfer_log.
1734          * It is also held when ->current_tle_nr is increased.
1735          * So either there are already new requests queued,
1736          * and corresponding barriers will be send there.
1737          * Or nothing new is queued yet, so the difference will be 1.
1738          */
1739         if (atomic_read(&connection->current_tle_nr) !=
1740             connection->send.current_epoch_nr + 1)
1741                 return false;
1742
1743         return true;
1744 }
1745
1746 bool dequeue_work_batch(struct drbd_work_queue *queue, struct list_head *work_list)
1747 {
1748         spin_lock_irq(&queue->q_lock);
1749         list_splice_init(&queue->q, work_list);
1750         spin_unlock_irq(&queue->q_lock);
1751         return !list_empty(work_list);
1752 }
1753
1754 bool dequeue_work_item(struct drbd_work_queue *queue, struct list_head *work_list)
1755 {
1756         spin_lock_irq(&queue->q_lock);
1757         if (!list_empty(&queue->q))
1758                 list_move(queue->q.next, work_list);
1759         spin_unlock_irq(&queue->q_lock);
1760         return !list_empty(work_list);
1761 }
1762
1763 void wait_for_work(struct drbd_tconn *connection, struct list_head *work_list)
1764 {
1765         DEFINE_WAIT(wait);
1766         struct net_conf *nc;
1767         int uncork, cork;
1768
1769         dequeue_work_item(&connection->sender_work, work_list);
1770         if (!list_empty(work_list))
1771                 return;
1772
1773         /* Still nothing to do?
1774          * Maybe we still need to close the current epoch,
1775          * even if no new requests are queued yet.
1776          *
1777          * Also, poke TCP, just in case.
1778          * Then wait for new work (or signal). */
1779         rcu_read_lock();
1780         nc = rcu_dereference(connection->net_conf);
1781         uncork = nc ? nc->tcp_cork : 0;
1782         rcu_read_unlock();
1783         if (uncork) {
1784                 mutex_lock(&connection->data.mutex);
1785                 if (connection->data.socket)
1786                         drbd_tcp_uncork(connection->data.socket);
1787                 mutex_unlock(&connection->data.mutex);
1788         }
1789
1790         for (;;) {
1791                 int send_barrier;
1792                 prepare_to_wait(&connection->sender_work.q_wait, &wait, TASK_INTERRUPTIBLE);
1793                 spin_lock_irq(&connection->req_lock);
1794                 spin_lock(&connection->sender_work.q_lock);     /* FIXME get rid of this one? */
1795                 /* dequeue single item only,
1796                  * we still use drbd_queue_work_front() in some places */
1797                 if (!list_empty(&connection->sender_work.q))
1798                         list_move(connection->sender_work.q.next, work_list);
1799                 spin_unlock(&connection->sender_work.q_lock);   /* FIXME get rid of this one? */
1800                 if (!list_empty(work_list) || signal_pending(current)) {
1801                         spin_unlock_irq(&connection->req_lock);
1802                         break;
1803                 }
1804                 send_barrier = need_to_send_barrier(connection);
1805                 spin_unlock_irq(&connection->req_lock);
1806                 if (send_barrier) {
1807                         drbd_send_barrier(connection);
1808                         connection->send.current_epoch_nr++;
1809                 }
1810                 schedule();
1811                 /* may be woken up for other things but new work, too,
1812                  * e.g. if the current epoch got closed.
1813                  * In which case we send the barrier above. */
1814         }
1815         finish_wait(&connection->sender_work.q_wait, &wait);
1816
1817         /* someone may have changed the config while we have been waiting above. */
1818         rcu_read_lock();
1819         nc = rcu_dereference(connection->net_conf);
1820         cork = nc ? nc->tcp_cork : 0;
1821         rcu_read_unlock();
1822         mutex_lock(&connection->data.mutex);
1823         if (connection->data.socket) {
1824                 if (cork)
1825                         drbd_tcp_cork(connection->data.socket);
1826                 else if (!uncork)
1827                         drbd_tcp_uncork(connection->data.socket);
1828         }
1829         mutex_unlock(&connection->data.mutex);
1830 }
1831
1832 int drbd_worker(struct drbd_thread *thi)
1833 {
1834         struct drbd_tconn *tconn = thi->tconn;
1835         struct drbd_work *w = NULL;
1836         struct drbd_conf *mdev;
1837         LIST_HEAD(work_list);
1838         int vnr;
1839
1840         while (get_t_state(thi) == RUNNING) {
1841                 drbd_thread_current_set_cpu(thi);
1842
1843                 /* as long as we use drbd_queue_work_front(),
1844                  * we may only dequeue single work items here, not batches. */
1845                 if (list_empty(&work_list))
1846                         wait_for_work(tconn, &work_list);
1847
1848                 if (signal_pending(current)) {
1849                         flush_signals(current);
1850                         if (get_t_state(thi) == RUNNING) {
1851                                 conn_warn(tconn, "Worker got an unexpected signal\n");
1852                                 continue;
1853                         }
1854                         break;
1855                 }
1856
1857                 if (get_t_state(thi) != RUNNING)
1858                         break;
1859
1860                 while (!list_empty(&work_list)) {
1861                         w = list_first_entry(&work_list, struct drbd_work, list);
1862                         list_del_init(&w->list);
1863                         if (w->cb(w, tconn->cstate < C_WF_REPORT_PARAMS) == 0)
1864                                 continue;
1865                         if (tconn->cstate >= C_WF_REPORT_PARAMS)
1866                                 conn_request_state(tconn, NS(conn, C_NETWORK_FAILURE), CS_HARD);
1867                 }
1868         }
1869
1870         do {
1871                 while (!list_empty(&work_list)) {
1872                         w = list_first_entry(&work_list, struct drbd_work, list);
1873                         list_del_init(&w->list);
1874                         w->cb(w, 1);
1875                 }
1876                 dequeue_work_batch(&tconn->sender_work, &work_list);
1877         } while (!list_empty(&work_list));
1878
1879         rcu_read_lock();
1880         idr_for_each_entry(&tconn->volumes, mdev, vnr) {
1881                 D_ASSERT(mdev->state.disk == D_DISKLESS && mdev->state.conn == C_STANDALONE);
1882                 kref_get(&mdev->kref);
1883                 rcu_read_unlock();
1884                 drbd_mdev_cleanup(mdev);
1885                 kref_put(&mdev->kref, &drbd_minor_destroy);
1886                 rcu_read_lock();
1887         }
1888         rcu_read_unlock();
1889
1890         return 0;
1891 }