592f0c949fd0b12ef83692cc350269511666ca54
[platform/adaptation/renesas_rcar/renesas_kernel.git] / drivers / block / drbd / drbd_main.c
1 /*
2    drbd.c
3
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10    Thanks to Carter Burden, Bart Grantham and Gennadiy Nerubayev
11    from Logicworks, Inc. for making SDP replication support possible.
12
13    drbd is free software; you can redistribute it and/or modify
14    it under the terms of the GNU General Public License as published by
15    the Free Software Foundation; either version 2, or (at your option)
16    any later version.
17
18    drbd is distributed in the hope that it will be useful,
19    but WITHOUT ANY WARRANTY; without even the implied warranty of
20    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
21    GNU General Public License for more details.
22
23    You should have received a copy of the GNU General Public License
24    along with drbd; see the file COPYING.  If not, write to
25    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
26
27  */
28
29 #include <linux/module.h>
30 #include <linux/drbd.h>
31 #include <asm/uaccess.h>
32 #include <asm/types.h>
33 #include <net/sock.h>
34 #include <linux/ctype.h>
35 #include <linux/mutex.h>
36 #include <linux/fs.h>
37 #include <linux/file.h>
38 #include <linux/proc_fs.h>
39 #include <linux/init.h>
40 #include <linux/mm.h>
41 #include <linux/memcontrol.h>
42 #include <linux/mm_inline.h>
43 #include <linux/slab.h>
44 #include <linux/random.h>
45 #include <linux/reboot.h>
46 #include <linux/notifier.h>
47 #include <linux/kthread.h>
48
49 #define __KERNEL_SYSCALLS__
50 #include <linux/unistd.h>
51 #include <linux/vmalloc.h>
52
53 #include <linux/drbd_limits.h>
54 #include "drbd_int.h"
55 #include "drbd_req.h" /* only for _req_mod in tl_release and tl_clear */
56
57 #include "drbd_vli.h"
58
59 static DEFINE_MUTEX(drbd_main_mutex);
60 int drbdd_init(struct drbd_thread *);
61 int drbd_worker(struct drbd_thread *);
62 int drbd_asender(struct drbd_thread *);
63
64 int drbd_init(void);
65 static int drbd_open(struct block_device *bdev, fmode_t mode);
66 static int drbd_release(struct gendisk *gd, fmode_t mode);
67 static int w_md_sync(struct drbd_work *w, int unused);
68 static void md_sync_timer_fn(unsigned long data);
69 static int w_bitmap_io(struct drbd_work *w, int unused);
70 static int w_go_diskless(struct drbd_work *w, int unused);
71
72 MODULE_AUTHOR("Philipp Reisner <phil@linbit.com>, "
73               "Lars Ellenberg <lars@linbit.com>");
74 MODULE_DESCRIPTION("drbd - Distributed Replicated Block Device v" REL_VERSION);
75 MODULE_VERSION(REL_VERSION);
76 MODULE_LICENSE("GPL");
77 MODULE_PARM_DESC(minor_count, "Maximum number of drbd devices ("
78                  __stringify(DRBD_MINOR_COUNT_MIN) "-" __stringify(DRBD_MINOR_COUNT_MAX) ")");
79 MODULE_ALIAS_BLOCKDEV_MAJOR(DRBD_MAJOR);
80
81 #include <linux/moduleparam.h>
82 /* allow_open_on_secondary */
83 MODULE_PARM_DESC(allow_oos, "DONT USE!");
84 /* thanks to these macros, if compiled into the kernel (not-module),
85  * this becomes the boot parameter drbd.minor_count */
86 module_param(minor_count, uint, 0444);
87 module_param(disable_sendpage, bool, 0644);
88 module_param(allow_oos, bool, 0);
89 module_param(cn_idx, uint, 0444);
90 module_param(proc_details, int, 0644);
91
92 #ifdef CONFIG_DRBD_FAULT_INJECTION
93 int enable_faults;
94 int fault_rate;
95 static int fault_count;
96 int fault_devs;
97 /* bitmap of enabled faults */
98 module_param(enable_faults, int, 0664);
99 /* fault rate % value - applies to all enabled faults */
100 module_param(fault_rate, int, 0664);
101 /* count of faults inserted */
102 module_param(fault_count, int, 0664);
103 /* bitmap of devices to insert faults on */
104 module_param(fault_devs, int, 0644);
105 #endif
106
107 /* module parameter, defined */
108 unsigned int minor_count = DRBD_MINOR_COUNT_DEF;
109 int disable_sendpage;
110 int allow_oos;
111 unsigned int cn_idx = CN_IDX_DRBD;
112 int proc_details;       /* Detail level in proc drbd*/
113
114 /* Module parameter for setting the user mode helper program
115  * to run. Default is /sbin/drbdadm */
116 char usermode_helper[80] = "/sbin/drbdadm";
117
118 module_param_string(usermode_helper, usermode_helper, sizeof(usermode_helper), 0644);
119
120 /* in 2.6.x, our device mapping and config info contains our virtual gendisks
121  * as member "struct gendisk *vdisk;"
122  */
123 struct drbd_conf **minor_table;
124 struct list_head drbd_tconns;  /* list of struct drbd_tconn */
125
126 struct kmem_cache *drbd_request_cache;
127 struct kmem_cache *drbd_ee_cache;       /* peer requests */
128 struct kmem_cache *drbd_bm_ext_cache;   /* bitmap extents */
129 struct kmem_cache *drbd_al_ext_cache;   /* activity log extents */
130 mempool_t *drbd_request_mempool;
131 mempool_t *drbd_ee_mempool;
132
133 /* I do not use a standard mempool, because:
134    1) I want to hand out the pre-allocated objects first.
135    2) I want to be able to interrupt sleeping allocation with a signal.
136    Note: This is a single linked list, the next pointer is the private
137          member of struct page.
138  */
139 struct page *drbd_pp_pool;
140 spinlock_t   drbd_pp_lock;
141 int          drbd_pp_vacant;
142 wait_queue_head_t drbd_pp_wait;
143
144 DEFINE_RATELIMIT_STATE(drbd_ratelimit_state, 5 * HZ, 5);
145
146 static const struct block_device_operations drbd_ops = {
147         .owner =   THIS_MODULE,
148         .open =    drbd_open,
149         .release = drbd_release,
150 };
151
152 #define ARRY_SIZE(A) (sizeof(A)/sizeof(A[0]))
153
154 #ifdef __CHECKER__
155 /* When checking with sparse, and this is an inline function, sparse will
156    give tons of false positives. When this is a real functions sparse works.
157  */
158 int _get_ldev_if_state(struct drbd_conf *mdev, enum drbd_disk_state mins)
159 {
160         int io_allowed;
161
162         atomic_inc(&mdev->local_cnt);
163         io_allowed = (mdev->state.disk >= mins);
164         if (!io_allowed) {
165                 if (atomic_dec_and_test(&mdev->local_cnt))
166                         wake_up(&mdev->misc_wait);
167         }
168         return io_allowed;
169 }
170
171 #endif
172
173 /**
174  * DOC: The transfer log
175  *
176  * The transfer log is a single linked list of &struct drbd_tl_epoch objects.
177  * mdev->tconn->newest_tle points to the head, mdev->tconn->oldest_tle points to the tail
178  * of the list. There is always at least one &struct drbd_tl_epoch object.
179  *
180  * Each &struct drbd_tl_epoch has a circular double linked list of requests
181  * attached.
182  */
183 static int tl_init(struct drbd_conf *mdev)
184 {
185         struct drbd_tl_epoch *b;
186
187         /* during device minor initialization, we may well use GFP_KERNEL */
188         b = kmalloc(sizeof(struct drbd_tl_epoch), GFP_KERNEL);
189         if (!b)
190                 return 0;
191         INIT_LIST_HEAD(&b->requests);
192         INIT_LIST_HEAD(&b->w.list);
193         b->next = NULL;
194         b->br_number = 4711;
195         b->n_writes = 0;
196         b->w.cb = NULL; /* if this is != NULL, we need to dec_ap_pending in tl_clear */
197
198         mdev->tconn->oldest_tle = b;
199         mdev->tconn->newest_tle = b;
200         INIT_LIST_HEAD(&mdev->tconn->out_of_sequence_requests);
201
202         return 1;
203 }
204
205 static void tl_cleanup(struct drbd_conf *mdev)
206 {
207         D_ASSERT(mdev->tconn->oldest_tle == mdev->tconn->newest_tle);
208         D_ASSERT(list_empty(&mdev->tconn->out_of_sequence_requests));
209         kfree(mdev->tconn->oldest_tle);
210         mdev->tconn->oldest_tle = NULL;
211         kfree(mdev->tconn->unused_spare_tle);
212         mdev->tconn->unused_spare_tle = NULL;
213 }
214
215 /**
216  * _tl_add_barrier() - Adds a barrier to the transfer log
217  * @mdev:       DRBD device.
218  * @new:        Barrier to be added before the current head of the TL.
219  *
220  * The caller must hold the req_lock.
221  */
222 void _tl_add_barrier(struct drbd_conf *mdev, struct drbd_tl_epoch *new)
223 {
224         struct drbd_tl_epoch *newest_before;
225
226         INIT_LIST_HEAD(&new->requests);
227         INIT_LIST_HEAD(&new->w.list);
228         new->w.cb = NULL; /* if this is != NULL, we need to dec_ap_pending in tl_clear */
229         new->next = NULL;
230         new->n_writes = 0;
231
232         newest_before = mdev->tconn->newest_tle;
233         /* never send a barrier number == 0, because that is special-cased
234          * when using TCQ for our write ordering code */
235         new->br_number = (newest_before->br_number+1) ?: 1;
236         if (mdev->tconn->newest_tle != new) {
237                 mdev->tconn->newest_tle->next = new;
238                 mdev->tconn->newest_tle = new;
239         }
240 }
241
242 /**
243  * tl_release() - Free or recycle the oldest &struct drbd_tl_epoch object of the TL
244  * @mdev:       DRBD device.
245  * @barrier_nr: Expected identifier of the DRBD write barrier packet.
246  * @set_size:   Expected number of requests before that barrier.
247  *
248  * In case the passed barrier_nr or set_size does not match the oldest
249  * &struct drbd_tl_epoch objects this function will cause a termination
250  * of the connection.
251  */
252 void tl_release(struct drbd_conf *mdev, unsigned int barrier_nr,
253                        unsigned int set_size)
254 {
255         struct drbd_tl_epoch *b, *nob; /* next old barrier */
256         struct list_head *le, *tle;
257         struct drbd_request *r;
258
259         spin_lock_irq(&mdev->tconn->req_lock);
260
261         b = mdev->tconn->oldest_tle;
262
263         /* first some paranoia code */
264         if (b == NULL) {
265                 dev_err(DEV, "BAD! BarrierAck #%u received, but no epoch in tl!?\n",
266                         barrier_nr);
267                 goto bail;
268         }
269         if (b->br_number != barrier_nr) {
270                 dev_err(DEV, "BAD! BarrierAck #%u received, expected #%u!\n",
271                         barrier_nr, b->br_number);
272                 goto bail;
273         }
274         if (b->n_writes != set_size) {
275                 dev_err(DEV, "BAD! BarrierAck #%u received with n_writes=%u, expected n_writes=%u!\n",
276                         barrier_nr, set_size, b->n_writes);
277                 goto bail;
278         }
279
280         /* Clean up list of requests processed during current epoch */
281         list_for_each_safe(le, tle, &b->requests) {
282                 r = list_entry(le, struct drbd_request, tl_requests);
283                 _req_mod(r, BARRIER_ACKED);
284         }
285         /* There could be requests on the list waiting for completion
286            of the write to the local disk. To avoid corruptions of
287            slab's data structures we have to remove the lists head.
288
289            Also there could have been a barrier ack out of sequence, overtaking
290            the write acks - which would be a bug and violating write ordering.
291            To not deadlock in case we lose connection while such requests are
292            still pending, we need some way to find them for the
293            _req_mode(CONNECTION_LOST_WHILE_PENDING).
294
295            These have been list_move'd to the out_of_sequence_requests list in
296            _req_mod(, BARRIER_ACKED) above.
297            */
298         list_del_init(&b->requests);
299
300         nob = b->next;
301         if (test_and_clear_bit(CREATE_BARRIER, &mdev->flags)) {
302                 _tl_add_barrier(mdev, b);
303                 if (nob)
304                         mdev->tconn->oldest_tle = nob;
305                 /* if nob == NULL b was the only barrier, and becomes the new
306                    barrier. Therefore mdev->tconn->oldest_tle points already to b */
307         } else {
308                 D_ASSERT(nob != NULL);
309                 mdev->tconn->oldest_tle = nob;
310                 kfree(b);
311         }
312
313         spin_unlock_irq(&mdev->tconn->req_lock);
314         dec_ap_pending(mdev);
315
316         return;
317
318 bail:
319         spin_unlock_irq(&mdev->tconn->req_lock);
320         drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
321 }
322
323
324 /**
325  * _tl_restart() - Walks the transfer log, and applies an action to all requests
326  * @mdev:       DRBD device.
327  * @what:       The action/event to perform with all request objects
328  *
329  * @what might be one of CONNECTION_LOST_WHILE_PENDING, RESEND, FAIL_FROZEN_DISK_IO,
330  * RESTART_FROZEN_DISK_IO.
331  */
332 void _tl_restart(struct drbd_conf *mdev, enum drbd_req_event what)
333 {
334         struct drbd_tl_epoch *b, *tmp, **pn;
335         struct list_head *le, *tle, carry_reads;
336         struct drbd_request *req;
337         int rv, n_writes, n_reads;
338
339         b = mdev->tconn->oldest_tle;
340         pn = &mdev->tconn->oldest_tle;
341         while (b) {
342                 n_writes = 0;
343                 n_reads = 0;
344                 INIT_LIST_HEAD(&carry_reads);
345                 list_for_each_safe(le, tle, &b->requests) {
346                         req = list_entry(le, struct drbd_request, tl_requests);
347                         rv = _req_mod(req, what);
348
349                         n_writes += (rv & MR_WRITE) >> MR_WRITE_SHIFT;
350                         n_reads  += (rv & MR_READ) >> MR_READ_SHIFT;
351                 }
352                 tmp = b->next;
353
354                 if (n_writes) {
355                         if (what == RESEND) {
356                                 b->n_writes = n_writes;
357                                 if (b->w.cb == NULL) {
358                                         b->w.cb = w_send_barrier;
359                                         inc_ap_pending(mdev);
360                                         set_bit(CREATE_BARRIER, &mdev->flags);
361                                 }
362
363                                 drbd_queue_work(&mdev->tconn->data.work, &b->w);
364                         }
365                         pn = &b->next;
366                 } else {
367                         if (n_reads)
368                                 list_add(&carry_reads, &b->requests);
369                         /* there could still be requests on that ring list,
370                          * in case local io is still pending */
371                         list_del(&b->requests);
372
373                         /* dec_ap_pending corresponding to queue_barrier.
374                          * the newest barrier may not have been queued yet,
375                          * in which case w.cb is still NULL. */
376                         if (b->w.cb != NULL)
377                                 dec_ap_pending(mdev);
378
379                         if (b == mdev->tconn->newest_tle) {
380                                 /* recycle, but reinit! */
381                                 D_ASSERT(tmp == NULL);
382                                 INIT_LIST_HEAD(&b->requests);
383                                 list_splice(&carry_reads, &b->requests);
384                                 INIT_LIST_HEAD(&b->w.list);
385                                 b->w.cb = NULL;
386                                 b->br_number = net_random();
387                                 b->n_writes = 0;
388
389                                 *pn = b;
390                                 break;
391                         }
392                         *pn = tmp;
393                         kfree(b);
394                 }
395                 b = tmp;
396                 list_splice(&carry_reads, &b->requests);
397         }
398 }
399
400
401 /**
402  * tl_clear() - Clears all requests and &struct drbd_tl_epoch objects out of the TL
403  * @mdev:       DRBD device.
404  *
405  * This is called after the connection to the peer was lost. The storage covered
406  * by the requests on the transfer gets marked as our of sync. Called from the
407  * receiver thread and the worker thread.
408  */
409 void tl_clear(struct drbd_conf *mdev)
410 {
411         struct list_head *le, *tle;
412         struct drbd_request *r;
413
414         spin_lock_irq(&mdev->tconn->req_lock);
415
416         _tl_restart(mdev, CONNECTION_LOST_WHILE_PENDING);
417
418         /* we expect this list to be empty. */
419         D_ASSERT(list_empty(&mdev->tconn->out_of_sequence_requests));
420
421         /* but just in case, clean it up anyways! */
422         list_for_each_safe(le, tle, &mdev->tconn->out_of_sequence_requests) {
423                 r = list_entry(le, struct drbd_request, tl_requests);
424                 /* It would be nice to complete outside of spinlock.
425                  * But this is easier for now. */
426                 _req_mod(r, CONNECTION_LOST_WHILE_PENDING);
427         }
428
429         /* ensure bit indicating barrier is required is clear */
430         clear_bit(CREATE_BARRIER, &mdev->flags);
431
432         spin_unlock_irq(&mdev->tconn->req_lock);
433 }
434
435 void tl_restart(struct drbd_conf *mdev, enum drbd_req_event what)
436 {
437         spin_lock_irq(&mdev->tconn->req_lock);
438         _tl_restart(mdev, what);
439         spin_unlock_irq(&mdev->tconn->req_lock);
440 }
441
442 static int drbd_thread_setup(void *arg)
443 {
444         struct drbd_thread *thi = (struct drbd_thread *) arg;
445         struct drbd_tconn *tconn = thi->tconn;
446         unsigned long flags;
447         int retval;
448
449         snprintf(current->comm, sizeof(current->comm), "drbd_%c_%s",
450                  thi->name[0], thi->tconn->name);
451
452 restart:
453         retval = thi->function(thi);
454
455         spin_lock_irqsave(&thi->t_lock, flags);
456
457         /* if the receiver has been "EXITING", the last thing it did
458          * was set the conn state to "StandAlone",
459          * if now a re-connect request comes in, conn state goes C_UNCONNECTED,
460          * and receiver thread will be "started".
461          * drbd_thread_start needs to set "RESTARTING" in that case.
462          * t_state check and assignment needs to be within the same spinlock,
463          * so either thread_start sees EXITING, and can remap to RESTARTING,
464          * or thread_start see NONE, and can proceed as normal.
465          */
466
467         if (thi->t_state == RESTARTING) {
468                 conn_info(tconn, "Restarting %s thread\n", thi->name);
469                 thi->t_state = RUNNING;
470                 spin_unlock_irqrestore(&thi->t_lock, flags);
471                 goto restart;
472         }
473
474         thi->task = NULL;
475         thi->t_state = NONE;
476         smp_mb();
477         complete(&thi->stop);
478         spin_unlock_irqrestore(&thi->t_lock, flags);
479
480         conn_info(tconn, "Terminating %s\n", current->comm);
481
482         /* Release mod reference taken when thread was started */
483         module_put(THIS_MODULE);
484         return retval;
485 }
486
487 static void drbd_thread_init(struct drbd_tconn *tconn, struct drbd_thread *thi,
488                              int (*func) (struct drbd_thread *), char *name)
489 {
490         spin_lock_init(&thi->t_lock);
491         thi->task    = NULL;
492         thi->t_state = NONE;
493         thi->function = func;
494         thi->tconn = tconn;
495         strncpy(thi->name, name, ARRAY_SIZE(thi->name));
496 }
497
498 int drbd_thread_start(struct drbd_thread *thi)
499 {
500         struct drbd_tconn *tconn = thi->tconn;
501         struct task_struct *nt;
502         unsigned long flags;
503
504         /* is used from state engine doing drbd_thread_stop_nowait,
505          * while holding the req lock irqsave */
506         spin_lock_irqsave(&thi->t_lock, flags);
507
508         switch (thi->t_state) {
509         case NONE:
510                 conn_info(tconn, "Starting %s thread (from %s [%d])\n",
511                          thi->name, current->comm, current->pid);
512
513                 /* Get ref on module for thread - this is released when thread exits */
514                 if (!try_module_get(THIS_MODULE)) {
515                         conn_err(tconn, "Failed to get module reference in drbd_thread_start\n");
516                         spin_unlock_irqrestore(&thi->t_lock, flags);
517                         return false;
518                 }
519
520                 init_completion(&thi->stop);
521                 thi->reset_cpu_mask = 1;
522                 thi->t_state = RUNNING;
523                 spin_unlock_irqrestore(&thi->t_lock, flags);
524                 flush_signals(current); /* otherw. may get -ERESTARTNOINTR */
525
526                 nt = kthread_create(drbd_thread_setup, (void *) thi,
527                                     "drbd_%c_%s", thi->name[0], thi->tconn->name);
528
529                 if (IS_ERR(nt)) {
530                         conn_err(tconn, "Couldn't start thread\n");
531
532                         module_put(THIS_MODULE);
533                         return false;
534                 }
535                 spin_lock_irqsave(&thi->t_lock, flags);
536                 thi->task = nt;
537                 thi->t_state = RUNNING;
538                 spin_unlock_irqrestore(&thi->t_lock, flags);
539                 wake_up_process(nt);
540                 break;
541         case EXITING:
542                 thi->t_state = RESTARTING;
543                 conn_info(tconn, "Restarting %s thread (from %s [%d])\n",
544                                 thi->name, current->comm, current->pid);
545                 /* fall through */
546         case RUNNING:
547         case RESTARTING:
548         default:
549                 spin_unlock_irqrestore(&thi->t_lock, flags);
550                 break;
551         }
552
553         return true;
554 }
555
556
557 void _drbd_thread_stop(struct drbd_thread *thi, int restart, int wait)
558 {
559         unsigned long flags;
560
561         enum drbd_thread_state ns = restart ? RESTARTING : EXITING;
562
563         /* may be called from state engine, holding the req lock irqsave */
564         spin_lock_irqsave(&thi->t_lock, flags);
565
566         if (thi->t_state == NONE) {
567                 spin_unlock_irqrestore(&thi->t_lock, flags);
568                 if (restart)
569                         drbd_thread_start(thi);
570                 return;
571         }
572
573         if (thi->t_state != ns) {
574                 if (thi->task == NULL) {
575                         spin_unlock_irqrestore(&thi->t_lock, flags);
576                         return;
577                 }
578
579                 thi->t_state = ns;
580                 smp_mb();
581                 init_completion(&thi->stop);
582                 if (thi->task != current)
583                         force_sig(DRBD_SIGKILL, thi->task);
584         }
585
586         spin_unlock_irqrestore(&thi->t_lock, flags);
587
588         if (wait)
589                 wait_for_completion(&thi->stop);
590 }
591
592 static struct drbd_thread *drbd_task_to_thread(struct drbd_tconn *tconn, struct task_struct *task)
593 {
594         struct drbd_thread *thi =
595                 task == tconn->receiver.task ? &tconn->receiver :
596                 task == tconn->asender.task  ? &tconn->asender :
597                 task == tconn->worker.task   ? &tconn->worker : NULL;
598
599         return thi;
600 }
601
602 char *drbd_task_to_thread_name(struct drbd_tconn *tconn, struct task_struct *task)
603 {
604         struct drbd_thread *thi = drbd_task_to_thread(tconn, task);
605         return thi ? thi->name : task->comm;
606 }
607
608 #ifdef CONFIG_SMP
609 static int conn_lowest_minor(struct drbd_tconn *tconn)
610 {
611         int minor = 0;
612         idr_get_next(&tconn->volumes, &minor);
613         return minor;
614 }
615 /**
616  * drbd_calc_cpu_mask() - Generate CPU masks, spread over all CPUs
617  * @mdev:       DRBD device.
618  *
619  * Forces all threads of a device onto the same CPU. This is beneficial for
620  * DRBD's performance. May be overwritten by user's configuration.
621  */
622 void drbd_calc_cpu_mask(struct drbd_tconn *tconn)
623 {
624         int ord, cpu;
625
626         /* user override. */
627         if (cpumask_weight(tconn->cpu_mask))
628                 return;
629
630         ord = conn_lowest_minor(tconn) % cpumask_weight(cpu_online_mask);
631         for_each_online_cpu(cpu) {
632                 if (ord-- == 0) {
633                         cpumask_set_cpu(cpu, tconn->cpu_mask);
634                         return;
635                 }
636         }
637         /* should not be reached */
638         cpumask_setall(tconn->cpu_mask);
639 }
640
641 /**
642  * drbd_thread_current_set_cpu() - modifies the cpu mask of the _current_ thread
643  * @mdev:       DRBD device.
644  * @thi:        drbd_thread object
645  *
646  * call in the "main loop" of _all_ threads, no need for any mutex, current won't die
647  * prematurely.
648  */
649 void drbd_thread_current_set_cpu(struct drbd_thread *thi)
650 {
651         struct task_struct *p = current;
652
653         if (!thi->reset_cpu_mask)
654                 return;
655         thi->reset_cpu_mask = 0;
656         set_cpus_allowed_ptr(p, thi->tconn->cpu_mask);
657 }
658 #endif
659
660 static void prepare_header80(struct p_header80 *h, enum drbd_packet cmd, int size)
661 {
662         h->magic   = cpu_to_be32(DRBD_MAGIC);
663         h->command = cpu_to_be16(cmd);
664         h->length  = cpu_to_be16(size);
665 }
666
667 static void prepare_header95(struct p_header95 *h, enum drbd_packet cmd, int size)
668 {
669         h->magic   = cpu_to_be16(DRBD_MAGIC_BIG);
670         h->command = cpu_to_be16(cmd);
671         h->length  = cpu_to_be32(size);
672 }
673
674 static void _prepare_header(struct drbd_tconn *tconn, int vnr, struct p_header *h,
675                             enum drbd_packet cmd, int size)
676 {
677         if (tconn->agreed_pro_version >= 100 || size > DRBD_MAX_SIZE_H80_PACKET)
678                 prepare_header95(&h->h95, cmd, size);
679         else
680                 prepare_header80(&h->h80, cmd, size);
681 }
682
683 static void prepare_header(struct drbd_conf *mdev, struct p_header *h,
684                            enum drbd_packet cmd, int size)
685 {
686         _prepare_header(mdev->tconn, mdev->vnr, h, cmd, size);
687 }
688
689 /* the appropriate socket mutex must be held already */
690 int _conn_send_cmd(struct drbd_tconn *tconn, int vnr, struct socket *sock,
691                    enum drbd_packet cmd, struct p_header *h, size_t size,
692                    unsigned msg_flags)
693 {
694         int sent, ok;
695
696         _prepare_header(tconn, vnr, h, cmd, size - sizeof(struct p_header));
697
698         sent = drbd_send(tconn, sock, h, size, msg_flags);
699
700         ok = (sent == size);
701         if (!ok && !signal_pending(current))
702                 conn_warn(tconn, "short sent %s size=%d sent=%d\n",
703                           cmdname(cmd), (int)size, sent);
704         return ok;
705 }
706
707 /* don't pass the socket. we may only look at it
708  * when we hold the appropriate socket mutex.
709  */
710 int conn_send_cmd(struct drbd_tconn *tconn, int vnr, int use_data_socket,
711                   enum drbd_packet cmd, struct p_header *h, size_t size)
712 {
713         int ok = 0;
714         struct socket *sock;
715
716         if (use_data_socket) {
717                 mutex_lock(&tconn->data.mutex);
718                 sock = tconn->data.socket;
719         } else {
720                 mutex_lock(&tconn->meta.mutex);
721                 sock = tconn->meta.socket;
722         }
723
724         /* drbd_disconnect() could have called drbd_free_sock()
725          * while we were waiting in down()... */
726         if (likely(sock != NULL))
727                 ok = _conn_send_cmd(tconn, vnr, sock, cmd, h, size, 0);
728
729         if (use_data_socket)
730                 mutex_unlock(&tconn->data.mutex);
731         else
732                 mutex_unlock(&tconn->meta.mutex);
733         return ok;
734 }
735
736 int conn_send_cmd2(struct drbd_tconn *tconn, enum drbd_packet cmd, char *data,
737                    size_t size)
738 {
739         struct p_header80 h;
740         int ok;
741
742         prepare_header80(&h, cmd, size);
743
744         if (!drbd_get_data_sock(tconn))
745                 return 0;
746
747         ok = (sizeof(h) ==
748                 drbd_send(tconn, tconn->data.socket, &h, sizeof(h), 0));
749         ok = ok && (size ==
750                 drbd_send(tconn, tconn->data.socket, data, size, 0));
751
752         drbd_put_data_sock(tconn);
753
754         return ok;
755 }
756
757 int drbd_send_sync_param(struct drbd_conf *mdev, struct syncer_conf *sc)
758 {
759         struct p_rs_param_95 *p;
760         struct socket *sock;
761         int size, rv;
762         const int apv = mdev->tconn->agreed_pro_version;
763
764         size = apv <= 87 ? sizeof(struct p_rs_param)
765                 : apv == 88 ? sizeof(struct p_rs_param)
766                         + strlen(mdev->sync_conf.verify_alg) + 1
767                 : apv <= 94 ? sizeof(struct p_rs_param_89)
768                 : /* apv >= 95 */ sizeof(struct p_rs_param_95);
769
770         /* used from admin command context and receiver/worker context.
771          * to avoid kmalloc, grab the socket right here,
772          * then use the pre-allocated sbuf there */
773         mutex_lock(&mdev->tconn->data.mutex);
774         sock = mdev->tconn->data.socket;
775
776         if (likely(sock != NULL)) {
777                 enum drbd_packet cmd =
778                         apv >= 89 ? P_SYNC_PARAM89 : P_SYNC_PARAM;
779
780                 p = &mdev->tconn->data.sbuf.rs_param_95;
781
782                 /* initialize verify_alg and csums_alg */
783                 memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
784
785                 p->rate = cpu_to_be32(sc->rate);
786                 p->c_plan_ahead = cpu_to_be32(sc->c_plan_ahead);
787                 p->c_delay_target = cpu_to_be32(sc->c_delay_target);
788                 p->c_fill_target = cpu_to_be32(sc->c_fill_target);
789                 p->c_max_rate = cpu_to_be32(sc->c_max_rate);
790
791                 if (apv >= 88)
792                         strcpy(p->verify_alg, mdev->sync_conf.verify_alg);
793                 if (apv >= 89)
794                         strcpy(p->csums_alg, mdev->sync_conf.csums_alg);
795
796                 rv = _drbd_send_cmd(mdev, sock, cmd, &p->head, size, 0);
797         } else
798                 rv = 0; /* not ok */
799
800         mutex_unlock(&mdev->tconn->data.mutex);
801
802         return rv;
803 }
804
805 int drbd_send_protocol(struct drbd_tconn *tconn)
806 {
807         struct p_protocol *p;
808         int size, cf, rv;
809
810         size = sizeof(struct p_protocol);
811
812         if (tconn->agreed_pro_version >= 87)
813                 size += strlen(tconn->net_conf->integrity_alg) + 1;
814
815         /* we must not recurse into our own queue,
816          * as that is blocked during handshake */
817         p = kmalloc(size, GFP_NOIO);
818         if (p == NULL)
819                 return 0;
820
821         p->protocol      = cpu_to_be32(tconn->net_conf->wire_protocol);
822         p->after_sb_0p   = cpu_to_be32(tconn->net_conf->after_sb_0p);
823         p->after_sb_1p   = cpu_to_be32(tconn->net_conf->after_sb_1p);
824         p->after_sb_2p   = cpu_to_be32(tconn->net_conf->after_sb_2p);
825         p->two_primaries = cpu_to_be32(tconn->net_conf->two_primaries);
826
827         cf = 0;
828         if (tconn->net_conf->want_lose)
829                 cf |= CF_WANT_LOSE;
830         if (tconn->net_conf->dry_run) {
831                 if (tconn->agreed_pro_version >= 92)
832                         cf |= CF_DRY_RUN;
833                 else {
834                         conn_err(tconn, "--dry-run is not supported by peer");
835                         kfree(p);
836                         return -1;
837                 }
838         }
839         p->conn_flags    = cpu_to_be32(cf);
840
841         if (tconn->agreed_pro_version >= 87)
842                 strcpy(p->integrity_alg, tconn->net_conf->integrity_alg);
843
844         rv = conn_send_cmd2(tconn, P_PROTOCOL, p->head.payload, size - sizeof(struct p_header));
845         kfree(p);
846         return rv;
847 }
848
849 int _drbd_send_uuids(struct drbd_conf *mdev, u64 uuid_flags)
850 {
851         struct p_uuids p;
852         int i;
853
854         if (!get_ldev_if_state(mdev, D_NEGOTIATING))
855                 return 1;
856
857         for (i = UI_CURRENT; i < UI_SIZE; i++)
858                 p.uuid[i] = mdev->ldev ? cpu_to_be64(mdev->ldev->md.uuid[i]) : 0;
859
860         mdev->comm_bm_set = drbd_bm_total_weight(mdev);
861         p.uuid[UI_SIZE] = cpu_to_be64(mdev->comm_bm_set);
862         uuid_flags |= mdev->tconn->net_conf->want_lose ? 1 : 0;
863         uuid_flags |= test_bit(CRASHED_PRIMARY, &mdev->flags) ? 2 : 0;
864         uuid_flags |= mdev->new_state_tmp.disk == D_INCONSISTENT ? 4 : 0;
865         p.uuid[UI_FLAGS] = cpu_to_be64(uuid_flags);
866
867         put_ldev(mdev);
868
869         return drbd_send_cmd(mdev, USE_DATA_SOCKET, P_UUIDS, &p.head, sizeof(p));
870 }
871
872 int drbd_send_uuids(struct drbd_conf *mdev)
873 {
874         return _drbd_send_uuids(mdev, 0);
875 }
876
877 int drbd_send_uuids_skip_initial_sync(struct drbd_conf *mdev)
878 {
879         return _drbd_send_uuids(mdev, 8);
880 }
881
882 void drbd_print_uuids(struct drbd_conf *mdev, const char *text)
883 {
884         if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
885                 u64 *uuid = mdev->ldev->md.uuid;
886                 dev_info(DEV, "%s %016llX:%016llX:%016llX:%016llX\n",
887                      text,
888                      (unsigned long long)uuid[UI_CURRENT],
889                      (unsigned long long)uuid[UI_BITMAP],
890                      (unsigned long long)uuid[UI_HISTORY_START],
891                      (unsigned long long)uuid[UI_HISTORY_END]);
892                 put_ldev(mdev);
893         } else {
894                 dev_info(DEV, "%s effective data uuid: %016llX\n",
895                                 text,
896                                 (unsigned long long)mdev->ed_uuid);
897         }
898 }
899
900 int drbd_gen_and_send_sync_uuid(struct drbd_conf *mdev)
901 {
902         struct p_rs_uuid p;
903         u64 uuid;
904
905         D_ASSERT(mdev->state.disk == D_UP_TO_DATE);
906
907         uuid = mdev->ldev->md.uuid[UI_BITMAP] + UUID_NEW_BM_OFFSET;
908         drbd_uuid_set(mdev, UI_BITMAP, uuid);
909         drbd_print_uuids(mdev, "updated sync UUID");
910         drbd_md_sync(mdev);
911         p.uuid = cpu_to_be64(uuid);
912
913         return drbd_send_cmd(mdev, USE_DATA_SOCKET, P_SYNC_UUID, &p.head, sizeof(p));
914 }
915
916 int drbd_send_sizes(struct drbd_conf *mdev, int trigger_reply, enum dds_flags flags)
917 {
918         struct p_sizes p;
919         sector_t d_size, u_size;
920         int q_order_type, max_bio_size;
921         int ok;
922
923         if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
924                 D_ASSERT(mdev->ldev->backing_bdev);
925                 d_size = drbd_get_max_capacity(mdev->ldev);
926                 u_size = mdev->ldev->dc.disk_size;
927                 q_order_type = drbd_queue_order_type(mdev);
928                 max_bio_size = queue_max_hw_sectors(mdev->ldev->backing_bdev->bd_disk->queue) << 9;
929                 max_bio_size = min_t(int, max_bio_size, DRBD_MAX_BIO_SIZE);
930                 put_ldev(mdev);
931         } else {
932                 d_size = 0;
933                 u_size = 0;
934                 q_order_type = QUEUE_ORDERED_NONE;
935                 max_bio_size = DRBD_MAX_BIO_SIZE; /* ... multiple BIOs per peer_request */
936         }
937
938         p.d_size = cpu_to_be64(d_size);
939         p.u_size = cpu_to_be64(u_size);
940         p.c_size = cpu_to_be64(trigger_reply ? 0 : drbd_get_capacity(mdev->this_bdev));
941         p.max_bio_size = cpu_to_be32(max_bio_size);
942         p.queue_order_type = cpu_to_be16(q_order_type);
943         p.dds_flags = cpu_to_be16(flags);
944
945         ok = drbd_send_cmd(mdev, USE_DATA_SOCKET, P_SIZES, &p.head, sizeof(p));
946         return ok;
947 }
948
949 /**
950  * drbd_send_state() - Sends the drbd state to the peer
951  * @mdev:       DRBD device.
952  */
953 int drbd_send_state(struct drbd_conf *mdev)
954 {
955         struct socket *sock;
956         struct p_state p;
957         int ok = 0;
958
959         mutex_lock(&mdev->tconn->data.mutex);
960
961         p.state = cpu_to_be32(mdev->state.i); /* Within the send mutex */
962         sock = mdev->tconn->data.socket;
963
964         if (likely(sock != NULL)) {
965                 ok = _drbd_send_cmd(mdev, sock, P_STATE, &p.head, sizeof(p), 0);
966         }
967
968         mutex_unlock(&mdev->tconn->data.mutex);
969
970         return ok;
971 }
972
973 int _conn_send_state_req(struct drbd_tconn *tconn, int vnr, enum drbd_packet cmd,
974                          union drbd_state mask, union drbd_state val)
975 {
976         struct p_req_state p;
977
978         p.mask    = cpu_to_be32(mask.i);
979         p.val     = cpu_to_be32(val.i);
980
981         return conn_send_cmd(tconn, vnr, USE_DATA_SOCKET, cmd, &p.head, sizeof(p));
982 }
983
984 int drbd_send_sr_reply(struct drbd_conf *mdev, enum drbd_state_rv retcode)
985 {
986         struct p_req_state_reply p;
987
988         p.retcode    = cpu_to_be32(retcode);
989
990         return drbd_send_cmd(mdev, USE_META_SOCKET, P_STATE_CHG_REPLY, &p.head, sizeof(p));
991 }
992
993 int conn_send_sr_reply(struct drbd_tconn *tconn, enum drbd_state_rv retcode)
994 {
995         struct p_req_state_reply p;
996         enum drbd_packet cmd = tconn->agreed_pro_version < 100 ? P_STATE_CHG_REPLY : P_CONN_ST_CHG_REPLY;
997
998         p.retcode    = cpu_to_be32(retcode);
999
1000         return conn_send_cmd(tconn, 0, USE_META_SOCKET, cmd, &p.head, sizeof(p));
1001 }
1002
1003 int fill_bitmap_rle_bits(struct drbd_conf *mdev,
1004         struct p_compressed_bm *p,
1005         struct bm_xfer_ctx *c)
1006 {
1007         struct bitstream bs;
1008         unsigned long plain_bits;
1009         unsigned long tmp;
1010         unsigned long rl;
1011         unsigned len;
1012         unsigned toggle;
1013         int bits;
1014
1015         /* may we use this feature? */
1016         if ((mdev->sync_conf.use_rle == 0) ||
1017                 (mdev->tconn->agreed_pro_version < 90))
1018                         return 0;
1019
1020         if (c->bit_offset >= c->bm_bits)
1021                 return 0; /* nothing to do. */
1022
1023         /* use at most thus many bytes */
1024         bitstream_init(&bs, p->code, BM_PACKET_VLI_BYTES_MAX, 0);
1025         memset(p->code, 0, BM_PACKET_VLI_BYTES_MAX);
1026         /* plain bits covered in this code string */
1027         plain_bits = 0;
1028
1029         /* p->encoding & 0x80 stores whether the first run length is set.
1030          * bit offset is implicit.
1031          * start with toggle == 2 to be able to tell the first iteration */
1032         toggle = 2;
1033
1034         /* see how much plain bits we can stuff into one packet
1035          * using RLE and VLI. */
1036         do {
1037                 tmp = (toggle == 0) ? _drbd_bm_find_next_zero(mdev, c->bit_offset)
1038                                     : _drbd_bm_find_next(mdev, c->bit_offset);
1039                 if (tmp == -1UL)
1040                         tmp = c->bm_bits;
1041                 rl = tmp - c->bit_offset;
1042
1043                 if (toggle == 2) { /* first iteration */
1044                         if (rl == 0) {
1045                                 /* the first checked bit was set,
1046                                  * store start value, */
1047                                 DCBP_set_start(p, 1);
1048                                 /* but skip encoding of zero run length */
1049                                 toggle = !toggle;
1050                                 continue;
1051                         }
1052                         DCBP_set_start(p, 0);
1053                 }
1054
1055                 /* paranoia: catch zero runlength.
1056                  * can only happen if bitmap is modified while we scan it. */
1057                 if (rl == 0) {
1058                         dev_err(DEV, "unexpected zero runlength while encoding bitmap "
1059                             "t:%u bo:%lu\n", toggle, c->bit_offset);
1060                         return -1;
1061                 }
1062
1063                 bits = vli_encode_bits(&bs, rl);
1064                 if (bits == -ENOBUFS) /* buffer full */
1065                         break;
1066                 if (bits <= 0) {
1067                         dev_err(DEV, "error while encoding bitmap: %d\n", bits);
1068                         return 0;
1069                 }
1070
1071                 toggle = !toggle;
1072                 plain_bits += rl;
1073                 c->bit_offset = tmp;
1074         } while (c->bit_offset < c->bm_bits);
1075
1076         len = bs.cur.b - p->code + !!bs.cur.bit;
1077
1078         if (plain_bits < (len << 3)) {
1079                 /* incompressible with this method.
1080                  * we need to rewind both word and bit position. */
1081                 c->bit_offset -= plain_bits;
1082                 bm_xfer_ctx_bit_to_word_offset(c);
1083                 c->bit_offset = c->word_offset * BITS_PER_LONG;
1084                 return 0;
1085         }
1086
1087         /* RLE + VLI was able to compress it just fine.
1088          * update c->word_offset. */
1089         bm_xfer_ctx_bit_to_word_offset(c);
1090
1091         /* store pad_bits */
1092         DCBP_set_pad_bits(p, (8 - bs.cur.bit) & 0x7);
1093
1094         return len;
1095 }
1096
1097 /**
1098  * send_bitmap_rle_or_plain
1099  *
1100  * Return 0 when done, 1 when another iteration is needed, and a negative error
1101  * code upon failure.
1102  */
1103 static int
1104 send_bitmap_rle_or_plain(struct drbd_conf *mdev,
1105                          struct p_header *h, struct bm_xfer_ctx *c)
1106 {
1107         struct p_compressed_bm *p = (void*)h;
1108         unsigned long num_words;
1109         int len;
1110         int ok;
1111
1112         len = fill_bitmap_rle_bits(mdev, p, c);
1113
1114         if (len < 0)
1115                 return -EIO;
1116
1117         if (len) {
1118                 DCBP_set_code(p, RLE_VLI_Bits);
1119                 ok = _drbd_send_cmd(mdev, mdev->tconn->data.socket, P_COMPRESSED_BITMAP, h,
1120                         sizeof(*p) + len, 0);
1121
1122                 c->packets[0]++;
1123                 c->bytes[0] += sizeof(*p) + len;
1124
1125                 if (c->bit_offset >= c->bm_bits)
1126                         len = 0; /* DONE */
1127         } else {
1128                 /* was not compressible.
1129                  * send a buffer full of plain text bits instead. */
1130                 num_words = min_t(size_t, BM_PACKET_WORDS, c->bm_words - c->word_offset);
1131                 len = num_words * sizeof(long);
1132                 if (len)
1133                         drbd_bm_get_lel(mdev, c->word_offset, num_words, (unsigned long*)h->payload);
1134                 ok = _drbd_send_cmd(mdev, mdev->tconn->data.socket, P_BITMAP,
1135                                    h, sizeof(struct p_header80) + len, 0);
1136                 c->word_offset += num_words;
1137                 c->bit_offset = c->word_offset * BITS_PER_LONG;
1138
1139                 c->packets[1]++;
1140                 c->bytes[1] += sizeof(struct p_header80) + len;
1141
1142                 if (c->bit_offset > c->bm_bits)
1143                         c->bit_offset = c->bm_bits;
1144         }
1145         if (ok) {
1146                 if (len == 0) {
1147                         INFO_bm_xfer_stats(mdev, "send", c);
1148                         return 0;
1149                 } else
1150                         return 1;
1151         }
1152         return -EIO;
1153 }
1154
1155 /* See the comment at receive_bitmap() */
1156 int _drbd_send_bitmap(struct drbd_conf *mdev)
1157 {
1158         struct bm_xfer_ctx c;
1159         struct p_header *p;
1160         int err;
1161
1162         if (!expect(mdev->bitmap))
1163                 return false;
1164
1165         /* maybe we should use some per thread scratch page,
1166          * and allocate that during initial device creation? */
1167         p = (struct p_header *) __get_free_page(GFP_NOIO);
1168         if (!p) {
1169                 dev_err(DEV, "failed to allocate one page buffer in %s\n", __func__);
1170                 return false;
1171         }
1172
1173         if (get_ldev(mdev)) {
1174                 if (drbd_md_test_flag(mdev->ldev, MDF_FULL_SYNC)) {
1175                         dev_info(DEV, "Writing the whole bitmap, MDF_FullSync was set.\n");
1176                         drbd_bm_set_all(mdev);
1177                         if (drbd_bm_write(mdev)) {
1178                                 /* write_bm did fail! Leave full sync flag set in Meta P_DATA
1179                                  * but otherwise process as per normal - need to tell other
1180                                  * side that a full resync is required! */
1181                                 dev_err(DEV, "Failed to write bitmap to disk!\n");
1182                         } else {
1183                                 drbd_md_clear_flag(mdev, MDF_FULL_SYNC);
1184                                 drbd_md_sync(mdev);
1185                         }
1186                 }
1187                 put_ldev(mdev);
1188         }
1189
1190         c = (struct bm_xfer_ctx) {
1191                 .bm_bits = drbd_bm_bits(mdev),
1192                 .bm_words = drbd_bm_words(mdev),
1193         };
1194
1195         do {
1196                 err = send_bitmap_rle_or_plain(mdev, p, &c);
1197         } while (err > 0);
1198
1199         free_page((unsigned long) p);
1200         return err == 0;
1201 }
1202
1203 int drbd_send_bitmap(struct drbd_conf *mdev)
1204 {
1205         int err;
1206
1207         if (!drbd_get_data_sock(mdev->tconn))
1208                 return -1;
1209         err = !_drbd_send_bitmap(mdev);
1210         drbd_put_data_sock(mdev->tconn);
1211         return err;
1212 }
1213
1214 int drbd_send_b_ack(struct drbd_conf *mdev, u32 barrier_nr, u32 set_size)
1215 {
1216         int ok;
1217         struct p_barrier_ack p;
1218
1219         p.barrier  = barrier_nr;
1220         p.set_size = cpu_to_be32(set_size);
1221
1222         if (mdev->state.conn < C_CONNECTED)
1223                 return false;
1224         ok = drbd_send_cmd(mdev, USE_META_SOCKET, P_BARRIER_ACK, &p.head, sizeof(p));
1225         return ok;
1226 }
1227
1228 /**
1229  * _drbd_send_ack() - Sends an ack packet
1230  * @mdev:       DRBD device.
1231  * @cmd:        Packet command code.
1232  * @sector:     sector, needs to be in big endian byte order
1233  * @blksize:    size in byte, needs to be in big endian byte order
1234  * @block_id:   Id, big endian byte order
1235  */
1236 static int _drbd_send_ack(struct drbd_conf *mdev, enum drbd_packet cmd,
1237                           u64 sector, u32 blksize, u64 block_id)
1238 {
1239         int ok;
1240         struct p_block_ack p;
1241
1242         p.sector   = sector;
1243         p.block_id = block_id;
1244         p.blksize  = blksize;
1245         p.seq_num  = cpu_to_be32(atomic_add_return(1, &mdev->packet_seq));
1246
1247         if (!mdev->tconn->meta.socket || mdev->state.conn < C_CONNECTED)
1248                 return false;
1249         ok = drbd_send_cmd(mdev, USE_META_SOCKET, cmd, &p.head, sizeof(p));
1250         return ok;
1251 }
1252
1253 /* dp->sector and dp->block_id already/still in network byte order,
1254  * data_size is payload size according to dp->head,
1255  * and may need to be corrected for digest size. */
1256 int drbd_send_ack_dp(struct drbd_conf *mdev, enum drbd_packet cmd,
1257                      struct p_data *dp, int data_size)
1258 {
1259         data_size -= (mdev->tconn->agreed_pro_version >= 87 && mdev->tconn->integrity_r_tfm) ?
1260                 crypto_hash_digestsize(mdev->tconn->integrity_r_tfm) : 0;
1261         return _drbd_send_ack(mdev, cmd, dp->sector, cpu_to_be32(data_size),
1262                               dp->block_id);
1263 }
1264
1265 int drbd_send_ack_rp(struct drbd_conf *mdev, enum drbd_packet cmd,
1266                      struct p_block_req *rp)
1267 {
1268         return _drbd_send_ack(mdev, cmd, rp->sector, rp->blksize, rp->block_id);
1269 }
1270
1271 /**
1272  * drbd_send_ack() - Sends an ack packet
1273  * @mdev:       DRBD device
1274  * @cmd:        packet command code
1275  * @peer_req:   peer request
1276  */
1277 int drbd_send_ack(struct drbd_conf *mdev, enum drbd_packet cmd,
1278                   struct drbd_peer_request *peer_req)
1279 {
1280         return _drbd_send_ack(mdev, cmd,
1281                               cpu_to_be64(peer_req->i.sector),
1282                               cpu_to_be32(peer_req->i.size),
1283                               peer_req->block_id);
1284 }
1285
1286 /* This function misuses the block_id field to signal if the blocks
1287  * are is sync or not. */
1288 int drbd_send_ack_ex(struct drbd_conf *mdev, enum drbd_packet cmd,
1289                      sector_t sector, int blksize, u64 block_id)
1290 {
1291         return _drbd_send_ack(mdev, cmd,
1292                               cpu_to_be64(sector),
1293                               cpu_to_be32(blksize),
1294                               cpu_to_be64(block_id));
1295 }
1296
1297 int drbd_send_drequest(struct drbd_conf *mdev, int cmd,
1298                        sector_t sector, int size, u64 block_id)
1299 {
1300         int ok;
1301         struct p_block_req p;
1302
1303         p.sector   = cpu_to_be64(sector);
1304         p.block_id = block_id;
1305         p.blksize  = cpu_to_be32(size);
1306
1307         ok = drbd_send_cmd(mdev, USE_DATA_SOCKET, cmd, &p.head, sizeof(p));
1308         return ok;
1309 }
1310
1311 int drbd_send_drequest_csum(struct drbd_conf *mdev, sector_t sector, int size,
1312                             void *digest, int digest_size, enum drbd_packet cmd)
1313 {
1314         int ok;
1315         struct p_block_req p;
1316
1317         prepare_header(mdev, &p.head, cmd, sizeof(p) - sizeof(struct p_header) + digest_size);
1318         p.sector   = cpu_to_be64(sector);
1319         p.block_id = ID_SYNCER /* unused */;
1320         p.blksize  = cpu_to_be32(size);
1321
1322         mutex_lock(&mdev->tconn->data.mutex);
1323
1324         ok = (sizeof(p) == drbd_send(mdev->tconn, mdev->tconn->data.socket, &p, sizeof(p), 0));
1325         ok = ok && (digest_size == drbd_send(mdev->tconn, mdev->tconn->data.socket, digest, digest_size, 0));
1326
1327         mutex_unlock(&mdev->tconn->data.mutex);
1328
1329         return ok;
1330 }
1331
1332 int drbd_send_ov_request(struct drbd_conf *mdev, sector_t sector, int size)
1333 {
1334         int ok;
1335         struct p_block_req p;
1336
1337         p.sector   = cpu_to_be64(sector);
1338         p.block_id = ID_SYNCER /* unused */;
1339         p.blksize  = cpu_to_be32(size);
1340
1341         ok = drbd_send_cmd(mdev, USE_DATA_SOCKET, P_OV_REQUEST, &p.head, sizeof(p));
1342         return ok;
1343 }
1344
1345 /* called on sndtimeo
1346  * returns false if we should retry,
1347  * true if we think connection is dead
1348  */
1349 static int we_should_drop_the_connection(struct drbd_tconn *tconn, struct socket *sock)
1350 {
1351         int drop_it;
1352         /* long elapsed = (long)(jiffies - mdev->last_received); */
1353
1354         drop_it =   tconn->meta.socket == sock
1355                 || !tconn->asender.task
1356                 || get_t_state(&tconn->asender) != RUNNING
1357                 || tconn->cstate < C_WF_REPORT_PARAMS;
1358
1359         if (drop_it)
1360                 return true;
1361
1362         drop_it = !--tconn->ko_count;
1363         if (!drop_it) {
1364                 conn_err(tconn, "[%s/%d] sock_sendmsg time expired, ko = %u\n",
1365                          current->comm, current->pid, tconn->ko_count);
1366                 request_ping(tconn);
1367         }
1368
1369         return drop_it; /* && (mdev->state == R_PRIMARY) */;
1370 }
1371
1372 static void drbd_update_congested(struct drbd_tconn *tconn)
1373 {
1374         struct sock *sk = tconn->data.socket->sk;
1375         if (sk->sk_wmem_queued > sk->sk_sndbuf * 4 / 5)
1376                 set_bit(NET_CONGESTED, &tconn->flags);
1377 }
1378
1379 /* The idea of sendpage seems to be to put some kind of reference
1380  * to the page into the skb, and to hand it over to the NIC. In
1381  * this process get_page() gets called.
1382  *
1383  * As soon as the page was really sent over the network put_page()
1384  * gets called by some part of the network layer. [ NIC driver? ]
1385  *
1386  * [ get_page() / put_page() increment/decrement the count. If count
1387  *   reaches 0 the page will be freed. ]
1388  *
1389  * This works nicely with pages from FSs.
1390  * But this means that in protocol A we might signal IO completion too early!
1391  *
1392  * In order not to corrupt data during a resync we must make sure
1393  * that we do not reuse our own buffer pages (EEs) to early, therefore
1394  * we have the net_ee list.
1395  *
1396  * XFS seems to have problems, still, it submits pages with page_count == 0!
1397  * As a workaround, we disable sendpage on pages
1398  * with page_count == 0 or PageSlab.
1399  */
1400 static int _drbd_no_send_page(struct drbd_conf *mdev, struct page *page,
1401                    int offset, size_t size, unsigned msg_flags)
1402 {
1403         int sent = drbd_send(mdev->tconn, mdev->tconn->data.socket, kmap(page) + offset, size, msg_flags);
1404         kunmap(page);
1405         if (sent == size)
1406                 mdev->send_cnt += size>>9;
1407         return sent == size;
1408 }
1409
1410 static int _drbd_send_page(struct drbd_conf *mdev, struct page *page,
1411                     int offset, size_t size, unsigned msg_flags)
1412 {
1413         mm_segment_t oldfs = get_fs();
1414         int sent, ok;
1415         int len = size;
1416
1417         /* e.g. XFS meta- & log-data is in slab pages, which have a
1418          * page_count of 0 and/or have PageSlab() set.
1419          * we cannot use send_page for those, as that does get_page();
1420          * put_page(); and would cause either a VM_BUG directly, or
1421          * __page_cache_release a page that would actually still be referenced
1422          * by someone, leading to some obscure delayed Oops somewhere else. */
1423         if (disable_sendpage || (page_count(page) < 1) || PageSlab(page))
1424                 return _drbd_no_send_page(mdev, page, offset, size, msg_flags);
1425
1426         msg_flags |= MSG_NOSIGNAL;
1427         drbd_update_congested(mdev->tconn);
1428         set_fs(KERNEL_DS);
1429         do {
1430                 sent = mdev->tconn->data.socket->ops->sendpage(mdev->tconn->data.socket, page,
1431                                                         offset, len,
1432                                                         msg_flags);
1433                 if (sent == -EAGAIN) {
1434                         if (we_should_drop_the_connection(mdev->tconn,
1435                                                           mdev->tconn->data.socket))
1436                                 break;
1437                         else
1438                                 continue;
1439                 }
1440                 if (sent <= 0) {
1441                         dev_warn(DEV, "%s: size=%d len=%d sent=%d\n",
1442                              __func__, (int)size, len, sent);
1443                         break;
1444                 }
1445                 len    -= sent;
1446                 offset += sent;
1447         } while (len > 0 /* THINK && mdev->cstate >= C_CONNECTED*/);
1448         set_fs(oldfs);
1449         clear_bit(NET_CONGESTED, &mdev->tconn->flags);
1450
1451         ok = (len == 0);
1452         if (likely(ok))
1453                 mdev->send_cnt += size>>9;
1454         return ok;
1455 }
1456
1457 static int _drbd_send_bio(struct drbd_conf *mdev, struct bio *bio)
1458 {
1459         struct bio_vec *bvec;
1460         int i;
1461         /* hint all but last page with MSG_MORE */
1462         __bio_for_each_segment(bvec, bio, i, 0) {
1463                 if (!_drbd_no_send_page(mdev, bvec->bv_page,
1464                                      bvec->bv_offset, bvec->bv_len,
1465                                      i == bio->bi_vcnt -1 ? 0 : MSG_MORE))
1466                         return 0;
1467         }
1468         return 1;
1469 }
1470
1471 static int _drbd_send_zc_bio(struct drbd_conf *mdev, struct bio *bio)
1472 {
1473         struct bio_vec *bvec;
1474         int i;
1475         /* hint all but last page with MSG_MORE */
1476         __bio_for_each_segment(bvec, bio, i, 0) {
1477                 if (!_drbd_send_page(mdev, bvec->bv_page,
1478                                      bvec->bv_offset, bvec->bv_len,
1479                                      i == bio->bi_vcnt -1 ? 0 : MSG_MORE))
1480                         return 0;
1481         }
1482         return 1;
1483 }
1484
1485 static int _drbd_send_zc_ee(struct drbd_conf *mdev,
1486                             struct drbd_peer_request *peer_req)
1487 {
1488         struct page *page = peer_req->pages;
1489         unsigned len = peer_req->i.size;
1490
1491         /* hint all but last page with MSG_MORE */
1492         page_chain_for_each(page) {
1493                 unsigned l = min_t(unsigned, len, PAGE_SIZE);
1494                 if (!_drbd_send_page(mdev, page, 0, l,
1495                                 page_chain_next(page) ? MSG_MORE : 0))
1496                         return 0;
1497                 len -= l;
1498         }
1499         return 1;
1500 }
1501
1502 static u32 bio_flags_to_wire(struct drbd_conf *mdev, unsigned long bi_rw)
1503 {
1504         if (mdev->tconn->agreed_pro_version >= 95)
1505                 return  (bi_rw & REQ_SYNC ? DP_RW_SYNC : 0) |
1506                         (bi_rw & REQ_FUA ? DP_FUA : 0) |
1507                         (bi_rw & REQ_FLUSH ? DP_FLUSH : 0) |
1508                         (bi_rw & REQ_DISCARD ? DP_DISCARD : 0);
1509         else
1510                 return bi_rw & REQ_SYNC ? DP_RW_SYNC : 0;
1511 }
1512
1513 /* Used to send write requests
1514  * R_PRIMARY -> Peer    (P_DATA)
1515  */
1516 int drbd_send_dblock(struct drbd_conf *mdev, struct drbd_request *req)
1517 {
1518         int ok = 1;
1519         struct p_data p;
1520         unsigned int dp_flags = 0;
1521         void *dgb;
1522         int dgs;
1523
1524         if (!drbd_get_data_sock(mdev->tconn))
1525                 return 0;
1526
1527         dgs = (mdev->tconn->agreed_pro_version >= 87 && mdev->tconn->integrity_w_tfm) ?
1528                 crypto_hash_digestsize(mdev->tconn->integrity_w_tfm) : 0;
1529
1530         prepare_header(mdev, &p.head, P_DATA, sizeof(p) - sizeof(struct p_header) + dgs + req->i.size);
1531         p.sector   = cpu_to_be64(req->i.sector);
1532         p.block_id = (unsigned long)req;
1533         p.seq_num  = cpu_to_be32(req->seq_num = atomic_add_return(1, &mdev->packet_seq));
1534
1535         dp_flags = bio_flags_to_wire(mdev, req->master_bio->bi_rw);
1536
1537         if (mdev->state.conn >= C_SYNC_SOURCE &&
1538             mdev->state.conn <= C_PAUSED_SYNC_T)
1539                 dp_flags |= DP_MAY_SET_IN_SYNC;
1540
1541         p.dp_flags = cpu_to_be32(dp_flags);
1542         set_bit(UNPLUG_REMOTE, &mdev->flags);
1543         ok = (sizeof(p) ==
1544                 drbd_send(mdev->tconn, mdev->tconn->data.socket, &p, sizeof(p), dgs ? MSG_MORE : 0));
1545         if (ok && dgs) {
1546                 dgb = mdev->tconn->int_dig_out;
1547                 drbd_csum_bio(mdev, mdev->tconn->integrity_w_tfm, req->master_bio, dgb);
1548                 ok = dgs == drbd_send(mdev->tconn, mdev->tconn->data.socket, dgb, dgs, 0);
1549         }
1550         if (ok) {
1551                 /* For protocol A, we have to memcpy the payload into
1552                  * socket buffers, as we may complete right away
1553                  * as soon as we handed it over to tcp, at which point the data
1554                  * pages may become invalid.
1555                  *
1556                  * For data-integrity enabled, we copy it as well, so we can be
1557                  * sure that even if the bio pages may still be modified, it
1558                  * won't change the data on the wire, thus if the digest checks
1559                  * out ok after sending on this side, but does not fit on the
1560                  * receiving side, we sure have detected corruption elsewhere.
1561                  */
1562                 if (mdev->tconn->net_conf->wire_protocol == DRBD_PROT_A || dgs)
1563                         ok = _drbd_send_bio(mdev, req->master_bio);
1564                 else
1565                         ok = _drbd_send_zc_bio(mdev, req->master_bio);
1566
1567                 /* double check digest, sometimes buffers have been modified in flight. */
1568                 if (dgs > 0 && dgs <= 64) {
1569                         /* 64 byte, 512 bit, is the largest digest size
1570                          * currently supported in kernel crypto. */
1571                         unsigned char digest[64];
1572                         drbd_csum_bio(mdev, mdev->tconn->integrity_w_tfm, req->master_bio, digest);
1573                         if (memcmp(mdev->tconn->int_dig_out, digest, dgs)) {
1574                                 dev_warn(DEV,
1575                                         "Digest mismatch, buffer modified by upper layers during write: %llus +%u\n",
1576                                         (unsigned long long)req->i.sector, req->i.size);
1577                         }
1578                 } /* else if (dgs > 64) {
1579                      ... Be noisy about digest too large ...
1580                 } */
1581         }
1582
1583         drbd_put_data_sock(mdev->tconn);
1584
1585         return ok;
1586 }
1587
1588 /* answer packet, used to send data back for read requests:
1589  *  Peer       -> (diskless) R_PRIMARY   (P_DATA_REPLY)
1590  *  C_SYNC_SOURCE -> C_SYNC_TARGET         (P_RS_DATA_REPLY)
1591  */
1592 int drbd_send_block(struct drbd_conf *mdev, enum drbd_packet cmd,
1593                     struct drbd_peer_request *peer_req)
1594 {
1595         int ok;
1596         struct p_data p;
1597         void *dgb;
1598         int dgs;
1599
1600         dgs = (mdev->tconn->agreed_pro_version >= 87 && mdev->tconn->integrity_w_tfm) ?
1601                 crypto_hash_digestsize(mdev->tconn->integrity_w_tfm) : 0;
1602
1603         prepare_header(mdev, &p.head, cmd, sizeof(p) -
1604                                            sizeof(struct p_header80) +
1605                                            dgs + peer_req->i.size);
1606         p.sector   = cpu_to_be64(peer_req->i.sector);
1607         p.block_id = peer_req->block_id;
1608         p.seq_num = 0;  /* unused */
1609
1610         /* Only called by our kernel thread.
1611          * This one may be interrupted by DRBD_SIG and/or DRBD_SIGKILL
1612          * in response to admin command or module unload.
1613          */
1614         if (!drbd_get_data_sock(mdev->tconn))
1615                 return 0;
1616
1617         ok = sizeof(p) == drbd_send(mdev->tconn, mdev->tconn->data.socket, &p, sizeof(p), dgs ? MSG_MORE : 0);
1618         if (ok && dgs) {
1619                 dgb = mdev->tconn->int_dig_out;
1620                 drbd_csum_ee(mdev, mdev->tconn->integrity_w_tfm, peer_req, dgb);
1621                 ok = dgs == drbd_send(mdev->tconn, mdev->tconn->data.socket, dgb, dgs, 0);
1622         }
1623         if (ok)
1624                 ok = _drbd_send_zc_ee(mdev, peer_req);
1625
1626         drbd_put_data_sock(mdev->tconn);
1627
1628         return ok;
1629 }
1630
1631 int drbd_send_oos(struct drbd_conf *mdev, struct drbd_request *req)
1632 {
1633         struct p_block_desc p;
1634
1635         p.sector  = cpu_to_be64(req->i.sector);
1636         p.blksize = cpu_to_be32(req->i.size);
1637
1638         return drbd_send_cmd(mdev, USE_DATA_SOCKET, P_OUT_OF_SYNC, &p.head, sizeof(p));
1639 }
1640
1641 /*
1642   drbd_send distinguishes two cases:
1643
1644   Packets sent via the data socket "sock"
1645   and packets sent via the meta data socket "msock"
1646
1647                     sock                      msock
1648   -----------------+-------------------------+------------------------------
1649   timeout           conf.timeout / 2          conf.timeout / 2
1650   timeout action    send a ping via msock     Abort communication
1651                                               and close all sockets
1652 */
1653
1654 /*
1655  * you must have down()ed the appropriate [m]sock_mutex elsewhere!
1656  */
1657 int drbd_send(struct drbd_tconn *tconn, struct socket *sock,
1658               void *buf, size_t size, unsigned msg_flags)
1659 {
1660         struct kvec iov;
1661         struct msghdr msg;
1662         int rv, sent = 0;
1663
1664         if (!sock)
1665                 return -1000;
1666
1667         /* THINK  if (signal_pending) return ... ? */
1668
1669         iov.iov_base = buf;
1670         iov.iov_len  = size;
1671
1672         msg.msg_name       = NULL;
1673         msg.msg_namelen    = 0;
1674         msg.msg_control    = NULL;
1675         msg.msg_controllen = 0;
1676         msg.msg_flags      = msg_flags | MSG_NOSIGNAL;
1677
1678         if (sock == tconn->data.socket) {
1679                 tconn->ko_count = tconn->net_conf->ko_count;
1680                 drbd_update_congested(tconn);
1681         }
1682         do {
1683                 /* STRANGE
1684                  * tcp_sendmsg does _not_ use its size parameter at all ?
1685                  *
1686                  * -EAGAIN on timeout, -EINTR on signal.
1687                  */
1688 /* THINK
1689  * do we need to block DRBD_SIG if sock == &meta.socket ??
1690  * otherwise wake_asender() might interrupt some send_*Ack !
1691  */
1692                 rv = kernel_sendmsg(sock, &msg, &iov, 1, size);
1693                 if (rv == -EAGAIN) {
1694                         if (we_should_drop_the_connection(tconn, sock))
1695                                 break;
1696                         else
1697                                 continue;
1698                 }
1699                 if (rv == -EINTR) {
1700                         flush_signals(current);
1701                         rv = 0;
1702                 }
1703                 if (rv < 0)
1704                         break;
1705                 sent += rv;
1706                 iov.iov_base += rv;
1707                 iov.iov_len  -= rv;
1708         } while (sent < size);
1709
1710         if (sock == tconn->data.socket)
1711                 clear_bit(NET_CONGESTED, &tconn->flags);
1712
1713         if (rv <= 0) {
1714                 if (rv != -EAGAIN) {
1715                         conn_err(tconn, "%s_sendmsg returned %d\n",
1716                                  sock == tconn->meta.socket ? "msock" : "sock",
1717                                  rv);
1718                         conn_request_state(tconn, NS(conn, C_BROKEN_PIPE), CS_HARD);
1719                 } else
1720                         conn_request_state(tconn, NS(conn, C_TIMEOUT), CS_HARD);
1721         }
1722
1723         return sent;
1724 }
1725
1726 static int drbd_open(struct block_device *bdev, fmode_t mode)
1727 {
1728         struct drbd_conf *mdev = bdev->bd_disk->private_data;
1729         unsigned long flags;
1730         int rv = 0;
1731
1732         mutex_lock(&drbd_main_mutex);
1733         spin_lock_irqsave(&mdev->tconn->req_lock, flags);
1734         /* to have a stable mdev->state.role
1735          * and no race with updating open_cnt */
1736
1737         if (mdev->state.role != R_PRIMARY) {
1738                 if (mode & FMODE_WRITE)
1739                         rv = -EROFS;
1740                 else if (!allow_oos)
1741                         rv = -EMEDIUMTYPE;
1742         }
1743
1744         if (!rv)
1745                 mdev->open_cnt++;
1746         spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
1747         mutex_unlock(&drbd_main_mutex);
1748
1749         return rv;
1750 }
1751
1752 static int drbd_release(struct gendisk *gd, fmode_t mode)
1753 {
1754         struct drbd_conf *mdev = gd->private_data;
1755         mutex_lock(&drbd_main_mutex);
1756         mdev->open_cnt--;
1757         mutex_unlock(&drbd_main_mutex);
1758         return 0;
1759 }
1760
1761 static void drbd_set_defaults(struct drbd_conf *mdev)
1762 {
1763         /* This way we get a compile error when sync_conf grows,
1764            and we forgot to initialize it here */
1765         mdev->sync_conf = (struct syncer_conf) {
1766                 /* .rate = */           DRBD_RATE_DEF,
1767                 /* .after = */          DRBD_AFTER_DEF,
1768                 /* .al_extents = */     DRBD_AL_EXTENTS_DEF,
1769                 /* .verify_alg = */     {}, 0,
1770                 /* .cpu_mask = */       {}, 0,
1771                 /* .csums_alg = */      {}, 0,
1772                 /* .use_rle = */        0,
1773                 /* .on_no_data = */     DRBD_ON_NO_DATA_DEF,
1774                 /* .c_plan_ahead = */   DRBD_C_PLAN_AHEAD_DEF,
1775                 /* .c_delay_target = */ DRBD_C_DELAY_TARGET_DEF,
1776                 /* .c_fill_target = */  DRBD_C_FILL_TARGET_DEF,
1777                 /* .c_max_rate = */     DRBD_C_MAX_RATE_DEF,
1778                 /* .c_min_rate = */     DRBD_C_MIN_RATE_DEF
1779         };
1780
1781         /* Have to use that way, because the layout differs between
1782            big endian and little endian */
1783         mdev->state = (union drbd_state) {
1784                 { .role = R_SECONDARY,
1785                   .peer = R_UNKNOWN,
1786                   .conn = C_STANDALONE,
1787                   .disk = D_DISKLESS,
1788                   .pdsk = D_UNKNOWN,
1789                   .susp = 0,
1790                   .susp_nod = 0,
1791                   .susp_fen = 0
1792                 } };
1793 }
1794
1795 void drbd_init_set_defaults(struct drbd_conf *mdev)
1796 {
1797         /* the memset(,0,) did most of this.
1798          * note: only assignments, no allocation in here */
1799
1800         drbd_set_defaults(mdev);
1801
1802         atomic_set(&mdev->ap_bio_cnt, 0);
1803         atomic_set(&mdev->ap_pending_cnt, 0);
1804         atomic_set(&mdev->rs_pending_cnt, 0);
1805         atomic_set(&mdev->unacked_cnt, 0);
1806         atomic_set(&mdev->local_cnt, 0);
1807         atomic_set(&mdev->pp_in_use, 0);
1808         atomic_set(&mdev->pp_in_use_by_net, 0);
1809         atomic_set(&mdev->rs_sect_in, 0);
1810         atomic_set(&mdev->rs_sect_ev, 0);
1811         atomic_set(&mdev->ap_in_flight, 0);
1812
1813         mutex_init(&mdev->md_io_mutex);
1814         mutex_init(&mdev->own_state_mutex);
1815         mdev->state_mutex = &mdev->own_state_mutex;
1816
1817         spin_lock_init(&mdev->al_lock);
1818         spin_lock_init(&mdev->peer_seq_lock);
1819         spin_lock_init(&mdev->epoch_lock);
1820
1821         INIT_LIST_HEAD(&mdev->active_ee);
1822         INIT_LIST_HEAD(&mdev->sync_ee);
1823         INIT_LIST_HEAD(&mdev->done_ee);
1824         INIT_LIST_HEAD(&mdev->read_ee);
1825         INIT_LIST_HEAD(&mdev->net_ee);
1826         INIT_LIST_HEAD(&mdev->resync_reads);
1827         INIT_LIST_HEAD(&mdev->resync_work.list);
1828         INIT_LIST_HEAD(&mdev->unplug_work.list);
1829         INIT_LIST_HEAD(&mdev->go_diskless.list);
1830         INIT_LIST_HEAD(&mdev->md_sync_work.list);
1831         INIT_LIST_HEAD(&mdev->start_resync_work.list);
1832         INIT_LIST_HEAD(&mdev->bm_io_work.w.list);
1833
1834         mdev->resync_work.cb  = w_resync_timer;
1835         mdev->unplug_work.cb  = w_send_write_hint;
1836         mdev->go_diskless.cb  = w_go_diskless;
1837         mdev->md_sync_work.cb = w_md_sync;
1838         mdev->bm_io_work.w.cb = w_bitmap_io;
1839         mdev->start_resync_work.cb = w_start_resync;
1840
1841         mdev->resync_work.mdev  = mdev;
1842         mdev->unplug_work.mdev  = mdev;
1843         mdev->go_diskless.mdev  = mdev;
1844         mdev->md_sync_work.mdev = mdev;
1845         mdev->bm_io_work.w.mdev = mdev;
1846         mdev->start_resync_work.mdev = mdev;
1847
1848         init_timer(&mdev->resync_timer);
1849         init_timer(&mdev->md_sync_timer);
1850         init_timer(&mdev->start_resync_timer);
1851         init_timer(&mdev->request_timer);
1852         mdev->resync_timer.function = resync_timer_fn;
1853         mdev->resync_timer.data = (unsigned long) mdev;
1854         mdev->md_sync_timer.function = md_sync_timer_fn;
1855         mdev->md_sync_timer.data = (unsigned long) mdev;
1856         mdev->start_resync_timer.function = start_resync_timer_fn;
1857         mdev->start_resync_timer.data = (unsigned long) mdev;
1858         mdev->request_timer.function = request_timer_fn;
1859         mdev->request_timer.data = (unsigned long) mdev;
1860
1861         init_waitqueue_head(&mdev->misc_wait);
1862         init_waitqueue_head(&mdev->state_wait);
1863         init_waitqueue_head(&mdev->ee_wait);
1864         init_waitqueue_head(&mdev->al_wait);
1865         init_waitqueue_head(&mdev->seq_wait);
1866
1867         /* mdev->tconn->agreed_pro_version gets initialized in drbd_connect() */
1868         mdev->write_ordering = WO_bdev_flush;
1869         mdev->resync_wenr = LC_FREE;
1870         mdev->peer_max_bio_size = DRBD_MAX_BIO_SIZE_SAFE;
1871         mdev->local_max_bio_size = DRBD_MAX_BIO_SIZE_SAFE;
1872 }
1873
1874 void drbd_mdev_cleanup(struct drbd_conf *mdev)
1875 {
1876         int i;
1877         if (mdev->tconn->receiver.t_state != NONE)
1878                 dev_err(DEV, "ASSERT FAILED: receiver t_state == %d expected 0.\n",
1879                                 mdev->tconn->receiver.t_state);
1880
1881         /* no need to lock it, I'm the only thread alive */
1882         if (atomic_read(&mdev->current_epoch->epoch_size) !=  0)
1883                 dev_err(DEV, "epoch_size:%d\n", atomic_read(&mdev->current_epoch->epoch_size));
1884         mdev->al_writ_cnt  =
1885         mdev->bm_writ_cnt  =
1886         mdev->read_cnt     =
1887         mdev->recv_cnt     =
1888         mdev->send_cnt     =
1889         mdev->writ_cnt     =
1890         mdev->p_size       =
1891         mdev->rs_start     =
1892         mdev->rs_total     =
1893         mdev->rs_failed    = 0;
1894         mdev->rs_last_events = 0;
1895         mdev->rs_last_sect_ev = 0;
1896         for (i = 0; i < DRBD_SYNC_MARKS; i++) {
1897                 mdev->rs_mark_left[i] = 0;
1898                 mdev->rs_mark_time[i] = 0;
1899         }
1900         D_ASSERT(mdev->tconn->net_conf == NULL);
1901
1902         drbd_set_my_capacity(mdev, 0);
1903         if (mdev->bitmap) {
1904                 /* maybe never allocated. */
1905                 drbd_bm_resize(mdev, 0, 1);
1906                 drbd_bm_cleanup(mdev);
1907         }
1908
1909         drbd_free_resources(mdev);
1910         clear_bit(AL_SUSPENDED, &mdev->flags);
1911
1912         /*
1913          * currently we drbd_init_ee only on module load, so
1914          * we may do drbd_release_ee only on module unload!
1915          */
1916         D_ASSERT(list_empty(&mdev->active_ee));
1917         D_ASSERT(list_empty(&mdev->sync_ee));
1918         D_ASSERT(list_empty(&mdev->done_ee));
1919         D_ASSERT(list_empty(&mdev->read_ee));
1920         D_ASSERT(list_empty(&mdev->net_ee));
1921         D_ASSERT(list_empty(&mdev->resync_reads));
1922         D_ASSERT(list_empty(&mdev->tconn->data.work.q));
1923         D_ASSERT(list_empty(&mdev->tconn->meta.work.q));
1924         D_ASSERT(list_empty(&mdev->resync_work.list));
1925         D_ASSERT(list_empty(&mdev->unplug_work.list));
1926         D_ASSERT(list_empty(&mdev->go_diskless.list));
1927
1928         drbd_set_defaults(mdev);
1929 }
1930
1931
1932 static void drbd_destroy_mempools(void)
1933 {
1934         struct page *page;
1935
1936         while (drbd_pp_pool) {
1937                 page = drbd_pp_pool;
1938                 drbd_pp_pool = (struct page *)page_private(page);
1939                 __free_page(page);
1940                 drbd_pp_vacant--;
1941         }
1942
1943         /* D_ASSERT(atomic_read(&drbd_pp_vacant)==0); */
1944
1945         if (drbd_ee_mempool)
1946                 mempool_destroy(drbd_ee_mempool);
1947         if (drbd_request_mempool)
1948                 mempool_destroy(drbd_request_mempool);
1949         if (drbd_ee_cache)
1950                 kmem_cache_destroy(drbd_ee_cache);
1951         if (drbd_request_cache)
1952                 kmem_cache_destroy(drbd_request_cache);
1953         if (drbd_bm_ext_cache)
1954                 kmem_cache_destroy(drbd_bm_ext_cache);
1955         if (drbd_al_ext_cache)
1956                 kmem_cache_destroy(drbd_al_ext_cache);
1957
1958         drbd_ee_mempool      = NULL;
1959         drbd_request_mempool = NULL;
1960         drbd_ee_cache        = NULL;
1961         drbd_request_cache   = NULL;
1962         drbd_bm_ext_cache    = NULL;
1963         drbd_al_ext_cache    = NULL;
1964
1965         return;
1966 }
1967
1968 static int drbd_create_mempools(void)
1969 {
1970         struct page *page;
1971         const int number = (DRBD_MAX_BIO_SIZE/PAGE_SIZE) * minor_count;
1972         int i;
1973
1974         /* prepare our caches and mempools */
1975         drbd_request_mempool = NULL;
1976         drbd_ee_cache        = NULL;
1977         drbd_request_cache   = NULL;
1978         drbd_bm_ext_cache    = NULL;
1979         drbd_al_ext_cache    = NULL;
1980         drbd_pp_pool         = NULL;
1981
1982         /* caches */
1983         drbd_request_cache = kmem_cache_create(
1984                 "drbd_req", sizeof(struct drbd_request), 0, 0, NULL);
1985         if (drbd_request_cache == NULL)
1986                 goto Enomem;
1987
1988         drbd_ee_cache = kmem_cache_create(
1989                 "drbd_ee", sizeof(struct drbd_peer_request), 0, 0, NULL);
1990         if (drbd_ee_cache == NULL)
1991                 goto Enomem;
1992
1993         drbd_bm_ext_cache = kmem_cache_create(
1994                 "drbd_bm", sizeof(struct bm_extent), 0, 0, NULL);
1995         if (drbd_bm_ext_cache == NULL)
1996                 goto Enomem;
1997
1998         drbd_al_ext_cache = kmem_cache_create(
1999                 "drbd_al", sizeof(struct lc_element), 0, 0, NULL);
2000         if (drbd_al_ext_cache == NULL)
2001                 goto Enomem;
2002
2003         /* mempools */
2004         drbd_request_mempool = mempool_create(number,
2005                 mempool_alloc_slab, mempool_free_slab, drbd_request_cache);
2006         if (drbd_request_mempool == NULL)
2007                 goto Enomem;
2008
2009         drbd_ee_mempool = mempool_create(number,
2010                 mempool_alloc_slab, mempool_free_slab, drbd_ee_cache);
2011         if (drbd_ee_mempool == NULL)
2012                 goto Enomem;
2013
2014         /* drbd's page pool */
2015         spin_lock_init(&drbd_pp_lock);
2016
2017         for (i = 0; i < number; i++) {
2018                 page = alloc_page(GFP_HIGHUSER);
2019                 if (!page)
2020                         goto Enomem;
2021                 set_page_private(page, (unsigned long)drbd_pp_pool);
2022                 drbd_pp_pool = page;
2023         }
2024         drbd_pp_vacant = number;
2025
2026         return 0;
2027
2028 Enomem:
2029         drbd_destroy_mempools(); /* in case we allocated some */
2030         return -ENOMEM;
2031 }
2032
2033 static int drbd_notify_sys(struct notifier_block *this, unsigned long code,
2034         void *unused)
2035 {
2036         /* just so we have it.  you never know what interesting things we
2037          * might want to do here some day...
2038          */
2039
2040         return NOTIFY_DONE;
2041 }
2042
2043 static struct notifier_block drbd_notifier = {
2044         .notifier_call = drbd_notify_sys,
2045 };
2046
2047 static void drbd_release_ee_lists(struct drbd_conf *mdev)
2048 {
2049         int rr;
2050
2051         rr = drbd_release_ee(mdev, &mdev->active_ee);
2052         if (rr)
2053                 dev_err(DEV, "%d EEs in active list found!\n", rr);
2054
2055         rr = drbd_release_ee(mdev, &mdev->sync_ee);
2056         if (rr)
2057                 dev_err(DEV, "%d EEs in sync list found!\n", rr);
2058
2059         rr = drbd_release_ee(mdev, &mdev->read_ee);
2060         if (rr)
2061                 dev_err(DEV, "%d EEs in read list found!\n", rr);
2062
2063         rr = drbd_release_ee(mdev, &mdev->done_ee);
2064         if (rr)
2065                 dev_err(DEV, "%d EEs in done list found!\n", rr);
2066
2067         rr = drbd_release_ee(mdev, &mdev->net_ee);
2068         if (rr)
2069                 dev_err(DEV, "%d EEs in net list found!\n", rr);
2070 }
2071
2072 /* caution. no locking.
2073  * currently only used from module cleanup code. */
2074 static void drbd_delete_device(unsigned int minor)
2075 {
2076         struct drbd_conf *mdev = minor_to_mdev(minor);
2077
2078         if (!mdev)
2079                 return;
2080
2081         /* paranoia asserts */
2082         D_ASSERT(mdev->open_cnt == 0);
2083         D_ASSERT(list_empty(&mdev->tconn->data.work.q));
2084         /* end paranoia asserts */
2085
2086         del_gendisk(mdev->vdisk);
2087
2088         /* cleanup stuff that may have been allocated during
2089          * device (re-)configuration or state changes */
2090
2091         if (mdev->this_bdev)
2092                 bdput(mdev->this_bdev);
2093
2094         drbd_free_resources(mdev);
2095         drbd_free_tconn(mdev->tconn);
2096
2097         drbd_release_ee_lists(mdev);
2098
2099         lc_destroy(mdev->act_log);
2100         lc_destroy(mdev->resync);
2101
2102         kfree(mdev->p_uuid);
2103         /* mdev->p_uuid = NULL; */
2104
2105         /* cleanup the rest that has been
2106          * allocated from drbd_new_device
2107          * and actually free the mdev itself */
2108         drbd_free_mdev(mdev);
2109 }
2110
2111 static void drbd_cleanup(void)
2112 {
2113         unsigned int i;
2114
2115         unregister_reboot_notifier(&drbd_notifier);
2116
2117         /* first remove proc,
2118          * drbdsetup uses it's presence to detect
2119          * whether DRBD is loaded.
2120          * If we would get stuck in proc removal,
2121          * but have netlink already deregistered,
2122          * some drbdsetup commands may wait forever
2123          * for an answer.
2124          */
2125         if (drbd_proc)
2126                 remove_proc_entry("drbd", NULL);
2127
2128         drbd_nl_cleanup();
2129
2130         if (minor_table) {
2131                 i = minor_count;
2132                 while (i--)
2133                         drbd_delete_device(i);
2134                 drbd_destroy_mempools();
2135         }
2136
2137         kfree(minor_table);
2138
2139         unregister_blkdev(DRBD_MAJOR, "drbd");
2140
2141         printk(KERN_INFO "drbd: module cleanup done.\n");
2142 }
2143
2144 /**
2145  * drbd_congested() - Callback for pdflush
2146  * @congested_data:     User data
2147  * @bdi_bits:           Bits pdflush is currently interested in
2148  *
2149  * Returns 1<<BDI_async_congested and/or 1<<BDI_sync_congested if we are congested.
2150  */
2151 static int drbd_congested(void *congested_data, int bdi_bits)
2152 {
2153         struct drbd_conf *mdev = congested_data;
2154         struct request_queue *q;
2155         char reason = '-';
2156         int r = 0;
2157
2158         if (!may_inc_ap_bio(mdev)) {
2159                 /* DRBD has frozen IO */
2160                 r = bdi_bits;
2161                 reason = 'd';
2162                 goto out;
2163         }
2164
2165         if (get_ldev(mdev)) {
2166                 q = bdev_get_queue(mdev->ldev->backing_bdev);
2167                 r = bdi_congested(&q->backing_dev_info, bdi_bits);
2168                 put_ldev(mdev);
2169                 if (r)
2170                         reason = 'b';
2171         }
2172
2173         if (bdi_bits & (1 << BDI_async_congested) && test_bit(NET_CONGESTED, &mdev->tconn->flags)) {
2174                 r |= (1 << BDI_async_congested);
2175                 reason = reason == 'b' ? 'a' : 'n';
2176         }
2177
2178 out:
2179         mdev->congestion_reason = reason;
2180         return r;
2181 }
2182
2183 static void drbd_init_workqueue(struct drbd_work_queue* wq)
2184 {
2185         sema_init(&wq->s, 0);
2186         spin_lock_init(&wq->q_lock);
2187         INIT_LIST_HEAD(&wq->q);
2188 }
2189
2190 struct drbd_tconn *drbd_new_tconn(char *name)
2191 {
2192         struct drbd_tconn *tconn;
2193
2194         tconn = kzalloc(sizeof(struct drbd_tconn), GFP_KERNEL);
2195         if (!tconn)
2196                 return NULL;
2197
2198         tconn->name = kstrdup(name, GFP_KERNEL);
2199         if (!tconn->name)
2200                 goto fail;
2201
2202         tconn->cstate = C_STANDALONE;
2203         mutex_init(&tconn->cstate_mutex);
2204         spin_lock_init(&tconn->req_lock);
2205         atomic_set(&tconn->net_cnt, 0);
2206         init_waitqueue_head(&tconn->net_cnt_wait);
2207         init_waitqueue_head(&tconn->ping_wait);
2208         idr_init(&tconn->volumes);
2209
2210         drbd_init_workqueue(&tconn->data.work);
2211         mutex_init(&tconn->data.mutex);
2212
2213         drbd_init_workqueue(&tconn->meta.work);
2214         mutex_init(&tconn->meta.mutex);
2215
2216         drbd_thread_init(tconn, &tconn->receiver, drbdd_init, "receiver");
2217         drbd_thread_init(tconn, &tconn->worker, drbd_worker, "worker");
2218         drbd_thread_init(tconn, &tconn->asender, drbd_asender, "asender");
2219
2220         write_lock_irq(&global_state_lock);
2221         list_add(&tconn->all_tconn, &drbd_tconns);
2222         write_unlock_irq(&global_state_lock);
2223
2224         return tconn;
2225
2226 fail:
2227         kfree(tconn->name);
2228         kfree(tconn);
2229
2230         return NULL;
2231 }
2232
2233 void drbd_free_tconn(struct drbd_tconn *tconn)
2234 {
2235         write_lock_irq(&global_state_lock);
2236         list_del(&tconn->all_tconn);
2237         write_unlock_irq(&global_state_lock);
2238         idr_destroy(&tconn->volumes);
2239
2240         kfree(tconn->name);
2241         kfree(tconn->int_dig_out);
2242         kfree(tconn->int_dig_in);
2243         kfree(tconn->int_dig_vv);
2244         kfree(tconn);
2245 }
2246
2247 struct drbd_conf *drbd_new_device(unsigned int minor)
2248 {
2249         struct drbd_conf *mdev;
2250         struct gendisk *disk;
2251         struct request_queue *q;
2252         char conn_name[9]; /* drbd1234N */
2253         int vnr;
2254
2255         /* GFP_KERNEL, we are outside of all write-out paths */
2256         mdev = kzalloc(sizeof(struct drbd_conf), GFP_KERNEL);
2257         if (!mdev)
2258                 return NULL;
2259         sprintf(conn_name, "drbd%d", minor);
2260         mdev->tconn = drbd_new_tconn(conn_name);
2261         if (!mdev->tconn)
2262                 goto out_no_tconn;
2263         if (!idr_pre_get(&mdev->tconn->volumes, GFP_KERNEL))
2264                 goto out_no_cpumask;
2265         if (idr_get_new(&mdev->tconn->volumes, mdev, &vnr))
2266                 goto out_no_cpumask;
2267         if (vnr != 0) {
2268                 dev_err(DEV, "vnr = %d\n", vnr);
2269                 goto out_no_cpumask;
2270         }
2271         if (!zalloc_cpumask_var(&mdev->tconn->cpu_mask, GFP_KERNEL))
2272                 goto out_no_cpumask;
2273
2274         mdev->minor = minor;
2275
2276         drbd_init_set_defaults(mdev);
2277
2278         q = blk_alloc_queue(GFP_KERNEL);
2279         if (!q)
2280                 goto out_no_q;
2281         mdev->rq_queue = q;
2282         q->queuedata   = mdev;
2283
2284         disk = alloc_disk(1);
2285         if (!disk)
2286                 goto out_no_disk;
2287         mdev->vdisk = disk;
2288
2289         set_disk_ro(disk, true);
2290
2291         disk->queue = q;
2292         disk->major = DRBD_MAJOR;
2293         disk->first_minor = minor;
2294         disk->fops = &drbd_ops;
2295         sprintf(disk->disk_name, "drbd%d", minor);
2296         disk->private_data = mdev;
2297
2298         mdev->this_bdev = bdget(MKDEV(DRBD_MAJOR, minor));
2299         /* we have no partitions. we contain only ourselves. */
2300         mdev->this_bdev->bd_contains = mdev->this_bdev;
2301
2302         q->backing_dev_info.congested_fn = drbd_congested;
2303         q->backing_dev_info.congested_data = mdev;
2304
2305         blk_queue_make_request(q, drbd_make_request);
2306         /* Setting the max_hw_sectors to an odd value of 8kibyte here
2307            This triggers a max_bio_size message upon first attach or connect */
2308         blk_queue_max_hw_sectors(q, DRBD_MAX_BIO_SIZE_SAFE >> 8);
2309         blk_queue_bounce_limit(q, BLK_BOUNCE_ANY);
2310         blk_queue_merge_bvec(q, drbd_merge_bvec);
2311         q->queue_lock = &mdev->tconn->req_lock; /* needed since we use */
2312
2313         mdev->md_io_page = alloc_page(GFP_KERNEL);
2314         if (!mdev->md_io_page)
2315                 goto out_no_io_page;
2316
2317         if (drbd_bm_init(mdev))
2318                 goto out_no_bitmap;
2319         /* no need to lock access, we are still initializing this minor device. */
2320         if (!tl_init(mdev))
2321                 goto out_no_tl;
2322         mdev->read_requests = RB_ROOT;
2323         mdev->write_requests = RB_ROOT;
2324
2325         mdev->current_epoch = kzalloc(sizeof(struct drbd_epoch), GFP_KERNEL);
2326         if (!mdev->current_epoch)
2327                 goto out_no_epoch;
2328
2329         INIT_LIST_HEAD(&mdev->current_epoch->list);
2330         mdev->epochs = 1;
2331
2332         return mdev;
2333
2334 /* out_whatever_else:
2335         kfree(mdev->current_epoch); */
2336 out_no_epoch:
2337         tl_cleanup(mdev);
2338 out_no_tl:
2339         drbd_bm_cleanup(mdev);
2340 out_no_bitmap:
2341         __free_page(mdev->md_io_page);
2342 out_no_io_page:
2343         put_disk(disk);
2344 out_no_disk:
2345         blk_cleanup_queue(q);
2346 out_no_q:
2347         free_cpumask_var(mdev->tconn->cpu_mask);
2348 out_no_cpumask:
2349         drbd_free_tconn(mdev->tconn);
2350 out_no_tconn:
2351         kfree(mdev);
2352         return NULL;
2353 }
2354
2355 /* counterpart of drbd_new_device.
2356  * last part of drbd_delete_device. */
2357 void drbd_free_mdev(struct drbd_conf *mdev)
2358 {
2359         kfree(mdev->current_epoch);
2360         tl_cleanup(mdev);
2361         if (mdev->bitmap) /* should no longer be there. */
2362                 drbd_bm_cleanup(mdev);
2363         __free_page(mdev->md_io_page);
2364         put_disk(mdev->vdisk);
2365         blk_cleanup_queue(mdev->rq_queue);
2366         kfree(mdev);
2367 }
2368
2369
2370 int __init drbd_init(void)
2371 {
2372         int err;
2373
2374         BUILD_BUG_ON(sizeof(struct p_header80) != sizeof(struct p_header95));
2375         BUILD_BUG_ON(sizeof(struct p_handshake) != 80);
2376
2377         if (minor_count < DRBD_MINOR_COUNT_MIN || minor_count > DRBD_MINOR_COUNT_MAX) {
2378                 printk(KERN_ERR
2379                         "drbd: invalid minor_count (%d)\n", minor_count);
2380 #ifdef MODULE
2381                 return -EINVAL;
2382 #else
2383                 minor_count = 8;
2384 #endif
2385         }
2386
2387         err = drbd_nl_init();
2388         if (err)
2389                 return err;
2390
2391         err = register_blkdev(DRBD_MAJOR, "drbd");
2392         if (err) {
2393                 printk(KERN_ERR
2394                        "drbd: unable to register block device major %d\n",
2395                        DRBD_MAJOR);
2396                 return err;
2397         }
2398
2399         register_reboot_notifier(&drbd_notifier);
2400
2401         /*
2402          * allocate all necessary structs
2403          */
2404         err = -ENOMEM;
2405
2406         init_waitqueue_head(&drbd_pp_wait);
2407
2408         drbd_proc = NULL; /* play safe for drbd_cleanup */
2409         minor_table = kzalloc(sizeof(struct drbd_conf *)*minor_count,
2410                                 GFP_KERNEL);
2411         if (!minor_table)
2412                 goto Enomem;
2413
2414         err = drbd_create_mempools();
2415         if (err)
2416                 goto Enomem;
2417
2418         drbd_proc = proc_create_data("drbd", S_IFREG | S_IRUGO , NULL, &drbd_proc_fops, NULL);
2419         if (!drbd_proc) {
2420                 printk(KERN_ERR "drbd: unable to register proc file\n");
2421                 goto Enomem;
2422         }
2423
2424         rwlock_init(&global_state_lock);
2425         INIT_LIST_HEAD(&drbd_tconns);
2426
2427         printk(KERN_INFO "drbd: initialized. "
2428                "Version: " REL_VERSION " (api:%d/proto:%d-%d)\n",
2429                API_VERSION, PRO_VERSION_MIN, PRO_VERSION_MAX);
2430         printk(KERN_INFO "drbd: %s\n", drbd_buildtag());
2431         printk(KERN_INFO "drbd: registered as block device major %d\n",
2432                 DRBD_MAJOR);
2433         printk(KERN_INFO "drbd: minor_table @ 0x%p\n", minor_table);
2434
2435         return 0; /* Success! */
2436
2437 Enomem:
2438         drbd_cleanup();
2439         if (err == -ENOMEM)
2440                 /* currently always the case */
2441                 printk(KERN_ERR "drbd: ran out of memory\n");
2442         else
2443                 printk(KERN_ERR "drbd: initialization failure\n");
2444         return err;
2445 }
2446
2447 void drbd_free_bc(struct drbd_backing_dev *ldev)
2448 {
2449         if (ldev == NULL)
2450                 return;
2451
2452         blkdev_put(ldev->backing_bdev, FMODE_READ | FMODE_WRITE | FMODE_EXCL);
2453         blkdev_put(ldev->md_bdev, FMODE_READ | FMODE_WRITE | FMODE_EXCL);
2454
2455         kfree(ldev);
2456 }
2457
2458 void drbd_free_sock(struct drbd_tconn *tconn)
2459 {
2460         if (tconn->data.socket) {
2461                 mutex_lock(&tconn->data.mutex);
2462                 kernel_sock_shutdown(tconn->data.socket, SHUT_RDWR);
2463                 sock_release(tconn->data.socket);
2464                 tconn->data.socket = NULL;
2465                 mutex_unlock(&tconn->data.mutex);
2466         }
2467         if (tconn->meta.socket) {
2468                 mutex_lock(&tconn->meta.mutex);
2469                 kernel_sock_shutdown(tconn->meta.socket, SHUT_RDWR);
2470                 sock_release(tconn->meta.socket);
2471                 tconn->meta.socket = NULL;
2472                 mutex_unlock(&tconn->meta.mutex);
2473         }
2474 }
2475
2476
2477 void drbd_free_resources(struct drbd_conf *mdev)
2478 {
2479         crypto_free_hash(mdev->csums_tfm);
2480         mdev->csums_tfm = NULL;
2481         crypto_free_hash(mdev->verify_tfm);
2482         mdev->verify_tfm = NULL;
2483         crypto_free_hash(mdev->tconn->cram_hmac_tfm);
2484         mdev->tconn->cram_hmac_tfm = NULL;
2485         crypto_free_hash(mdev->tconn->integrity_w_tfm);
2486         mdev->tconn->integrity_w_tfm = NULL;
2487         crypto_free_hash(mdev->tconn->integrity_r_tfm);
2488         mdev->tconn->integrity_r_tfm = NULL;
2489
2490         drbd_free_sock(mdev->tconn);
2491
2492         __no_warn(local,
2493                   drbd_free_bc(mdev->ldev);
2494                   mdev->ldev = NULL;);
2495 }
2496
2497 /* meta data management */
2498
2499 struct meta_data_on_disk {
2500         u64 la_size;           /* last agreed size. */
2501         u64 uuid[UI_SIZE];   /* UUIDs. */
2502         u64 device_uuid;
2503         u64 reserved_u64_1;
2504         u32 flags;             /* MDF */
2505         u32 magic;
2506         u32 md_size_sect;
2507         u32 al_offset;         /* offset to this block */
2508         u32 al_nr_extents;     /* important for restoring the AL */
2509               /* `-- act_log->nr_elements <-- sync_conf.al_extents */
2510         u32 bm_offset;         /* offset to the bitmap, from here */
2511         u32 bm_bytes_per_bit;  /* BM_BLOCK_SIZE */
2512         u32 la_peer_max_bio_size;   /* last peer max_bio_size */
2513         u32 reserved_u32[3];
2514
2515 } __packed;
2516
2517 /**
2518  * drbd_md_sync() - Writes the meta data super block if the MD_DIRTY flag bit is set
2519  * @mdev:       DRBD device.
2520  */
2521 void drbd_md_sync(struct drbd_conf *mdev)
2522 {
2523         struct meta_data_on_disk *buffer;
2524         sector_t sector;
2525         int i;
2526
2527         del_timer(&mdev->md_sync_timer);
2528         /* timer may be rearmed by drbd_md_mark_dirty() now. */
2529         if (!test_and_clear_bit(MD_DIRTY, &mdev->flags))
2530                 return;
2531
2532         /* We use here D_FAILED and not D_ATTACHING because we try to write
2533          * metadata even if we detach due to a disk failure! */
2534         if (!get_ldev_if_state(mdev, D_FAILED))
2535                 return;
2536
2537         mutex_lock(&mdev->md_io_mutex);
2538         buffer = (struct meta_data_on_disk *)page_address(mdev->md_io_page);
2539         memset(buffer, 0, 512);
2540
2541         buffer->la_size = cpu_to_be64(drbd_get_capacity(mdev->this_bdev));
2542         for (i = UI_CURRENT; i < UI_SIZE; i++)
2543                 buffer->uuid[i] = cpu_to_be64(mdev->ldev->md.uuid[i]);
2544         buffer->flags = cpu_to_be32(mdev->ldev->md.flags);
2545         buffer->magic = cpu_to_be32(DRBD_MD_MAGIC);
2546
2547         buffer->md_size_sect  = cpu_to_be32(mdev->ldev->md.md_size_sect);
2548         buffer->al_offset     = cpu_to_be32(mdev->ldev->md.al_offset);
2549         buffer->al_nr_extents = cpu_to_be32(mdev->act_log->nr_elements);
2550         buffer->bm_bytes_per_bit = cpu_to_be32(BM_BLOCK_SIZE);
2551         buffer->device_uuid = cpu_to_be64(mdev->ldev->md.device_uuid);
2552
2553         buffer->bm_offset = cpu_to_be32(mdev->ldev->md.bm_offset);
2554         buffer->la_peer_max_bio_size = cpu_to_be32(mdev->peer_max_bio_size);
2555
2556         D_ASSERT(drbd_md_ss__(mdev, mdev->ldev) == mdev->ldev->md.md_offset);
2557         sector = mdev->ldev->md.md_offset;
2558
2559         if (!drbd_md_sync_page_io(mdev, mdev->ldev, sector, WRITE)) {
2560                 /* this was a try anyways ... */
2561                 dev_err(DEV, "meta data update failed!\n");
2562                 drbd_chk_io_error(mdev, 1, true);
2563         }
2564
2565         /* Update mdev->ldev->md.la_size_sect,
2566          * since we updated it on metadata. */
2567         mdev->ldev->md.la_size_sect = drbd_get_capacity(mdev->this_bdev);
2568
2569         mutex_unlock(&mdev->md_io_mutex);
2570         put_ldev(mdev);
2571 }
2572
2573 /**
2574  * drbd_md_read() - Reads in the meta data super block
2575  * @mdev:       DRBD device.
2576  * @bdev:       Device from which the meta data should be read in.
2577  *
2578  * Return 0 (NO_ERROR) on success, and an enum drbd_ret_code in case
2579  * something goes wrong.  Currently only: ERR_IO_MD_DISK, ERR_MD_INVALID.
2580  */
2581 int drbd_md_read(struct drbd_conf *mdev, struct drbd_backing_dev *bdev)
2582 {
2583         struct meta_data_on_disk *buffer;
2584         int i, rv = NO_ERROR;
2585
2586         if (!get_ldev_if_state(mdev, D_ATTACHING))
2587                 return ERR_IO_MD_DISK;
2588
2589         mutex_lock(&mdev->md_io_mutex);
2590         buffer = (struct meta_data_on_disk *)page_address(mdev->md_io_page);
2591
2592         if (!drbd_md_sync_page_io(mdev, bdev, bdev->md.md_offset, READ)) {
2593                 /* NOTE: can't do normal error processing here as this is
2594                    called BEFORE disk is attached */
2595                 dev_err(DEV, "Error while reading metadata.\n");
2596                 rv = ERR_IO_MD_DISK;
2597                 goto err;
2598         }
2599
2600         if (buffer->magic != cpu_to_be32(DRBD_MD_MAGIC)) {
2601                 dev_err(DEV, "Error while reading metadata, magic not found.\n");
2602                 rv = ERR_MD_INVALID;
2603                 goto err;
2604         }
2605         if (be32_to_cpu(buffer->al_offset) != bdev->md.al_offset) {
2606                 dev_err(DEV, "unexpected al_offset: %d (expected %d)\n",
2607                     be32_to_cpu(buffer->al_offset), bdev->md.al_offset);
2608                 rv = ERR_MD_INVALID;
2609                 goto err;
2610         }
2611         if (be32_to_cpu(buffer->bm_offset) != bdev->md.bm_offset) {
2612                 dev_err(DEV, "unexpected bm_offset: %d (expected %d)\n",
2613                     be32_to_cpu(buffer->bm_offset), bdev->md.bm_offset);
2614                 rv = ERR_MD_INVALID;
2615                 goto err;
2616         }
2617         if (be32_to_cpu(buffer->md_size_sect) != bdev->md.md_size_sect) {
2618                 dev_err(DEV, "unexpected md_size: %u (expected %u)\n",
2619                     be32_to_cpu(buffer->md_size_sect), bdev->md.md_size_sect);
2620                 rv = ERR_MD_INVALID;
2621                 goto err;
2622         }
2623
2624         if (be32_to_cpu(buffer->bm_bytes_per_bit) != BM_BLOCK_SIZE) {
2625                 dev_err(DEV, "unexpected bm_bytes_per_bit: %u (expected %u)\n",
2626                     be32_to_cpu(buffer->bm_bytes_per_bit), BM_BLOCK_SIZE);
2627                 rv = ERR_MD_INVALID;
2628                 goto err;
2629         }
2630
2631         bdev->md.la_size_sect = be64_to_cpu(buffer->la_size);
2632         for (i = UI_CURRENT; i < UI_SIZE; i++)
2633                 bdev->md.uuid[i] = be64_to_cpu(buffer->uuid[i]);
2634         bdev->md.flags = be32_to_cpu(buffer->flags);
2635         mdev->sync_conf.al_extents = be32_to_cpu(buffer->al_nr_extents);
2636         bdev->md.device_uuid = be64_to_cpu(buffer->device_uuid);
2637
2638         spin_lock_irq(&mdev->tconn->req_lock);
2639         if (mdev->state.conn < C_CONNECTED) {
2640                 int peer;
2641                 peer = be32_to_cpu(buffer->la_peer_max_bio_size);
2642                 peer = max_t(int, peer, DRBD_MAX_BIO_SIZE_SAFE);
2643                 mdev->peer_max_bio_size = peer;
2644         }
2645         spin_unlock_irq(&mdev->tconn->req_lock);
2646
2647         if (mdev->sync_conf.al_extents < 7)
2648                 mdev->sync_conf.al_extents = 127;
2649
2650  err:
2651         mutex_unlock(&mdev->md_io_mutex);
2652         put_ldev(mdev);
2653
2654         return rv;
2655 }
2656
2657 /**
2658  * drbd_md_mark_dirty() - Mark meta data super block as dirty
2659  * @mdev:       DRBD device.
2660  *
2661  * Call this function if you change anything that should be written to
2662  * the meta-data super block. This function sets MD_DIRTY, and starts a
2663  * timer that ensures that within five seconds you have to call drbd_md_sync().
2664  */
2665 #ifdef DEBUG
2666 void drbd_md_mark_dirty_(struct drbd_conf *mdev, unsigned int line, const char *func)
2667 {
2668         if (!test_and_set_bit(MD_DIRTY, &mdev->flags)) {
2669                 mod_timer(&mdev->md_sync_timer, jiffies + HZ);
2670                 mdev->last_md_mark_dirty.line = line;
2671                 mdev->last_md_mark_dirty.func = func;
2672         }
2673 }
2674 #else
2675 void drbd_md_mark_dirty(struct drbd_conf *mdev)
2676 {
2677         if (!test_and_set_bit(MD_DIRTY, &mdev->flags))
2678                 mod_timer(&mdev->md_sync_timer, jiffies + 5*HZ);
2679 }
2680 #endif
2681
2682 static void drbd_uuid_move_history(struct drbd_conf *mdev) __must_hold(local)
2683 {
2684         int i;
2685
2686         for (i = UI_HISTORY_START; i < UI_HISTORY_END; i++)
2687                 mdev->ldev->md.uuid[i+1] = mdev->ldev->md.uuid[i];
2688 }
2689
2690 void _drbd_uuid_set(struct drbd_conf *mdev, int idx, u64 val) __must_hold(local)
2691 {
2692         if (idx == UI_CURRENT) {
2693                 if (mdev->state.role == R_PRIMARY)
2694                         val |= 1;
2695                 else
2696                         val &= ~((u64)1);
2697
2698                 drbd_set_ed_uuid(mdev, val);
2699         }
2700
2701         mdev->ldev->md.uuid[idx] = val;
2702         drbd_md_mark_dirty(mdev);
2703 }
2704
2705
2706 void drbd_uuid_set(struct drbd_conf *mdev, int idx, u64 val) __must_hold(local)
2707 {
2708         if (mdev->ldev->md.uuid[idx]) {
2709                 drbd_uuid_move_history(mdev);
2710                 mdev->ldev->md.uuid[UI_HISTORY_START] = mdev->ldev->md.uuid[idx];
2711         }
2712         _drbd_uuid_set(mdev, idx, val);
2713 }
2714
2715 /**
2716  * drbd_uuid_new_current() - Creates a new current UUID
2717  * @mdev:       DRBD device.
2718  *
2719  * Creates a new current UUID, and rotates the old current UUID into
2720  * the bitmap slot. Causes an incremental resync upon next connect.
2721  */
2722 void drbd_uuid_new_current(struct drbd_conf *mdev) __must_hold(local)
2723 {
2724         u64 val;
2725         unsigned long long bm_uuid = mdev->ldev->md.uuid[UI_BITMAP];
2726
2727         if (bm_uuid)
2728                 dev_warn(DEV, "bm UUID was already set: %llX\n", bm_uuid);
2729
2730         mdev->ldev->md.uuid[UI_BITMAP] = mdev->ldev->md.uuid[UI_CURRENT];
2731
2732         get_random_bytes(&val, sizeof(u64));
2733         _drbd_uuid_set(mdev, UI_CURRENT, val);
2734         drbd_print_uuids(mdev, "new current UUID");
2735         /* get it to stable storage _now_ */
2736         drbd_md_sync(mdev);
2737 }
2738
2739 void drbd_uuid_set_bm(struct drbd_conf *mdev, u64 val) __must_hold(local)
2740 {
2741         if (mdev->ldev->md.uuid[UI_BITMAP] == 0 && val == 0)
2742                 return;
2743
2744         if (val == 0) {
2745                 drbd_uuid_move_history(mdev);
2746                 mdev->ldev->md.uuid[UI_HISTORY_START] = mdev->ldev->md.uuid[UI_BITMAP];
2747                 mdev->ldev->md.uuid[UI_BITMAP] = 0;
2748         } else {
2749                 unsigned long long bm_uuid = mdev->ldev->md.uuid[UI_BITMAP];
2750                 if (bm_uuid)
2751                         dev_warn(DEV, "bm UUID was already set: %llX\n", bm_uuid);
2752
2753                 mdev->ldev->md.uuid[UI_BITMAP] = val & ~((u64)1);
2754         }
2755         drbd_md_mark_dirty(mdev);
2756 }
2757
2758 /**
2759  * drbd_bmio_set_n_write() - io_fn for drbd_queue_bitmap_io() or drbd_bitmap_io()
2760  * @mdev:       DRBD device.
2761  *
2762  * Sets all bits in the bitmap and writes the whole bitmap to stable storage.
2763  */
2764 int drbd_bmio_set_n_write(struct drbd_conf *mdev)
2765 {
2766         int rv = -EIO;
2767
2768         if (get_ldev_if_state(mdev, D_ATTACHING)) {
2769                 drbd_md_set_flag(mdev, MDF_FULL_SYNC);
2770                 drbd_md_sync(mdev);
2771                 drbd_bm_set_all(mdev);
2772
2773                 rv = drbd_bm_write(mdev);
2774
2775                 if (!rv) {
2776                         drbd_md_clear_flag(mdev, MDF_FULL_SYNC);
2777                         drbd_md_sync(mdev);
2778                 }
2779
2780                 put_ldev(mdev);
2781         }
2782
2783         return rv;
2784 }
2785
2786 /**
2787  * drbd_bmio_clear_n_write() - io_fn for drbd_queue_bitmap_io() or drbd_bitmap_io()
2788  * @mdev:       DRBD device.
2789  *
2790  * Clears all bits in the bitmap and writes the whole bitmap to stable storage.
2791  */
2792 int drbd_bmio_clear_n_write(struct drbd_conf *mdev)
2793 {
2794         int rv = -EIO;
2795
2796         drbd_resume_al(mdev);
2797         if (get_ldev_if_state(mdev, D_ATTACHING)) {
2798                 drbd_bm_clear_all(mdev);
2799                 rv = drbd_bm_write(mdev);
2800                 put_ldev(mdev);
2801         }
2802
2803         return rv;
2804 }
2805
2806 static int w_bitmap_io(struct drbd_work *w, int unused)
2807 {
2808         struct bm_io_work *work = container_of(w, struct bm_io_work, w);
2809         struct drbd_conf *mdev = w->mdev;
2810         int rv = -EIO;
2811
2812         D_ASSERT(atomic_read(&mdev->ap_bio_cnt) == 0);
2813
2814         if (get_ldev(mdev)) {
2815                 drbd_bm_lock(mdev, work->why, work->flags);
2816                 rv = work->io_fn(mdev);
2817                 drbd_bm_unlock(mdev);
2818                 put_ldev(mdev);
2819         }
2820
2821         clear_bit(BITMAP_IO, &mdev->flags);
2822         smp_mb__after_clear_bit();
2823         wake_up(&mdev->misc_wait);
2824
2825         if (work->done)
2826                 work->done(mdev, rv);
2827
2828         clear_bit(BITMAP_IO_QUEUED, &mdev->flags);
2829         work->why = NULL;
2830         work->flags = 0;
2831
2832         return 1;
2833 }
2834
2835 void drbd_ldev_destroy(struct drbd_conf *mdev)
2836 {
2837         lc_destroy(mdev->resync);
2838         mdev->resync = NULL;
2839         lc_destroy(mdev->act_log);
2840         mdev->act_log = NULL;
2841         __no_warn(local,
2842                 drbd_free_bc(mdev->ldev);
2843                 mdev->ldev = NULL;);
2844
2845         if (mdev->md_io_tmpp) {
2846                 __free_page(mdev->md_io_tmpp);
2847                 mdev->md_io_tmpp = NULL;
2848         }
2849         clear_bit(GO_DISKLESS, &mdev->flags);
2850 }
2851
2852 static int w_go_diskless(struct drbd_work *w, int unused)
2853 {
2854         struct drbd_conf *mdev = w->mdev;
2855
2856         D_ASSERT(mdev->state.disk == D_FAILED);
2857         /* we cannot assert local_cnt == 0 here, as get_ldev_if_state will
2858          * inc/dec it frequently. Once we are D_DISKLESS, no one will touch
2859          * the protected members anymore, though, so once put_ldev reaches zero
2860          * again, it will be safe to free them. */
2861         drbd_force_state(mdev, NS(disk, D_DISKLESS));
2862         return 1;
2863 }
2864
2865 void drbd_go_diskless(struct drbd_conf *mdev)
2866 {
2867         D_ASSERT(mdev->state.disk == D_FAILED);
2868         if (!test_and_set_bit(GO_DISKLESS, &mdev->flags))
2869                 drbd_queue_work(&mdev->tconn->data.work, &mdev->go_diskless);
2870 }
2871
2872 /**
2873  * drbd_queue_bitmap_io() - Queues an IO operation on the whole bitmap
2874  * @mdev:       DRBD device.
2875  * @io_fn:      IO callback to be called when bitmap IO is possible
2876  * @done:       callback to be called after the bitmap IO was performed
2877  * @why:        Descriptive text of the reason for doing the IO
2878  *
2879  * While IO on the bitmap happens we freeze application IO thus we ensure
2880  * that drbd_set_out_of_sync() can not be called. This function MAY ONLY be
2881  * called from worker context. It MUST NOT be used while a previous such
2882  * work is still pending!
2883  */
2884 void drbd_queue_bitmap_io(struct drbd_conf *mdev,
2885                           int (*io_fn)(struct drbd_conf *),
2886                           void (*done)(struct drbd_conf *, int),
2887                           char *why, enum bm_flag flags)
2888 {
2889         D_ASSERT(current == mdev->tconn->worker.task);
2890
2891         D_ASSERT(!test_bit(BITMAP_IO_QUEUED, &mdev->flags));
2892         D_ASSERT(!test_bit(BITMAP_IO, &mdev->flags));
2893         D_ASSERT(list_empty(&mdev->bm_io_work.w.list));
2894         if (mdev->bm_io_work.why)
2895                 dev_err(DEV, "FIXME going to queue '%s' but '%s' still pending?\n",
2896                         why, mdev->bm_io_work.why);
2897
2898         mdev->bm_io_work.io_fn = io_fn;
2899         mdev->bm_io_work.done = done;
2900         mdev->bm_io_work.why = why;
2901         mdev->bm_io_work.flags = flags;
2902
2903         spin_lock_irq(&mdev->tconn->req_lock);
2904         set_bit(BITMAP_IO, &mdev->flags);
2905         if (atomic_read(&mdev->ap_bio_cnt) == 0) {
2906                 if (!test_and_set_bit(BITMAP_IO_QUEUED, &mdev->flags))
2907                         drbd_queue_work(&mdev->tconn->data.work, &mdev->bm_io_work.w);
2908         }
2909         spin_unlock_irq(&mdev->tconn->req_lock);
2910 }
2911
2912 /**
2913  * drbd_bitmap_io() -  Does an IO operation on the whole bitmap
2914  * @mdev:       DRBD device.
2915  * @io_fn:      IO callback to be called when bitmap IO is possible
2916  * @why:        Descriptive text of the reason for doing the IO
2917  *
2918  * freezes application IO while that the actual IO operations runs. This
2919  * functions MAY NOT be called from worker context.
2920  */
2921 int drbd_bitmap_io(struct drbd_conf *mdev, int (*io_fn)(struct drbd_conf *),
2922                 char *why, enum bm_flag flags)
2923 {
2924         int rv;
2925
2926         D_ASSERT(current != mdev->tconn->worker.task);
2927
2928         if ((flags & BM_LOCKED_SET_ALLOWED) == 0)
2929                 drbd_suspend_io(mdev);
2930
2931         drbd_bm_lock(mdev, why, flags);
2932         rv = io_fn(mdev);
2933         drbd_bm_unlock(mdev);
2934
2935         if ((flags & BM_LOCKED_SET_ALLOWED) == 0)
2936                 drbd_resume_io(mdev);
2937
2938         return rv;
2939 }
2940
2941 void drbd_md_set_flag(struct drbd_conf *mdev, int flag) __must_hold(local)
2942 {
2943         if ((mdev->ldev->md.flags & flag) != flag) {
2944                 drbd_md_mark_dirty(mdev);
2945                 mdev->ldev->md.flags |= flag;
2946         }
2947 }
2948
2949 void drbd_md_clear_flag(struct drbd_conf *mdev, int flag) __must_hold(local)
2950 {
2951         if ((mdev->ldev->md.flags & flag) != 0) {
2952                 drbd_md_mark_dirty(mdev);
2953                 mdev->ldev->md.flags &= ~flag;
2954         }
2955 }
2956 int drbd_md_test_flag(struct drbd_backing_dev *bdev, int flag)
2957 {
2958         return (bdev->md.flags & flag) != 0;
2959 }
2960
2961 static void md_sync_timer_fn(unsigned long data)
2962 {
2963         struct drbd_conf *mdev = (struct drbd_conf *) data;
2964
2965         drbd_queue_work_front(&mdev->tconn->data.work, &mdev->md_sync_work);
2966 }
2967
2968 static int w_md_sync(struct drbd_work *w, int unused)
2969 {
2970         struct drbd_conf *mdev = w->mdev;
2971
2972         dev_warn(DEV, "md_sync_timer expired! Worker calls drbd_md_sync().\n");
2973 #ifdef DEBUG
2974         dev_warn(DEV, "last md_mark_dirty: %s:%u\n",
2975                 mdev->last_md_mark_dirty.func, mdev->last_md_mark_dirty.line);
2976 #endif
2977         drbd_md_sync(mdev);
2978         return 1;
2979 }
2980
2981 const char *cmdname(enum drbd_packet cmd)
2982 {
2983         /* THINK may need to become several global tables
2984          * when we want to support more than
2985          * one PRO_VERSION */
2986         static const char *cmdnames[] = {
2987                 [P_DATA]                = "Data",
2988                 [P_DATA_REPLY]          = "DataReply",
2989                 [P_RS_DATA_REPLY]       = "RSDataReply",
2990                 [P_BARRIER]             = "Barrier",
2991                 [P_BITMAP]              = "ReportBitMap",
2992                 [P_BECOME_SYNC_TARGET]  = "BecomeSyncTarget",
2993                 [P_BECOME_SYNC_SOURCE]  = "BecomeSyncSource",
2994                 [P_UNPLUG_REMOTE]       = "UnplugRemote",
2995                 [P_DATA_REQUEST]        = "DataRequest",
2996                 [P_RS_DATA_REQUEST]     = "RSDataRequest",
2997                 [P_SYNC_PARAM]          = "SyncParam",
2998                 [P_SYNC_PARAM89]        = "SyncParam89",
2999                 [P_PROTOCOL]            = "ReportProtocol",
3000                 [P_UUIDS]               = "ReportUUIDs",
3001                 [P_SIZES]               = "ReportSizes",
3002                 [P_STATE]               = "ReportState",
3003                 [P_SYNC_UUID]           = "ReportSyncUUID",
3004                 [P_AUTH_CHALLENGE]      = "AuthChallenge",
3005                 [P_AUTH_RESPONSE]       = "AuthResponse",
3006                 [P_PING]                = "Ping",
3007                 [P_PING_ACK]            = "PingAck",
3008                 [P_RECV_ACK]            = "RecvAck",
3009                 [P_WRITE_ACK]           = "WriteAck",
3010                 [P_RS_WRITE_ACK]        = "RSWriteAck",
3011                 [P_DISCARD_ACK]         = "DiscardAck",
3012                 [P_NEG_ACK]             = "NegAck",
3013                 [P_NEG_DREPLY]          = "NegDReply",
3014                 [P_NEG_RS_DREPLY]       = "NegRSDReply",
3015                 [P_BARRIER_ACK]         = "BarrierAck",
3016                 [P_STATE_CHG_REQ]       = "StateChgRequest",
3017                 [P_STATE_CHG_REPLY]     = "StateChgReply",
3018                 [P_OV_REQUEST]          = "OVRequest",
3019                 [P_OV_REPLY]            = "OVReply",
3020                 [P_OV_RESULT]           = "OVResult",
3021                 [P_CSUM_RS_REQUEST]     = "CsumRSRequest",
3022                 [P_RS_IS_IN_SYNC]       = "CsumRSIsInSync",
3023                 [P_COMPRESSED_BITMAP]   = "CBitmap",
3024                 [P_DELAY_PROBE]         = "DelayProbe",
3025                 [P_OUT_OF_SYNC]         = "OutOfSync",
3026                 [P_MAX_CMD]             = NULL,
3027         };
3028
3029         if (cmd == P_HAND_SHAKE_M)
3030                 return "HandShakeM";
3031         if (cmd == P_HAND_SHAKE_S)
3032                 return "HandShakeS";
3033         if (cmd == P_HAND_SHAKE)
3034                 return "HandShake";
3035         if (cmd >= P_MAX_CMD)
3036                 return "Unknown";
3037         return cmdnames[cmd];
3038 }
3039
3040 #ifdef CONFIG_DRBD_FAULT_INJECTION
3041 /* Fault insertion support including random number generator shamelessly
3042  * stolen from kernel/rcutorture.c */
3043 struct fault_random_state {
3044         unsigned long state;
3045         unsigned long count;
3046 };
3047
3048 #define FAULT_RANDOM_MULT 39916801  /* prime */
3049 #define FAULT_RANDOM_ADD        479001701 /* prime */
3050 #define FAULT_RANDOM_REFRESH 10000
3051
3052 /*
3053  * Crude but fast random-number generator.  Uses a linear congruential
3054  * generator, with occasional help from get_random_bytes().
3055  */
3056 static unsigned long
3057 _drbd_fault_random(struct fault_random_state *rsp)
3058 {
3059         long refresh;
3060
3061         if (!rsp->count--) {
3062                 get_random_bytes(&refresh, sizeof(refresh));
3063                 rsp->state += refresh;
3064                 rsp->count = FAULT_RANDOM_REFRESH;
3065         }
3066         rsp->state = rsp->state * FAULT_RANDOM_MULT + FAULT_RANDOM_ADD;
3067         return swahw32(rsp->state);
3068 }
3069
3070 static char *
3071 _drbd_fault_str(unsigned int type) {
3072         static char *_faults[] = {
3073                 [DRBD_FAULT_MD_WR] = "Meta-data write",
3074                 [DRBD_FAULT_MD_RD] = "Meta-data read",
3075                 [DRBD_FAULT_RS_WR] = "Resync write",
3076                 [DRBD_FAULT_RS_RD] = "Resync read",
3077                 [DRBD_FAULT_DT_WR] = "Data write",
3078                 [DRBD_FAULT_DT_RD] = "Data read",
3079                 [DRBD_FAULT_DT_RA] = "Data read ahead",
3080                 [DRBD_FAULT_BM_ALLOC] = "BM allocation",
3081                 [DRBD_FAULT_AL_EE] = "EE allocation",
3082                 [DRBD_FAULT_RECEIVE] = "receive data corruption",
3083         };
3084
3085         return (type < DRBD_FAULT_MAX) ? _faults[type] : "**Unknown**";
3086 }
3087
3088 unsigned int
3089 _drbd_insert_fault(struct drbd_conf *mdev, unsigned int type)
3090 {
3091         static struct fault_random_state rrs = {0, 0};
3092
3093         unsigned int ret = (
3094                 (fault_devs == 0 ||
3095                         ((1 << mdev_to_minor(mdev)) & fault_devs) != 0) &&
3096                 (((_drbd_fault_random(&rrs) % 100) + 1) <= fault_rate));
3097
3098         if (ret) {
3099                 fault_count++;
3100
3101                 if (__ratelimit(&drbd_ratelimit_state))
3102                         dev_warn(DEV, "***Simulating %s failure\n",
3103                                 _drbd_fault_str(type));
3104         }
3105
3106         return ret;
3107 }
3108 #endif
3109
3110 const char *drbd_buildtag(void)
3111 {
3112         /* DRBD built from external sources has here a reference to the
3113            git hash of the source code. */
3114
3115         static char buildtag[38] = "\0uilt-in";
3116
3117         if (buildtag[0] == 0) {
3118 #ifdef CONFIG_MODULES
3119                 if (THIS_MODULE != NULL)
3120                         sprintf(buildtag, "srcversion: %-24s", THIS_MODULE->srcversion);
3121                 else
3122 #endif
3123                         buildtag[0] = 'b';
3124         }
3125
3126         return buildtag;
3127 }
3128
3129 module_init(drbd_init)
3130 module_exit(drbd_cleanup)
3131
3132 EXPORT_SYMBOL(drbd_conn_str);
3133 EXPORT_SYMBOL(drbd_role_str);
3134 EXPORT_SYMBOL(drbd_disk_str);
3135 EXPORT_SYMBOL(drbd_set_st_err_str);