drbd: Bugfix for the connection behavior
[platform/adaptation/renesas_rcar/renesas_kernel.git] / drivers / block / drbd / drbd_receiver.c
1 /*
2    drbd_receiver.c
3
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10    drbd is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2, or (at your option)
13    any later version.
14
15    drbd is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19
20    You should have received a copy of the GNU General Public License
21    along with drbd; see the file COPYING.  If not, write to
22    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23  */
24
25
26 #include <linux/module.h>
27
28 #include <asm/uaccess.h>
29 #include <net/sock.h>
30
31 #include <linux/drbd.h>
32 #include <linux/fs.h>
33 #include <linux/file.h>
34 #include <linux/in.h>
35 #include <linux/mm.h>
36 #include <linux/memcontrol.h>
37 #include <linux/mm_inline.h>
38 #include <linux/slab.h>
39 #include <linux/pkt_sched.h>
40 #define __KERNEL_SYSCALLS__
41 #include <linux/unistd.h>
42 #include <linux/vmalloc.h>
43 #include <linux/random.h>
44 #include <linux/string.h>
45 #include <linux/scatterlist.h>
46 #include "drbd_int.h"
47 #include "drbd_req.h"
48
49 #include "drbd_vli.h"
50
51 struct packet_info {
52         enum drbd_packet cmd;
53         unsigned int size;
54         unsigned int vnr;
55         void *data;
56 };
57
58 enum finish_epoch {
59         FE_STILL_LIVE,
60         FE_DESTROYED,
61         FE_RECYCLED,
62 };
63
64 static int drbd_do_features(struct drbd_tconn *tconn);
65 static int drbd_do_auth(struct drbd_tconn *tconn);
66 static int drbd_disconnected(struct drbd_conf *mdev);
67
68 static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *, struct drbd_epoch *, enum epoch_event);
69 static int e_end_block(struct drbd_work *, int);
70
71
72 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
73
74 /*
75  * some helper functions to deal with single linked page lists,
76  * page->private being our "next" pointer.
77  */
78
79 /* If at least n pages are linked at head, get n pages off.
80  * Otherwise, don't modify head, and return NULL.
81  * Locking is the responsibility of the caller.
82  */
83 static struct page *page_chain_del(struct page **head, int n)
84 {
85         struct page *page;
86         struct page *tmp;
87
88         BUG_ON(!n);
89         BUG_ON(!head);
90
91         page = *head;
92
93         if (!page)
94                 return NULL;
95
96         while (page) {
97                 tmp = page_chain_next(page);
98                 if (--n == 0)
99                         break; /* found sufficient pages */
100                 if (tmp == NULL)
101                         /* insufficient pages, don't use any of them. */
102                         return NULL;
103                 page = tmp;
104         }
105
106         /* add end of list marker for the returned list */
107         set_page_private(page, 0);
108         /* actual return value, and adjustment of head */
109         page = *head;
110         *head = tmp;
111         return page;
112 }
113
114 /* may be used outside of locks to find the tail of a (usually short)
115  * "private" page chain, before adding it back to a global chain head
116  * with page_chain_add() under a spinlock. */
117 static struct page *page_chain_tail(struct page *page, int *len)
118 {
119         struct page *tmp;
120         int i = 1;
121         while ((tmp = page_chain_next(page)))
122                 ++i, page = tmp;
123         if (len)
124                 *len = i;
125         return page;
126 }
127
128 static int page_chain_free(struct page *page)
129 {
130         struct page *tmp;
131         int i = 0;
132         page_chain_for_each_safe(page, tmp) {
133                 put_page(page);
134                 ++i;
135         }
136         return i;
137 }
138
139 static void page_chain_add(struct page **head,
140                 struct page *chain_first, struct page *chain_last)
141 {
142 #if 1
143         struct page *tmp;
144         tmp = page_chain_tail(chain_first, NULL);
145         BUG_ON(tmp != chain_last);
146 #endif
147
148         /* add chain to head */
149         set_page_private(chain_last, (unsigned long)*head);
150         *head = chain_first;
151 }
152
153 static struct page *__drbd_alloc_pages(struct drbd_conf *mdev,
154                                        unsigned int number)
155 {
156         struct page *page = NULL;
157         struct page *tmp = NULL;
158         unsigned int i = 0;
159
160         /* Yes, testing drbd_pp_vacant outside the lock is racy.
161          * So what. It saves a spin_lock. */
162         if (drbd_pp_vacant >= number) {
163                 spin_lock(&drbd_pp_lock);
164                 page = page_chain_del(&drbd_pp_pool, number);
165                 if (page)
166                         drbd_pp_vacant -= number;
167                 spin_unlock(&drbd_pp_lock);
168                 if (page)
169                         return page;
170         }
171
172         /* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
173          * "criss-cross" setup, that might cause write-out on some other DRBD,
174          * which in turn might block on the other node at this very place.  */
175         for (i = 0; i < number; i++) {
176                 tmp = alloc_page(GFP_TRY);
177                 if (!tmp)
178                         break;
179                 set_page_private(tmp, (unsigned long)page);
180                 page = tmp;
181         }
182
183         if (i == number)
184                 return page;
185
186         /* Not enough pages immediately available this time.
187          * No need to jump around here, drbd_alloc_pages will retry this
188          * function "soon". */
189         if (page) {
190                 tmp = page_chain_tail(page, NULL);
191                 spin_lock(&drbd_pp_lock);
192                 page_chain_add(&drbd_pp_pool, page, tmp);
193                 drbd_pp_vacant += i;
194                 spin_unlock(&drbd_pp_lock);
195         }
196         return NULL;
197 }
198
199 static void reclaim_finished_net_peer_reqs(struct drbd_conf *mdev,
200                                            struct list_head *to_be_freed)
201 {
202         struct drbd_peer_request *peer_req;
203         struct list_head *le, *tle;
204
205         /* The EEs are always appended to the end of the list. Since
206            they are sent in order over the wire, they have to finish
207            in order. As soon as we see the first not finished we can
208            stop to examine the list... */
209
210         list_for_each_safe(le, tle, &mdev->net_ee) {
211                 peer_req = list_entry(le, struct drbd_peer_request, w.list);
212                 if (drbd_peer_req_has_active_page(peer_req))
213                         break;
214                 list_move(le, to_be_freed);
215         }
216 }
217
218 static void drbd_kick_lo_and_reclaim_net(struct drbd_conf *mdev)
219 {
220         LIST_HEAD(reclaimed);
221         struct drbd_peer_request *peer_req, *t;
222
223         spin_lock_irq(&mdev->tconn->req_lock);
224         reclaim_finished_net_peer_reqs(mdev, &reclaimed);
225         spin_unlock_irq(&mdev->tconn->req_lock);
226
227         list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
228                 drbd_free_net_peer_req(mdev, peer_req);
229 }
230
231 /**
232  * drbd_alloc_pages() - Returns @number pages, retries forever (or until signalled)
233  * @mdev:       DRBD device.
234  * @number:     number of pages requested
235  * @retry:      whether to retry, if not enough pages are available right now
236  *
237  * Tries to allocate number pages, first from our own page pool, then from
238  * the kernel, unless this allocation would exceed the max_buffers setting.
239  * Possibly retry until DRBD frees sufficient pages somewhere else.
240  *
241  * Returns a page chain linked via page->private.
242  */
243 struct page *drbd_alloc_pages(struct drbd_conf *mdev, unsigned int number,
244                               bool retry)
245 {
246         struct page *page = NULL;
247         struct net_conf *nc;
248         DEFINE_WAIT(wait);
249         int mxb;
250
251         /* Yes, we may run up to @number over max_buffers. If we
252          * follow it strictly, the admin will get it wrong anyways. */
253         rcu_read_lock();
254         nc = rcu_dereference(mdev->tconn->net_conf);
255         mxb = nc ? nc->max_buffers : 1000000;
256         rcu_read_unlock();
257
258         if (atomic_read(&mdev->pp_in_use) < mxb)
259                 page = __drbd_alloc_pages(mdev, number);
260
261         while (page == NULL) {
262                 prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
263
264                 drbd_kick_lo_and_reclaim_net(mdev);
265
266                 if (atomic_read(&mdev->pp_in_use) < mxb) {
267                         page = __drbd_alloc_pages(mdev, number);
268                         if (page)
269                                 break;
270                 }
271
272                 if (!retry)
273                         break;
274
275                 if (signal_pending(current)) {
276                         dev_warn(DEV, "drbd_alloc_pages interrupted!\n");
277                         break;
278                 }
279
280                 schedule();
281         }
282         finish_wait(&drbd_pp_wait, &wait);
283
284         if (page)
285                 atomic_add(number, &mdev->pp_in_use);
286         return page;
287 }
288
289 /* Must not be used from irq, as that may deadlock: see drbd_alloc_pages.
290  * Is also used from inside an other spin_lock_irq(&mdev->tconn->req_lock);
291  * Either links the page chain back to the global pool,
292  * or returns all pages to the system. */
293 static void drbd_free_pages(struct drbd_conf *mdev, struct page *page, int is_net)
294 {
295         atomic_t *a = is_net ? &mdev->pp_in_use_by_net : &mdev->pp_in_use;
296         int i;
297
298         if (drbd_pp_vacant > (DRBD_MAX_BIO_SIZE/PAGE_SIZE) * minor_count)
299                 i = page_chain_free(page);
300         else {
301                 struct page *tmp;
302                 tmp = page_chain_tail(page, &i);
303                 spin_lock(&drbd_pp_lock);
304                 page_chain_add(&drbd_pp_pool, page, tmp);
305                 drbd_pp_vacant += i;
306                 spin_unlock(&drbd_pp_lock);
307         }
308         i = atomic_sub_return(i, a);
309         if (i < 0)
310                 dev_warn(DEV, "ASSERTION FAILED: %s: %d < 0\n",
311                         is_net ? "pp_in_use_by_net" : "pp_in_use", i);
312         wake_up(&drbd_pp_wait);
313 }
314
315 /*
316 You need to hold the req_lock:
317  _drbd_wait_ee_list_empty()
318
319 You must not have the req_lock:
320  drbd_free_peer_req()
321  drbd_alloc_peer_req()
322  drbd_free_peer_reqs()
323  drbd_ee_fix_bhs()
324  drbd_finish_peer_reqs()
325  drbd_clear_done_ee()
326  drbd_wait_ee_list_empty()
327 */
328
329 struct drbd_peer_request *
330 drbd_alloc_peer_req(struct drbd_conf *mdev, u64 id, sector_t sector,
331                     unsigned int data_size, gfp_t gfp_mask) __must_hold(local)
332 {
333         struct drbd_peer_request *peer_req;
334         struct page *page;
335         unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
336
337         if (drbd_insert_fault(mdev, DRBD_FAULT_AL_EE))
338                 return NULL;
339
340         peer_req = mempool_alloc(drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM);
341         if (!peer_req) {
342                 if (!(gfp_mask & __GFP_NOWARN))
343                         dev_err(DEV, "%s: allocation failed\n", __func__);
344                 return NULL;
345         }
346
347         page = drbd_alloc_pages(mdev, nr_pages, (gfp_mask & __GFP_WAIT));
348         if (!page)
349                 goto fail;
350
351         drbd_clear_interval(&peer_req->i);
352         peer_req->i.size = data_size;
353         peer_req->i.sector = sector;
354         peer_req->i.local = false;
355         peer_req->i.waiting = false;
356
357         peer_req->epoch = NULL;
358         peer_req->w.mdev = mdev;
359         peer_req->pages = page;
360         atomic_set(&peer_req->pending_bios, 0);
361         peer_req->flags = 0;
362         /*
363          * The block_id is opaque to the receiver.  It is not endianness
364          * converted, and sent back to the sender unchanged.
365          */
366         peer_req->block_id = id;
367
368         return peer_req;
369
370  fail:
371         mempool_free(peer_req, drbd_ee_mempool);
372         return NULL;
373 }
374
375 void __drbd_free_peer_req(struct drbd_conf *mdev, struct drbd_peer_request *peer_req,
376                        int is_net)
377 {
378         if (peer_req->flags & EE_HAS_DIGEST)
379                 kfree(peer_req->digest);
380         drbd_free_pages(mdev, peer_req->pages, is_net);
381         D_ASSERT(atomic_read(&peer_req->pending_bios) == 0);
382         D_ASSERT(drbd_interval_empty(&peer_req->i));
383         mempool_free(peer_req, drbd_ee_mempool);
384 }
385
386 int drbd_free_peer_reqs(struct drbd_conf *mdev, struct list_head *list)
387 {
388         LIST_HEAD(work_list);
389         struct drbd_peer_request *peer_req, *t;
390         int count = 0;
391         int is_net = list == &mdev->net_ee;
392
393         spin_lock_irq(&mdev->tconn->req_lock);
394         list_splice_init(list, &work_list);
395         spin_unlock_irq(&mdev->tconn->req_lock);
396
397         list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
398                 __drbd_free_peer_req(mdev, peer_req, is_net);
399                 count++;
400         }
401         return count;
402 }
403
404 /*
405  * See also comments in _req_mod(,BARRIER_ACKED) and receive_Barrier.
406  */
407 static int drbd_finish_peer_reqs(struct drbd_conf *mdev)
408 {
409         LIST_HEAD(work_list);
410         LIST_HEAD(reclaimed);
411         struct drbd_peer_request *peer_req, *t;
412         int err = 0;
413
414         spin_lock_irq(&mdev->tconn->req_lock);
415         reclaim_finished_net_peer_reqs(mdev, &reclaimed);
416         list_splice_init(&mdev->done_ee, &work_list);
417         spin_unlock_irq(&mdev->tconn->req_lock);
418
419         list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
420                 drbd_free_net_peer_req(mdev, peer_req);
421
422         /* possible callbacks here:
423          * e_end_block, and e_end_resync_block, e_send_discard_write.
424          * all ignore the last argument.
425          */
426         list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
427                 int err2;
428
429                 /* list_del not necessary, next/prev members not touched */
430                 err2 = peer_req->w.cb(&peer_req->w, !!err);
431                 if (!err)
432                         err = err2;
433                 drbd_free_peer_req(mdev, peer_req);
434         }
435         wake_up(&mdev->ee_wait);
436
437         return err;
438 }
439
440 static void _drbd_wait_ee_list_empty(struct drbd_conf *mdev,
441                                      struct list_head *head)
442 {
443         DEFINE_WAIT(wait);
444
445         /* avoids spin_lock/unlock
446          * and calling prepare_to_wait in the fast path */
447         while (!list_empty(head)) {
448                 prepare_to_wait(&mdev->ee_wait, &wait, TASK_UNINTERRUPTIBLE);
449                 spin_unlock_irq(&mdev->tconn->req_lock);
450                 io_schedule();
451                 finish_wait(&mdev->ee_wait, &wait);
452                 spin_lock_irq(&mdev->tconn->req_lock);
453         }
454 }
455
456 static void drbd_wait_ee_list_empty(struct drbd_conf *mdev,
457                                     struct list_head *head)
458 {
459         spin_lock_irq(&mdev->tconn->req_lock);
460         _drbd_wait_ee_list_empty(mdev, head);
461         spin_unlock_irq(&mdev->tconn->req_lock);
462 }
463
464 /* see also kernel_accept; which is only present since 2.6.18.
465  * also we want to log which part of it failed, exactly */
466 static int drbd_accept(const char **what, struct socket *sock, struct socket **newsock)
467 {
468         struct sock *sk = sock->sk;
469         int err = 0;
470
471         *what = "listen";
472         err = sock->ops->listen(sock, 5);
473         if (err < 0)
474                 goto out;
475
476         *what = "sock_create_lite";
477         err = sock_create_lite(sk->sk_family, sk->sk_type, sk->sk_protocol,
478                                newsock);
479         if (err < 0)
480                 goto out;
481
482         *what = "accept";
483         err = sock->ops->accept(sock, *newsock, 0);
484         if (err < 0) {
485                 sock_release(*newsock);
486                 *newsock = NULL;
487                 goto out;
488         }
489         (*newsock)->ops  = sock->ops;
490
491 out:
492         return err;
493 }
494
495 static int drbd_recv_short(struct socket *sock, void *buf, size_t size, int flags)
496 {
497         mm_segment_t oldfs;
498         struct kvec iov = {
499                 .iov_base = buf,
500                 .iov_len = size,
501         };
502         struct msghdr msg = {
503                 .msg_iovlen = 1,
504                 .msg_iov = (struct iovec *)&iov,
505                 .msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL)
506         };
507         int rv;
508
509         oldfs = get_fs();
510         set_fs(KERNEL_DS);
511         rv = sock_recvmsg(sock, &msg, size, msg.msg_flags);
512         set_fs(oldfs);
513
514         return rv;
515 }
516
517 static int drbd_recv(struct drbd_tconn *tconn, void *buf, size_t size)
518 {
519         mm_segment_t oldfs;
520         struct kvec iov = {
521                 .iov_base = buf,
522                 .iov_len = size,
523         };
524         struct msghdr msg = {
525                 .msg_iovlen = 1,
526                 .msg_iov = (struct iovec *)&iov,
527                 .msg_flags = MSG_WAITALL | MSG_NOSIGNAL
528         };
529         int rv;
530
531         oldfs = get_fs();
532         set_fs(KERNEL_DS);
533
534         for (;;) {
535                 rv = sock_recvmsg(tconn->data.socket, &msg, size, msg.msg_flags);
536                 if (rv == size)
537                         break;
538
539                 /* Note:
540                  * ECONNRESET   other side closed the connection
541                  * ERESTARTSYS  (on  sock) we got a signal
542                  */
543
544                 if (rv < 0) {
545                         if (rv == -ECONNRESET)
546                                 conn_info(tconn, "sock was reset by peer\n");
547                         else if (rv != -ERESTARTSYS)
548                                 conn_err(tconn, "sock_recvmsg returned %d\n", rv);
549                         break;
550                 } else if (rv == 0) {
551                         conn_info(tconn, "sock was shut down by peer\n");
552                         break;
553                 } else  {
554                         /* signal came in, or peer/link went down,
555                          * after we read a partial message
556                          */
557                         /* D_ASSERT(signal_pending(current)); */
558                         break;
559                 }
560         };
561
562         set_fs(oldfs);
563
564         if (rv != size)
565                 conn_request_state(tconn, NS(conn, C_BROKEN_PIPE), CS_HARD);
566
567         return rv;
568 }
569
570 static int drbd_recv_all(struct drbd_tconn *tconn, void *buf, size_t size)
571 {
572         int err;
573
574         err = drbd_recv(tconn, buf, size);
575         if (err != size) {
576                 if (err >= 0)
577                         err = -EIO;
578         } else
579                 err = 0;
580         return err;
581 }
582
583 static int drbd_recv_all_warn(struct drbd_tconn *tconn, void *buf, size_t size)
584 {
585         int err;
586
587         err = drbd_recv_all(tconn, buf, size);
588         if (err && !signal_pending(current))
589                 conn_warn(tconn, "short read (expected size %d)\n", (int)size);
590         return err;
591 }
592
593 /* quoting tcp(7):
594  *   On individual connections, the socket buffer size must be set prior to the
595  *   listen(2) or connect(2) calls in order to have it take effect.
596  * This is our wrapper to do so.
597  */
598 static void drbd_setbufsize(struct socket *sock, unsigned int snd,
599                 unsigned int rcv)
600 {
601         /* open coded SO_SNDBUF, SO_RCVBUF */
602         if (snd) {
603                 sock->sk->sk_sndbuf = snd;
604                 sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
605         }
606         if (rcv) {
607                 sock->sk->sk_rcvbuf = rcv;
608                 sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
609         }
610 }
611
612 static struct socket *drbd_try_connect(struct drbd_tconn *tconn)
613 {
614         const char *what;
615         struct socket *sock;
616         struct sockaddr_in6 src_in6;
617         struct sockaddr_in6 peer_in6;
618         struct net_conf *nc;
619         int err, peer_addr_len, my_addr_len;
620         int sndbuf_size, rcvbuf_size, connect_int;
621         int disconnect_on_error = 1;
622
623         rcu_read_lock();
624         nc = rcu_dereference(tconn->net_conf);
625         if (!nc) {
626                 rcu_read_unlock();
627                 return NULL;
628         }
629         sndbuf_size = nc->sndbuf_size;
630         rcvbuf_size = nc->rcvbuf_size;
631         connect_int = nc->connect_int;
632         rcu_read_unlock();
633
634         my_addr_len = min_t(int, tconn->my_addr_len, sizeof(src_in6));
635         memcpy(&src_in6, &tconn->my_addr, my_addr_len);
636
637         if (((struct sockaddr *)&tconn->my_addr)->sa_family == AF_INET6)
638                 src_in6.sin6_port = 0;
639         else
640                 ((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */
641
642         peer_addr_len = min_t(int, tconn->peer_addr_len, sizeof(src_in6));
643         memcpy(&peer_in6, &tconn->peer_addr, peer_addr_len);
644
645         what = "sock_create_kern";
646         err = sock_create_kern(((struct sockaddr *)&src_in6)->sa_family,
647                                SOCK_STREAM, IPPROTO_TCP, &sock);
648         if (err < 0) {
649                 sock = NULL;
650                 goto out;
651         }
652
653         sock->sk->sk_rcvtimeo =
654         sock->sk->sk_sndtimeo = connect_int * HZ;
655         drbd_setbufsize(sock, sndbuf_size, rcvbuf_size);
656
657        /* explicitly bind to the configured IP as source IP
658         *  for the outgoing connections.
659         *  This is needed for multihomed hosts and to be
660         *  able to use lo: interfaces for drbd.
661         * Make sure to use 0 as port number, so linux selects
662         *  a free one dynamically.
663         */
664         what = "bind before connect";
665         err = sock->ops->bind(sock, (struct sockaddr *) &src_in6, my_addr_len);
666         if (err < 0)
667                 goto out;
668
669         /* connect may fail, peer not yet available.
670          * stay C_WF_CONNECTION, don't go Disconnecting! */
671         disconnect_on_error = 0;
672         what = "connect";
673         err = sock->ops->connect(sock, (struct sockaddr *) &peer_in6, peer_addr_len, 0);
674
675 out:
676         if (err < 0) {
677                 if (sock) {
678                         sock_release(sock);
679                         sock = NULL;
680                 }
681                 switch (-err) {
682                         /* timeout, busy, signal pending */
683                 case ETIMEDOUT: case EAGAIN: case EINPROGRESS:
684                 case EINTR: case ERESTARTSYS:
685                         /* peer not (yet) available, network problem */
686                 case ECONNREFUSED: case ENETUNREACH:
687                 case EHOSTDOWN:    case EHOSTUNREACH:
688                         disconnect_on_error = 0;
689                         break;
690                 default:
691                         conn_err(tconn, "%s failed, err = %d\n", what, err);
692                 }
693                 if (disconnect_on_error)
694                         conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
695         }
696
697         return sock;
698 }
699
700 static struct socket *drbd_wait_for_connect(struct drbd_tconn *tconn)
701 {
702         int timeo, err, my_addr_len;
703         int sndbuf_size, rcvbuf_size, connect_int;
704         struct socket *s_estab = NULL, *s_listen;
705         struct sockaddr_in6 my_addr;
706         struct net_conf *nc;
707         const char *what;
708
709         rcu_read_lock();
710         nc = rcu_dereference(tconn->net_conf);
711         if (!nc) {
712                 rcu_read_unlock();
713                 return NULL;
714         }
715         sndbuf_size = nc->sndbuf_size;
716         rcvbuf_size = nc->rcvbuf_size;
717         connect_int = nc->connect_int;
718         rcu_read_unlock();
719
720         my_addr_len = min_t(int, tconn->my_addr_len, sizeof(struct sockaddr_in6));
721         memcpy(&my_addr, &tconn->my_addr, my_addr_len);
722
723         what = "sock_create_kern";
724         err = sock_create_kern(((struct sockaddr *)&my_addr)->sa_family,
725                 SOCK_STREAM, IPPROTO_TCP, &s_listen);
726         if (err) {
727                 s_listen = NULL;
728                 goto out;
729         }
730
731         timeo = connect_int * HZ;
732         timeo += (random32() & 1) ? timeo / 7 : -timeo / 7; /* 28.5% random jitter */
733
734         s_listen->sk->sk_reuse    = 1; /* SO_REUSEADDR */
735         s_listen->sk->sk_rcvtimeo = timeo;
736         s_listen->sk->sk_sndtimeo = timeo;
737         drbd_setbufsize(s_listen, sndbuf_size, rcvbuf_size);
738
739         what = "bind before listen";
740         err = s_listen->ops->bind(s_listen, (struct sockaddr *)&my_addr, my_addr_len);
741         if (err < 0)
742                 goto out;
743
744         err = drbd_accept(&what, s_listen, &s_estab);
745
746 out:
747         if (s_listen)
748                 sock_release(s_listen);
749         if (err < 0) {
750                 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
751                         conn_err(tconn, "%s failed, err = %d\n", what, err);
752                         conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
753                 }
754         }
755
756         return s_estab;
757 }
758
759 static int decode_header(struct drbd_tconn *, void *, struct packet_info *);
760
761 static int send_first_packet(struct drbd_tconn *tconn, struct drbd_socket *sock,
762                              enum drbd_packet cmd)
763 {
764         if (!conn_prepare_command(tconn, sock))
765                 return -EIO;
766         return conn_send_command(tconn, sock, cmd, 0, NULL, 0);
767 }
768
769 static int receive_first_packet(struct drbd_tconn *tconn, struct socket *sock)
770 {
771         unsigned int header_size = drbd_header_size(tconn);
772         struct packet_info pi;
773         int err;
774
775         err = drbd_recv_short(sock, tconn->data.rbuf, header_size, 0);
776         if (err != header_size) {
777                 if (err >= 0)
778                         err = -EIO;
779                 return err;
780         }
781         err = decode_header(tconn, tconn->data.rbuf, &pi);
782         if (err)
783                 return err;
784         return pi.cmd;
785 }
786
787 /**
788  * drbd_socket_okay() - Free the socket if its connection is not okay
789  * @sock:       pointer to the pointer to the socket.
790  */
791 static int drbd_socket_okay(struct socket **sock)
792 {
793         int rr;
794         char tb[4];
795
796         if (!*sock)
797                 return false;
798
799         rr = drbd_recv_short(*sock, tb, 4, MSG_DONTWAIT | MSG_PEEK);
800
801         if (rr > 0 || rr == -EAGAIN) {
802                 return true;
803         } else {
804                 sock_release(*sock);
805                 *sock = NULL;
806                 return false;
807         }
808 }
809 /* Gets called if a connection is established, or if a new minor gets created
810    in a connection */
811 int drbd_connected(struct drbd_conf *mdev)
812 {
813         int err;
814
815         atomic_set(&mdev->packet_seq, 0);
816         mdev->peer_seq = 0;
817
818         mdev->state_mutex = mdev->tconn->agreed_pro_version < 100 ?
819                 &mdev->tconn->cstate_mutex :
820                 &mdev->own_state_mutex;
821
822         err = drbd_send_sync_param(mdev);
823         if (!err)
824                 err = drbd_send_sizes(mdev, 0, 0);
825         if (!err)
826                 err = drbd_send_uuids(mdev);
827         if (!err)
828                 err = drbd_send_state(mdev);
829         clear_bit(USE_DEGR_WFC_T, &mdev->flags);
830         clear_bit(RESIZE_PENDING, &mdev->flags);
831         mod_timer(&mdev->request_timer, jiffies + HZ); /* just start it here. */
832         return err;
833 }
834
835 /*
836  * return values:
837  *   1 yes, we have a valid connection
838  *   0 oops, did not work out, please try again
839  *  -1 peer talks different language,
840  *     no point in trying again, please go standalone.
841  *  -2 We do not have a network config...
842  */
843 static int conn_connect(struct drbd_tconn *tconn)
844 {
845         struct socket *sock, *msock;
846         struct drbd_conf *mdev;
847         struct net_conf *nc;
848         int vnr, timeout, try, h, ok;
849
850         if (conn_request_state(tconn, NS(conn, C_WF_CONNECTION), CS_VERBOSE) < SS_SUCCESS)
851                 return -2;
852
853         clear_bit(DISCARD_CONCURRENT, &tconn->flags);
854
855         /* Assume that the peer only understands protocol 80 until we know better.  */
856         tconn->agreed_pro_version = 80;
857
858         do {
859                 struct socket *s;
860
861                 for (try = 0;;) {
862                         /* 3 tries, this should take less than a second! */
863                         s = drbd_try_connect(tconn);
864                         if (s || ++try >= 3)
865                                 break;
866                         /* give the other side time to call bind() & listen() */
867                         schedule_timeout_interruptible(HZ / 10);
868                 }
869
870                 if (s) {
871                         if (!tconn->data.socket) {
872                                 tconn->data.socket = s;
873                                 send_first_packet(tconn, &tconn->data, P_INITIAL_DATA);
874                         } else if (!tconn->meta.socket) {
875                                 tconn->meta.socket = s;
876                                 send_first_packet(tconn, &tconn->meta, P_INITIAL_META);
877                         } else {
878                                 conn_err(tconn, "Logic error in conn_connect()\n");
879                                 goto out_release_sockets;
880                         }
881                 }
882
883                 if (tconn->data.socket && tconn->meta.socket) {
884                         schedule_timeout_interruptible(tconn->net_conf->ping_timeo*HZ/10);
885                         ok = drbd_socket_okay(&tconn->data.socket);
886                         ok = drbd_socket_okay(&tconn->meta.socket) && ok;
887                         if (ok)
888                                 break;
889                 }
890
891 retry:
892                 s = drbd_wait_for_connect(tconn);
893                 if (s) {
894                         try = receive_first_packet(tconn, s);
895                         drbd_socket_okay(&tconn->data.socket);
896                         drbd_socket_okay(&tconn->meta.socket);
897                         switch (try) {
898                         case P_INITIAL_DATA:
899                                 if (tconn->data.socket) {
900                                         conn_warn(tconn, "initial packet S crossed\n");
901                                         sock_release(tconn->data.socket);
902                                 }
903                                 tconn->data.socket = s;
904                                 break;
905                         case P_INITIAL_META:
906                                 if (tconn->meta.socket) {
907                                         conn_warn(tconn, "initial packet M crossed\n");
908                                         sock_release(tconn->meta.socket);
909                                 }
910                                 tconn->meta.socket = s;
911                                 set_bit(DISCARD_CONCURRENT, &tconn->flags);
912                                 break;
913                         default:
914                                 conn_warn(tconn, "Error receiving initial packet\n");
915                                 sock_release(s);
916                                 if (random32() & 1)
917                                         goto retry;
918                         }
919                 }
920
921                 if (tconn->cstate <= C_DISCONNECTING)
922                         goto out_release_sockets;
923                 if (signal_pending(current)) {
924                         flush_signals(current);
925                         smp_rmb();
926                         if (get_t_state(&tconn->receiver) == EXITING)
927                                 goto out_release_sockets;
928                 }
929
930                 if (tconn->data.socket && &tconn->meta.socket) {
931                         ok = drbd_socket_okay(&tconn->data.socket);
932                         ok = drbd_socket_okay(&tconn->meta.socket) && ok;
933                         if (ok)
934                                 break;
935                 }
936         } while (1);
937
938         sock  = tconn->data.socket;
939         msock = tconn->meta.socket;
940
941         msock->sk->sk_reuse = 1; /* SO_REUSEADDR */
942         sock->sk->sk_reuse = 1; /* SO_REUSEADDR */
943
944         sock->sk->sk_allocation = GFP_NOIO;
945         msock->sk->sk_allocation = GFP_NOIO;
946
947         sock->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK;
948         msock->sk->sk_priority = TC_PRIO_INTERACTIVE;
949
950         /* NOT YET ...
951          * sock->sk->sk_sndtimeo = tconn->net_conf->timeout*HZ/10;
952          * sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
953          * first set it to the P_CONNECTION_FEATURES timeout,
954          * which we set to 4x the configured ping_timeout. */
955         rcu_read_lock();
956         nc = rcu_dereference(tconn->net_conf);
957
958         sock->sk->sk_sndtimeo =
959         sock->sk->sk_rcvtimeo = nc->ping_timeo*4*HZ/10;
960
961         msock->sk->sk_rcvtimeo = nc->ping_int*HZ;
962         timeout = nc->timeout * HZ / 10;
963         rcu_read_unlock();
964
965         msock->sk->sk_sndtimeo = timeout;
966
967         /* we don't want delays.
968          * we use TCP_CORK where appropriate, though */
969         drbd_tcp_nodelay(sock);
970         drbd_tcp_nodelay(msock);
971
972         tconn->last_received = jiffies;
973
974         h = drbd_do_features(tconn);
975         if (h <= 0)
976                 return h;
977
978         if (tconn->cram_hmac_tfm) {
979                 /* drbd_request_state(mdev, NS(conn, WFAuth)); */
980                 switch (drbd_do_auth(tconn)) {
981                 case -1:
982                         conn_err(tconn, "Authentication of peer failed\n");
983                         return -1;
984                 case 0:
985                         conn_err(tconn, "Authentication of peer failed, trying again.\n");
986                         return 0;
987                 }
988         }
989
990         sock->sk->sk_sndtimeo = timeout;
991         sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
992
993         if (drbd_send_protocol(tconn) == -EOPNOTSUPP)
994                 return -1;
995
996         rcu_read_lock();
997         idr_for_each_entry(&tconn->volumes, mdev, vnr) {
998                 kref_get(&mdev->kref);
999                 rcu_read_unlock();
1000                 drbd_connected(mdev);
1001                 kref_put(&mdev->kref, &drbd_minor_destroy);
1002                 rcu_read_lock();
1003         }
1004         rcu_read_unlock();
1005
1006         if (conn_request_state(tconn, NS(conn, C_WF_REPORT_PARAMS), CS_VERBOSE) < SS_SUCCESS)
1007                 return 0;
1008
1009         drbd_thread_start(&tconn->asender);
1010
1011         return h;
1012
1013 out_release_sockets:
1014         if (tconn->data.socket) {
1015                 sock_release(tconn->data.socket);
1016                 tconn->data.socket = NULL;
1017         }
1018         if (tconn->meta.socket) {
1019                 sock_release(tconn->meta.socket);
1020                 tconn->meta.socket = NULL;
1021         }
1022         return -1;
1023 }
1024
1025 static int decode_header(struct drbd_tconn *tconn, void *header, struct packet_info *pi)
1026 {
1027         unsigned int header_size = drbd_header_size(tconn);
1028
1029         if (header_size == sizeof(struct p_header100) &&
1030             *(__be32 *)header == cpu_to_be32(DRBD_MAGIC_100)) {
1031                 struct p_header100 *h = header;
1032                 if (h->pad != 0) {
1033                         conn_err(tconn, "Header padding is not zero\n");
1034                         return -EINVAL;
1035                 }
1036                 pi->vnr = be16_to_cpu(h->volume);
1037                 pi->cmd = be16_to_cpu(h->command);
1038                 pi->size = be32_to_cpu(h->length);
1039         } else if (header_size == sizeof(struct p_header95) &&
1040                    *(__be16 *)header == cpu_to_be16(DRBD_MAGIC_BIG)) {
1041                 struct p_header95 *h = header;
1042                 pi->cmd = be16_to_cpu(h->command);
1043                 pi->size = be32_to_cpu(h->length);
1044                 pi->vnr = 0;
1045         } else if (header_size == sizeof(struct p_header80) &&
1046                    *(__be32 *)header == cpu_to_be32(DRBD_MAGIC)) {
1047                 struct p_header80 *h = header;
1048                 pi->cmd = be16_to_cpu(h->command);
1049                 pi->size = be16_to_cpu(h->length);
1050                 pi->vnr = 0;
1051         } else {
1052                 conn_err(tconn, "Wrong magic value 0x%08x in protocol version %d\n",
1053                          be32_to_cpu(*(__be32 *)header),
1054                          tconn->agreed_pro_version);
1055                 return -EINVAL;
1056         }
1057         pi->data = header + header_size;
1058         return 0;
1059 }
1060
1061 static int drbd_recv_header(struct drbd_tconn *tconn, struct packet_info *pi)
1062 {
1063         void *buffer = tconn->data.rbuf;
1064         int err;
1065
1066         err = drbd_recv_all_warn(tconn, buffer, drbd_header_size(tconn));
1067         if (err)
1068                 return err;
1069
1070         err = decode_header(tconn, buffer, pi);
1071         tconn->last_received = jiffies;
1072
1073         return err;
1074 }
1075
1076 static void drbd_flush(struct drbd_conf *mdev)
1077 {
1078         int rv;
1079
1080         if (mdev->write_ordering >= WO_bdev_flush && get_ldev(mdev)) {
1081                 rv = blkdev_issue_flush(mdev->ldev->backing_bdev, GFP_KERNEL,
1082                                         NULL);
1083                 if (rv) {
1084                         dev_info(DEV, "local disk flush failed with status %d\n", rv);
1085                         /* would rather check on EOPNOTSUPP, but that is not reliable.
1086                          * don't try again for ANY return value != 0
1087                          * if (rv == -EOPNOTSUPP) */
1088                         drbd_bump_write_ordering(mdev, WO_drain_io);
1089                 }
1090                 put_ldev(mdev);
1091         }
1092 }
1093
1094 /**
1095  * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it.
1096  * @mdev:       DRBD device.
1097  * @epoch:      Epoch object.
1098  * @ev:         Epoch event.
1099  */
1100 static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *mdev,
1101                                                struct drbd_epoch *epoch,
1102                                                enum epoch_event ev)
1103 {
1104         int epoch_size;
1105         struct drbd_epoch *next_epoch;
1106         enum finish_epoch rv = FE_STILL_LIVE;
1107
1108         spin_lock(&mdev->epoch_lock);
1109         do {
1110                 next_epoch = NULL;
1111
1112                 epoch_size = atomic_read(&epoch->epoch_size);
1113
1114                 switch (ev & ~EV_CLEANUP) {
1115                 case EV_PUT:
1116                         atomic_dec(&epoch->active);
1117                         break;
1118                 case EV_GOT_BARRIER_NR:
1119                         set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags);
1120                         break;
1121                 case EV_BECAME_LAST:
1122                         /* nothing to do*/
1123                         break;
1124                 }
1125
1126                 if (epoch_size != 0 &&
1127                     atomic_read(&epoch->active) == 0 &&
1128                     test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags)) {
1129                         if (!(ev & EV_CLEANUP)) {
1130                                 spin_unlock(&mdev->epoch_lock);
1131                                 drbd_send_b_ack(mdev, epoch->barrier_nr, epoch_size);
1132                                 spin_lock(&mdev->epoch_lock);
1133                         }
1134                         dec_unacked(mdev);
1135
1136                         if (mdev->current_epoch != epoch) {
1137                                 next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list);
1138                                 list_del(&epoch->list);
1139                                 ev = EV_BECAME_LAST | (ev & EV_CLEANUP);
1140                                 mdev->epochs--;
1141                                 kfree(epoch);
1142
1143                                 if (rv == FE_STILL_LIVE)
1144                                         rv = FE_DESTROYED;
1145                         } else {
1146                                 epoch->flags = 0;
1147                                 atomic_set(&epoch->epoch_size, 0);
1148                                 /* atomic_set(&epoch->active, 0); is already zero */
1149                                 if (rv == FE_STILL_LIVE)
1150                                         rv = FE_RECYCLED;
1151                                 wake_up(&mdev->ee_wait);
1152                         }
1153                 }
1154
1155                 if (!next_epoch)
1156                         break;
1157
1158                 epoch = next_epoch;
1159         } while (1);
1160
1161         spin_unlock(&mdev->epoch_lock);
1162
1163         return rv;
1164 }
1165
1166 /**
1167  * drbd_bump_write_ordering() - Fall back to an other write ordering method
1168  * @mdev:       DRBD device.
1169  * @wo:         Write ordering method to try.
1170  */
1171 void drbd_bump_write_ordering(struct drbd_conf *mdev, enum write_ordering_e wo) __must_hold(local)
1172 {
1173         struct disk_conf *dc;
1174         enum write_ordering_e pwo;
1175         static char *write_ordering_str[] = {
1176                 [WO_none] = "none",
1177                 [WO_drain_io] = "drain",
1178                 [WO_bdev_flush] = "flush",
1179         };
1180
1181         pwo = mdev->write_ordering;
1182         wo = min(pwo, wo);
1183         rcu_read_lock();
1184         dc = rcu_dereference(mdev->ldev->disk_conf);
1185
1186         if (wo == WO_bdev_flush && !dc->disk_flushes)
1187                 wo = WO_drain_io;
1188         if (wo == WO_drain_io && !dc->disk_drain)
1189                 wo = WO_none;
1190         rcu_read_unlock();
1191         mdev->write_ordering = wo;
1192         if (pwo != mdev->write_ordering || wo == WO_bdev_flush)
1193                 dev_info(DEV, "Method to ensure write ordering: %s\n", write_ordering_str[mdev->write_ordering]);
1194 }
1195
1196 /**
1197  * drbd_submit_peer_request()
1198  * @mdev:       DRBD device.
1199  * @peer_req:   peer request
1200  * @rw:         flag field, see bio->bi_rw
1201  *
1202  * May spread the pages to multiple bios,
1203  * depending on bio_add_page restrictions.
1204  *
1205  * Returns 0 if all bios have been submitted,
1206  * -ENOMEM if we could not allocate enough bios,
1207  * -ENOSPC (any better suggestion?) if we have not been able to bio_add_page a
1208  *  single page to an empty bio (which should never happen and likely indicates
1209  *  that the lower level IO stack is in some way broken). This has been observed
1210  *  on certain Xen deployments.
1211  */
1212 /* TODO allocate from our own bio_set. */
1213 int drbd_submit_peer_request(struct drbd_conf *mdev,
1214                              struct drbd_peer_request *peer_req,
1215                              const unsigned rw, const int fault_type)
1216 {
1217         struct bio *bios = NULL;
1218         struct bio *bio;
1219         struct page *page = peer_req->pages;
1220         sector_t sector = peer_req->i.sector;
1221         unsigned ds = peer_req->i.size;
1222         unsigned n_bios = 0;
1223         unsigned nr_pages = (ds + PAGE_SIZE -1) >> PAGE_SHIFT;
1224         int err = -ENOMEM;
1225
1226         /* In most cases, we will only need one bio.  But in case the lower
1227          * level restrictions happen to be different at this offset on this
1228          * side than those of the sending peer, we may need to submit the
1229          * request in more than one bio.
1230          *
1231          * Plain bio_alloc is good enough here, this is no DRBD internally
1232          * generated bio, but a bio allocated on behalf of the peer.
1233          */
1234 next_bio:
1235         bio = bio_alloc(GFP_NOIO, nr_pages);
1236         if (!bio) {
1237                 dev_err(DEV, "submit_ee: Allocation of a bio failed\n");
1238                 goto fail;
1239         }
1240         /* > peer_req->i.sector, unless this is the first bio */
1241         bio->bi_sector = sector;
1242         bio->bi_bdev = mdev->ldev->backing_bdev;
1243         bio->bi_rw = rw;
1244         bio->bi_private = peer_req;
1245         bio->bi_end_io = drbd_peer_request_endio;
1246
1247         bio->bi_next = bios;
1248         bios = bio;
1249         ++n_bios;
1250
1251         page_chain_for_each(page) {
1252                 unsigned len = min_t(unsigned, ds, PAGE_SIZE);
1253                 if (!bio_add_page(bio, page, len, 0)) {
1254                         /* A single page must always be possible!
1255                          * But in case it fails anyways,
1256                          * we deal with it, and complain (below). */
1257                         if (bio->bi_vcnt == 0) {
1258                                 dev_err(DEV,
1259                                         "bio_add_page failed for len=%u, "
1260                                         "bi_vcnt=0 (bi_sector=%llu)\n",
1261                                         len, (unsigned long long)bio->bi_sector);
1262                                 err = -ENOSPC;
1263                                 goto fail;
1264                         }
1265                         goto next_bio;
1266                 }
1267                 ds -= len;
1268                 sector += len >> 9;
1269                 --nr_pages;
1270         }
1271         D_ASSERT(page == NULL);
1272         D_ASSERT(ds == 0);
1273
1274         atomic_set(&peer_req->pending_bios, n_bios);
1275         do {
1276                 bio = bios;
1277                 bios = bios->bi_next;
1278                 bio->bi_next = NULL;
1279
1280                 drbd_generic_make_request(mdev, fault_type, bio);
1281         } while (bios);
1282         return 0;
1283
1284 fail:
1285         while (bios) {
1286                 bio = bios;
1287                 bios = bios->bi_next;
1288                 bio_put(bio);
1289         }
1290         return err;
1291 }
1292
1293 static void drbd_remove_epoch_entry_interval(struct drbd_conf *mdev,
1294                                              struct drbd_peer_request *peer_req)
1295 {
1296         struct drbd_interval *i = &peer_req->i;
1297
1298         drbd_remove_interval(&mdev->write_requests, i);
1299         drbd_clear_interval(i);
1300
1301         /* Wake up any processes waiting for this peer request to complete.  */
1302         if (i->waiting)
1303                 wake_up(&mdev->misc_wait);
1304 }
1305
1306 static int receive_Barrier(struct drbd_tconn *tconn, struct packet_info *pi)
1307 {
1308         struct drbd_conf *mdev;
1309         int rv;
1310         struct p_barrier *p = pi->data;
1311         struct drbd_epoch *epoch;
1312
1313         mdev = vnr_to_mdev(tconn, pi->vnr);
1314         if (!mdev)
1315                 return -EIO;
1316
1317         inc_unacked(mdev);
1318
1319         mdev->current_epoch->barrier_nr = p->barrier;
1320         rv = drbd_may_finish_epoch(mdev, mdev->current_epoch, EV_GOT_BARRIER_NR);
1321
1322         /* P_BARRIER_ACK may imply that the corresponding extent is dropped from
1323          * the activity log, which means it would not be resynced in case the
1324          * R_PRIMARY crashes now.
1325          * Therefore we must send the barrier_ack after the barrier request was
1326          * completed. */
1327         switch (mdev->write_ordering) {
1328         case WO_none:
1329                 if (rv == FE_RECYCLED)
1330                         return 0;
1331
1332                 /* receiver context, in the writeout path of the other node.
1333                  * avoid potential distributed deadlock */
1334                 epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1335                 if (epoch)
1336                         break;
1337                 else
1338                         dev_warn(DEV, "Allocation of an epoch failed, slowing down\n");
1339                         /* Fall through */
1340
1341         case WO_bdev_flush:
1342         case WO_drain_io:
1343                 drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
1344                 drbd_flush(mdev);
1345
1346                 if (atomic_read(&mdev->current_epoch->epoch_size)) {
1347                         epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1348                         if (epoch)
1349                                 break;
1350                 }
1351
1352                 epoch = mdev->current_epoch;
1353                 wait_event(mdev->ee_wait, atomic_read(&epoch->epoch_size) == 0);
1354
1355                 D_ASSERT(atomic_read(&epoch->active) == 0);
1356                 D_ASSERT(epoch->flags == 0);
1357
1358                 return 0;
1359         default:
1360                 dev_err(DEV, "Strangeness in mdev->write_ordering %d\n", mdev->write_ordering);
1361                 return -EIO;
1362         }
1363
1364         epoch->flags = 0;
1365         atomic_set(&epoch->epoch_size, 0);
1366         atomic_set(&epoch->active, 0);
1367
1368         spin_lock(&mdev->epoch_lock);
1369         if (atomic_read(&mdev->current_epoch->epoch_size)) {
1370                 list_add(&epoch->list, &mdev->current_epoch->list);
1371                 mdev->current_epoch = epoch;
1372                 mdev->epochs++;
1373         } else {
1374                 /* The current_epoch got recycled while we allocated this one... */
1375                 kfree(epoch);
1376         }
1377         spin_unlock(&mdev->epoch_lock);
1378
1379         return 0;
1380 }
1381
1382 /* used from receive_RSDataReply (recv_resync_read)
1383  * and from receive_Data */
1384 static struct drbd_peer_request *
1385 read_in_block(struct drbd_conf *mdev, u64 id, sector_t sector,
1386               int data_size) __must_hold(local)
1387 {
1388         const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
1389         struct drbd_peer_request *peer_req;
1390         struct page *page;
1391         int dgs, ds, err;
1392         void *dig_in = mdev->tconn->int_dig_in;
1393         void *dig_vv = mdev->tconn->int_dig_vv;
1394         unsigned long *data;
1395
1396         dgs = 0;
1397         if (mdev->tconn->peer_integrity_tfm) {
1398                 dgs = crypto_hash_digestsize(mdev->tconn->peer_integrity_tfm);
1399                 /*
1400                  * FIXME: Receive the incoming digest into the receive buffer
1401                  *        here, together with its struct p_data?
1402                  */
1403                 err = drbd_recv_all_warn(mdev->tconn, dig_in, dgs);
1404                 if (err)
1405                         return NULL;
1406                 data_size -= dgs;
1407         }
1408
1409         if (!expect(data_size != 0))
1410                 return NULL;
1411         if (!expect(IS_ALIGNED(data_size, 512)))
1412                 return NULL;
1413         if (!expect(data_size <= DRBD_MAX_BIO_SIZE))
1414                 return NULL;
1415
1416         /* even though we trust out peer,
1417          * we sometimes have to double check. */
1418         if (sector + (data_size>>9) > capacity) {
1419                 dev_err(DEV, "request from peer beyond end of local disk: "
1420                         "capacity: %llus < sector: %llus + size: %u\n",
1421                         (unsigned long long)capacity,
1422                         (unsigned long long)sector, data_size);
1423                 return NULL;
1424         }
1425
1426         /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1427          * "criss-cross" setup, that might cause write-out on some other DRBD,
1428          * which in turn might block on the other node at this very place.  */
1429         peer_req = drbd_alloc_peer_req(mdev, id, sector, data_size, GFP_NOIO);
1430         if (!peer_req)
1431                 return NULL;
1432
1433         ds = data_size;
1434         page = peer_req->pages;
1435         page_chain_for_each(page) {
1436                 unsigned len = min_t(int, ds, PAGE_SIZE);
1437                 data = kmap(page);
1438                 err = drbd_recv_all_warn(mdev->tconn, data, len);
1439                 if (drbd_insert_fault(mdev, DRBD_FAULT_RECEIVE)) {
1440                         dev_err(DEV, "Fault injection: Corrupting data on receive\n");
1441                         data[0] = data[0] ^ (unsigned long)-1;
1442                 }
1443                 kunmap(page);
1444                 if (err) {
1445                         drbd_free_peer_req(mdev, peer_req);
1446                         return NULL;
1447                 }
1448                 ds -= len;
1449         }
1450
1451         if (dgs) {
1452                 drbd_csum_ee(mdev, mdev->tconn->peer_integrity_tfm, peer_req, dig_vv);
1453                 if (memcmp(dig_in, dig_vv, dgs)) {
1454                         dev_err(DEV, "Digest integrity check FAILED: %llus +%u\n",
1455                                 (unsigned long long)sector, data_size);
1456                         drbd_free_peer_req(mdev, peer_req);
1457                         return NULL;
1458                 }
1459         }
1460         mdev->recv_cnt += data_size>>9;
1461         return peer_req;
1462 }
1463
1464 /* drbd_drain_block() just takes a data block
1465  * out of the socket input buffer, and discards it.
1466  */
1467 static int drbd_drain_block(struct drbd_conf *mdev, int data_size)
1468 {
1469         struct page *page;
1470         int err = 0;
1471         void *data;
1472
1473         if (!data_size)
1474                 return 0;
1475
1476         page = drbd_alloc_pages(mdev, 1, 1);
1477
1478         data = kmap(page);
1479         while (data_size) {
1480                 unsigned int len = min_t(int, data_size, PAGE_SIZE);
1481
1482                 err = drbd_recv_all_warn(mdev->tconn, data, len);
1483                 if (err)
1484                         break;
1485                 data_size -= len;
1486         }
1487         kunmap(page);
1488         drbd_free_pages(mdev, page, 0);
1489         return err;
1490 }
1491
1492 static int recv_dless_read(struct drbd_conf *mdev, struct drbd_request *req,
1493                            sector_t sector, int data_size)
1494 {
1495         struct bio_vec *bvec;
1496         struct bio *bio;
1497         int dgs, err, i, expect;
1498         void *dig_in = mdev->tconn->int_dig_in;
1499         void *dig_vv = mdev->tconn->int_dig_vv;
1500
1501         dgs = 0;
1502         if (mdev->tconn->peer_integrity_tfm) {
1503                 dgs = crypto_hash_digestsize(mdev->tconn->peer_integrity_tfm);
1504                 err = drbd_recv_all_warn(mdev->tconn, dig_in, dgs);
1505                 if (err)
1506                         return err;
1507                 data_size -= dgs;
1508         }
1509
1510         /* optimistically update recv_cnt.  if receiving fails below,
1511          * we disconnect anyways, and counters will be reset. */
1512         mdev->recv_cnt += data_size>>9;
1513
1514         bio = req->master_bio;
1515         D_ASSERT(sector == bio->bi_sector);
1516
1517         bio_for_each_segment(bvec, bio, i) {
1518                 void *mapped = kmap(bvec->bv_page) + bvec->bv_offset;
1519                 expect = min_t(int, data_size, bvec->bv_len);
1520                 err = drbd_recv_all_warn(mdev->tconn, mapped, expect);
1521                 kunmap(bvec->bv_page);
1522                 if (err)
1523                         return err;
1524                 data_size -= expect;
1525         }
1526
1527         if (dgs) {
1528                 drbd_csum_bio(mdev, mdev->tconn->peer_integrity_tfm, bio, dig_vv);
1529                 if (memcmp(dig_in, dig_vv, dgs)) {
1530                         dev_err(DEV, "Digest integrity check FAILED. Broken NICs?\n");
1531                         return -EINVAL;
1532                 }
1533         }
1534
1535         D_ASSERT(data_size == 0);
1536         return 0;
1537 }
1538
1539 /*
1540  * e_end_resync_block() is called in asender context via
1541  * drbd_finish_peer_reqs().
1542  */
1543 static int e_end_resync_block(struct drbd_work *w, int unused)
1544 {
1545         struct drbd_peer_request *peer_req =
1546                 container_of(w, struct drbd_peer_request, w);
1547         struct drbd_conf *mdev = w->mdev;
1548         sector_t sector = peer_req->i.sector;
1549         int err;
1550
1551         D_ASSERT(drbd_interval_empty(&peer_req->i));
1552
1553         if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1554                 drbd_set_in_sync(mdev, sector, peer_req->i.size);
1555                 err = drbd_send_ack(mdev, P_RS_WRITE_ACK, peer_req);
1556         } else {
1557                 /* Record failure to sync */
1558                 drbd_rs_failed_io(mdev, sector, peer_req->i.size);
1559
1560                 err  = drbd_send_ack(mdev, P_NEG_ACK, peer_req);
1561         }
1562         dec_unacked(mdev);
1563
1564         return err;
1565 }
1566
1567 static int recv_resync_read(struct drbd_conf *mdev, sector_t sector, int data_size) __releases(local)
1568 {
1569         struct drbd_peer_request *peer_req;
1570
1571         peer_req = read_in_block(mdev, ID_SYNCER, sector, data_size);
1572         if (!peer_req)
1573                 goto fail;
1574
1575         dec_rs_pending(mdev);
1576
1577         inc_unacked(mdev);
1578         /* corresponding dec_unacked() in e_end_resync_block()
1579          * respective _drbd_clear_done_ee */
1580
1581         peer_req->w.cb = e_end_resync_block;
1582
1583         spin_lock_irq(&mdev->tconn->req_lock);
1584         list_add(&peer_req->w.list, &mdev->sync_ee);
1585         spin_unlock_irq(&mdev->tconn->req_lock);
1586
1587         atomic_add(data_size >> 9, &mdev->rs_sect_ev);
1588         if (drbd_submit_peer_request(mdev, peer_req, WRITE, DRBD_FAULT_RS_WR) == 0)
1589                 return 0;
1590
1591         /* don't care for the reason here */
1592         dev_err(DEV, "submit failed, triggering re-connect\n");
1593         spin_lock_irq(&mdev->tconn->req_lock);
1594         list_del(&peer_req->w.list);
1595         spin_unlock_irq(&mdev->tconn->req_lock);
1596
1597         drbd_free_peer_req(mdev, peer_req);
1598 fail:
1599         put_ldev(mdev);
1600         return -EIO;
1601 }
1602
1603 static struct drbd_request *
1604 find_request(struct drbd_conf *mdev, struct rb_root *root, u64 id,
1605              sector_t sector, bool missing_ok, const char *func)
1606 {
1607         struct drbd_request *req;
1608
1609         /* Request object according to our peer */
1610         req = (struct drbd_request *)(unsigned long)id;
1611         if (drbd_contains_interval(root, sector, &req->i) && req->i.local)
1612                 return req;
1613         if (!missing_ok) {
1614                 dev_err(DEV, "%s: failed to find request 0x%lx, sector %llus\n", func,
1615                         (unsigned long)id, (unsigned long long)sector);
1616         }
1617         return NULL;
1618 }
1619
1620 static int receive_DataReply(struct drbd_tconn *tconn, struct packet_info *pi)
1621 {
1622         struct drbd_conf *mdev;
1623         struct drbd_request *req;
1624         sector_t sector;
1625         int err;
1626         struct p_data *p = pi->data;
1627
1628         mdev = vnr_to_mdev(tconn, pi->vnr);
1629         if (!mdev)
1630                 return -EIO;
1631
1632         sector = be64_to_cpu(p->sector);
1633
1634         spin_lock_irq(&mdev->tconn->req_lock);
1635         req = find_request(mdev, &mdev->read_requests, p->block_id, sector, false, __func__);
1636         spin_unlock_irq(&mdev->tconn->req_lock);
1637         if (unlikely(!req))
1638                 return -EIO;
1639
1640         /* hlist_del(&req->collision) is done in _req_may_be_done, to avoid
1641          * special casing it there for the various failure cases.
1642          * still no race with drbd_fail_pending_reads */
1643         err = recv_dless_read(mdev, req, sector, pi->size);
1644         if (!err)
1645                 req_mod(req, DATA_RECEIVED);
1646         /* else: nothing. handled from drbd_disconnect...
1647          * I don't think we may complete this just yet
1648          * in case we are "on-disconnect: freeze" */
1649
1650         return err;
1651 }
1652
1653 static int receive_RSDataReply(struct drbd_tconn *tconn, struct packet_info *pi)
1654 {
1655         struct drbd_conf *mdev;
1656         sector_t sector;
1657         int err;
1658         struct p_data *p = pi->data;
1659
1660         mdev = vnr_to_mdev(tconn, pi->vnr);
1661         if (!mdev)
1662                 return -EIO;
1663
1664         sector = be64_to_cpu(p->sector);
1665         D_ASSERT(p->block_id == ID_SYNCER);
1666
1667         if (get_ldev(mdev)) {
1668                 /* data is submitted to disk within recv_resync_read.
1669                  * corresponding put_ldev done below on error,
1670                  * or in drbd_peer_request_endio. */
1671                 err = recv_resync_read(mdev, sector, pi->size);
1672         } else {
1673                 if (__ratelimit(&drbd_ratelimit_state))
1674                         dev_err(DEV, "Can not write resync data to local disk.\n");
1675
1676                 err = drbd_drain_block(mdev, pi->size);
1677
1678                 drbd_send_ack_dp(mdev, P_NEG_ACK, p, pi->size);
1679         }
1680
1681         atomic_add(pi->size >> 9, &mdev->rs_sect_in);
1682
1683         return err;
1684 }
1685
1686 static int w_restart_write(struct drbd_work *w, int cancel)
1687 {
1688         struct drbd_request *req = container_of(w, struct drbd_request, w);
1689         struct drbd_conf *mdev = w->mdev;
1690         struct bio *bio;
1691         unsigned long start_time;
1692         unsigned long flags;
1693
1694         spin_lock_irqsave(&mdev->tconn->req_lock, flags);
1695         if (!expect(req->rq_state & RQ_POSTPONED)) {
1696                 spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
1697                 return -EIO;
1698         }
1699         bio = req->master_bio;
1700         start_time = req->start_time;
1701         /* Postponed requests will not have their master_bio completed!  */
1702         __req_mod(req, DISCARD_WRITE, NULL);
1703         spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
1704
1705         while (__drbd_make_request(mdev, bio, start_time))
1706                 /* retry */ ;
1707         return 0;
1708 }
1709
1710 static void restart_conflicting_writes(struct drbd_conf *mdev,
1711                                        sector_t sector, int size)
1712 {
1713         struct drbd_interval *i;
1714         struct drbd_request *req;
1715
1716         drbd_for_each_overlap(i, &mdev->write_requests, sector, size) {
1717                 if (!i->local)
1718                         continue;
1719                 req = container_of(i, struct drbd_request, i);
1720                 if (req->rq_state & RQ_LOCAL_PENDING ||
1721                     !(req->rq_state & RQ_POSTPONED))
1722                         continue;
1723                 if (expect(list_empty(&req->w.list))) {
1724                         req->w.mdev = mdev;
1725                         req->w.cb = w_restart_write;
1726                         drbd_queue_work(&mdev->tconn->data.work, &req->w);
1727                 }
1728         }
1729 }
1730
1731 /*
1732  * e_end_block() is called in asender context via drbd_finish_peer_reqs().
1733  */
1734 static int e_end_block(struct drbd_work *w, int cancel)
1735 {
1736         struct drbd_peer_request *peer_req =
1737                 container_of(w, struct drbd_peer_request, w);
1738         struct drbd_conf *mdev = w->mdev;
1739         sector_t sector = peer_req->i.sector;
1740         int err = 0, pcmd;
1741
1742         if (peer_req->flags & EE_SEND_WRITE_ACK) {
1743                 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1744                         pcmd = (mdev->state.conn >= C_SYNC_SOURCE &&
1745                                 mdev->state.conn <= C_PAUSED_SYNC_T &&
1746                                 peer_req->flags & EE_MAY_SET_IN_SYNC) ?
1747                                 P_RS_WRITE_ACK : P_WRITE_ACK;
1748                         err = drbd_send_ack(mdev, pcmd, peer_req);
1749                         if (pcmd == P_RS_WRITE_ACK)
1750                                 drbd_set_in_sync(mdev, sector, peer_req->i.size);
1751                 } else {
1752                         err = drbd_send_ack(mdev, P_NEG_ACK, peer_req);
1753                         /* we expect it to be marked out of sync anyways...
1754                          * maybe assert this?  */
1755                 }
1756                 dec_unacked(mdev);
1757         }
1758         /* we delete from the conflict detection hash _after_ we sent out the
1759          * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right.  */
1760         if (peer_req->flags & EE_IN_INTERVAL_TREE) {
1761                 spin_lock_irq(&mdev->tconn->req_lock);
1762                 D_ASSERT(!drbd_interval_empty(&peer_req->i));
1763                 drbd_remove_epoch_entry_interval(mdev, peer_req);
1764                 if (peer_req->flags & EE_RESTART_REQUESTS)
1765                         restart_conflicting_writes(mdev, sector, peer_req->i.size);
1766                 spin_unlock_irq(&mdev->tconn->req_lock);
1767         } else
1768                 D_ASSERT(drbd_interval_empty(&peer_req->i));
1769
1770         drbd_may_finish_epoch(mdev, peer_req->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0));
1771
1772         return err;
1773 }
1774
1775 static int e_send_ack(struct drbd_work *w, enum drbd_packet ack)
1776 {
1777         struct drbd_conf *mdev = w->mdev;
1778         struct drbd_peer_request *peer_req =
1779                 container_of(w, struct drbd_peer_request, w);
1780         int err;
1781
1782         err = drbd_send_ack(mdev, ack, peer_req);
1783         dec_unacked(mdev);
1784
1785         return err;
1786 }
1787
1788 static int e_send_discard_write(struct drbd_work *w, int unused)
1789 {
1790         return e_send_ack(w, P_DISCARD_WRITE);
1791 }
1792
1793 static int e_send_retry_write(struct drbd_work *w, int unused)
1794 {
1795         struct drbd_tconn *tconn = w->mdev->tconn;
1796
1797         return e_send_ack(w, tconn->agreed_pro_version >= 100 ?
1798                              P_RETRY_WRITE : P_DISCARD_WRITE);
1799 }
1800
1801 static bool seq_greater(u32 a, u32 b)
1802 {
1803         /*
1804          * We assume 32-bit wrap-around here.
1805          * For 24-bit wrap-around, we would have to shift:
1806          *  a <<= 8; b <<= 8;
1807          */
1808         return (s32)a - (s32)b > 0;
1809 }
1810
1811 static u32 seq_max(u32 a, u32 b)
1812 {
1813         return seq_greater(a, b) ? a : b;
1814 }
1815
1816 static bool need_peer_seq(struct drbd_conf *mdev)
1817 {
1818         struct drbd_tconn *tconn = mdev->tconn;
1819         int tp;
1820
1821         /*
1822          * We only need to keep track of the last packet_seq number of our peer
1823          * if we are in dual-primary mode and we have the discard flag set; see
1824          * handle_write_conflicts().
1825          */
1826
1827         rcu_read_lock();
1828         tp = rcu_dereference(mdev->tconn->net_conf)->two_primaries;
1829         rcu_read_unlock();
1830
1831         return tp && test_bit(DISCARD_CONCURRENT, &tconn->flags);
1832 }
1833
1834 static void update_peer_seq(struct drbd_conf *mdev, unsigned int peer_seq)
1835 {
1836         unsigned int newest_peer_seq;
1837
1838         if (need_peer_seq(mdev)) {
1839                 spin_lock(&mdev->peer_seq_lock);
1840                 newest_peer_seq = seq_max(mdev->peer_seq, peer_seq);
1841                 mdev->peer_seq = newest_peer_seq;
1842                 spin_unlock(&mdev->peer_seq_lock);
1843                 /* wake up only if we actually changed mdev->peer_seq */
1844                 if (peer_seq == newest_peer_seq)
1845                         wake_up(&mdev->seq_wait);
1846         }
1847 }
1848
1849 /* Called from receive_Data.
1850  * Synchronize packets on sock with packets on msock.
1851  *
1852  * This is here so even when a P_DATA packet traveling via sock overtook an Ack
1853  * packet traveling on msock, they are still processed in the order they have
1854  * been sent.
1855  *
1856  * Note: we don't care for Ack packets overtaking P_DATA packets.
1857  *
1858  * In case packet_seq is larger than mdev->peer_seq number, there are
1859  * outstanding packets on the msock. We wait for them to arrive.
1860  * In case we are the logically next packet, we update mdev->peer_seq
1861  * ourselves. Correctly handles 32bit wrap around.
1862  *
1863  * Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
1864  * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
1865  * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
1866  * 1<<9 == 512 seconds aka ages for the 32bit wrap around...
1867  *
1868  * returns 0 if we may process the packet,
1869  * -ERESTARTSYS if we were interrupted (by disconnect signal). */
1870 static int wait_for_and_update_peer_seq(struct drbd_conf *mdev, const u32 peer_seq)
1871 {
1872         DEFINE_WAIT(wait);
1873         long timeout;
1874         int ret;
1875
1876         if (!need_peer_seq(mdev))
1877                 return 0;
1878
1879         spin_lock(&mdev->peer_seq_lock);
1880         for (;;) {
1881                 if (!seq_greater(peer_seq - 1, mdev->peer_seq)) {
1882                         mdev->peer_seq = seq_max(mdev->peer_seq, peer_seq);
1883                         ret = 0;
1884                         break;
1885                 }
1886                 if (signal_pending(current)) {
1887                         ret = -ERESTARTSYS;
1888                         break;
1889                 }
1890                 prepare_to_wait(&mdev->seq_wait, &wait, TASK_INTERRUPTIBLE);
1891                 spin_unlock(&mdev->peer_seq_lock);
1892                 rcu_read_lock();
1893                 timeout = rcu_dereference(mdev->tconn->net_conf)->ping_timeo*HZ/10;
1894                 rcu_read_unlock();
1895                 timeout = schedule_timeout(timeout);
1896                 spin_lock(&mdev->peer_seq_lock);
1897                 if (!timeout) {
1898                         ret = -ETIMEDOUT;
1899                         dev_err(DEV, "Timed out waiting for missing ack packets; disconnecting\n");
1900                         break;
1901                 }
1902         }
1903         spin_unlock(&mdev->peer_seq_lock);
1904         finish_wait(&mdev->seq_wait, &wait);
1905         return ret;
1906 }
1907
1908 /* see also bio_flags_to_wire()
1909  * DRBD_REQ_*, because we need to semantically map the flags to data packet
1910  * flags and back. We may replicate to other kernel versions. */
1911 static unsigned long wire_flags_to_bio(struct drbd_conf *mdev, u32 dpf)
1912 {
1913         return  (dpf & DP_RW_SYNC ? REQ_SYNC : 0) |
1914                 (dpf & DP_FUA ? REQ_FUA : 0) |
1915                 (dpf & DP_FLUSH ? REQ_FLUSH : 0) |
1916                 (dpf & DP_DISCARD ? REQ_DISCARD : 0);
1917 }
1918
1919 static void fail_postponed_requests(struct drbd_conf *mdev, sector_t sector,
1920                                     unsigned int size)
1921 {
1922         struct drbd_interval *i;
1923
1924     repeat:
1925         drbd_for_each_overlap(i, &mdev->write_requests, sector, size) {
1926                 struct drbd_request *req;
1927                 struct bio_and_error m;
1928
1929                 if (!i->local)
1930                         continue;
1931                 req = container_of(i, struct drbd_request, i);
1932                 if (!(req->rq_state & RQ_POSTPONED))
1933                         continue;
1934                 req->rq_state &= ~RQ_POSTPONED;
1935                 __req_mod(req, NEG_ACKED, &m);
1936                 spin_unlock_irq(&mdev->tconn->req_lock);
1937                 if (m.bio)
1938                         complete_master_bio(mdev, &m);
1939                 spin_lock_irq(&mdev->tconn->req_lock);
1940                 goto repeat;
1941         }
1942 }
1943
1944 static int handle_write_conflicts(struct drbd_conf *mdev,
1945                                   struct drbd_peer_request *peer_req)
1946 {
1947         struct drbd_tconn *tconn = mdev->tconn;
1948         bool resolve_conflicts = test_bit(DISCARD_CONCURRENT, &tconn->flags);
1949         sector_t sector = peer_req->i.sector;
1950         const unsigned int size = peer_req->i.size;
1951         struct drbd_interval *i;
1952         bool equal;
1953         int err;
1954
1955         /*
1956          * Inserting the peer request into the write_requests tree will prevent
1957          * new conflicting local requests from being added.
1958          */
1959         drbd_insert_interval(&mdev->write_requests, &peer_req->i);
1960
1961     repeat:
1962         drbd_for_each_overlap(i, &mdev->write_requests, sector, size) {
1963                 if (i == &peer_req->i)
1964                         continue;
1965
1966                 if (!i->local) {
1967                         /*
1968                          * Our peer has sent a conflicting remote request; this
1969                          * should not happen in a two-node setup.  Wait for the
1970                          * earlier peer request to complete.
1971                          */
1972                         err = drbd_wait_misc(mdev, i);
1973                         if (err)
1974                                 goto out;
1975                         goto repeat;
1976                 }
1977
1978                 equal = i->sector == sector && i->size == size;
1979                 if (resolve_conflicts) {
1980                         /*
1981                          * If the peer request is fully contained within the
1982                          * overlapping request, it can be discarded; otherwise,
1983                          * it will be retried once all overlapping requests
1984                          * have completed.
1985                          */
1986                         bool discard = i->sector <= sector && i->sector +
1987                                        (i->size >> 9) >= sector + (size >> 9);
1988
1989                         if (!equal)
1990                                 dev_alert(DEV, "Concurrent writes detected: "
1991                                                "local=%llus +%u, remote=%llus +%u, "
1992                                                "assuming %s came first\n",
1993                                           (unsigned long long)i->sector, i->size,
1994                                           (unsigned long long)sector, size,
1995                                           discard ? "local" : "remote");
1996
1997                         inc_unacked(mdev);
1998                         peer_req->w.cb = discard ? e_send_discard_write :
1999                                                    e_send_retry_write;
2000                         list_add_tail(&peer_req->w.list, &mdev->done_ee);
2001                         wake_asender(mdev->tconn);
2002
2003                         err = -ENOENT;
2004                         goto out;
2005                 } else {
2006                         struct drbd_request *req =
2007                                 container_of(i, struct drbd_request, i);
2008
2009                         if (!equal)
2010                                 dev_alert(DEV, "Concurrent writes detected: "
2011                                                "local=%llus +%u, remote=%llus +%u\n",
2012                                           (unsigned long long)i->sector, i->size,
2013                                           (unsigned long long)sector, size);
2014
2015                         if (req->rq_state & RQ_LOCAL_PENDING ||
2016                             !(req->rq_state & RQ_POSTPONED)) {
2017                                 /*
2018                                  * Wait for the node with the discard flag to
2019                                  * decide if this request will be discarded or
2020                                  * retried.  Requests that are discarded will
2021                                  * disappear from the write_requests tree.
2022                                  *
2023                                  * In addition, wait for the conflicting
2024                                  * request to finish locally before submitting
2025                                  * the conflicting peer request.
2026                                  */
2027                                 err = drbd_wait_misc(mdev, &req->i);
2028                                 if (err) {
2029                                         _conn_request_state(mdev->tconn,
2030                                                             NS(conn, C_TIMEOUT),
2031                                                             CS_HARD);
2032                                         fail_postponed_requests(mdev, sector, size);
2033                                         goto out;
2034                                 }
2035                                 goto repeat;
2036                         }
2037                         /*
2038                          * Remember to restart the conflicting requests after
2039                          * the new peer request has completed.
2040                          */
2041                         peer_req->flags |= EE_RESTART_REQUESTS;
2042                 }
2043         }
2044         err = 0;
2045
2046     out:
2047         if (err)
2048                 drbd_remove_epoch_entry_interval(mdev, peer_req);
2049         return err;
2050 }
2051
2052 /* mirrored write */
2053 static int receive_Data(struct drbd_tconn *tconn, struct packet_info *pi)
2054 {
2055         struct drbd_conf *mdev;
2056         sector_t sector;
2057         struct drbd_peer_request *peer_req;
2058         struct p_data *p = pi->data;
2059         u32 peer_seq = be32_to_cpu(p->seq_num);
2060         int rw = WRITE;
2061         u32 dp_flags;
2062         int err, tp;
2063
2064         mdev = vnr_to_mdev(tconn, pi->vnr);
2065         if (!mdev)
2066                 return -EIO;
2067
2068         if (!get_ldev(mdev)) {
2069                 int err2;
2070
2071                 err = wait_for_and_update_peer_seq(mdev, peer_seq);
2072                 drbd_send_ack_dp(mdev, P_NEG_ACK, p, pi->size);
2073                 atomic_inc(&mdev->current_epoch->epoch_size);
2074                 err2 = drbd_drain_block(mdev, pi->size);
2075                 if (!err)
2076                         err = err2;
2077                 return err;
2078         }
2079
2080         /*
2081          * Corresponding put_ldev done either below (on various errors), or in
2082          * drbd_peer_request_endio, if we successfully submit the data at the
2083          * end of this function.
2084          */
2085
2086         sector = be64_to_cpu(p->sector);
2087         peer_req = read_in_block(mdev, p->block_id, sector, pi->size);
2088         if (!peer_req) {
2089                 put_ldev(mdev);
2090                 return -EIO;
2091         }
2092
2093         peer_req->w.cb = e_end_block;
2094
2095         dp_flags = be32_to_cpu(p->dp_flags);
2096         rw |= wire_flags_to_bio(mdev, dp_flags);
2097
2098         if (dp_flags & DP_MAY_SET_IN_SYNC)
2099                 peer_req->flags |= EE_MAY_SET_IN_SYNC;
2100
2101         spin_lock(&mdev->epoch_lock);
2102         peer_req->epoch = mdev->current_epoch;
2103         atomic_inc(&peer_req->epoch->epoch_size);
2104         atomic_inc(&peer_req->epoch->active);
2105         spin_unlock(&mdev->epoch_lock);
2106
2107         rcu_read_lock();
2108         tp = rcu_dereference(mdev->tconn->net_conf)->two_primaries;
2109         rcu_read_unlock();
2110         if (tp) {
2111                 peer_req->flags |= EE_IN_INTERVAL_TREE;
2112                 err = wait_for_and_update_peer_seq(mdev, peer_seq);
2113                 if (err)
2114                         goto out_interrupted;
2115                 spin_lock_irq(&mdev->tconn->req_lock);
2116                 err = handle_write_conflicts(mdev, peer_req);
2117                 if (err) {
2118                         spin_unlock_irq(&mdev->tconn->req_lock);
2119                         if (err == -ENOENT) {
2120                                 put_ldev(mdev);
2121                                 return 0;
2122                         }
2123                         goto out_interrupted;
2124                 }
2125         } else
2126                 spin_lock_irq(&mdev->tconn->req_lock);
2127         list_add(&peer_req->w.list, &mdev->active_ee);
2128         spin_unlock_irq(&mdev->tconn->req_lock);
2129
2130         if (mdev->tconn->agreed_pro_version < 100) {
2131                 rcu_read_lock();
2132                 switch (rcu_dereference(mdev->tconn->net_conf)->wire_protocol) {
2133                 case DRBD_PROT_C:
2134                         dp_flags |= DP_SEND_WRITE_ACK;
2135                         break;
2136                 case DRBD_PROT_B:
2137                         dp_flags |= DP_SEND_RECEIVE_ACK;
2138                         break;
2139                 }
2140                 rcu_read_unlock();
2141         }
2142
2143         if (dp_flags & DP_SEND_WRITE_ACK) {
2144                 peer_req->flags |= EE_SEND_WRITE_ACK;
2145                 inc_unacked(mdev);
2146                 /* corresponding dec_unacked() in e_end_block()
2147                  * respective _drbd_clear_done_ee */
2148         }
2149
2150         if (dp_flags & DP_SEND_RECEIVE_ACK) {
2151                 /* I really don't like it that the receiver thread
2152                  * sends on the msock, but anyways */
2153                 drbd_send_ack(mdev, P_RECV_ACK, peer_req);
2154         }
2155
2156         if (mdev->state.pdsk < D_INCONSISTENT) {
2157                 /* In case we have the only disk of the cluster, */
2158                 drbd_set_out_of_sync(mdev, peer_req->i.sector, peer_req->i.size);
2159                 peer_req->flags |= EE_CALL_AL_COMPLETE_IO;
2160                 peer_req->flags &= ~EE_MAY_SET_IN_SYNC;
2161                 drbd_al_begin_io(mdev, &peer_req->i);
2162         }
2163
2164         err = drbd_submit_peer_request(mdev, peer_req, rw, DRBD_FAULT_DT_WR);
2165         if (!err)
2166                 return 0;
2167
2168         /* don't care for the reason here */
2169         dev_err(DEV, "submit failed, triggering re-connect\n");
2170         spin_lock_irq(&mdev->tconn->req_lock);
2171         list_del(&peer_req->w.list);
2172         drbd_remove_epoch_entry_interval(mdev, peer_req);
2173         spin_unlock_irq(&mdev->tconn->req_lock);
2174         if (peer_req->flags & EE_CALL_AL_COMPLETE_IO)
2175                 drbd_al_complete_io(mdev, &peer_req->i);
2176
2177 out_interrupted:
2178         drbd_may_finish_epoch(mdev, peer_req->epoch, EV_PUT + EV_CLEANUP);
2179         put_ldev(mdev);
2180         drbd_free_peer_req(mdev, peer_req);
2181         return err;
2182 }
2183
2184 /* We may throttle resync, if the lower device seems to be busy,
2185  * and current sync rate is above c_min_rate.
2186  *
2187  * To decide whether or not the lower device is busy, we use a scheme similar
2188  * to MD RAID is_mddev_idle(): if the partition stats reveal "significant"
2189  * (more than 64 sectors) of activity we cannot account for with our own resync
2190  * activity, it obviously is "busy".
2191  *
2192  * The current sync rate used here uses only the most recent two step marks,
2193  * to have a short time average so we can react faster.
2194  */
2195 int drbd_rs_should_slow_down(struct drbd_conf *mdev, sector_t sector)
2196 {
2197         struct gendisk *disk = mdev->ldev->backing_bdev->bd_contains->bd_disk;
2198         unsigned long db, dt, dbdt;
2199         struct lc_element *tmp;
2200         int curr_events;
2201         int throttle = 0;
2202         unsigned int c_min_rate;
2203
2204         rcu_read_lock();
2205         c_min_rate = rcu_dereference(mdev->ldev->disk_conf)->c_min_rate;
2206         rcu_read_unlock();
2207
2208         /* feature disabled? */
2209         if (c_min_rate == 0)
2210                 return 0;
2211
2212         spin_lock_irq(&mdev->al_lock);
2213         tmp = lc_find(mdev->resync, BM_SECT_TO_EXT(sector));
2214         if (tmp) {
2215                 struct bm_extent *bm_ext = lc_entry(tmp, struct bm_extent, lce);
2216                 if (test_bit(BME_PRIORITY, &bm_ext->flags)) {
2217                         spin_unlock_irq(&mdev->al_lock);
2218                         return 0;
2219                 }
2220                 /* Do not slow down if app IO is already waiting for this extent */
2221         }
2222         spin_unlock_irq(&mdev->al_lock);
2223
2224         curr_events = (int)part_stat_read(&disk->part0, sectors[0]) +
2225                       (int)part_stat_read(&disk->part0, sectors[1]) -
2226                         atomic_read(&mdev->rs_sect_ev);
2227
2228         if (!mdev->rs_last_events || curr_events - mdev->rs_last_events > 64) {
2229                 unsigned long rs_left;
2230                 int i;
2231
2232                 mdev->rs_last_events = curr_events;
2233
2234                 /* sync speed average over the last 2*DRBD_SYNC_MARK_STEP,
2235                  * approx. */
2236                 i = (mdev->rs_last_mark + DRBD_SYNC_MARKS-1) % DRBD_SYNC_MARKS;
2237
2238                 if (mdev->state.conn == C_VERIFY_S || mdev->state.conn == C_VERIFY_T)
2239                         rs_left = mdev->ov_left;
2240                 else
2241                         rs_left = drbd_bm_total_weight(mdev) - mdev->rs_failed;
2242
2243                 dt = ((long)jiffies - (long)mdev->rs_mark_time[i]) / HZ;
2244                 if (!dt)
2245                         dt++;
2246                 db = mdev->rs_mark_left[i] - rs_left;
2247                 dbdt = Bit2KB(db/dt);
2248
2249                 if (dbdt > c_min_rate)
2250                         throttle = 1;
2251         }
2252         return throttle;
2253 }
2254
2255
2256 static int receive_DataRequest(struct drbd_tconn *tconn, struct packet_info *pi)
2257 {
2258         struct drbd_conf *mdev;
2259         sector_t sector;
2260         sector_t capacity;
2261         struct drbd_peer_request *peer_req;
2262         struct digest_info *di = NULL;
2263         int size, verb;
2264         unsigned int fault_type;
2265         struct p_block_req *p = pi->data;
2266
2267         mdev = vnr_to_mdev(tconn, pi->vnr);
2268         if (!mdev)
2269                 return -EIO;
2270         capacity = drbd_get_capacity(mdev->this_bdev);
2271
2272         sector = be64_to_cpu(p->sector);
2273         size   = be32_to_cpu(p->blksize);
2274
2275         if (size <= 0 || !IS_ALIGNED(size, 512) || size > DRBD_MAX_BIO_SIZE) {
2276                 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2277                                 (unsigned long long)sector, size);
2278                 return -EINVAL;
2279         }
2280         if (sector + (size>>9) > capacity) {
2281                 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2282                                 (unsigned long long)sector, size);
2283                 return -EINVAL;
2284         }
2285
2286         if (!get_ldev_if_state(mdev, D_UP_TO_DATE)) {
2287                 verb = 1;
2288                 switch (pi->cmd) {
2289                 case P_DATA_REQUEST:
2290                         drbd_send_ack_rp(mdev, P_NEG_DREPLY, p);
2291                         break;
2292                 case P_RS_DATA_REQUEST:
2293                 case P_CSUM_RS_REQUEST:
2294                 case P_OV_REQUEST:
2295                         drbd_send_ack_rp(mdev, P_NEG_RS_DREPLY , p);
2296                         break;
2297                 case P_OV_REPLY:
2298                         verb = 0;
2299                         dec_rs_pending(mdev);
2300                         drbd_send_ack_ex(mdev, P_OV_RESULT, sector, size, ID_IN_SYNC);
2301                         break;
2302                 default:
2303                         BUG();
2304                 }
2305                 if (verb && __ratelimit(&drbd_ratelimit_state))
2306                         dev_err(DEV, "Can not satisfy peer's read request, "
2307                             "no local data.\n");
2308
2309                 /* drain possibly payload */
2310                 return drbd_drain_block(mdev, pi->size);
2311         }
2312
2313         /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
2314          * "criss-cross" setup, that might cause write-out on some other DRBD,
2315          * which in turn might block on the other node at this very place.  */
2316         peer_req = drbd_alloc_peer_req(mdev, p->block_id, sector, size, GFP_NOIO);
2317         if (!peer_req) {
2318                 put_ldev(mdev);
2319                 return -ENOMEM;
2320         }
2321
2322         switch (pi->cmd) {
2323         case P_DATA_REQUEST:
2324                 peer_req->w.cb = w_e_end_data_req;
2325                 fault_type = DRBD_FAULT_DT_RD;
2326                 /* application IO, don't drbd_rs_begin_io */
2327                 goto submit;
2328
2329         case P_RS_DATA_REQUEST:
2330                 peer_req->w.cb = w_e_end_rsdata_req;
2331                 fault_type = DRBD_FAULT_RS_RD;
2332                 /* used in the sector offset progress display */
2333                 mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
2334                 break;
2335
2336         case P_OV_REPLY:
2337         case P_CSUM_RS_REQUEST:
2338                 fault_type = DRBD_FAULT_RS_RD;
2339                 di = kmalloc(sizeof(*di) + pi->size, GFP_NOIO);
2340                 if (!di)
2341                         goto out_free_e;
2342
2343                 di->digest_size = pi->size;
2344                 di->digest = (((char *)di)+sizeof(struct digest_info));
2345
2346                 peer_req->digest = di;
2347                 peer_req->flags |= EE_HAS_DIGEST;
2348
2349                 if (drbd_recv_all(mdev->tconn, di->digest, pi->size))
2350                         goto out_free_e;
2351
2352                 if (pi->cmd == P_CSUM_RS_REQUEST) {
2353                         D_ASSERT(mdev->tconn->agreed_pro_version >= 89);
2354                         peer_req->w.cb = w_e_end_csum_rs_req;
2355                         /* used in the sector offset progress display */
2356                         mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
2357                 } else if (pi->cmd == P_OV_REPLY) {
2358                         /* track progress, we may need to throttle */
2359                         atomic_add(size >> 9, &mdev->rs_sect_in);
2360                         peer_req->w.cb = w_e_end_ov_reply;
2361                         dec_rs_pending(mdev);
2362                         /* drbd_rs_begin_io done when we sent this request,
2363                          * but accounting still needs to be done. */
2364                         goto submit_for_resync;
2365                 }
2366                 break;
2367
2368         case P_OV_REQUEST:
2369                 if (mdev->ov_start_sector == ~(sector_t)0 &&
2370                     mdev->tconn->agreed_pro_version >= 90) {
2371                         unsigned long now = jiffies;
2372                         int i;
2373                         mdev->ov_start_sector = sector;
2374                         mdev->ov_position = sector;
2375                         mdev->ov_left = drbd_bm_bits(mdev) - BM_SECT_TO_BIT(sector);
2376                         mdev->rs_total = mdev->ov_left;
2377                         for (i = 0; i < DRBD_SYNC_MARKS; i++) {
2378                                 mdev->rs_mark_left[i] = mdev->ov_left;
2379                                 mdev->rs_mark_time[i] = now;
2380                         }
2381                         dev_info(DEV, "Online Verify start sector: %llu\n",
2382                                         (unsigned long long)sector);
2383                 }
2384                 peer_req->w.cb = w_e_end_ov_req;
2385                 fault_type = DRBD_FAULT_RS_RD;
2386                 break;
2387
2388         default:
2389                 BUG();
2390         }
2391
2392         /* Throttle, drbd_rs_begin_io and submit should become asynchronous
2393          * wrt the receiver, but it is not as straightforward as it may seem.
2394          * Various places in the resync start and stop logic assume resync
2395          * requests are processed in order, requeuing this on the worker thread
2396          * introduces a bunch of new code for synchronization between threads.
2397          *
2398          * Unlimited throttling before drbd_rs_begin_io may stall the resync
2399          * "forever", throttling after drbd_rs_begin_io will lock that extent
2400          * for application writes for the same time.  For now, just throttle
2401          * here, where the rest of the code expects the receiver to sleep for
2402          * a while, anyways.
2403          */
2404
2405         /* Throttle before drbd_rs_begin_io, as that locks out application IO;
2406          * this defers syncer requests for some time, before letting at least
2407          * on request through.  The resync controller on the receiving side
2408          * will adapt to the incoming rate accordingly.
2409          *
2410          * We cannot throttle here if remote is Primary/SyncTarget:
2411          * we would also throttle its application reads.
2412          * In that case, throttling is done on the SyncTarget only.
2413          */
2414         if (mdev->state.peer != R_PRIMARY && drbd_rs_should_slow_down(mdev, sector))
2415                 schedule_timeout_uninterruptible(HZ/10);
2416         if (drbd_rs_begin_io(mdev, sector))
2417                 goto out_free_e;
2418
2419 submit_for_resync:
2420         atomic_add(size >> 9, &mdev->rs_sect_ev);
2421
2422 submit:
2423         inc_unacked(mdev);
2424         spin_lock_irq(&mdev->tconn->req_lock);
2425         list_add_tail(&peer_req->w.list, &mdev->read_ee);
2426         spin_unlock_irq(&mdev->tconn->req_lock);
2427
2428         if (drbd_submit_peer_request(mdev, peer_req, READ, fault_type) == 0)
2429                 return 0;
2430
2431         /* don't care for the reason here */
2432         dev_err(DEV, "submit failed, triggering re-connect\n");
2433         spin_lock_irq(&mdev->tconn->req_lock);
2434         list_del(&peer_req->w.list);
2435         spin_unlock_irq(&mdev->tconn->req_lock);
2436         /* no drbd_rs_complete_io(), we are dropping the connection anyways */
2437
2438 out_free_e:
2439         put_ldev(mdev);
2440         drbd_free_peer_req(mdev, peer_req);
2441         return -EIO;
2442 }
2443
2444 static int drbd_asb_recover_0p(struct drbd_conf *mdev) __must_hold(local)
2445 {
2446         int self, peer, rv = -100;
2447         unsigned long ch_self, ch_peer;
2448         enum drbd_after_sb_p after_sb_0p;
2449
2450         self = mdev->ldev->md.uuid[UI_BITMAP] & 1;
2451         peer = mdev->p_uuid[UI_BITMAP] & 1;
2452
2453         ch_peer = mdev->p_uuid[UI_SIZE];
2454         ch_self = mdev->comm_bm_set;
2455
2456         rcu_read_lock();
2457         after_sb_0p = rcu_dereference(mdev->tconn->net_conf)->after_sb_0p;
2458         rcu_read_unlock();
2459         switch (after_sb_0p) {
2460         case ASB_CONSENSUS:
2461         case ASB_DISCARD_SECONDARY:
2462         case ASB_CALL_HELPER:
2463         case ASB_VIOLENTLY:
2464                 dev_err(DEV, "Configuration error.\n");
2465                 break;
2466         case ASB_DISCONNECT:
2467                 break;
2468         case ASB_DISCARD_YOUNGER_PRI:
2469                 if (self == 0 && peer == 1) {
2470                         rv = -1;
2471                         break;
2472                 }
2473                 if (self == 1 && peer == 0) {
2474                         rv =  1;
2475                         break;
2476                 }
2477                 /* Else fall through to one of the other strategies... */
2478         case ASB_DISCARD_OLDER_PRI:
2479                 if (self == 0 && peer == 1) {
2480                         rv = 1;
2481                         break;
2482                 }
2483                 if (self == 1 && peer == 0) {
2484                         rv = -1;
2485                         break;
2486                 }
2487                 /* Else fall through to one of the other strategies... */
2488                 dev_warn(DEV, "Discard younger/older primary did not find a decision\n"
2489                      "Using discard-least-changes instead\n");
2490         case ASB_DISCARD_ZERO_CHG:
2491                 if (ch_peer == 0 && ch_self == 0) {
2492                         rv = test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags)
2493                                 ? -1 : 1;
2494                         break;
2495                 } else {
2496                         if (ch_peer == 0) { rv =  1; break; }
2497                         if (ch_self == 0) { rv = -1; break; }
2498                 }
2499                 if (after_sb_0p == ASB_DISCARD_ZERO_CHG)
2500                         break;
2501         case ASB_DISCARD_LEAST_CHG:
2502                 if      (ch_self < ch_peer)
2503                         rv = -1;
2504                 else if (ch_self > ch_peer)
2505                         rv =  1;
2506                 else /* ( ch_self == ch_peer ) */
2507                      /* Well, then use something else. */
2508                         rv = test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags)
2509                                 ? -1 : 1;
2510                 break;
2511         case ASB_DISCARD_LOCAL:
2512                 rv = -1;
2513                 break;
2514         case ASB_DISCARD_REMOTE:
2515                 rv =  1;
2516         }
2517
2518         return rv;
2519 }
2520
2521 static int drbd_asb_recover_1p(struct drbd_conf *mdev) __must_hold(local)
2522 {
2523         int hg, rv = -100;
2524         enum drbd_after_sb_p after_sb_1p;
2525
2526         rcu_read_lock();
2527         after_sb_1p = rcu_dereference(mdev->tconn->net_conf)->after_sb_1p;
2528         rcu_read_unlock();
2529         switch (after_sb_1p) {
2530         case ASB_DISCARD_YOUNGER_PRI:
2531         case ASB_DISCARD_OLDER_PRI:
2532         case ASB_DISCARD_LEAST_CHG:
2533         case ASB_DISCARD_LOCAL:
2534         case ASB_DISCARD_REMOTE:
2535         case ASB_DISCARD_ZERO_CHG:
2536                 dev_err(DEV, "Configuration error.\n");
2537                 break;
2538         case ASB_DISCONNECT:
2539                 break;
2540         case ASB_CONSENSUS:
2541                 hg = drbd_asb_recover_0p(mdev);
2542                 if (hg == -1 && mdev->state.role == R_SECONDARY)
2543                         rv = hg;
2544                 if (hg == 1  && mdev->state.role == R_PRIMARY)
2545                         rv = hg;
2546                 break;
2547         case ASB_VIOLENTLY:
2548                 rv = drbd_asb_recover_0p(mdev);
2549                 break;
2550         case ASB_DISCARD_SECONDARY:
2551                 return mdev->state.role == R_PRIMARY ? 1 : -1;
2552         case ASB_CALL_HELPER:
2553                 hg = drbd_asb_recover_0p(mdev);
2554                 if (hg == -1 && mdev->state.role == R_PRIMARY) {
2555                         enum drbd_state_rv rv2;
2556
2557                         drbd_set_role(mdev, R_SECONDARY, 0);
2558                          /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2559                           * we might be here in C_WF_REPORT_PARAMS which is transient.
2560                           * we do not need to wait for the after state change work either. */
2561                         rv2 = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2562                         if (rv2 != SS_SUCCESS) {
2563                                 drbd_khelper(mdev, "pri-lost-after-sb");
2564                         } else {
2565                                 dev_warn(DEV, "Successfully gave up primary role.\n");
2566                                 rv = hg;
2567                         }
2568                 } else
2569                         rv = hg;
2570         }
2571
2572         return rv;
2573 }
2574
2575 static int drbd_asb_recover_2p(struct drbd_conf *mdev) __must_hold(local)
2576 {
2577         int hg, rv = -100;
2578         enum drbd_after_sb_p after_sb_2p;
2579
2580         rcu_read_lock();
2581         after_sb_2p = rcu_dereference(mdev->tconn->net_conf)->after_sb_2p;
2582         rcu_read_unlock();
2583         switch (after_sb_2p) {
2584         case ASB_DISCARD_YOUNGER_PRI:
2585         case ASB_DISCARD_OLDER_PRI:
2586         case ASB_DISCARD_LEAST_CHG:
2587         case ASB_DISCARD_LOCAL:
2588         case ASB_DISCARD_REMOTE:
2589         case ASB_CONSENSUS:
2590         case ASB_DISCARD_SECONDARY:
2591         case ASB_DISCARD_ZERO_CHG:
2592                 dev_err(DEV, "Configuration error.\n");
2593                 break;
2594         case ASB_VIOLENTLY:
2595                 rv = drbd_asb_recover_0p(mdev);
2596                 break;
2597         case ASB_DISCONNECT:
2598                 break;
2599         case ASB_CALL_HELPER:
2600                 hg = drbd_asb_recover_0p(mdev);
2601                 if (hg == -1) {
2602                         enum drbd_state_rv rv2;
2603
2604                          /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2605                           * we might be here in C_WF_REPORT_PARAMS which is transient.
2606                           * we do not need to wait for the after state change work either. */
2607                         rv2 = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2608                         if (rv2 != SS_SUCCESS) {
2609                                 drbd_khelper(mdev, "pri-lost-after-sb");
2610                         } else {
2611                                 dev_warn(DEV, "Successfully gave up primary role.\n");
2612                                 rv = hg;
2613                         }
2614                 } else
2615                         rv = hg;
2616         }
2617
2618         return rv;
2619 }
2620
2621 static void drbd_uuid_dump(struct drbd_conf *mdev, char *text, u64 *uuid,
2622                            u64 bits, u64 flags)
2623 {
2624         if (!uuid) {
2625                 dev_info(DEV, "%s uuid info vanished while I was looking!\n", text);
2626                 return;
2627         }
2628         dev_info(DEV, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",
2629              text,
2630              (unsigned long long)uuid[UI_CURRENT],
2631              (unsigned long long)uuid[UI_BITMAP],
2632              (unsigned long long)uuid[UI_HISTORY_START],
2633              (unsigned long long)uuid[UI_HISTORY_END],
2634              (unsigned long long)bits,
2635              (unsigned long long)flags);
2636 }
2637
2638 /*
2639   100   after split brain try auto recover
2640     2   C_SYNC_SOURCE set BitMap
2641     1   C_SYNC_SOURCE use BitMap
2642     0   no Sync
2643    -1   C_SYNC_TARGET use BitMap
2644    -2   C_SYNC_TARGET set BitMap
2645  -100   after split brain, disconnect
2646 -1000   unrelated data
2647 -1091   requires proto 91
2648 -1096   requires proto 96
2649  */
2650 static int drbd_uuid_compare(struct drbd_conf *mdev, int *rule_nr) __must_hold(local)
2651 {
2652         u64 self, peer;
2653         int i, j;
2654
2655         self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2656         peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2657
2658         *rule_nr = 10;
2659         if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED)
2660                 return 0;
2661
2662         *rule_nr = 20;
2663         if ((self == UUID_JUST_CREATED || self == (u64)0) &&
2664              peer != UUID_JUST_CREATED)
2665                 return -2;
2666
2667         *rule_nr = 30;
2668         if (self != UUID_JUST_CREATED &&
2669             (peer == UUID_JUST_CREATED || peer == (u64)0))
2670                 return 2;
2671
2672         if (self == peer) {
2673                 int rct, dc; /* roles at crash time */
2674
2675                 if (mdev->p_uuid[UI_BITMAP] == (u64)0 && mdev->ldev->md.uuid[UI_BITMAP] != (u64)0) {
2676
2677                         if (mdev->tconn->agreed_pro_version < 91)
2678                                 return -1091;
2679
2680                         if ((mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) &&
2681                             (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) {
2682                                 dev_info(DEV, "was SyncSource, missed the resync finished event, corrected myself:\n");
2683                                 drbd_uuid_set_bm(mdev, 0UL);
2684
2685                                 drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2686                                                mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2687                                 *rule_nr = 34;
2688                         } else {
2689                                 dev_info(DEV, "was SyncSource (peer failed to write sync_uuid)\n");
2690                                 *rule_nr = 36;
2691                         }
2692
2693                         return 1;
2694                 }
2695
2696                 if (mdev->ldev->md.uuid[UI_BITMAP] == (u64)0 && mdev->p_uuid[UI_BITMAP] != (u64)0) {
2697
2698                         if (mdev->tconn->agreed_pro_version < 91)
2699                                 return -1091;
2700
2701                         if ((mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_BITMAP] & ~((u64)1)) &&
2702                             (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1))) {
2703                                 dev_info(DEV, "was SyncTarget, peer missed the resync finished event, corrected peer:\n");
2704
2705                                 mdev->p_uuid[UI_HISTORY_START + 1] = mdev->p_uuid[UI_HISTORY_START];
2706                                 mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_BITMAP];
2707                                 mdev->p_uuid[UI_BITMAP] = 0UL;
2708
2709                                 drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2710                                 *rule_nr = 35;
2711                         } else {
2712                                 dev_info(DEV, "was SyncTarget (failed to write sync_uuid)\n");
2713                                 *rule_nr = 37;
2714                         }
2715
2716                         return -1;
2717                 }
2718
2719                 /* Common power [off|failure] */
2720                 rct = (test_bit(CRASHED_PRIMARY, &mdev->flags) ? 1 : 0) +
2721                         (mdev->p_uuid[UI_FLAGS] & 2);
2722                 /* lowest bit is set when we were primary,
2723                  * next bit (weight 2) is set when peer was primary */
2724                 *rule_nr = 40;
2725
2726                 switch (rct) {
2727                 case 0: /* !self_pri && !peer_pri */ return 0;
2728                 case 1: /*  self_pri && !peer_pri */ return 1;
2729                 case 2: /* !self_pri &&  peer_pri */ return -1;
2730                 case 3: /*  self_pri &&  peer_pri */
2731                         dc = test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags);
2732                         return dc ? -1 : 1;
2733                 }
2734         }
2735
2736         *rule_nr = 50;
2737         peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2738         if (self == peer)
2739                 return -1;
2740
2741         *rule_nr = 51;
2742         peer = mdev->p_uuid[UI_HISTORY_START] & ~((u64)1);
2743         if (self == peer) {
2744                 if (mdev->tconn->agreed_pro_version < 96 ?
2745                     (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) ==
2746                     (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1)) :
2747                     peer + UUID_NEW_BM_OFFSET == (mdev->p_uuid[UI_BITMAP] & ~((u64)1))) {
2748                         /* The last P_SYNC_UUID did not get though. Undo the last start of
2749                            resync as sync source modifications of the peer's UUIDs. */
2750
2751                         if (mdev->tconn->agreed_pro_version < 91)
2752                                 return -1091;
2753
2754                         mdev->p_uuid[UI_BITMAP] = mdev->p_uuid[UI_HISTORY_START];
2755                         mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_HISTORY_START + 1];
2756
2757                         dev_info(DEV, "Did not got last syncUUID packet, corrected:\n");
2758                         drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2759
2760                         return -1;
2761                 }
2762         }
2763
2764         *rule_nr = 60;
2765         self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2766         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2767                 peer = mdev->p_uuid[i] & ~((u64)1);
2768                 if (self == peer)
2769                         return -2;
2770         }
2771
2772         *rule_nr = 70;
2773         self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2774         peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2775         if (self == peer)
2776                 return 1;
2777
2778         *rule_nr = 71;
2779         self = mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
2780         if (self == peer) {
2781                 if (mdev->tconn->agreed_pro_version < 96 ?
2782                     (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) ==
2783                     (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) :
2784                     self + UUID_NEW_BM_OFFSET == (mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1))) {
2785                         /* The last P_SYNC_UUID did not get though. Undo the last start of
2786                            resync as sync source modifications of our UUIDs. */
2787
2788                         if (mdev->tconn->agreed_pro_version < 91)
2789                                 return -1091;
2790
2791                         _drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_HISTORY_START]);
2792                         _drbd_uuid_set(mdev, UI_HISTORY_START, mdev->ldev->md.uuid[UI_HISTORY_START + 1]);
2793
2794                         dev_info(DEV, "Last syncUUID did not get through, corrected:\n");
2795                         drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2796                                        mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2797
2798                         return 1;
2799                 }
2800         }
2801
2802
2803         *rule_nr = 80;
2804         peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2805         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2806                 self = mdev->ldev->md.uuid[i] & ~((u64)1);
2807                 if (self == peer)
2808                         return 2;
2809         }
2810
2811         *rule_nr = 90;
2812         self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2813         peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2814         if (self == peer && self != ((u64)0))
2815                 return 100;
2816
2817         *rule_nr = 100;
2818         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2819                 self = mdev->ldev->md.uuid[i] & ~((u64)1);
2820                 for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) {
2821                         peer = mdev->p_uuid[j] & ~((u64)1);
2822                         if (self == peer)
2823                                 return -100;
2824                 }
2825         }
2826
2827         return -1000;
2828 }
2829
2830 /* drbd_sync_handshake() returns the new conn state on success, or
2831    CONN_MASK (-1) on failure.
2832  */
2833 static enum drbd_conns drbd_sync_handshake(struct drbd_conf *mdev, enum drbd_role peer_role,
2834                                            enum drbd_disk_state peer_disk) __must_hold(local)
2835 {
2836         enum drbd_conns rv = C_MASK;
2837         enum drbd_disk_state mydisk;
2838         struct net_conf *nc;
2839         int hg, rule_nr, rr_conflict, tentative;
2840
2841         mydisk = mdev->state.disk;
2842         if (mydisk == D_NEGOTIATING)
2843                 mydisk = mdev->new_state_tmp.disk;
2844
2845         dev_info(DEV, "drbd_sync_handshake:\n");
2846         drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid, mdev->comm_bm_set, 0);
2847         drbd_uuid_dump(mdev, "peer", mdev->p_uuid,
2848                        mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2849
2850         hg = drbd_uuid_compare(mdev, &rule_nr);
2851
2852         dev_info(DEV, "uuid_compare()=%d by rule %d\n", hg, rule_nr);
2853
2854         if (hg == -1000) {
2855                 dev_alert(DEV, "Unrelated data, aborting!\n");
2856                 return C_MASK;
2857         }
2858         if (hg < -1000) {
2859                 dev_alert(DEV, "To resolve this both sides have to support at least protocol %d\n", -hg - 1000);
2860                 return C_MASK;
2861         }
2862
2863         if    ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) ||
2864             (peer_disk == D_INCONSISTENT && mydisk    > D_INCONSISTENT)) {
2865                 int f = (hg == -100) || abs(hg) == 2;
2866                 hg = mydisk > D_INCONSISTENT ? 1 : -1;
2867                 if (f)
2868                         hg = hg*2;
2869                 dev_info(DEV, "Becoming sync %s due to disk states.\n",
2870                      hg > 0 ? "source" : "target");
2871         }
2872
2873         if (abs(hg) == 100)
2874                 drbd_khelper(mdev, "initial-split-brain");
2875
2876         rcu_read_lock();
2877         nc = rcu_dereference(mdev->tconn->net_conf);
2878
2879         if (hg == 100 || (hg == -100 && nc->always_asbp)) {
2880                 int pcount = (mdev->state.role == R_PRIMARY)
2881                            + (peer_role == R_PRIMARY);
2882                 int forced = (hg == -100);
2883
2884                 switch (pcount) {
2885                 case 0:
2886                         hg = drbd_asb_recover_0p(mdev);
2887                         break;
2888                 case 1:
2889                         hg = drbd_asb_recover_1p(mdev);
2890                         break;
2891                 case 2:
2892                         hg = drbd_asb_recover_2p(mdev);
2893                         break;
2894                 }
2895                 if (abs(hg) < 100) {
2896                         dev_warn(DEV, "Split-Brain detected, %d primaries, "
2897                              "automatically solved. Sync from %s node\n",
2898                              pcount, (hg < 0) ? "peer" : "this");
2899                         if (forced) {
2900                                 dev_warn(DEV, "Doing a full sync, since"
2901                                      " UUIDs where ambiguous.\n");
2902                                 hg = hg*2;
2903                         }
2904                 }
2905         }
2906
2907         if (hg == -100) {
2908                 if (nc->discard_my_data && !(mdev->p_uuid[UI_FLAGS]&1))
2909                         hg = -1;
2910                 if (!nc->discard_my_data && (mdev->p_uuid[UI_FLAGS]&1))
2911                         hg = 1;
2912
2913                 if (abs(hg) < 100)
2914                         dev_warn(DEV, "Split-Brain detected, manually solved. "
2915                              "Sync from %s node\n",
2916                              (hg < 0) ? "peer" : "this");
2917         }
2918         rr_conflict = nc->rr_conflict;
2919         tentative = nc->tentative;
2920         rcu_read_unlock();
2921
2922         if (hg == -100) {
2923                 /* FIXME this log message is not correct if we end up here
2924                  * after an attempted attach on a diskless node.
2925                  * We just refuse to attach -- well, we drop the "connection"
2926                  * to that disk, in a way... */
2927                 dev_alert(DEV, "Split-Brain detected but unresolved, dropping connection!\n");
2928                 drbd_khelper(mdev, "split-brain");
2929                 return C_MASK;
2930         }
2931
2932         if (hg > 0 && mydisk <= D_INCONSISTENT) {
2933                 dev_err(DEV, "I shall become SyncSource, but I am inconsistent!\n");
2934                 return C_MASK;
2935         }
2936
2937         if (hg < 0 && /* by intention we do not use mydisk here. */
2938             mdev->state.role == R_PRIMARY && mdev->state.disk >= D_CONSISTENT) {
2939                 switch (rr_conflict) {
2940                 case ASB_CALL_HELPER:
2941                         drbd_khelper(mdev, "pri-lost");
2942                         /* fall through */
2943                 case ASB_DISCONNECT:
2944                         dev_err(DEV, "I shall become SyncTarget, but I am primary!\n");
2945                         return C_MASK;
2946                 case ASB_VIOLENTLY:
2947                         dev_warn(DEV, "Becoming SyncTarget, violating the stable-data"
2948                              "assumption\n");
2949                 }
2950         }
2951
2952         if (tentative || test_bit(CONN_DRY_RUN, &mdev->tconn->flags)) {
2953                 if (hg == 0)
2954                         dev_info(DEV, "dry-run connect: No resync, would become Connected immediately.\n");
2955                 else
2956                         dev_info(DEV, "dry-run connect: Would become %s, doing a %s resync.",
2957                                  drbd_conn_str(hg > 0 ? C_SYNC_SOURCE : C_SYNC_TARGET),
2958                                  abs(hg) >= 2 ? "full" : "bit-map based");
2959                 return C_MASK;
2960         }
2961
2962         if (abs(hg) >= 2) {
2963                 dev_info(DEV, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n");
2964                 if (drbd_bitmap_io(mdev, &drbd_bmio_set_n_write, "set_n_write from sync_handshake",
2965                                         BM_LOCKED_SET_ALLOWED))
2966                         return C_MASK;
2967         }
2968
2969         if (hg > 0) { /* become sync source. */
2970                 rv = C_WF_BITMAP_S;
2971         } else if (hg < 0) { /* become sync target */
2972                 rv = C_WF_BITMAP_T;
2973         } else {
2974                 rv = C_CONNECTED;
2975                 if (drbd_bm_total_weight(mdev)) {
2976                         dev_info(DEV, "No resync, but %lu bits in bitmap!\n",
2977                              drbd_bm_total_weight(mdev));
2978                 }
2979         }
2980
2981         return rv;
2982 }
2983
2984 static enum drbd_after_sb_p convert_after_sb(enum drbd_after_sb_p peer)
2985 {
2986         /* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */
2987         if (peer == ASB_DISCARD_REMOTE)
2988                 return ASB_DISCARD_LOCAL;
2989
2990         /* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */
2991         if (peer == ASB_DISCARD_LOCAL)
2992                 return ASB_DISCARD_REMOTE;
2993
2994         /* everything else is valid if they are equal on both sides. */
2995         return peer;
2996 }
2997
2998 static int receive_protocol(struct drbd_tconn *tconn, struct packet_info *pi)
2999 {
3000         struct p_protocol *p = pi->data;
3001         enum drbd_after_sb_p p_after_sb_0p, p_after_sb_1p, p_after_sb_2p;
3002         int p_proto, p_discard_my_data, p_two_primaries, cf;
3003         struct net_conf *nc, *old_net_conf, *new_net_conf = NULL;
3004         char integrity_alg[SHARED_SECRET_MAX] = "";
3005         struct crypto_hash *peer_integrity_tfm = NULL;
3006         void *int_dig_in = NULL, *int_dig_vv = NULL;
3007
3008         p_proto         = be32_to_cpu(p->protocol);
3009         p_after_sb_0p   = be32_to_cpu(p->after_sb_0p);
3010         p_after_sb_1p   = be32_to_cpu(p->after_sb_1p);
3011         p_after_sb_2p   = be32_to_cpu(p->after_sb_2p);
3012         p_two_primaries = be32_to_cpu(p->two_primaries);
3013         cf              = be32_to_cpu(p->conn_flags);
3014         p_discard_my_data = cf & CF_DISCARD_MY_DATA;
3015
3016         if (tconn->agreed_pro_version >= 87) {
3017                 int err;
3018
3019                 if (pi->size > sizeof(integrity_alg))
3020                         return -EIO;
3021                 err = drbd_recv_all(tconn, integrity_alg, pi->size);
3022                 if (err)
3023                         return err;
3024                 integrity_alg[SHARED_SECRET_MAX - 1] = 0;
3025         }
3026
3027         if (pi->cmd != P_PROTOCOL_UPDATE) {
3028                 clear_bit(CONN_DRY_RUN, &tconn->flags);
3029
3030                 if (cf & CF_DRY_RUN)
3031                         set_bit(CONN_DRY_RUN, &tconn->flags);
3032
3033                 rcu_read_lock();
3034                 nc = rcu_dereference(tconn->net_conf);
3035
3036                 if (p_proto != nc->wire_protocol) {
3037                         conn_err(tconn, "incompatible %s settings\n", "protocol");
3038                         goto disconnect_rcu_unlock;
3039                 }
3040
3041                 if (convert_after_sb(p_after_sb_0p) != nc->after_sb_0p) {
3042                         conn_err(tconn, "incompatible %s settings\n", "after-sb-0pri");
3043                         goto disconnect_rcu_unlock;
3044                 }
3045
3046                 if (convert_after_sb(p_after_sb_1p) != nc->after_sb_1p) {
3047                         conn_err(tconn, "incompatible %s settings\n", "after-sb-1pri");
3048                         goto disconnect_rcu_unlock;
3049                 }
3050
3051                 if (convert_after_sb(p_after_sb_2p) != nc->after_sb_2p) {
3052                         conn_err(tconn, "incompatible %s settings\n", "after-sb-2pri");
3053                         goto disconnect_rcu_unlock;
3054                 }
3055
3056                 if (p_discard_my_data && nc->discard_my_data) {
3057                         conn_err(tconn, "incompatible %s settings\n", "discard-my-data");
3058                         goto disconnect_rcu_unlock;
3059                 }
3060
3061                 if (p_two_primaries != nc->two_primaries) {
3062                         conn_err(tconn, "incompatible %s settings\n", "allow-two-primaries");
3063                         goto disconnect_rcu_unlock;
3064                 }
3065
3066                 if (strcmp(integrity_alg, nc->integrity_alg)) {
3067                         conn_err(tconn, "incompatible %s settings\n", "data-integrity-alg");
3068                         goto disconnect_rcu_unlock;
3069                 }
3070
3071                 rcu_read_unlock();
3072         }
3073
3074         if (integrity_alg[0]) {
3075                 int hash_size;
3076
3077                 /*
3078                  * We can only change the peer data integrity algorithm
3079                  * here.  Changing our own data integrity algorithm
3080                  * requires that we send a P_PROTOCOL_UPDATE packet at
3081                  * the same time; otherwise, the peer has no way to
3082                  * tell between which packets the algorithm should
3083                  * change.
3084                  */
3085
3086                 peer_integrity_tfm = crypto_alloc_hash(integrity_alg, 0, CRYPTO_ALG_ASYNC);
3087                 if (!peer_integrity_tfm) {
3088                         conn_err(tconn, "peer data-integrity-alg %s not supported\n",
3089                                  integrity_alg);
3090                         goto disconnect;
3091                 }
3092
3093                 hash_size = crypto_hash_digestsize(peer_integrity_tfm);
3094                 int_dig_in = kmalloc(hash_size, GFP_KERNEL);
3095                 int_dig_vv = kmalloc(hash_size, GFP_KERNEL);
3096                 if (!(int_dig_in && int_dig_vv)) {
3097                         conn_err(tconn, "Allocation of buffers for data integrity checking failed\n");
3098                         goto disconnect;
3099                 }
3100         }
3101
3102         new_net_conf = kmalloc(sizeof(struct net_conf), GFP_KERNEL);
3103         if (!new_net_conf) {
3104                 conn_err(tconn, "Allocation of new net_conf failed\n");
3105                 goto disconnect;
3106         }
3107
3108         mutex_lock(&tconn->data.mutex);
3109         mutex_lock(&tconn->conf_update);
3110         old_net_conf = tconn->net_conf;
3111         *new_net_conf = *old_net_conf;
3112
3113         new_net_conf->wire_protocol = p_proto;
3114         new_net_conf->after_sb_0p = convert_after_sb(p_after_sb_0p);
3115         new_net_conf->after_sb_1p = convert_after_sb(p_after_sb_1p);
3116         new_net_conf->after_sb_2p = convert_after_sb(p_after_sb_2p);
3117         new_net_conf->two_primaries = p_two_primaries;
3118
3119         rcu_assign_pointer(tconn->net_conf, new_net_conf);
3120         mutex_unlock(&tconn->conf_update);
3121         mutex_unlock(&tconn->data.mutex);
3122
3123         crypto_free_hash(tconn->peer_integrity_tfm);
3124         kfree(tconn->int_dig_in);
3125         kfree(tconn->int_dig_vv);
3126         tconn->peer_integrity_tfm = peer_integrity_tfm;
3127         tconn->int_dig_in = int_dig_in;
3128         tconn->int_dig_vv = int_dig_vv;
3129
3130         if (strcmp(old_net_conf->integrity_alg, integrity_alg))
3131                 conn_info(tconn, "peer data-integrity-alg: %s\n",
3132                           integrity_alg[0] ? integrity_alg : "(none)");
3133
3134         synchronize_rcu();
3135         kfree(old_net_conf);
3136         return 0;
3137
3138 disconnect_rcu_unlock:
3139         rcu_read_unlock();
3140 disconnect:
3141         crypto_free_hash(peer_integrity_tfm);
3142         kfree(int_dig_in);
3143         kfree(int_dig_vv);
3144         conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3145         return -EIO;
3146 }
3147
3148 /* helper function
3149  * input: alg name, feature name
3150  * return: NULL (alg name was "")
3151  *         ERR_PTR(error) if something goes wrong
3152  *         or the crypto hash ptr, if it worked out ok. */
3153 struct crypto_hash *drbd_crypto_alloc_digest_safe(const struct drbd_conf *mdev,
3154                 const char *alg, const char *name)
3155 {
3156         struct crypto_hash *tfm;
3157
3158         if (!alg[0])
3159                 return NULL;
3160
3161         tfm = crypto_alloc_hash(alg, 0, CRYPTO_ALG_ASYNC);
3162         if (IS_ERR(tfm)) {
3163                 dev_err(DEV, "Can not allocate \"%s\" as %s (reason: %ld)\n",
3164                         alg, name, PTR_ERR(tfm));
3165                 return tfm;
3166         }
3167         return tfm;
3168 }
3169
3170 static int ignore_remaining_packet(struct drbd_tconn *tconn, struct packet_info *pi)
3171 {
3172         void *buffer = tconn->data.rbuf;
3173         int size = pi->size;
3174
3175         while (size) {
3176                 int s = min_t(int, size, DRBD_SOCKET_BUFFER_SIZE);
3177                 s = drbd_recv(tconn, buffer, s);
3178                 if (s <= 0) {
3179                         if (s < 0)
3180                                 return s;
3181                         break;
3182                 }
3183                 size -= s;
3184         }
3185         if (size)
3186                 return -EIO;
3187         return 0;
3188 }
3189
3190 /*
3191  * config_unknown_volume  -  device configuration command for unknown volume
3192  *
3193  * When a device is added to an existing connection, the node on which the
3194  * device is added first will send configuration commands to its peer but the
3195  * peer will not know about the device yet.  It will warn and ignore these
3196  * commands.  Once the device is added on the second node, the second node will
3197  * send the same device configuration commands, but in the other direction.
3198  *
3199  * (We can also end up here if drbd is misconfigured.)
3200  */
3201 static int config_unknown_volume(struct drbd_tconn *tconn, struct packet_info *pi)
3202 {
3203         conn_warn(tconn, "%s packet received for volume %u, which is not configured locally\n",
3204                   cmdname(pi->cmd), pi->vnr);
3205         return ignore_remaining_packet(tconn, pi);
3206 }
3207
3208 static int receive_SyncParam(struct drbd_tconn *tconn, struct packet_info *pi)
3209 {
3210         struct drbd_conf *mdev;
3211         struct p_rs_param_95 *p;
3212         unsigned int header_size, data_size, exp_max_sz;
3213         struct crypto_hash *verify_tfm = NULL;
3214         struct crypto_hash *csums_tfm = NULL;
3215         struct net_conf *old_net_conf, *new_net_conf = NULL;
3216         struct disk_conf *old_disk_conf = NULL, *new_disk_conf = NULL;
3217         const int apv = tconn->agreed_pro_version;
3218         struct fifo_buffer *old_plan = NULL, *new_plan = NULL;
3219         int fifo_size = 0;
3220         int err;
3221
3222         mdev = vnr_to_mdev(tconn, pi->vnr);
3223         if (!mdev)
3224                 return config_unknown_volume(tconn, pi);
3225
3226         exp_max_sz  = apv <= 87 ? sizeof(struct p_rs_param)
3227                     : apv == 88 ? sizeof(struct p_rs_param)
3228                                         + SHARED_SECRET_MAX
3229                     : apv <= 94 ? sizeof(struct p_rs_param_89)
3230                     : /* apv >= 95 */ sizeof(struct p_rs_param_95);
3231
3232         if (pi->size > exp_max_sz) {
3233                 dev_err(DEV, "SyncParam packet too long: received %u, expected <= %u bytes\n",
3234                     pi->size, exp_max_sz);
3235                 return -EIO;
3236         }
3237
3238         if (apv <= 88) {
3239                 header_size = sizeof(struct p_rs_param);
3240                 data_size = pi->size - header_size;
3241         } else if (apv <= 94) {
3242                 header_size = sizeof(struct p_rs_param_89);
3243                 data_size = pi->size - header_size;
3244                 D_ASSERT(data_size == 0);
3245         } else {
3246                 header_size = sizeof(struct p_rs_param_95);
3247                 data_size = pi->size - header_size;
3248                 D_ASSERT(data_size == 0);
3249         }
3250
3251         /* initialize verify_alg and csums_alg */
3252         p = pi->data;
3253         memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
3254
3255         err = drbd_recv_all(mdev->tconn, p, header_size);
3256         if (err)
3257                 return err;
3258
3259         mutex_lock(&mdev->tconn->conf_update);
3260         old_net_conf = mdev->tconn->net_conf;
3261         if (get_ldev(mdev)) {
3262                 new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
3263                 if (!new_disk_conf) {
3264                         put_ldev(mdev);
3265                         mutex_unlock(&mdev->tconn->conf_update);
3266                         dev_err(DEV, "Allocation of new disk_conf failed\n");
3267                         return -ENOMEM;
3268                 }
3269
3270                 old_disk_conf = mdev->ldev->disk_conf;
3271                 *new_disk_conf = *old_disk_conf;
3272
3273                 new_disk_conf->resync_rate = be32_to_cpu(p->resync_rate);
3274         }
3275
3276         if (apv >= 88) {
3277                 if (apv == 88) {
3278                         if (data_size > SHARED_SECRET_MAX) {
3279                                 dev_err(DEV, "verify-alg too long, "
3280                                     "peer wants %u, accepting only %u byte\n",
3281                                                 data_size, SHARED_SECRET_MAX);
3282                                 err = -EIO;
3283                                 goto reconnect;
3284                         }
3285
3286                         err = drbd_recv_all(mdev->tconn, p->verify_alg, data_size);
3287                         if (err)
3288                                 goto reconnect;
3289                         /* we expect NUL terminated string */
3290                         /* but just in case someone tries to be evil */
3291                         D_ASSERT(p->verify_alg[data_size-1] == 0);
3292                         p->verify_alg[data_size-1] = 0;
3293
3294                 } else /* apv >= 89 */ {
3295                         /* we still expect NUL terminated strings */
3296                         /* but just in case someone tries to be evil */
3297                         D_ASSERT(p->verify_alg[SHARED_SECRET_MAX-1] == 0);
3298                         D_ASSERT(p->csums_alg[SHARED_SECRET_MAX-1] == 0);
3299                         p->verify_alg[SHARED_SECRET_MAX-1] = 0;
3300                         p->csums_alg[SHARED_SECRET_MAX-1] = 0;
3301                 }
3302
3303                 if (strcmp(old_net_conf->verify_alg, p->verify_alg)) {
3304                         if (mdev->state.conn == C_WF_REPORT_PARAMS) {
3305                                 dev_err(DEV, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n",
3306                                     old_net_conf->verify_alg, p->verify_alg);
3307                                 goto disconnect;
3308                         }
3309                         verify_tfm = drbd_crypto_alloc_digest_safe(mdev,
3310                                         p->verify_alg, "verify-alg");
3311                         if (IS_ERR(verify_tfm)) {
3312                                 verify_tfm = NULL;
3313                                 goto disconnect;
3314                         }
3315                 }
3316
3317                 if (apv >= 89 && strcmp(old_net_conf->csums_alg, p->csums_alg)) {
3318                         if (mdev->state.conn == C_WF_REPORT_PARAMS) {
3319                                 dev_err(DEV, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n",
3320                                     old_net_conf->csums_alg, p->csums_alg);
3321                                 goto disconnect;
3322                         }
3323                         csums_tfm = drbd_crypto_alloc_digest_safe(mdev,
3324                                         p->csums_alg, "csums-alg");
3325                         if (IS_ERR(csums_tfm)) {
3326                                 csums_tfm = NULL;
3327                                 goto disconnect;
3328                         }
3329                 }
3330
3331                 if (apv > 94 && new_disk_conf) {
3332                         new_disk_conf->c_plan_ahead = be32_to_cpu(p->c_plan_ahead);
3333                         new_disk_conf->c_delay_target = be32_to_cpu(p->c_delay_target);
3334                         new_disk_conf->c_fill_target = be32_to_cpu(p->c_fill_target);
3335                         new_disk_conf->c_max_rate = be32_to_cpu(p->c_max_rate);
3336
3337                         fifo_size = (new_disk_conf->c_plan_ahead * 10 * SLEEP_TIME) / HZ;
3338                         if (fifo_size != mdev->rs_plan_s->size) {
3339                                 new_plan = fifo_alloc(fifo_size);
3340                                 if (!new_plan) {
3341                                         dev_err(DEV, "kmalloc of fifo_buffer failed");
3342                                         put_ldev(mdev);
3343                                         goto disconnect;
3344                                 }
3345                         }
3346                 }
3347
3348                 if (verify_tfm || csums_tfm) {
3349                         new_net_conf = kzalloc(sizeof(struct net_conf), GFP_KERNEL);
3350                         if (!new_net_conf) {
3351                                 dev_err(DEV, "Allocation of new net_conf failed\n");
3352                                 goto disconnect;
3353                         }
3354
3355                         *new_net_conf = *old_net_conf;
3356
3357                         if (verify_tfm) {
3358                                 strcpy(new_net_conf->verify_alg, p->verify_alg);
3359                                 new_net_conf->verify_alg_len = strlen(p->verify_alg) + 1;
3360                                 crypto_free_hash(mdev->tconn->verify_tfm);
3361                                 mdev->tconn->verify_tfm = verify_tfm;
3362                                 dev_info(DEV, "using verify-alg: \"%s\"\n", p->verify_alg);
3363                         }
3364                         if (csums_tfm) {
3365                                 strcpy(new_net_conf->csums_alg, p->csums_alg);
3366                                 new_net_conf->csums_alg_len = strlen(p->csums_alg) + 1;
3367                                 crypto_free_hash(mdev->tconn->csums_tfm);
3368                                 mdev->tconn->csums_tfm = csums_tfm;
3369                                 dev_info(DEV, "using csums-alg: \"%s\"\n", p->csums_alg);
3370                         }
3371                         rcu_assign_pointer(tconn->net_conf, new_net_conf);
3372                 }
3373         }
3374
3375         if (new_disk_conf) {
3376                 rcu_assign_pointer(mdev->ldev->disk_conf, new_disk_conf);
3377                 put_ldev(mdev);
3378         }
3379
3380         if (new_plan) {
3381                 old_plan = mdev->rs_plan_s;
3382                 rcu_assign_pointer(mdev->rs_plan_s, new_plan);
3383         }
3384
3385         mutex_unlock(&mdev->tconn->conf_update);
3386         synchronize_rcu();
3387         if (new_net_conf)
3388                 kfree(old_net_conf);
3389         kfree(old_disk_conf);
3390         kfree(old_plan);
3391
3392         return 0;
3393
3394 reconnect:
3395         if (new_disk_conf) {
3396                 put_ldev(mdev);
3397                 kfree(new_disk_conf);
3398         }
3399         mutex_unlock(&mdev->tconn->conf_update);
3400         return -EIO;
3401
3402 disconnect:
3403         kfree(new_plan);
3404         if (new_disk_conf) {
3405                 put_ldev(mdev);
3406                 kfree(new_disk_conf);
3407         }
3408         mutex_unlock(&mdev->tconn->conf_update);
3409         /* just for completeness: actually not needed,
3410          * as this is not reached if csums_tfm was ok. */
3411         crypto_free_hash(csums_tfm);
3412         /* but free the verify_tfm again, if csums_tfm did not work out */
3413         crypto_free_hash(verify_tfm);
3414         conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3415         return -EIO;
3416 }
3417
3418 /* warn if the arguments differ by more than 12.5% */
3419 static void warn_if_differ_considerably(struct drbd_conf *mdev,
3420         const char *s, sector_t a, sector_t b)
3421 {
3422         sector_t d;
3423         if (a == 0 || b == 0)
3424                 return;
3425         d = (a > b) ? (a - b) : (b - a);
3426         if (d > (a>>3) || d > (b>>3))
3427                 dev_warn(DEV, "Considerable difference in %s: %llus vs. %llus\n", s,
3428                      (unsigned long long)a, (unsigned long long)b);
3429 }
3430
3431 static int receive_sizes(struct drbd_tconn *tconn, struct packet_info *pi)
3432 {
3433         struct drbd_conf *mdev;
3434         struct p_sizes *p = pi->data;
3435         enum determine_dev_size dd = unchanged;
3436         sector_t p_size, p_usize, my_usize;
3437         int ldsc = 0; /* local disk size changed */
3438         enum dds_flags ddsf;
3439
3440         mdev = vnr_to_mdev(tconn, pi->vnr);
3441         if (!mdev)
3442                 return config_unknown_volume(tconn, pi);
3443
3444         p_size = be64_to_cpu(p->d_size);
3445         p_usize = be64_to_cpu(p->u_size);
3446
3447         /* just store the peer's disk size for now.
3448          * we still need to figure out whether we accept that. */
3449         mdev->p_size = p_size;
3450
3451         if (get_ldev(mdev)) {
3452                 rcu_read_lock();
3453                 my_usize = rcu_dereference(mdev->ldev->disk_conf)->disk_size;
3454                 rcu_read_unlock();
3455
3456                 warn_if_differ_considerably(mdev, "lower level device sizes",
3457                            p_size, drbd_get_max_capacity(mdev->ldev));
3458                 warn_if_differ_considerably(mdev, "user requested size",
3459                                             p_usize, my_usize);
3460
3461                 /* if this is the first connect, or an otherwise expected
3462                  * param exchange, choose the minimum */
3463                 if (mdev->state.conn == C_WF_REPORT_PARAMS)
3464                         p_usize = min_not_zero(my_usize, p_usize);
3465
3466                 /* Never shrink a device with usable data during connect.
3467                    But allow online shrinking if we are connected. */
3468                 if (drbd_new_dev_size(mdev, mdev->ldev, p_usize, 0) <
3469                     drbd_get_capacity(mdev->this_bdev) &&
3470                     mdev->state.disk >= D_OUTDATED &&
3471                     mdev->state.conn < C_CONNECTED) {
3472                         dev_err(DEV, "The peer's disk size is too small!\n");
3473                         conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3474                         put_ldev(mdev);
3475                         return -EIO;
3476                 }
3477
3478                 if (my_usize != p_usize) {
3479                         struct disk_conf *old_disk_conf, *new_disk_conf = NULL;
3480
3481                         new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
3482                         if (!new_disk_conf) {
3483                                 dev_err(DEV, "Allocation of new disk_conf failed\n");
3484                                 put_ldev(mdev);
3485                                 return -ENOMEM;
3486                         }
3487
3488                         mutex_lock(&mdev->tconn->conf_update);
3489                         old_disk_conf = mdev->ldev->disk_conf;
3490                         *new_disk_conf = *old_disk_conf;
3491                         new_disk_conf->disk_size = p_usize;
3492
3493                         rcu_assign_pointer(mdev->ldev->disk_conf, new_disk_conf);
3494                         mutex_unlock(&mdev->tconn->conf_update);
3495                         synchronize_rcu();
3496                         kfree(old_disk_conf);
3497
3498                         dev_info(DEV, "Peer sets u_size to %lu sectors\n",
3499                                  (unsigned long)my_usize);
3500                 }
3501
3502                 put_ldev(mdev);
3503         }
3504
3505         ddsf = be16_to_cpu(p->dds_flags);
3506         if (get_ldev(mdev)) {
3507                 dd = drbd_determine_dev_size(mdev, ddsf);
3508                 put_ldev(mdev);
3509                 if (dd == dev_size_error)
3510                         return -EIO;
3511                 drbd_md_sync(mdev);
3512         } else {
3513                 /* I am diskless, need to accept the peer's size. */
3514                 drbd_set_my_capacity(mdev, p_size);
3515         }
3516
3517         mdev->peer_max_bio_size = be32_to_cpu(p->max_bio_size);
3518         drbd_reconsider_max_bio_size(mdev);
3519
3520         if (get_ldev(mdev)) {
3521                 if (mdev->ldev->known_size != drbd_get_capacity(mdev->ldev->backing_bdev)) {
3522                         mdev->ldev->known_size = drbd_get_capacity(mdev->ldev->backing_bdev);
3523                         ldsc = 1;
3524                 }
3525
3526                 put_ldev(mdev);
3527         }
3528
3529         if (mdev->state.conn > C_WF_REPORT_PARAMS) {
3530                 if (be64_to_cpu(p->c_size) !=
3531                     drbd_get_capacity(mdev->this_bdev) || ldsc) {
3532                         /* we have different sizes, probably peer
3533                          * needs to know my new size... */
3534                         drbd_send_sizes(mdev, 0, ddsf);
3535                 }
3536                 if (test_and_clear_bit(RESIZE_PENDING, &mdev->flags) ||
3537                     (dd == grew && mdev->state.conn == C_CONNECTED)) {
3538                         if (mdev->state.pdsk >= D_INCONSISTENT &&
3539                             mdev->state.disk >= D_INCONSISTENT) {
3540                                 if (ddsf & DDSF_NO_RESYNC)
3541                                         dev_info(DEV, "Resync of new storage suppressed with --assume-clean\n");
3542                                 else
3543                                         resync_after_online_grow(mdev);
3544                         } else
3545                                 set_bit(RESYNC_AFTER_NEG, &mdev->flags);
3546                 }
3547         }
3548
3549         return 0;
3550 }
3551
3552 static int receive_uuids(struct drbd_tconn *tconn, struct packet_info *pi)
3553 {
3554         struct drbd_conf *mdev;
3555         struct p_uuids *p = pi->data;
3556         u64 *p_uuid;
3557         int i, updated_uuids = 0;
3558
3559         mdev = vnr_to_mdev(tconn, pi->vnr);
3560         if (!mdev)
3561                 return config_unknown_volume(tconn, pi);
3562
3563         p_uuid = kmalloc(sizeof(u64)*UI_EXTENDED_SIZE, GFP_NOIO);
3564
3565         for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++)
3566                 p_uuid[i] = be64_to_cpu(p->uuid[i]);
3567
3568         kfree(mdev->p_uuid);
3569         mdev->p_uuid = p_uuid;
3570
3571         if (mdev->state.conn < C_CONNECTED &&
3572             mdev->state.disk < D_INCONSISTENT &&
3573             mdev->state.role == R_PRIMARY &&
3574             (mdev->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) {
3575                 dev_err(DEV, "Can only connect to data with current UUID=%016llX\n",
3576                     (unsigned long long)mdev->ed_uuid);
3577                 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3578                 return -EIO;
3579         }
3580
3581         if (get_ldev(mdev)) {
3582                 int skip_initial_sync =
3583                         mdev->state.conn == C_CONNECTED &&
3584                         mdev->tconn->agreed_pro_version >= 90 &&
3585                         mdev->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED &&
3586                         (p_uuid[UI_FLAGS] & 8);
3587                 if (skip_initial_sync) {
3588                         dev_info(DEV, "Accepted new current UUID, preparing to skip initial sync\n");
3589                         drbd_bitmap_io(mdev, &drbd_bmio_clear_n_write,
3590                                         "clear_n_write from receive_uuids",
3591                                         BM_LOCKED_TEST_ALLOWED);
3592                         _drbd_uuid_set(mdev, UI_CURRENT, p_uuid[UI_CURRENT]);
3593                         _drbd_uuid_set(mdev, UI_BITMAP, 0);
3594                         _drbd_set_state(_NS2(mdev, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE),
3595                                         CS_VERBOSE, NULL);
3596                         drbd_md_sync(mdev);
3597                         updated_uuids = 1;
3598                 }
3599                 put_ldev(mdev);
3600         } else if (mdev->state.disk < D_INCONSISTENT &&
3601                    mdev->state.role == R_PRIMARY) {
3602                 /* I am a diskless primary, the peer just created a new current UUID
3603                    for me. */
3604                 updated_uuids = drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
3605         }
3606
3607         /* Before we test for the disk state, we should wait until an eventually
3608            ongoing cluster wide state change is finished. That is important if
3609            we are primary and are detaching from our disk. We need to see the
3610            new disk state... */
3611         mutex_lock(mdev->state_mutex);
3612         mutex_unlock(mdev->state_mutex);
3613         if (mdev->state.conn >= C_CONNECTED && mdev->state.disk < D_INCONSISTENT)
3614                 updated_uuids |= drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
3615
3616         if (updated_uuids)
3617                 drbd_print_uuids(mdev, "receiver updated UUIDs to");
3618
3619         return 0;
3620 }
3621
3622 /**
3623  * convert_state() - Converts the peer's view of the cluster state to our point of view
3624  * @ps:         The state as seen by the peer.
3625  */
3626 static union drbd_state convert_state(union drbd_state ps)
3627 {
3628         union drbd_state ms;
3629
3630         static enum drbd_conns c_tab[] = {
3631                 [C_WF_REPORT_PARAMS] = C_WF_REPORT_PARAMS,
3632                 [C_CONNECTED] = C_CONNECTED,
3633
3634                 [C_STARTING_SYNC_S] = C_STARTING_SYNC_T,
3635                 [C_STARTING_SYNC_T] = C_STARTING_SYNC_S,
3636                 [C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */
3637                 [C_VERIFY_S]       = C_VERIFY_T,
3638                 [C_MASK]   = C_MASK,
3639         };
3640
3641         ms.i = ps.i;
3642
3643         ms.conn = c_tab[ps.conn];
3644         ms.peer = ps.role;
3645         ms.role = ps.peer;
3646         ms.pdsk = ps.disk;
3647         ms.disk = ps.pdsk;
3648         ms.peer_isp = (ps.aftr_isp | ps.user_isp);
3649
3650         return ms;
3651 }
3652
3653 static int receive_req_state(struct drbd_tconn *tconn, struct packet_info *pi)
3654 {
3655         struct drbd_conf *mdev;
3656         struct p_req_state *p = pi->data;
3657         union drbd_state mask, val;
3658         enum drbd_state_rv rv;
3659
3660         mdev = vnr_to_mdev(tconn, pi->vnr);
3661         if (!mdev)
3662                 return -EIO;
3663
3664         mask.i = be32_to_cpu(p->mask);
3665         val.i = be32_to_cpu(p->val);
3666
3667         if (test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags) &&
3668             mutex_is_locked(mdev->state_mutex)) {
3669                 drbd_send_sr_reply(mdev, SS_CONCURRENT_ST_CHG);
3670                 return 0;
3671         }
3672
3673         mask = convert_state(mask);
3674         val = convert_state(val);
3675
3676         rv = drbd_change_state(mdev, CS_VERBOSE, mask, val);
3677         drbd_send_sr_reply(mdev, rv);
3678
3679         drbd_md_sync(mdev);
3680
3681         return 0;
3682 }
3683
3684 static int receive_req_conn_state(struct drbd_tconn *tconn, struct packet_info *pi)
3685 {
3686         struct p_req_state *p = pi->data;
3687         union drbd_state mask, val;
3688         enum drbd_state_rv rv;
3689
3690         mask.i = be32_to_cpu(p->mask);
3691         val.i = be32_to_cpu(p->val);
3692
3693         if (test_bit(DISCARD_CONCURRENT, &tconn->flags) &&
3694             mutex_is_locked(&tconn->cstate_mutex)) {
3695                 conn_send_sr_reply(tconn, SS_CONCURRENT_ST_CHG);
3696                 return 0;
3697         }
3698
3699         mask = convert_state(mask);
3700         val = convert_state(val);
3701
3702         rv = conn_request_state(tconn, mask, val, CS_VERBOSE | CS_LOCAL_ONLY | CS_IGN_OUTD_FAIL);
3703         conn_send_sr_reply(tconn, rv);
3704
3705         return 0;
3706 }
3707
3708 static int receive_state(struct drbd_tconn *tconn, struct packet_info *pi)
3709 {
3710         struct drbd_conf *mdev;
3711         struct p_state *p = pi->data;
3712         union drbd_state os, ns, peer_state;
3713         enum drbd_disk_state real_peer_disk;
3714         enum chg_state_flags cs_flags;
3715         int rv;
3716
3717         mdev = vnr_to_mdev(tconn, pi->vnr);
3718         if (!mdev)
3719                 return config_unknown_volume(tconn, pi);
3720
3721         peer_state.i = be32_to_cpu(p->state);
3722
3723         real_peer_disk = peer_state.disk;
3724         if (peer_state.disk == D_NEGOTIATING) {
3725                 real_peer_disk = mdev->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT;
3726                 dev_info(DEV, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk));
3727         }
3728
3729         spin_lock_irq(&mdev->tconn->req_lock);
3730  retry:
3731         os = ns = drbd_read_state(mdev);
3732         spin_unlock_irq(&mdev->tconn->req_lock);
3733
3734         /* peer says his disk is uptodate, while we think it is inconsistent,
3735          * and this happens while we think we have a sync going on. */
3736         if (os.pdsk == D_INCONSISTENT && real_peer_disk == D_UP_TO_DATE &&
3737             os.conn > C_CONNECTED && os.disk == D_UP_TO_DATE) {
3738                 /* If we are (becoming) SyncSource, but peer is still in sync
3739                  * preparation, ignore its uptodate-ness to avoid flapping, it
3740                  * will change to inconsistent once the peer reaches active
3741                  * syncing states.
3742                  * It may have changed syncer-paused flags, however, so we
3743                  * cannot ignore this completely. */
3744                 if (peer_state.conn > C_CONNECTED &&
3745                     peer_state.conn < C_SYNC_SOURCE)
3746                         real_peer_disk = D_INCONSISTENT;
3747
3748                 /* if peer_state changes to connected at the same time,
3749                  * it explicitly notifies us that it finished resync.
3750                  * Maybe we should finish it up, too? */
3751                 else if (os.conn >= C_SYNC_SOURCE &&
3752                          peer_state.conn == C_CONNECTED) {
3753                         if (drbd_bm_total_weight(mdev) <= mdev->rs_failed)
3754                                 drbd_resync_finished(mdev);
3755                         return 0;
3756                 }
3757         }
3758
3759         /* peer says his disk is inconsistent, while we think it is uptodate,
3760          * and this happens while the peer still thinks we have a sync going on,
3761          * but we think we are already done with the sync.
3762          * We ignore this to avoid flapping pdsk.
3763          * This should not happen, if the peer is a recent version of drbd. */
3764         if (os.pdsk == D_UP_TO_DATE && real_peer_disk == D_INCONSISTENT &&
3765             os.conn == C_CONNECTED && peer_state.conn > C_SYNC_SOURCE)
3766                 real_peer_disk = D_UP_TO_DATE;
3767
3768         if (ns.conn == C_WF_REPORT_PARAMS)
3769                 ns.conn = C_CONNECTED;
3770
3771         if (peer_state.conn == C_AHEAD)
3772                 ns.conn = C_BEHIND;
3773
3774         if (mdev->p_uuid && peer_state.disk >= D_NEGOTIATING &&
3775             get_ldev_if_state(mdev, D_NEGOTIATING)) {
3776                 int cr; /* consider resync */
3777
3778                 /* if we established a new connection */
3779                 cr  = (os.conn < C_CONNECTED);
3780                 /* if we had an established connection
3781                  * and one of the nodes newly attaches a disk */
3782                 cr |= (os.conn == C_CONNECTED &&
3783                        (peer_state.disk == D_NEGOTIATING ||
3784                         os.disk == D_NEGOTIATING));
3785                 /* if we have both been inconsistent, and the peer has been
3786                  * forced to be UpToDate with --overwrite-data */
3787                 cr |= test_bit(CONSIDER_RESYNC, &mdev->flags);
3788                 /* if we had been plain connected, and the admin requested to
3789                  * start a sync by "invalidate" or "invalidate-remote" */
3790                 cr |= (os.conn == C_CONNECTED &&
3791                                 (peer_state.conn >= C_STARTING_SYNC_S &&
3792                                  peer_state.conn <= C_WF_BITMAP_T));
3793
3794                 if (cr)
3795                         ns.conn = drbd_sync_handshake(mdev, peer_state.role, real_peer_disk);
3796
3797                 put_ldev(mdev);
3798                 if (ns.conn == C_MASK) {
3799                         ns.conn = C_CONNECTED;
3800                         if (mdev->state.disk == D_NEGOTIATING) {
3801                                 drbd_force_state(mdev, NS(disk, D_FAILED));
3802                         } else if (peer_state.disk == D_NEGOTIATING) {
3803                                 dev_err(DEV, "Disk attach process on the peer node was aborted.\n");
3804                                 peer_state.disk = D_DISKLESS;
3805                                 real_peer_disk = D_DISKLESS;
3806                         } else {
3807                                 if (test_and_clear_bit(CONN_DRY_RUN, &mdev->tconn->flags))
3808                                         return -EIO;
3809                                 D_ASSERT(os.conn == C_WF_REPORT_PARAMS);
3810                                 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3811                                 return -EIO;
3812                         }
3813                 }
3814         }
3815
3816         spin_lock_irq(&mdev->tconn->req_lock);
3817         if (os.i != drbd_read_state(mdev).i)
3818                 goto retry;
3819         clear_bit(CONSIDER_RESYNC, &mdev->flags);
3820         ns.peer = peer_state.role;
3821         ns.pdsk = real_peer_disk;
3822         ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp);
3823         if ((ns.conn == C_CONNECTED || ns.conn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING)
3824                 ns.disk = mdev->new_state_tmp.disk;
3825         cs_flags = CS_VERBOSE + (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED ? 0 : CS_HARD);
3826         if (ns.pdsk == D_CONSISTENT && drbd_suspended(mdev) && ns.conn == C_CONNECTED && os.conn < C_CONNECTED &&
3827             test_bit(NEW_CUR_UUID, &mdev->flags)) {
3828                 /* Do not allow tl_restart(RESEND) for a rebooted peer. We can only allow this
3829                    for temporal network outages! */
3830                 spin_unlock_irq(&mdev->tconn->req_lock);
3831                 dev_err(DEV, "Aborting Connect, can not thaw IO with an only Consistent peer\n");
3832                 tl_clear(mdev->tconn);
3833                 drbd_uuid_new_current(mdev);
3834                 clear_bit(NEW_CUR_UUID, &mdev->flags);
3835                 conn_request_state(mdev->tconn, NS2(conn, C_PROTOCOL_ERROR, susp, 0), CS_HARD);
3836                 return -EIO;
3837         }
3838         rv = _drbd_set_state(mdev, ns, cs_flags, NULL);
3839         ns = drbd_read_state(mdev);
3840         spin_unlock_irq(&mdev->tconn->req_lock);
3841
3842         if (rv < SS_SUCCESS) {
3843                 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3844                 return -EIO;
3845         }
3846
3847         if (os.conn > C_WF_REPORT_PARAMS) {
3848                 if (ns.conn > C_CONNECTED && peer_state.conn <= C_CONNECTED &&
3849                     peer_state.disk != D_NEGOTIATING ) {
3850                         /* we want resync, peer has not yet decided to sync... */
3851                         /* Nowadays only used when forcing a node into primary role and
3852                            setting its disk to UpToDate with that */
3853                         drbd_send_uuids(mdev);
3854                         drbd_send_state(mdev);
3855                 }
3856         }
3857
3858         mutex_lock(&mdev->tconn->conf_update);
3859         mdev->tconn->net_conf->discard_my_data = 0; /* without copy; single bit op is atomic */
3860         mutex_unlock(&mdev->tconn->conf_update);
3861
3862         drbd_md_sync(mdev); /* update connected indicator, la_size, ... */
3863
3864         return 0;
3865 }
3866
3867 static int receive_sync_uuid(struct drbd_tconn *tconn, struct packet_info *pi)
3868 {
3869         struct drbd_conf *mdev;
3870         struct p_rs_uuid *p = pi->data;
3871
3872         mdev = vnr_to_mdev(tconn, pi->vnr);
3873         if (!mdev)
3874                 return -EIO;
3875
3876         wait_event(mdev->misc_wait,
3877                    mdev->state.conn == C_WF_SYNC_UUID ||
3878                    mdev->state.conn == C_BEHIND ||
3879                    mdev->state.conn < C_CONNECTED ||
3880                    mdev->state.disk < D_NEGOTIATING);
3881
3882         /* D_ASSERT( mdev->state.conn == C_WF_SYNC_UUID ); */
3883
3884         /* Here the _drbd_uuid_ functions are right, current should
3885            _not_ be rotated into the history */
3886         if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
3887                 _drbd_uuid_set(mdev, UI_CURRENT, be64_to_cpu(p->uuid));
3888                 _drbd_uuid_set(mdev, UI_BITMAP, 0UL);
3889
3890                 drbd_print_uuids(mdev, "updated sync uuid");
3891                 drbd_start_resync(mdev, C_SYNC_TARGET);
3892
3893                 put_ldev(mdev);
3894         } else
3895                 dev_err(DEV, "Ignoring SyncUUID packet!\n");
3896
3897         return 0;
3898 }
3899
3900 /**
3901  * receive_bitmap_plain
3902  *
3903  * Return 0 when done, 1 when another iteration is needed, and a negative error
3904  * code upon failure.
3905  */
3906 static int
3907 receive_bitmap_plain(struct drbd_conf *mdev, unsigned int size,
3908                      unsigned long *p, struct bm_xfer_ctx *c)
3909 {
3910         unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE -
3911                                  drbd_header_size(mdev->tconn);
3912         unsigned int num_words = min_t(size_t, data_size / sizeof(*p),
3913                                        c->bm_words - c->word_offset);
3914         unsigned int want = num_words * sizeof(*p);
3915         int err;
3916
3917         if (want != size) {
3918                 dev_err(DEV, "%s:want (%u) != size (%u)\n", __func__, want, size);
3919                 return -EIO;
3920         }
3921         if (want == 0)
3922                 return 0;
3923         err = drbd_recv_all(mdev->tconn, p, want);
3924         if (err)
3925                 return err;
3926
3927         drbd_bm_merge_lel(mdev, c->word_offset, num_words, p);
3928
3929         c->word_offset += num_words;
3930         c->bit_offset = c->word_offset * BITS_PER_LONG;
3931         if (c->bit_offset > c->bm_bits)
3932                 c->bit_offset = c->bm_bits;
3933
3934         return 1;
3935 }
3936
3937 static enum drbd_bitmap_code dcbp_get_code(struct p_compressed_bm *p)
3938 {
3939         return (enum drbd_bitmap_code)(p->encoding & 0x0f);
3940 }
3941
3942 static int dcbp_get_start(struct p_compressed_bm *p)
3943 {
3944         return (p->encoding & 0x80) != 0;
3945 }
3946
3947 static int dcbp_get_pad_bits(struct p_compressed_bm *p)
3948 {
3949         return (p->encoding >> 4) & 0x7;
3950 }
3951
3952 /**
3953  * recv_bm_rle_bits
3954  *
3955  * Return 0 when done, 1 when another iteration is needed, and a negative error
3956  * code upon failure.
3957  */
3958 static int
3959 recv_bm_rle_bits(struct drbd_conf *mdev,
3960                 struct p_compressed_bm *p,
3961                  struct bm_xfer_ctx *c,
3962                  unsigned int len)
3963 {
3964         struct bitstream bs;
3965         u64 look_ahead;
3966         u64 rl;
3967         u64 tmp;
3968         unsigned long s = c->bit_offset;
3969         unsigned long e;
3970         int toggle = dcbp_get_start(p);
3971         int have;
3972         int bits;
3973
3974         bitstream_init(&bs, p->code, len, dcbp_get_pad_bits(p));
3975
3976         bits = bitstream_get_bits(&bs, &look_ahead, 64);
3977         if (bits < 0)
3978                 return -EIO;
3979
3980         for (have = bits; have > 0; s += rl, toggle = !toggle) {
3981                 bits = vli_decode_bits(&rl, look_ahead);
3982                 if (bits <= 0)
3983                         return -EIO;
3984
3985                 if (toggle) {
3986                         e = s + rl -1;
3987                         if (e >= c->bm_bits) {
3988                                 dev_err(DEV, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e);
3989                                 return -EIO;
3990                         }
3991                         _drbd_bm_set_bits(mdev, s, e);
3992                 }
3993
3994                 if (have < bits) {
3995                         dev_err(DEV, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n",
3996                                 have, bits, look_ahead,
3997                                 (unsigned int)(bs.cur.b - p->code),
3998                                 (unsigned int)bs.buf_len);
3999                         return -EIO;
4000                 }
4001                 look_ahead >>= bits;
4002                 have -= bits;
4003
4004                 bits = bitstream_get_bits(&bs, &tmp, 64 - have);
4005                 if (bits < 0)
4006                         return -EIO;
4007                 look_ahead |= tmp << have;
4008                 have += bits;
4009         }
4010
4011         c->bit_offset = s;
4012         bm_xfer_ctx_bit_to_word_offset(c);
4013
4014         return (s != c->bm_bits);
4015 }
4016
4017 /**
4018  * decode_bitmap_c
4019  *
4020  * Return 0 when done, 1 when another iteration is needed, and a negative error
4021  * code upon failure.
4022  */
4023 static int
4024 decode_bitmap_c(struct drbd_conf *mdev,
4025                 struct p_compressed_bm *p,
4026                 struct bm_xfer_ctx *c,
4027                 unsigned int len)
4028 {
4029         if (dcbp_get_code(p) == RLE_VLI_Bits)
4030                 return recv_bm_rle_bits(mdev, p, c, len - sizeof(*p));
4031
4032         /* other variants had been implemented for evaluation,
4033          * but have been dropped as this one turned out to be "best"
4034          * during all our tests. */
4035
4036         dev_err(DEV, "receive_bitmap_c: unknown encoding %u\n", p->encoding);
4037         conn_request_state(mdev->tconn, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
4038         return -EIO;
4039 }
4040
4041 void INFO_bm_xfer_stats(struct drbd_conf *mdev,
4042                 const char *direction, struct bm_xfer_ctx *c)
4043 {
4044         /* what would it take to transfer it "plaintext" */
4045         unsigned int header_size = drbd_header_size(mdev->tconn);
4046         unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE - header_size;
4047         unsigned int plain =
4048                 header_size * (DIV_ROUND_UP(c->bm_words, data_size) + 1) +
4049                 c->bm_words * sizeof(unsigned long);
4050         unsigned int total = c->bytes[0] + c->bytes[1];
4051         unsigned int r;
4052
4053         /* total can not be zero. but just in case: */
4054         if (total == 0)
4055                 return;
4056
4057         /* don't report if not compressed */
4058         if (total >= plain)
4059                 return;
4060
4061         /* total < plain. check for overflow, still */
4062         r = (total > UINT_MAX/1000) ? (total / (plain/1000))
4063                                     : (1000 * total / plain);
4064
4065         if (r > 1000)
4066                 r = 1000;
4067
4068         r = 1000 - r;
4069         dev_info(DEV, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), "
4070              "total %u; compression: %u.%u%%\n",
4071                         direction,
4072                         c->bytes[1], c->packets[1],
4073                         c->bytes[0], c->packets[0],
4074                         total, r/10, r % 10);
4075 }
4076
4077 /* Since we are processing the bitfield from lower addresses to higher,
4078    it does not matter if the process it in 32 bit chunks or 64 bit
4079    chunks as long as it is little endian. (Understand it as byte stream,
4080    beginning with the lowest byte...) If we would use big endian
4081    we would need to process it from the highest address to the lowest,
4082    in order to be agnostic to the 32 vs 64 bits issue.
4083
4084    returns 0 on failure, 1 if we successfully received it. */
4085 static int receive_bitmap(struct drbd_tconn *tconn, struct packet_info *pi)
4086 {
4087         struct drbd_conf *mdev;
4088         struct bm_xfer_ctx c;
4089         int err;
4090
4091         mdev = vnr_to_mdev(tconn, pi->vnr);
4092         if (!mdev)
4093                 return -EIO;
4094
4095         drbd_bm_lock(mdev, "receive bitmap", BM_LOCKED_SET_ALLOWED);
4096         /* you are supposed to send additional out-of-sync information
4097          * if you actually set bits during this phase */
4098
4099         c = (struct bm_xfer_ctx) {
4100                 .bm_bits = drbd_bm_bits(mdev),
4101                 .bm_words = drbd_bm_words(mdev),
4102         };
4103
4104         for(;;) {
4105                 if (pi->cmd == P_BITMAP)
4106                         err = receive_bitmap_plain(mdev, pi->size, pi->data, &c);
4107                 else if (pi->cmd == P_COMPRESSED_BITMAP) {
4108                         /* MAYBE: sanity check that we speak proto >= 90,
4109                          * and the feature is enabled! */
4110                         struct p_compressed_bm *p = pi->data;
4111
4112                         if (pi->size > DRBD_SOCKET_BUFFER_SIZE - drbd_header_size(tconn)) {
4113                                 dev_err(DEV, "ReportCBitmap packet too large\n");
4114                                 err = -EIO;
4115                                 goto out;
4116                         }
4117                         if (pi->size <= sizeof(*p)) {
4118                                 dev_err(DEV, "ReportCBitmap packet too small (l:%u)\n", pi->size);
4119                                 err = -EIO;
4120                                 goto out;
4121                         }
4122                         err = drbd_recv_all(mdev->tconn, p, pi->size);
4123                         if (err)
4124                                goto out;
4125                         err = decode_bitmap_c(mdev, p, &c, pi->size);
4126                 } else {
4127                         dev_warn(DEV, "receive_bitmap: cmd neither ReportBitMap nor ReportCBitMap (is 0x%x)", pi->cmd);
4128                         err = -EIO;
4129                         goto out;
4130                 }
4131
4132                 c.packets[pi->cmd == P_BITMAP]++;
4133                 c.bytes[pi->cmd == P_BITMAP] += drbd_header_size(tconn) + pi->size;
4134
4135                 if (err <= 0) {
4136                         if (err < 0)
4137                                 goto out;
4138                         break;
4139                 }
4140                 err = drbd_recv_header(mdev->tconn, pi);
4141                 if (err)
4142                         goto out;
4143         }
4144
4145         INFO_bm_xfer_stats(mdev, "receive", &c);
4146
4147         if (mdev->state.conn == C_WF_BITMAP_T) {
4148                 enum drbd_state_rv rv;
4149
4150                 err = drbd_send_bitmap(mdev);
4151                 if (err)
4152                         goto out;
4153                 /* Omit CS_ORDERED with this state transition to avoid deadlocks. */
4154                 rv = _drbd_request_state(mdev, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
4155                 D_ASSERT(rv == SS_SUCCESS);
4156         } else if (mdev->state.conn != C_WF_BITMAP_S) {
4157                 /* admin may have requested C_DISCONNECTING,
4158                  * other threads may have noticed network errors */
4159                 dev_info(DEV, "unexpected cstate (%s) in receive_bitmap\n",
4160                     drbd_conn_str(mdev->state.conn));
4161         }
4162         err = 0;
4163
4164  out:
4165         drbd_bm_unlock(mdev);
4166         if (!err && mdev->state.conn == C_WF_BITMAP_S)
4167                 drbd_start_resync(mdev, C_SYNC_SOURCE);
4168         return err;
4169 }
4170
4171 static int receive_skip(struct drbd_tconn *tconn, struct packet_info *pi)
4172 {
4173         conn_warn(tconn, "skipping unknown optional packet type %d, l: %d!\n",
4174                  pi->cmd, pi->size);
4175
4176         return ignore_remaining_packet(tconn, pi);
4177 }
4178
4179 static int receive_UnplugRemote(struct drbd_tconn *tconn, struct packet_info *pi)
4180 {
4181         /* Make sure we've acked all the TCP data associated
4182          * with the data requests being unplugged */
4183         drbd_tcp_quickack(tconn->data.socket);
4184
4185         return 0;
4186 }
4187
4188 static int receive_out_of_sync(struct drbd_tconn *tconn, struct packet_info *pi)
4189 {
4190         struct drbd_conf *mdev;
4191         struct p_block_desc *p = pi->data;
4192
4193         mdev = vnr_to_mdev(tconn, pi->vnr);
4194         if (!mdev)
4195                 return -EIO;
4196
4197         switch (mdev->state.conn) {
4198         case C_WF_SYNC_UUID:
4199         case C_WF_BITMAP_T:
4200         case C_BEHIND:
4201                         break;
4202         default:
4203                 dev_err(DEV, "ASSERT FAILED cstate = %s, expected: WFSyncUUID|WFBitMapT|Behind\n",
4204                                 drbd_conn_str(mdev->state.conn));
4205         }
4206
4207         drbd_set_out_of_sync(mdev, be64_to_cpu(p->sector), be32_to_cpu(p->blksize));
4208
4209         return 0;
4210 }
4211
4212 struct data_cmd {
4213         int expect_payload;
4214         size_t pkt_size;
4215         int (*fn)(struct drbd_tconn *, struct packet_info *);
4216 };
4217
4218 static struct data_cmd drbd_cmd_handler[] = {
4219         [P_DATA]            = { 1, sizeof(struct p_data), receive_Data },
4220         [P_DATA_REPLY]      = { 1, sizeof(struct p_data), receive_DataReply },
4221         [P_RS_DATA_REPLY]   = { 1, sizeof(struct p_data), receive_RSDataReply } ,
4222         [P_BARRIER]         = { 0, sizeof(struct p_barrier), receive_Barrier } ,
4223         [P_BITMAP]          = { 1, 0, receive_bitmap } ,
4224         [P_COMPRESSED_BITMAP] = { 1, 0, receive_bitmap } ,
4225         [P_UNPLUG_REMOTE]   = { 0, 0, receive_UnplugRemote },
4226         [P_DATA_REQUEST]    = { 0, sizeof(struct p_block_req), receive_DataRequest },
4227         [P_RS_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
4228         [P_SYNC_PARAM]      = { 1, 0, receive_SyncParam },
4229         [P_SYNC_PARAM89]    = { 1, 0, receive_SyncParam },
4230         [P_PROTOCOL]        = { 1, sizeof(struct p_protocol), receive_protocol },
4231         [P_UUIDS]           = { 0, sizeof(struct p_uuids), receive_uuids },
4232         [P_SIZES]           = { 0, sizeof(struct p_sizes), receive_sizes },
4233         [P_STATE]           = { 0, sizeof(struct p_state), receive_state },
4234         [P_STATE_CHG_REQ]   = { 0, sizeof(struct p_req_state), receive_req_state },
4235         [P_SYNC_UUID]       = { 0, sizeof(struct p_rs_uuid), receive_sync_uuid },
4236         [P_OV_REQUEST]      = { 0, sizeof(struct p_block_req), receive_DataRequest },
4237         [P_OV_REPLY]        = { 1, sizeof(struct p_block_req), receive_DataRequest },
4238         [P_CSUM_RS_REQUEST] = { 1, sizeof(struct p_block_req), receive_DataRequest },
4239         [P_DELAY_PROBE]     = { 0, sizeof(struct p_delay_probe93), receive_skip },
4240         [P_OUT_OF_SYNC]     = { 0, sizeof(struct p_block_desc), receive_out_of_sync },
4241         [P_CONN_ST_CHG_REQ] = { 0, sizeof(struct p_req_state), receive_req_conn_state },
4242         [P_PROTOCOL_UPDATE] = { 1, sizeof(struct p_protocol), receive_protocol },
4243 };
4244
4245 static void drbdd(struct drbd_tconn *tconn)
4246 {
4247         struct packet_info pi;
4248         size_t shs; /* sub header size */
4249         int err;
4250
4251         while (get_t_state(&tconn->receiver) == RUNNING) {
4252                 struct data_cmd *cmd;
4253
4254                 drbd_thread_current_set_cpu(&tconn->receiver);
4255                 if (drbd_recv_header(tconn, &pi))
4256                         goto err_out;
4257
4258                 cmd = &drbd_cmd_handler[pi.cmd];
4259                 if (unlikely(pi.cmd >= ARRAY_SIZE(drbd_cmd_handler) || !cmd->fn)) {
4260                         conn_err(tconn, "Unexpected data packet %s (0x%04x)",
4261                                  cmdname(pi.cmd), pi.cmd);
4262                         goto err_out;
4263                 }
4264
4265                 shs = cmd->pkt_size;
4266                 if (pi.size > shs && !cmd->expect_payload) {
4267                         conn_err(tconn, "No payload expected %s l:%d\n",
4268                                  cmdname(pi.cmd), pi.size);
4269                         goto err_out;
4270                 }
4271
4272                 if (shs) {
4273                         err = drbd_recv_all_warn(tconn, pi.data, shs);
4274                         if (err)
4275                                 goto err_out;
4276                         pi.size -= shs;
4277                 }
4278
4279                 err = cmd->fn(tconn, &pi);
4280                 if (err) {
4281                         conn_err(tconn, "error receiving %s, e: %d l: %d!\n",
4282                                  cmdname(pi.cmd), err, pi.size);
4283                         goto err_out;
4284                 }
4285         }
4286         return;
4287
4288     err_out:
4289         conn_request_state(tconn, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
4290 }
4291
4292 void conn_flush_workqueue(struct drbd_tconn *tconn)
4293 {
4294         struct drbd_wq_barrier barr;
4295
4296         barr.w.cb = w_prev_work_done;
4297         barr.w.tconn = tconn;
4298         init_completion(&barr.done);
4299         drbd_queue_work(&tconn->data.work, &barr.w);
4300         wait_for_completion(&barr.done);
4301 }
4302
4303 static void conn_disconnect(struct drbd_tconn *tconn)
4304 {
4305         struct drbd_conf *mdev;
4306         enum drbd_conns oc;
4307         int vnr, rv = SS_UNKNOWN_ERROR;
4308
4309         if (tconn->cstate == C_STANDALONE)
4310                 return;
4311
4312         /* asender does not clean up anything. it must not interfere, either */
4313         drbd_thread_stop(&tconn->asender);
4314         drbd_free_sock(tconn);
4315
4316         rcu_read_lock();
4317         idr_for_each_entry(&tconn->volumes, mdev, vnr) {
4318                 kref_get(&mdev->kref);
4319                 rcu_read_unlock();
4320                 drbd_disconnected(mdev);
4321                 kref_put(&mdev->kref, &drbd_minor_destroy);
4322                 rcu_read_lock();
4323         }
4324         rcu_read_unlock();
4325
4326         conn_info(tconn, "Connection closed\n");
4327
4328         if (conn_highest_role(tconn) == R_PRIMARY && conn_highest_pdsk(tconn) >= D_UNKNOWN)
4329                 conn_try_outdate_peer_async(tconn);
4330
4331         spin_lock_irq(&tconn->req_lock);
4332         oc = tconn->cstate;
4333         if (oc >= C_UNCONNECTED)
4334                 rv = _conn_request_state(tconn, NS(conn, C_UNCONNECTED), CS_VERBOSE);
4335
4336         spin_unlock_irq(&tconn->req_lock);
4337
4338         if (oc == C_DISCONNECTING)
4339                 conn_request_state(tconn, NS(conn, C_STANDALONE), CS_VERBOSE | CS_HARD);
4340 }
4341
4342 static int drbd_disconnected(struct drbd_conf *mdev)
4343 {
4344         enum drbd_fencing_p fp;
4345         unsigned int i;
4346
4347         /* wait for current activity to cease. */
4348         spin_lock_irq(&mdev->tconn->req_lock);
4349         _drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
4350         _drbd_wait_ee_list_empty(mdev, &mdev->sync_ee);
4351         _drbd_wait_ee_list_empty(mdev, &mdev->read_ee);
4352         spin_unlock_irq(&mdev->tconn->req_lock);
4353
4354         /* We do not have data structures that would allow us to
4355          * get the rs_pending_cnt down to 0 again.
4356          *  * On C_SYNC_TARGET we do not have any data structures describing
4357          *    the pending RSDataRequest's we have sent.
4358          *  * On C_SYNC_SOURCE there is no data structure that tracks
4359          *    the P_RS_DATA_REPLY blocks that we sent to the SyncTarget.
4360          *  And no, it is not the sum of the reference counts in the
4361          *  resync_LRU. The resync_LRU tracks the whole operation including
4362          *  the disk-IO, while the rs_pending_cnt only tracks the blocks
4363          *  on the fly. */
4364         drbd_rs_cancel_all(mdev);
4365         mdev->rs_total = 0;
4366         mdev->rs_failed = 0;
4367         atomic_set(&mdev->rs_pending_cnt, 0);
4368         wake_up(&mdev->misc_wait);
4369
4370         del_timer_sync(&mdev->resync_timer);
4371         resync_timer_fn((unsigned long)mdev);
4372
4373         /* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,
4374          * w_make_resync_request etc. which may still be on the worker queue
4375          * to be "canceled" */
4376         drbd_flush_workqueue(mdev);
4377
4378         drbd_finish_peer_reqs(mdev);
4379
4380         kfree(mdev->p_uuid);
4381         mdev->p_uuid = NULL;
4382
4383         if (!drbd_suspended(mdev))
4384                 tl_clear(mdev->tconn);
4385
4386         drbd_md_sync(mdev);
4387
4388         fp = FP_DONT_CARE;
4389         if (get_ldev(mdev)) {
4390                 rcu_read_lock();
4391                 fp = rcu_dereference(mdev->ldev->disk_conf)->fencing;
4392                 rcu_read_unlock();
4393                 put_ldev(mdev);
4394         }
4395
4396         /* serialize with bitmap writeout triggered by the state change,
4397          * if any. */
4398         wait_event(mdev->misc_wait, !test_bit(BITMAP_IO, &mdev->flags));
4399
4400         /* tcp_close and release of sendpage pages can be deferred.  I don't
4401          * want to use SO_LINGER, because apparently it can be deferred for
4402          * more than 20 seconds (longest time I checked).
4403          *
4404          * Actually we don't care for exactly when the network stack does its
4405          * put_page(), but release our reference on these pages right here.
4406          */
4407         i = drbd_free_peer_reqs(mdev, &mdev->net_ee);
4408         if (i)
4409                 dev_info(DEV, "net_ee not empty, killed %u entries\n", i);
4410         i = atomic_read(&mdev->pp_in_use_by_net);
4411         if (i)
4412                 dev_info(DEV, "pp_in_use_by_net = %d, expected 0\n", i);
4413         i = atomic_read(&mdev->pp_in_use);
4414         if (i)
4415                 dev_info(DEV, "pp_in_use = %d, expected 0\n", i);
4416
4417         D_ASSERT(list_empty(&mdev->read_ee));
4418         D_ASSERT(list_empty(&mdev->active_ee));
4419         D_ASSERT(list_empty(&mdev->sync_ee));
4420         D_ASSERT(list_empty(&mdev->done_ee));
4421
4422         /* ok, no more ee's on the fly, it is safe to reset the epoch_size */
4423         atomic_set(&mdev->current_epoch->epoch_size, 0);
4424         D_ASSERT(list_empty(&mdev->current_epoch->list));
4425
4426         return 0;
4427 }
4428
4429 /*
4430  * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version
4431  * we can agree on is stored in agreed_pro_version.
4432  *
4433  * feature flags and the reserved array should be enough room for future
4434  * enhancements of the handshake protocol, and possible plugins...
4435  *
4436  * for now, they are expected to be zero, but ignored.
4437  */
4438 static int drbd_send_features(struct drbd_tconn *tconn)
4439 {
4440         struct drbd_socket *sock;
4441         struct p_connection_features *p;
4442
4443         sock = &tconn->data;
4444         p = conn_prepare_command(tconn, sock);
4445         if (!p)
4446                 return -EIO;
4447         memset(p, 0, sizeof(*p));
4448         p->protocol_min = cpu_to_be32(PRO_VERSION_MIN);
4449         p->protocol_max = cpu_to_be32(PRO_VERSION_MAX);
4450         return conn_send_command(tconn, sock, P_CONNECTION_FEATURES, sizeof(*p), NULL, 0);
4451 }
4452
4453 /*
4454  * return values:
4455  *   1 yes, we have a valid connection
4456  *   0 oops, did not work out, please try again
4457  *  -1 peer talks different language,
4458  *     no point in trying again, please go standalone.
4459  */
4460 static int drbd_do_features(struct drbd_tconn *tconn)
4461 {
4462         /* ASSERT current == tconn->receiver ... */
4463         struct p_connection_features *p;
4464         const int expect = sizeof(struct p_connection_features);
4465         struct packet_info pi;
4466         int err;
4467
4468         err = drbd_send_features(tconn);
4469         if (err)
4470                 return 0;
4471
4472         err = drbd_recv_header(tconn, &pi);
4473         if (err)
4474                 return 0;
4475
4476         if (pi.cmd != P_CONNECTION_FEATURES) {
4477                 conn_err(tconn, "expected ConnectionFeatures packet, received: %s (0x%04x)\n",
4478                          cmdname(pi.cmd), pi.cmd);
4479                 return -1;
4480         }
4481
4482         if (pi.size != expect) {
4483                 conn_err(tconn, "expected ConnectionFeatures length: %u, received: %u\n",
4484                      expect, pi.size);
4485                 return -1;
4486         }
4487
4488         p = pi.data;
4489         err = drbd_recv_all_warn(tconn, p, expect);
4490         if (err)
4491                 return 0;
4492
4493         p->protocol_min = be32_to_cpu(p->protocol_min);
4494         p->protocol_max = be32_to_cpu(p->protocol_max);
4495         if (p->protocol_max == 0)
4496                 p->protocol_max = p->protocol_min;
4497
4498         if (PRO_VERSION_MAX < p->protocol_min ||
4499             PRO_VERSION_MIN > p->protocol_max)
4500                 goto incompat;
4501
4502         tconn->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max);
4503
4504         conn_info(tconn, "Handshake successful: "
4505              "Agreed network protocol version %d\n", tconn->agreed_pro_version);
4506
4507         return 1;
4508
4509  incompat:
4510         conn_err(tconn, "incompatible DRBD dialects: "
4511             "I support %d-%d, peer supports %d-%d\n",
4512             PRO_VERSION_MIN, PRO_VERSION_MAX,
4513             p->protocol_min, p->protocol_max);
4514         return -1;
4515 }
4516
4517 #if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE)
4518 static int drbd_do_auth(struct drbd_tconn *tconn)
4519 {
4520         dev_err(DEV, "This kernel was build without CONFIG_CRYPTO_HMAC.\n");
4521         dev_err(DEV, "You need to disable 'cram-hmac-alg' in drbd.conf.\n");
4522         return -1;
4523 }
4524 #else
4525 #define CHALLENGE_LEN 64
4526
4527 /* Return value:
4528         1 - auth succeeded,
4529         0 - failed, try again (network error),
4530         -1 - auth failed, don't try again.
4531 */
4532
4533 static int drbd_do_auth(struct drbd_tconn *tconn)
4534 {
4535         struct drbd_socket *sock;
4536         char my_challenge[CHALLENGE_LEN];  /* 64 Bytes... */
4537         struct scatterlist sg;
4538         char *response = NULL;
4539         char *right_response = NULL;
4540         char *peers_ch = NULL;
4541         unsigned int key_len;
4542         char secret[SHARED_SECRET_MAX]; /* 64 byte */
4543         unsigned int resp_size;
4544         struct hash_desc desc;
4545         struct packet_info pi;
4546         struct net_conf *nc;
4547         int err, rv;
4548
4549         /* FIXME: Put the challenge/response into the preallocated socket buffer.  */
4550
4551         rcu_read_lock();
4552         nc = rcu_dereference(tconn->net_conf);
4553         key_len = strlen(nc->shared_secret);
4554         memcpy(secret, nc->shared_secret, key_len);
4555         rcu_read_unlock();
4556
4557         desc.tfm = tconn->cram_hmac_tfm;
4558         desc.flags = 0;
4559
4560         rv = crypto_hash_setkey(tconn->cram_hmac_tfm, (u8 *)secret, key_len);
4561         if (rv) {
4562                 conn_err(tconn, "crypto_hash_setkey() failed with %d\n", rv);
4563                 rv = -1;
4564                 goto fail;
4565         }
4566
4567         get_random_bytes(my_challenge, CHALLENGE_LEN);
4568
4569         sock = &tconn->data;
4570         if (!conn_prepare_command(tconn, sock)) {
4571                 rv = 0;
4572                 goto fail;
4573         }
4574         rv = !conn_send_command(tconn, sock, P_AUTH_CHALLENGE, 0,
4575                                 my_challenge, CHALLENGE_LEN);
4576         if (!rv)
4577                 goto fail;
4578
4579         err = drbd_recv_header(tconn, &pi);
4580         if (err) {
4581                 rv = 0;
4582                 goto fail;
4583         }
4584
4585         if (pi.cmd != P_AUTH_CHALLENGE) {
4586                 conn_err(tconn, "expected AuthChallenge packet, received: %s (0x%04x)\n",
4587                          cmdname(pi.cmd), pi.cmd);
4588                 rv = 0;
4589                 goto fail;
4590         }
4591
4592         if (pi.size > CHALLENGE_LEN * 2) {
4593                 conn_err(tconn, "expected AuthChallenge payload too big.\n");
4594                 rv = -1;
4595                 goto fail;
4596         }
4597
4598         peers_ch = kmalloc(pi.size, GFP_NOIO);
4599         if (peers_ch == NULL) {
4600                 conn_err(tconn, "kmalloc of peers_ch failed\n");
4601                 rv = -1;
4602                 goto fail;
4603         }
4604
4605         err = drbd_recv_all_warn(tconn, peers_ch, pi.size);
4606         if (err) {
4607                 rv = 0;
4608                 goto fail;
4609         }
4610
4611         resp_size = crypto_hash_digestsize(tconn->cram_hmac_tfm);
4612         response = kmalloc(resp_size, GFP_NOIO);
4613         if (response == NULL) {
4614                 conn_err(tconn, "kmalloc of response failed\n");
4615                 rv = -1;
4616                 goto fail;
4617         }
4618
4619         sg_init_table(&sg, 1);
4620         sg_set_buf(&sg, peers_ch, pi.size);
4621
4622         rv = crypto_hash_digest(&desc, &sg, sg.length, response);
4623         if (rv) {
4624                 conn_err(tconn, "crypto_hash_digest() failed with %d\n", rv);
4625                 rv = -1;
4626                 goto fail;
4627         }
4628
4629         if (!conn_prepare_command(tconn, sock)) {
4630                 rv = 0;
4631                 goto fail;
4632         }
4633         rv = !conn_send_command(tconn, sock, P_AUTH_RESPONSE, 0,
4634                                 response, resp_size);
4635         if (!rv)
4636                 goto fail;
4637
4638         err = drbd_recv_header(tconn, &pi);
4639         if (err) {
4640                 rv = 0;
4641                 goto fail;
4642         }
4643
4644         if (pi.cmd != P_AUTH_RESPONSE) {
4645                 conn_err(tconn, "expected AuthResponse packet, received: %s (0x%04x)\n",
4646                          cmdname(pi.cmd), pi.cmd);
4647                 rv = 0;
4648                 goto fail;
4649         }
4650
4651         if (pi.size != resp_size) {
4652                 conn_err(tconn, "expected AuthResponse payload of wrong size\n");
4653                 rv = 0;
4654                 goto fail;
4655         }
4656
4657         err = drbd_recv_all_warn(tconn, response , resp_size);
4658         if (err) {
4659                 rv = 0;
4660                 goto fail;
4661         }
4662
4663         right_response = kmalloc(resp_size, GFP_NOIO);
4664         if (right_response == NULL) {
4665                 conn_err(tconn, "kmalloc of right_response failed\n");
4666                 rv = -1;
4667                 goto fail;
4668         }
4669
4670         sg_set_buf(&sg, my_challenge, CHALLENGE_LEN);
4671
4672         rv = crypto_hash_digest(&desc, &sg, sg.length, right_response);
4673         if (rv) {
4674                 conn_err(tconn, "crypto_hash_digest() failed with %d\n", rv);
4675                 rv = -1;
4676                 goto fail;
4677         }
4678
4679         rv = !memcmp(response, right_response, resp_size);
4680
4681         if (rv)
4682                 conn_info(tconn, "Peer authenticated using %d bytes HMAC\n",
4683                      resp_size);
4684         else
4685                 rv = -1;
4686
4687  fail:
4688         kfree(peers_ch);
4689         kfree(response);
4690         kfree(right_response);
4691
4692         return rv;
4693 }
4694 #endif
4695
4696 int drbdd_init(struct drbd_thread *thi)
4697 {
4698         struct drbd_tconn *tconn = thi->tconn;
4699         int h;
4700
4701         conn_info(tconn, "receiver (re)started\n");
4702
4703         do {
4704                 h = conn_connect(tconn);
4705                 if (h == 0) {
4706                         conn_disconnect(tconn);
4707                         schedule_timeout_interruptible(HZ);
4708                 }
4709                 if (h == -1) {
4710                         conn_warn(tconn, "Discarding network configuration.\n");
4711                         conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
4712                 }
4713         } while (h == 0);
4714
4715         if (h > 0)
4716                 drbdd(tconn);
4717
4718         conn_disconnect(tconn);
4719
4720         conn_info(tconn, "receiver terminated\n");
4721         return 0;
4722 }
4723
4724 /* ********* acknowledge sender ******** */
4725
4726 static int got_conn_RqSReply(struct drbd_tconn *tconn, struct packet_info *pi)
4727 {
4728         struct p_req_state_reply *p = pi->data;
4729         int retcode = be32_to_cpu(p->retcode);
4730
4731         if (retcode >= SS_SUCCESS) {
4732                 set_bit(CONN_WD_ST_CHG_OKAY, &tconn->flags);
4733         } else {
4734                 set_bit(CONN_WD_ST_CHG_FAIL, &tconn->flags);
4735                 conn_err(tconn, "Requested state change failed by peer: %s (%d)\n",
4736                          drbd_set_st_err_str(retcode), retcode);
4737         }
4738         wake_up(&tconn->ping_wait);
4739
4740         return 0;
4741 }
4742
4743 static int got_RqSReply(struct drbd_tconn *tconn, struct packet_info *pi)
4744 {
4745         struct drbd_conf *mdev;
4746         struct p_req_state_reply *p = pi->data;
4747         int retcode = be32_to_cpu(p->retcode);
4748
4749         mdev = vnr_to_mdev(tconn, pi->vnr);
4750         if (!mdev)
4751                 return -EIO;
4752
4753         if (retcode >= SS_SUCCESS) {
4754                 set_bit(CL_ST_CHG_SUCCESS, &mdev->flags);
4755         } else {
4756                 set_bit(CL_ST_CHG_FAIL, &mdev->flags);
4757                 dev_err(DEV, "Requested state change failed by peer: %s (%d)\n",
4758                         drbd_set_st_err_str(retcode), retcode);
4759         }
4760         wake_up(&mdev->state_wait);
4761
4762         return 0;
4763 }
4764
4765 static int got_Ping(struct drbd_tconn *tconn, struct packet_info *pi)
4766 {
4767         return drbd_send_ping_ack(tconn);
4768
4769 }
4770
4771 static int got_PingAck(struct drbd_tconn *tconn, struct packet_info *pi)
4772 {
4773         /* restore idle timeout */
4774         tconn->meta.socket->sk->sk_rcvtimeo = tconn->net_conf->ping_int*HZ;
4775         if (!test_and_set_bit(GOT_PING_ACK, &tconn->flags))
4776                 wake_up(&tconn->ping_wait);
4777
4778         return 0;
4779 }
4780
4781 static int got_IsInSync(struct drbd_tconn *tconn, struct packet_info *pi)
4782 {
4783         struct drbd_conf *mdev;
4784         struct p_block_ack *p = pi->data;
4785         sector_t sector = be64_to_cpu(p->sector);
4786         int blksize = be32_to_cpu(p->blksize);
4787
4788         mdev = vnr_to_mdev(tconn, pi->vnr);
4789         if (!mdev)
4790                 return -EIO;
4791
4792         D_ASSERT(mdev->tconn->agreed_pro_version >= 89);
4793
4794         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4795
4796         if (get_ldev(mdev)) {
4797                 drbd_rs_complete_io(mdev, sector);
4798                 drbd_set_in_sync(mdev, sector, blksize);
4799                 /* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
4800                 mdev->rs_same_csum += (blksize >> BM_BLOCK_SHIFT);
4801                 put_ldev(mdev);
4802         }
4803         dec_rs_pending(mdev);
4804         atomic_add(blksize >> 9, &mdev->rs_sect_in);
4805
4806         return 0;
4807 }
4808
4809 static int
4810 validate_req_change_req_state(struct drbd_conf *mdev, u64 id, sector_t sector,
4811                               struct rb_root *root, const char *func,
4812                               enum drbd_req_event what, bool missing_ok)
4813 {
4814         struct drbd_request *req;
4815         struct bio_and_error m;
4816
4817         spin_lock_irq(&mdev->tconn->req_lock);
4818         req = find_request(mdev, root, id, sector, missing_ok, func);
4819         if (unlikely(!req)) {
4820                 spin_unlock_irq(&mdev->tconn->req_lock);
4821                 return -EIO;
4822         }
4823         __req_mod(req, what, &m);
4824         spin_unlock_irq(&mdev->tconn->req_lock);
4825
4826         if (m.bio)
4827                 complete_master_bio(mdev, &m);
4828         return 0;
4829 }
4830
4831 static int got_BlockAck(struct drbd_tconn *tconn, struct packet_info *pi)
4832 {
4833         struct drbd_conf *mdev;
4834         struct p_block_ack *p = pi->data;
4835         sector_t sector = be64_to_cpu(p->sector);
4836         int blksize = be32_to_cpu(p->blksize);
4837         enum drbd_req_event what;
4838
4839         mdev = vnr_to_mdev(tconn, pi->vnr);
4840         if (!mdev)
4841                 return -EIO;
4842
4843         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4844
4845         if (p->block_id == ID_SYNCER) {
4846                 drbd_set_in_sync(mdev, sector, blksize);
4847                 dec_rs_pending(mdev);
4848                 return 0;
4849         }
4850         switch (pi->cmd) {
4851         case P_RS_WRITE_ACK:
4852                 what = WRITE_ACKED_BY_PEER_AND_SIS;
4853                 break;
4854         case P_WRITE_ACK:
4855                 what = WRITE_ACKED_BY_PEER;
4856                 break;
4857         case P_RECV_ACK:
4858                 what = RECV_ACKED_BY_PEER;
4859                 break;
4860         case P_DISCARD_WRITE:
4861                 what = DISCARD_WRITE;
4862                 break;
4863         case P_RETRY_WRITE:
4864                 what = POSTPONE_WRITE;
4865                 break;
4866         default:
4867                 BUG();
4868         }
4869
4870         return validate_req_change_req_state(mdev, p->block_id, sector,
4871                                              &mdev->write_requests, __func__,
4872                                              what, false);
4873 }
4874
4875 static int got_NegAck(struct drbd_tconn *tconn, struct packet_info *pi)
4876 {
4877         struct drbd_conf *mdev;
4878         struct p_block_ack *p = pi->data;
4879         sector_t sector = be64_to_cpu(p->sector);
4880         int size = be32_to_cpu(p->blksize);
4881         int err;
4882
4883         mdev = vnr_to_mdev(tconn, pi->vnr);
4884         if (!mdev)
4885                 return -EIO;
4886
4887         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4888
4889         if (p->block_id == ID_SYNCER) {
4890                 dec_rs_pending(mdev);
4891                 drbd_rs_failed_io(mdev, sector, size);
4892                 return 0;
4893         }
4894
4895         err = validate_req_change_req_state(mdev, p->block_id, sector,
4896                                             &mdev->write_requests, __func__,
4897                                             NEG_ACKED, true);
4898         if (err) {
4899                 /* Protocol A has no P_WRITE_ACKs, but has P_NEG_ACKs.
4900                    The master bio might already be completed, therefore the
4901                    request is no longer in the collision hash. */
4902                 /* In Protocol B we might already have got a P_RECV_ACK
4903                    but then get a P_NEG_ACK afterwards. */
4904                 drbd_set_out_of_sync(mdev, sector, size);
4905         }
4906         return 0;
4907 }
4908
4909 static int got_NegDReply(struct drbd_tconn *tconn, struct packet_info *pi)
4910 {
4911         struct drbd_conf *mdev;
4912         struct p_block_ack *p = pi->data;
4913         sector_t sector = be64_to_cpu(p->sector);
4914
4915         mdev = vnr_to_mdev(tconn, pi->vnr);
4916         if (!mdev)
4917                 return -EIO;
4918
4919         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4920
4921         dev_err(DEV, "Got NegDReply; Sector %llus, len %u; Fail original request.\n",
4922             (unsigned long long)sector, be32_to_cpu(p->blksize));
4923
4924         return validate_req_change_req_state(mdev, p->block_id, sector,
4925                                              &mdev->read_requests, __func__,
4926                                              NEG_ACKED, false);
4927 }
4928
4929 static int got_NegRSDReply(struct drbd_tconn *tconn, struct packet_info *pi)
4930 {
4931         struct drbd_conf *mdev;
4932         sector_t sector;
4933         int size;
4934         struct p_block_ack *p = pi->data;
4935
4936         mdev = vnr_to_mdev(tconn, pi->vnr);
4937         if (!mdev)
4938                 return -EIO;
4939
4940         sector = be64_to_cpu(p->sector);
4941         size = be32_to_cpu(p->blksize);
4942
4943         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4944
4945         dec_rs_pending(mdev);
4946
4947         if (get_ldev_if_state(mdev, D_FAILED)) {
4948                 drbd_rs_complete_io(mdev, sector);
4949                 switch (pi->cmd) {
4950                 case P_NEG_RS_DREPLY:
4951                         drbd_rs_failed_io(mdev, sector, size);
4952                 case P_RS_CANCEL:
4953                         break;
4954                 default:
4955                         BUG();
4956                 }
4957                 put_ldev(mdev);
4958         }
4959
4960         return 0;
4961 }
4962
4963 static int got_BarrierAck(struct drbd_tconn *tconn, struct packet_info *pi)
4964 {
4965         struct drbd_conf *mdev;
4966         struct p_barrier_ack *p = pi->data;
4967
4968         mdev = vnr_to_mdev(tconn, pi->vnr);
4969         if (!mdev)
4970                 return -EIO;
4971
4972         tl_release(mdev->tconn, p->barrier, be32_to_cpu(p->set_size));
4973
4974         if (mdev->state.conn == C_AHEAD &&
4975             atomic_read(&mdev->ap_in_flight) == 0 &&
4976             !test_and_set_bit(AHEAD_TO_SYNC_SOURCE, &mdev->current_epoch->flags)) {
4977                 mdev->start_resync_timer.expires = jiffies + HZ;
4978                 add_timer(&mdev->start_resync_timer);
4979         }
4980
4981         return 0;
4982 }
4983
4984 static int got_OVResult(struct drbd_tconn *tconn, struct packet_info *pi)
4985 {
4986         struct drbd_conf *mdev;
4987         struct p_block_ack *p = pi->data;
4988         struct drbd_work *w;
4989         sector_t sector;
4990         int size;
4991
4992         mdev = vnr_to_mdev(tconn, pi->vnr);
4993         if (!mdev)
4994                 return -EIO;
4995
4996         sector = be64_to_cpu(p->sector);
4997         size = be32_to_cpu(p->blksize);
4998
4999         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
5000
5001         if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC)
5002                 drbd_ov_out_of_sync_found(mdev, sector, size);
5003         else
5004                 ov_out_of_sync_print(mdev);
5005
5006         if (!get_ldev(mdev))
5007                 return 0;
5008
5009         drbd_rs_complete_io(mdev, sector);
5010         dec_rs_pending(mdev);
5011
5012         --mdev->ov_left;
5013
5014         /* let's advance progress step marks only for every other megabyte */
5015         if ((mdev->ov_left & 0x200) == 0x200)
5016                 drbd_advance_rs_marks(mdev, mdev->ov_left);
5017
5018         if (mdev->ov_left == 0) {
5019                 w = kmalloc(sizeof(*w), GFP_NOIO);
5020                 if (w) {
5021                         w->cb = w_ov_finished;
5022                         w->mdev = mdev;
5023                         drbd_queue_work_front(&mdev->tconn->data.work, w);
5024                 } else {
5025                         dev_err(DEV, "kmalloc(w) failed.");
5026                         ov_out_of_sync_print(mdev);
5027                         drbd_resync_finished(mdev);
5028                 }
5029         }
5030         put_ldev(mdev);
5031         return 0;
5032 }
5033
5034 static int got_skip(struct drbd_tconn *tconn, struct packet_info *pi)
5035 {
5036         return 0;
5037 }
5038
5039 static int tconn_finish_peer_reqs(struct drbd_tconn *tconn)
5040 {
5041         struct drbd_conf *mdev;
5042         int vnr, not_empty = 0;
5043
5044         do {
5045                 clear_bit(SIGNAL_ASENDER, &tconn->flags);
5046                 flush_signals(current);
5047
5048                 rcu_read_lock();
5049                 idr_for_each_entry(&tconn->volumes, mdev, vnr) {
5050                         kref_get(&mdev->kref);
5051                         rcu_read_unlock();
5052                         if (drbd_finish_peer_reqs(mdev)) {
5053                                 kref_put(&mdev->kref, &drbd_minor_destroy);
5054                                 return 1;
5055                         }
5056                         kref_put(&mdev->kref, &drbd_minor_destroy);
5057                         rcu_read_lock();
5058                 }
5059                 set_bit(SIGNAL_ASENDER, &tconn->flags);
5060
5061                 spin_lock_irq(&tconn->req_lock);
5062                 idr_for_each_entry(&tconn->volumes, mdev, vnr) {
5063                         not_empty = !list_empty(&mdev->done_ee);
5064                         if (not_empty)
5065                                 break;
5066                 }
5067                 spin_unlock_irq(&tconn->req_lock);
5068                 rcu_read_unlock();
5069         } while (not_empty);
5070
5071         return 0;
5072 }
5073
5074 struct asender_cmd {
5075         size_t pkt_size;
5076         int (*fn)(struct drbd_tconn *tconn, struct packet_info *);
5077 };
5078
5079 static struct asender_cmd asender_tbl[] = {
5080         [P_PING]            = { 0, got_Ping },
5081         [P_PING_ACK]        = { 0, got_PingAck },
5082         [P_RECV_ACK]        = { sizeof(struct p_block_ack), got_BlockAck },
5083         [P_WRITE_ACK]       = { sizeof(struct p_block_ack), got_BlockAck },
5084         [P_RS_WRITE_ACK]    = { sizeof(struct p_block_ack), got_BlockAck },
5085         [P_DISCARD_WRITE]   = { sizeof(struct p_block_ack), got_BlockAck },
5086         [P_NEG_ACK]         = { sizeof(struct p_block_ack), got_NegAck },
5087         [P_NEG_DREPLY]      = { sizeof(struct p_block_ack), got_NegDReply },
5088         [P_NEG_RS_DREPLY]   = { sizeof(struct p_block_ack), got_NegRSDReply },
5089         [P_OV_RESULT]       = { sizeof(struct p_block_ack), got_OVResult },
5090         [P_BARRIER_ACK]     = { sizeof(struct p_barrier_ack), got_BarrierAck },
5091         [P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply },
5092         [P_RS_IS_IN_SYNC]   = { sizeof(struct p_block_ack), got_IsInSync },
5093         [P_DELAY_PROBE]     = { sizeof(struct p_delay_probe93), got_skip },
5094         [P_RS_CANCEL]       = { sizeof(struct p_block_ack), got_NegRSDReply },
5095         [P_CONN_ST_CHG_REPLY]={ sizeof(struct p_req_state_reply), got_conn_RqSReply },
5096         [P_RETRY_WRITE]     = { sizeof(struct p_block_ack), got_BlockAck },
5097 };
5098
5099 int drbd_asender(struct drbd_thread *thi)
5100 {
5101         struct drbd_tconn *tconn = thi->tconn;
5102         struct asender_cmd *cmd = NULL;
5103         struct packet_info pi;
5104         int rv;
5105         void *buf    = tconn->meta.rbuf;
5106         int received = 0;
5107         unsigned int header_size = drbd_header_size(tconn);
5108         int expect   = header_size;
5109         bool ping_timeout_active = false;
5110         struct net_conf *nc;
5111         int ping_timeo, tcp_cork, ping_int;
5112
5113         current->policy = SCHED_RR;  /* Make this a realtime task! */
5114         current->rt_priority = 2;    /* more important than all other tasks */
5115
5116         while (get_t_state(thi) == RUNNING) {
5117                 drbd_thread_current_set_cpu(thi);
5118
5119                 rcu_read_lock();
5120                 nc = rcu_dereference(tconn->net_conf);
5121                 ping_timeo = nc->ping_timeo;
5122                 tcp_cork = nc->tcp_cork;
5123                 ping_int = nc->ping_int;
5124                 rcu_read_unlock();
5125
5126                 if (test_and_clear_bit(SEND_PING, &tconn->flags)) {
5127                         if (drbd_send_ping(tconn)) {
5128                                 conn_err(tconn, "drbd_send_ping has failed\n");
5129                                 goto reconnect;
5130                         }
5131                         tconn->meta.socket->sk->sk_rcvtimeo = ping_timeo * HZ / 10;
5132                         ping_timeout_active = true;
5133                 }
5134
5135                 /* TODO: conditionally cork; it may hurt latency if we cork without
5136                    much to send */
5137                 if (tcp_cork)
5138                         drbd_tcp_cork(tconn->meta.socket);
5139                 if (tconn_finish_peer_reqs(tconn)) {
5140                         conn_err(tconn, "tconn_finish_peer_reqs() failed\n");
5141                         goto reconnect;
5142                 }
5143                 /* but unconditionally uncork unless disabled */
5144                 if (tcp_cork)
5145                         drbd_tcp_uncork(tconn->meta.socket);
5146
5147                 /* short circuit, recv_msg would return EINTR anyways. */
5148                 if (signal_pending(current))
5149                         continue;
5150
5151                 rv = drbd_recv_short(tconn->meta.socket, buf, expect-received, 0);
5152                 clear_bit(SIGNAL_ASENDER, &tconn->flags);
5153
5154                 flush_signals(current);
5155
5156                 /* Note:
5157                  * -EINTR        (on meta) we got a signal
5158                  * -EAGAIN       (on meta) rcvtimeo expired
5159                  * -ECONNRESET   other side closed the connection
5160                  * -ERESTARTSYS  (on data) we got a signal
5161                  * rv <  0       other than above: unexpected error!
5162                  * rv == expected: full header or command
5163                  * rv <  expected: "woken" by signal during receive
5164                  * rv == 0       : "connection shut down by peer"
5165                  */
5166                 if (likely(rv > 0)) {
5167                         received += rv;
5168                         buf      += rv;
5169                 } else if (rv == 0) {
5170                         conn_err(tconn, "meta connection shut down by peer.\n");
5171                         goto reconnect;
5172                 } else if (rv == -EAGAIN) {
5173                         /* If the data socket received something meanwhile,
5174                          * that is good enough: peer is still alive. */
5175                         if (time_after(tconn->last_received,
5176                                 jiffies - tconn->meta.socket->sk->sk_rcvtimeo))
5177                                 continue;
5178                         if (ping_timeout_active) {
5179                                 conn_err(tconn, "PingAck did not arrive in time.\n");
5180                                 goto reconnect;
5181                         }
5182                         set_bit(SEND_PING, &tconn->flags);
5183                         continue;
5184                 } else if (rv == -EINTR) {
5185                         continue;
5186                 } else {
5187                         conn_err(tconn, "sock_recvmsg returned %d\n", rv);
5188                         goto reconnect;
5189                 }
5190
5191                 if (received == expect && cmd == NULL) {
5192                         if (decode_header(tconn, tconn->meta.rbuf, &pi))
5193                                 goto reconnect;
5194                         cmd = &asender_tbl[pi.cmd];
5195                         if (pi.cmd >= ARRAY_SIZE(asender_tbl) || !cmd->fn) {
5196                                 conn_err(tconn, "Unexpected meta packet %s (0x%04x)\n",
5197                                          cmdname(pi.cmd), pi.cmd);
5198                                 goto disconnect;
5199                         }
5200                         expect = header_size + cmd->pkt_size;
5201                         if (pi.size != expect - header_size) {
5202                                 conn_err(tconn, "Wrong packet size on meta (c: %d, l: %d)\n",
5203                                         pi.cmd, pi.size);
5204                                 goto reconnect;
5205                         }
5206                 }
5207                 if (received == expect) {
5208                         bool err;
5209
5210                         err = cmd->fn(tconn, &pi);
5211                         if (err) {
5212                                 conn_err(tconn, "%pf failed\n", cmd->fn);
5213                                 goto reconnect;
5214                         }
5215
5216                         tconn->last_received = jiffies;
5217
5218                         if (cmd == &asender_tbl[P_PING_ACK]) {
5219                                 /* restore idle timeout */
5220                                 tconn->meta.socket->sk->sk_rcvtimeo = ping_int * HZ;
5221                                 ping_timeout_active = false;
5222                         }
5223
5224                         buf      = tconn->meta.rbuf;
5225                         received = 0;
5226                         expect   = header_size;
5227                         cmd      = NULL;
5228                 }
5229         }
5230
5231         if (0) {
5232 reconnect:
5233                 conn_request_state(tconn, NS(conn, C_NETWORK_FAILURE), CS_HARD);
5234         }
5235         if (0) {
5236 disconnect:
5237                 conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
5238         }
5239         clear_bit(SIGNAL_ASENDER, &tconn->flags);
5240
5241         conn_info(tconn, "asender terminated\n");
5242
5243         return 0;
5244 }