sched/headers: Prepare to move signal wakeup & sigpending methods from <linux/sched...
[platform/kernel/linux-starfive.git] / drivers / block / drbd / drbd_receiver.c
1 /*
2    drbd_receiver.c
3
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10    drbd is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2, or (at your option)
13    any later version.
14
15    drbd is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19
20    You should have received a copy of the GNU General Public License
21    along with drbd; see the file COPYING.  If not, write to
22    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23  */
24
25
26 #include <linux/module.h>
27
28 #include <linux/uaccess.h>
29 #include <net/sock.h>
30
31 #include <linux/drbd.h>
32 #include <linux/fs.h>
33 #include <linux/file.h>
34 #include <linux/in.h>
35 #include <linux/mm.h>
36 #include <linux/memcontrol.h>
37 #include <linux/mm_inline.h>
38 #include <linux/slab.h>
39 #include <uapi/linux/sched/types.h>
40 #include <linux/sched/signal.h>
41 #include <linux/pkt_sched.h>
42 #define __KERNEL_SYSCALLS__
43 #include <linux/unistd.h>
44 #include <linux/vmalloc.h>
45 #include <linux/random.h>
46 #include <linux/string.h>
47 #include <linux/scatterlist.h>
48 #include "drbd_int.h"
49 #include "drbd_protocol.h"
50 #include "drbd_req.h"
51 #include "drbd_vli.h"
52
53 #define PRO_FEATURES (DRBD_FF_TRIM|DRBD_FF_THIN_RESYNC|DRBD_FF_WSAME)
54
55 struct packet_info {
56         enum drbd_packet cmd;
57         unsigned int size;
58         unsigned int vnr;
59         void *data;
60 };
61
62 enum finish_epoch {
63         FE_STILL_LIVE,
64         FE_DESTROYED,
65         FE_RECYCLED,
66 };
67
68 static int drbd_do_features(struct drbd_connection *connection);
69 static int drbd_do_auth(struct drbd_connection *connection);
70 static int drbd_disconnected(struct drbd_peer_device *);
71 static void conn_wait_active_ee_empty(struct drbd_connection *connection);
72 static enum finish_epoch drbd_may_finish_epoch(struct drbd_connection *, struct drbd_epoch *, enum epoch_event);
73 static int e_end_block(struct drbd_work *, int);
74
75
76 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
77
78 /*
79  * some helper functions to deal with single linked page lists,
80  * page->private being our "next" pointer.
81  */
82
83 /* If at least n pages are linked at head, get n pages off.
84  * Otherwise, don't modify head, and return NULL.
85  * Locking is the responsibility of the caller.
86  */
87 static struct page *page_chain_del(struct page **head, int n)
88 {
89         struct page *page;
90         struct page *tmp;
91
92         BUG_ON(!n);
93         BUG_ON(!head);
94
95         page = *head;
96
97         if (!page)
98                 return NULL;
99
100         while (page) {
101                 tmp = page_chain_next(page);
102                 if (--n == 0)
103                         break; /* found sufficient pages */
104                 if (tmp == NULL)
105                         /* insufficient pages, don't use any of them. */
106                         return NULL;
107                 page = tmp;
108         }
109
110         /* add end of list marker for the returned list */
111         set_page_private(page, 0);
112         /* actual return value, and adjustment of head */
113         page = *head;
114         *head = tmp;
115         return page;
116 }
117
118 /* may be used outside of locks to find the tail of a (usually short)
119  * "private" page chain, before adding it back to a global chain head
120  * with page_chain_add() under a spinlock. */
121 static struct page *page_chain_tail(struct page *page, int *len)
122 {
123         struct page *tmp;
124         int i = 1;
125         while ((tmp = page_chain_next(page)))
126                 ++i, page = tmp;
127         if (len)
128                 *len = i;
129         return page;
130 }
131
132 static int page_chain_free(struct page *page)
133 {
134         struct page *tmp;
135         int i = 0;
136         page_chain_for_each_safe(page, tmp) {
137                 put_page(page);
138                 ++i;
139         }
140         return i;
141 }
142
143 static void page_chain_add(struct page **head,
144                 struct page *chain_first, struct page *chain_last)
145 {
146 #if 1
147         struct page *tmp;
148         tmp = page_chain_tail(chain_first, NULL);
149         BUG_ON(tmp != chain_last);
150 #endif
151
152         /* add chain to head */
153         set_page_private(chain_last, (unsigned long)*head);
154         *head = chain_first;
155 }
156
157 static struct page *__drbd_alloc_pages(struct drbd_device *device,
158                                        unsigned int number)
159 {
160         struct page *page = NULL;
161         struct page *tmp = NULL;
162         unsigned int i = 0;
163
164         /* Yes, testing drbd_pp_vacant outside the lock is racy.
165          * So what. It saves a spin_lock. */
166         if (drbd_pp_vacant >= number) {
167                 spin_lock(&drbd_pp_lock);
168                 page = page_chain_del(&drbd_pp_pool, number);
169                 if (page)
170                         drbd_pp_vacant -= number;
171                 spin_unlock(&drbd_pp_lock);
172                 if (page)
173                         return page;
174         }
175
176         /* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
177          * "criss-cross" setup, that might cause write-out on some other DRBD,
178          * which in turn might block on the other node at this very place.  */
179         for (i = 0; i < number; i++) {
180                 tmp = alloc_page(GFP_TRY);
181                 if (!tmp)
182                         break;
183                 set_page_private(tmp, (unsigned long)page);
184                 page = tmp;
185         }
186
187         if (i == number)
188                 return page;
189
190         /* Not enough pages immediately available this time.
191          * No need to jump around here, drbd_alloc_pages will retry this
192          * function "soon". */
193         if (page) {
194                 tmp = page_chain_tail(page, NULL);
195                 spin_lock(&drbd_pp_lock);
196                 page_chain_add(&drbd_pp_pool, page, tmp);
197                 drbd_pp_vacant += i;
198                 spin_unlock(&drbd_pp_lock);
199         }
200         return NULL;
201 }
202
203 static void reclaim_finished_net_peer_reqs(struct drbd_device *device,
204                                            struct list_head *to_be_freed)
205 {
206         struct drbd_peer_request *peer_req, *tmp;
207
208         /* The EEs are always appended to the end of the list. Since
209            they are sent in order over the wire, they have to finish
210            in order. As soon as we see the first not finished we can
211            stop to examine the list... */
212
213         list_for_each_entry_safe(peer_req, tmp, &device->net_ee, w.list) {
214                 if (drbd_peer_req_has_active_page(peer_req))
215                         break;
216                 list_move(&peer_req->w.list, to_be_freed);
217         }
218 }
219
220 static void drbd_reclaim_net_peer_reqs(struct drbd_device *device)
221 {
222         LIST_HEAD(reclaimed);
223         struct drbd_peer_request *peer_req, *t;
224
225         spin_lock_irq(&device->resource->req_lock);
226         reclaim_finished_net_peer_reqs(device, &reclaimed);
227         spin_unlock_irq(&device->resource->req_lock);
228         list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
229                 drbd_free_net_peer_req(device, peer_req);
230 }
231
232 static void conn_reclaim_net_peer_reqs(struct drbd_connection *connection)
233 {
234         struct drbd_peer_device *peer_device;
235         int vnr;
236
237         rcu_read_lock();
238         idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
239                 struct drbd_device *device = peer_device->device;
240                 if (!atomic_read(&device->pp_in_use_by_net))
241                         continue;
242
243                 kref_get(&device->kref);
244                 rcu_read_unlock();
245                 drbd_reclaim_net_peer_reqs(device);
246                 kref_put(&device->kref, drbd_destroy_device);
247                 rcu_read_lock();
248         }
249         rcu_read_unlock();
250 }
251
252 /**
253  * drbd_alloc_pages() - Returns @number pages, retries forever (or until signalled)
254  * @device:     DRBD device.
255  * @number:     number of pages requested
256  * @retry:      whether to retry, if not enough pages are available right now
257  *
258  * Tries to allocate number pages, first from our own page pool, then from
259  * the kernel.
260  * Possibly retry until DRBD frees sufficient pages somewhere else.
261  *
262  * If this allocation would exceed the max_buffers setting, we throttle
263  * allocation (schedule_timeout) to give the system some room to breathe.
264  *
265  * We do not use max-buffers as hard limit, because it could lead to
266  * congestion and further to a distributed deadlock during online-verify or
267  * (checksum based) resync, if the max-buffers, socket buffer sizes and
268  * resync-rate settings are mis-configured.
269  *
270  * Returns a page chain linked via page->private.
271  */
272 struct page *drbd_alloc_pages(struct drbd_peer_device *peer_device, unsigned int number,
273                               bool retry)
274 {
275         struct drbd_device *device = peer_device->device;
276         struct page *page = NULL;
277         struct net_conf *nc;
278         DEFINE_WAIT(wait);
279         unsigned int mxb;
280
281         rcu_read_lock();
282         nc = rcu_dereference(peer_device->connection->net_conf);
283         mxb = nc ? nc->max_buffers : 1000000;
284         rcu_read_unlock();
285
286         if (atomic_read(&device->pp_in_use) < mxb)
287                 page = __drbd_alloc_pages(device, number);
288
289         /* Try to keep the fast path fast, but occasionally we need
290          * to reclaim the pages we lended to the network stack. */
291         if (page && atomic_read(&device->pp_in_use_by_net) > 512)
292                 drbd_reclaim_net_peer_reqs(device);
293
294         while (page == NULL) {
295                 prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
296
297                 drbd_reclaim_net_peer_reqs(device);
298
299                 if (atomic_read(&device->pp_in_use) < mxb) {
300                         page = __drbd_alloc_pages(device, number);
301                         if (page)
302                                 break;
303                 }
304
305                 if (!retry)
306                         break;
307
308                 if (signal_pending(current)) {
309                         drbd_warn(device, "drbd_alloc_pages interrupted!\n");
310                         break;
311                 }
312
313                 if (schedule_timeout(HZ/10) == 0)
314                         mxb = UINT_MAX;
315         }
316         finish_wait(&drbd_pp_wait, &wait);
317
318         if (page)
319                 atomic_add(number, &device->pp_in_use);
320         return page;
321 }
322
323 /* Must not be used from irq, as that may deadlock: see drbd_alloc_pages.
324  * Is also used from inside an other spin_lock_irq(&resource->req_lock);
325  * Either links the page chain back to the global pool,
326  * or returns all pages to the system. */
327 static void drbd_free_pages(struct drbd_device *device, struct page *page, int is_net)
328 {
329         atomic_t *a = is_net ? &device->pp_in_use_by_net : &device->pp_in_use;
330         int i;
331
332         if (page == NULL)
333                 return;
334
335         if (drbd_pp_vacant > (DRBD_MAX_BIO_SIZE/PAGE_SIZE) * minor_count)
336                 i = page_chain_free(page);
337         else {
338                 struct page *tmp;
339                 tmp = page_chain_tail(page, &i);
340                 spin_lock(&drbd_pp_lock);
341                 page_chain_add(&drbd_pp_pool, page, tmp);
342                 drbd_pp_vacant += i;
343                 spin_unlock(&drbd_pp_lock);
344         }
345         i = atomic_sub_return(i, a);
346         if (i < 0)
347                 drbd_warn(device, "ASSERTION FAILED: %s: %d < 0\n",
348                         is_net ? "pp_in_use_by_net" : "pp_in_use", i);
349         wake_up(&drbd_pp_wait);
350 }
351
352 /*
353 You need to hold the req_lock:
354  _drbd_wait_ee_list_empty()
355
356 You must not have the req_lock:
357  drbd_free_peer_req()
358  drbd_alloc_peer_req()
359  drbd_free_peer_reqs()
360  drbd_ee_fix_bhs()
361  drbd_finish_peer_reqs()
362  drbd_clear_done_ee()
363  drbd_wait_ee_list_empty()
364 */
365
366 /* normal: payload_size == request size (bi_size)
367  * w_same: payload_size == logical_block_size
368  * trim: payload_size == 0 */
369 struct drbd_peer_request *
370 drbd_alloc_peer_req(struct drbd_peer_device *peer_device, u64 id, sector_t sector,
371                     unsigned int request_size, unsigned int payload_size, gfp_t gfp_mask) __must_hold(local)
372 {
373         struct drbd_device *device = peer_device->device;
374         struct drbd_peer_request *peer_req;
375         struct page *page = NULL;
376         unsigned nr_pages = (payload_size + PAGE_SIZE -1) >> PAGE_SHIFT;
377
378         if (drbd_insert_fault(device, DRBD_FAULT_AL_EE))
379                 return NULL;
380
381         peer_req = mempool_alloc(drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM);
382         if (!peer_req) {
383                 if (!(gfp_mask & __GFP_NOWARN))
384                         drbd_err(device, "%s: allocation failed\n", __func__);
385                 return NULL;
386         }
387
388         if (nr_pages) {
389                 page = drbd_alloc_pages(peer_device, nr_pages,
390                                         gfpflags_allow_blocking(gfp_mask));
391                 if (!page)
392                         goto fail;
393         }
394
395         memset(peer_req, 0, sizeof(*peer_req));
396         INIT_LIST_HEAD(&peer_req->w.list);
397         drbd_clear_interval(&peer_req->i);
398         peer_req->i.size = request_size;
399         peer_req->i.sector = sector;
400         peer_req->submit_jif = jiffies;
401         peer_req->peer_device = peer_device;
402         peer_req->pages = page;
403         /*
404          * The block_id is opaque to the receiver.  It is not endianness
405          * converted, and sent back to the sender unchanged.
406          */
407         peer_req->block_id = id;
408
409         return peer_req;
410
411  fail:
412         mempool_free(peer_req, drbd_ee_mempool);
413         return NULL;
414 }
415
416 void __drbd_free_peer_req(struct drbd_device *device, struct drbd_peer_request *peer_req,
417                        int is_net)
418 {
419         might_sleep();
420         if (peer_req->flags & EE_HAS_DIGEST)
421                 kfree(peer_req->digest);
422         drbd_free_pages(device, peer_req->pages, is_net);
423         D_ASSERT(device, atomic_read(&peer_req->pending_bios) == 0);
424         D_ASSERT(device, drbd_interval_empty(&peer_req->i));
425         if (!expect(!(peer_req->flags & EE_CALL_AL_COMPLETE_IO))) {
426                 peer_req->flags &= ~EE_CALL_AL_COMPLETE_IO;
427                 drbd_al_complete_io(device, &peer_req->i);
428         }
429         mempool_free(peer_req, drbd_ee_mempool);
430 }
431
432 int drbd_free_peer_reqs(struct drbd_device *device, struct list_head *list)
433 {
434         LIST_HEAD(work_list);
435         struct drbd_peer_request *peer_req, *t;
436         int count = 0;
437         int is_net = list == &device->net_ee;
438
439         spin_lock_irq(&device->resource->req_lock);
440         list_splice_init(list, &work_list);
441         spin_unlock_irq(&device->resource->req_lock);
442
443         list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
444                 __drbd_free_peer_req(device, peer_req, is_net);
445                 count++;
446         }
447         return count;
448 }
449
450 /*
451  * See also comments in _req_mod(,BARRIER_ACKED) and receive_Barrier.
452  */
453 static int drbd_finish_peer_reqs(struct drbd_device *device)
454 {
455         LIST_HEAD(work_list);
456         LIST_HEAD(reclaimed);
457         struct drbd_peer_request *peer_req, *t;
458         int err = 0;
459
460         spin_lock_irq(&device->resource->req_lock);
461         reclaim_finished_net_peer_reqs(device, &reclaimed);
462         list_splice_init(&device->done_ee, &work_list);
463         spin_unlock_irq(&device->resource->req_lock);
464
465         list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
466                 drbd_free_net_peer_req(device, peer_req);
467
468         /* possible callbacks here:
469          * e_end_block, and e_end_resync_block, e_send_superseded.
470          * all ignore the last argument.
471          */
472         list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
473                 int err2;
474
475                 /* list_del not necessary, next/prev members not touched */
476                 err2 = peer_req->w.cb(&peer_req->w, !!err);
477                 if (!err)
478                         err = err2;
479                 drbd_free_peer_req(device, peer_req);
480         }
481         wake_up(&device->ee_wait);
482
483         return err;
484 }
485
486 static void _drbd_wait_ee_list_empty(struct drbd_device *device,
487                                      struct list_head *head)
488 {
489         DEFINE_WAIT(wait);
490
491         /* avoids spin_lock/unlock
492          * and calling prepare_to_wait in the fast path */
493         while (!list_empty(head)) {
494                 prepare_to_wait(&device->ee_wait, &wait, TASK_UNINTERRUPTIBLE);
495                 spin_unlock_irq(&device->resource->req_lock);
496                 io_schedule();
497                 finish_wait(&device->ee_wait, &wait);
498                 spin_lock_irq(&device->resource->req_lock);
499         }
500 }
501
502 static void drbd_wait_ee_list_empty(struct drbd_device *device,
503                                     struct list_head *head)
504 {
505         spin_lock_irq(&device->resource->req_lock);
506         _drbd_wait_ee_list_empty(device, head);
507         spin_unlock_irq(&device->resource->req_lock);
508 }
509
510 static int drbd_recv_short(struct socket *sock, void *buf, size_t size, int flags)
511 {
512         struct kvec iov = {
513                 .iov_base = buf,
514                 .iov_len = size,
515         };
516         struct msghdr msg = {
517                 .msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL)
518         };
519         return kernel_recvmsg(sock, &msg, &iov, 1, size, msg.msg_flags);
520 }
521
522 static int drbd_recv(struct drbd_connection *connection, void *buf, size_t size)
523 {
524         int rv;
525
526         rv = drbd_recv_short(connection->data.socket, buf, size, 0);
527
528         if (rv < 0) {
529                 if (rv == -ECONNRESET)
530                         drbd_info(connection, "sock was reset by peer\n");
531                 else if (rv != -ERESTARTSYS)
532                         drbd_err(connection, "sock_recvmsg returned %d\n", rv);
533         } else if (rv == 0) {
534                 if (test_bit(DISCONNECT_SENT, &connection->flags)) {
535                         long t;
536                         rcu_read_lock();
537                         t = rcu_dereference(connection->net_conf)->ping_timeo * HZ/10;
538                         rcu_read_unlock();
539
540                         t = wait_event_timeout(connection->ping_wait, connection->cstate < C_WF_REPORT_PARAMS, t);
541
542                         if (t)
543                                 goto out;
544                 }
545                 drbd_info(connection, "sock was shut down by peer\n");
546         }
547
548         if (rv != size)
549                 conn_request_state(connection, NS(conn, C_BROKEN_PIPE), CS_HARD);
550
551 out:
552         return rv;
553 }
554
555 static int drbd_recv_all(struct drbd_connection *connection, void *buf, size_t size)
556 {
557         int err;
558
559         err = drbd_recv(connection, buf, size);
560         if (err != size) {
561                 if (err >= 0)
562                         err = -EIO;
563         } else
564                 err = 0;
565         return err;
566 }
567
568 static int drbd_recv_all_warn(struct drbd_connection *connection, void *buf, size_t size)
569 {
570         int err;
571
572         err = drbd_recv_all(connection, buf, size);
573         if (err && !signal_pending(current))
574                 drbd_warn(connection, "short read (expected size %d)\n", (int)size);
575         return err;
576 }
577
578 /* quoting tcp(7):
579  *   On individual connections, the socket buffer size must be set prior to the
580  *   listen(2) or connect(2) calls in order to have it take effect.
581  * This is our wrapper to do so.
582  */
583 static void drbd_setbufsize(struct socket *sock, unsigned int snd,
584                 unsigned int rcv)
585 {
586         /* open coded SO_SNDBUF, SO_RCVBUF */
587         if (snd) {
588                 sock->sk->sk_sndbuf = snd;
589                 sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
590         }
591         if (rcv) {
592                 sock->sk->sk_rcvbuf = rcv;
593                 sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
594         }
595 }
596
597 static struct socket *drbd_try_connect(struct drbd_connection *connection)
598 {
599         const char *what;
600         struct socket *sock;
601         struct sockaddr_in6 src_in6;
602         struct sockaddr_in6 peer_in6;
603         struct net_conf *nc;
604         int err, peer_addr_len, my_addr_len;
605         int sndbuf_size, rcvbuf_size, connect_int;
606         int disconnect_on_error = 1;
607
608         rcu_read_lock();
609         nc = rcu_dereference(connection->net_conf);
610         if (!nc) {
611                 rcu_read_unlock();
612                 return NULL;
613         }
614         sndbuf_size = nc->sndbuf_size;
615         rcvbuf_size = nc->rcvbuf_size;
616         connect_int = nc->connect_int;
617         rcu_read_unlock();
618
619         my_addr_len = min_t(int, connection->my_addr_len, sizeof(src_in6));
620         memcpy(&src_in6, &connection->my_addr, my_addr_len);
621
622         if (((struct sockaddr *)&connection->my_addr)->sa_family == AF_INET6)
623                 src_in6.sin6_port = 0;
624         else
625                 ((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */
626
627         peer_addr_len = min_t(int, connection->peer_addr_len, sizeof(src_in6));
628         memcpy(&peer_in6, &connection->peer_addr, peer_addr_len);
629
630         what = "sock_create_kern";
631         err = sock_create_kern(&init_net, ((struct sockaddr *)&src_in6)->sa_family,
632                                SOCK_STREAM, IPPROTO_TCP, &sock);
633         if (err < 0) {
634                 sock = NULL;
635                 goto out;
636         }
637
638         sock->sk->sk_rcvtimeo =
639         sock->sk->sk_sndtimeo = connect_int * HZ;
640         drbd_setbufsize(sock, sndbuf_size, rcvbuf_size);
641
642        /* explicitly bind to the configured IP as source IP
643         *  for the outgoing connections.
644         *  This is needed for multihomed hosts and to be
645         *  able to use lo: interfaces for drbd.
646         * Make sure to use 0 as port number, so linux selects
647         *  a free one dynamically.
648         */
649         what = "bind before connect";
650         err = sock->ops->bind(sock, (struct sockaddr *) &src_in6, my_addr_len);
651         if (err < 0)
652                 goto out;
653
654         /* connect may fail, peer not yet available.
655          * stay C_WF_CONNECTION, don't go Disconnecting! */
656         disconnect_on_error = 0;
657         what = "connect";
658         err = sock->ops->connect(sock, (struct sockaddr *) &peer_in6, peer_addr_len, 0);
659
660 out:
661         if (err < 0) {
662                 if (sock) {
663                         sock_release(sock);
664                         sock = NULL;
665                 }
666                 switch (-err) {
667                         /* timeout, busy, signal pending */
668                 case ETIMEDOUT: case EAGAIN: case EINPROGRESS:
669                 case EINTR: case ERESTARTSYS:
670                         /* peer not (yet) available, network problem */
671                 case ECONNREFUSED: case ENETUNREACH:
672                 case EHOSTDOWN:    case EHOSTUNREACH:
673                         disconnect_on_error = 0;
674                         break;
675                 default:
676                         drbd_err(connection, "%s failed, err = %d\n", what, err);
677                 }
678                 if (disconnect_on_error)
679                         conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
680         }
681
682         return sock;
683 }
684
685 struct accept_wait_data {
686         struct drbd_connection *connection;
687         struct socket *s_listen;
688         struct completion door_bell;
689         void (*original_sk_state_change)(struct sock *sk);
690
691 };
692
693 static void drbd_incoming_connection(struct sock *sk)
694 {
695         struct accept_wait_data *ad = sk->sk_user_data;
696         void (*state_change)(struct sock *sk);
697
698         state_change = ad->original_sk_state_change;
699         if (sk->sk_state == TCP_ESTABLISHED)
700                 complete(&ad->door_bell);
701         state_change(sk);
702 }
703
704 static int prepare_listen_socket(struct drbd_connection *connection, struct accept_wait_data *ad)
705 {
706         int err, sndbuf_size, rcvbuf_size, my_addr_len;
707         struct sockaddr_in6 my_addr;
708         struct socket *s_listen;
709         struct net_conf *nc;
710         const char *what;
711
712         rcu_read_lock();
713         nc = rcu_dereference(connection->net_conf);
714         if (!nc) {
715                 rcu_read_unlock();
716                 return -EIO;
717         }
718         sndbuf_size = nc->sndbuf_size;
719         rcvbuf_size = nc->rcvbuf_size;
720         rcu_read_unlock();
721
722         my_addr_len = min_t(int, connection->my_addr_len, sizeof(struct sockaddr_in6));
723         memcpy(&my_addr, &connection->my_addr, my_addr_len);
724
725         what = "sock_create_kern";
726         err = sock_create_kern(&init_net, ((struct sockaddr *)&my_addr)->sa_family,
727                                SOCK_STREAM, IPPROTO_TCP, &s_listen);
728         if (err) {
729                 s_listen = NULL;
730                 goto out;
731         }
732
733         s_listen->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
734         drbd_setbufsize(s_listen, sndbuf_size, rcvbuf_size);
735
736         what = "bind before listen";
737         err = s_listen->ops->bind(s_listen, (struct sockaddr *)&my_addr, my_addr_len);
738         if (err < 0)
739                 goto out;
740
741         ad->s_listen = s_listen;
742         write_lock_bh(&s_listen->sk->sk_callback_lock);
743         ad->original_sk_state_change = s_listen->sk->sk_state_change;
744         s_listen->sk->sk_state_change = drbd_incoming_connection;
745         s_listen->sk->sk_user_data = ad;
746         write_unlock_bh(&s_listen->sk->sk_callback_lock);
747
748         what = "listen";
749         err = s_listen->ops->listen(s_listen, 5);
750         if (err < 0)
751                 goto out;
752
753         return 0;
754 out:
755         if (s_listen)
756                 sock_release(s_listen);
757         if (err < 0) {
758                 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
759                         drbd_err(connection, "%s failed, err = %d\n", what, err);
760                         conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
761                 }
762         }
763
764         return -EIO;
765 }
766
767 static void unregister_state_change(struct sock *sk, struct accept_wait_data *ad)
768 {
769         write_lock_bh(&sk->sk_callback_lock);
770         sk->sk_state_change = ad->original_sk_state_change;
771         sk->sk_user_data = NULL;
772         write_unlock_bh(&sk->sk_callback_lock);
773 }
774
775 static struct socket *drbd_wait_for_connect(struct drbd_connection *connection, struct accept_wait_data *ad)
776 {
777         int timeo, connect_int, err = 0;
778         struct socket *s_estab = NULL;
779         struct net_conf *nc;
780
781         rcu_read_lock();
782         nc = rcu_dereference(connection->net_conf);
783         if (!nc) {
784                 rcu_read_unlock();
785                 return NULL;
786         }
787         connect_int = nc->connect_int;
788         rcu_read_unlock();
789
790         timeo = connect_int * HZ;
791         /* 28.5% random jitter */
792         timeo += (prandom_u32() & 1) ? timeo / 7 : -timeo / 7;
793
794         err = wait_for_completion_interruptible_timeout(&ad->door_bell, timeo);
795         if (err <= 0)
796                 return NULL;
797
798         err = kernel_accept(ad->s_listen, &s_estab, 0);
799         if (err < 0) {
800                 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
801                         drbd_err(connection, "accept failed, err = %d\n", err);
802                         conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
803                 }
804         }
805
806         if (s_estab)
807                 unregister_state_change(s_estab->sk, ad);
808
809         return s_estab;
810 }
811
812 static int decode_header(struct drbd_connection *, void *, struct packet_info *);
813
814 static int send_first_packet(struct drbd_connection *connection, struct drbd_socket *sock,
815                              enum drbd_packet cmd)
816 {
817         if (!conn_prepare_command(connection, sock))
818                 return -EIO;
819         return conn_send_command(connection, sock, cmd, 0, NULL, 0);
820 }
821
822 static int receive_first_packet(struct drbd_connection *connection, struct socket *sock)
823 {
824         unsigned int header_size = drbd_header_size(connection);
825         struct packet_info pi;
826         struct net_conf *nc;
827         int err;
828
829         rcu_read_lock();
830         nc = rcu_dereference(connection->net_conf);
831         if (!nc) {
832                 rcu_read_unlock();
833                 return -EIO;
834         }
835         sock->sk->sk_rcvtimeo = nc->ping_timeo * 4 * HZ / 10;
836         rcu_read_unlock();
837
838         err = drbd_recv_short(sock, connection->data.rbuf, header_size, 0);
839         if (err != header_size) {
840                 if (err >= 0)
841                         err = -EIO;
842                 return err;
843         }
844         err = decode_header(connection, connection->data.rbuf, &pi);
845         if (err)
846                 return err;
847         return pi.cmd;
848 }
849
850 /**
851  * drbd_socket_okay() - Free the socket if its connection is not okay
852  * @sock:       pointer to the pointer to the socket.
853  */
854 static bool drbd_socket_okay(struct socket **sock)
855 {
856         int rr;
857         char tb[4];
858
859         if (!*sock)
860                 return false;
861
862         rr = drbd_recv_short(*sock, tb, 4, MSG_DONTWAIT | MSG_PEEK);
863
864         if (rr > 0 || rr == -EAGAIN) {
865                 return true;
866         } else {
867                 sock_release(*sock);
868                 *sock = NULL;
869                 return false;
870         }
871 }
872
873 static bool connection_established(struct drbd_connection *connection,
874                                    struct socket **sock1,
875                                    struct socket **sock2)
876 {
877         struct net_conf *nc;
878         int timeout;
879         bool ok;
880
881         if (!*sock1 || !*sock2)
882                 return false;
883
884         rcu_read_lock();
885         nc = rcu_dereference(connection->net_conf);
886         timeout = (nc->sock_check_timeo ?: nc->ping_timeo) * HZ / 10;
887         rcu_read_unlock();
888         schedule_timeout_interruptible(timeout);
889
890         ok = drbd_socket_okay(sock1);
891         ok = drbd_socket_okay(sock2) && ok;
892
893         return ok;
894 }
895
896 /* Gets called if a connection is established, or if a new minor gets created
897    in a connection */
898 int drbd_connected(struct drbd_peer_device *peer_device)
899 {
900         struct drbd_device *device = peer_device->device;
901         int err;
902
903         atomic_set(&device->packet_seq, 0);
904         device->peer_seq = 0;
905
906         device->state_mutex = peer_device->connection->agreed_pro_version < 100 ?
907                 &peer_device->connection->cstate_mutex :
908                 &device->own_state_mutex;
909
910         err = drbd_send_sync_param(peer_device);
911         if (!err)
912                 err = drbd_send_sizes(peer_device, 0, 0);
913         if (!err)
914                 err = drbd_send_uuids(peer_device);
915         if (!err)
916                 err = drbd_send_current_state(peer_device);
917         clear_bit(USE_DEGR_WFC_T, &device->flags);
918         clear_bit(RESIZE_PENDING, &device->flags);
919         atomic_set(&device->ap_in_flight, 0);
920         mod_timer(&device->request_timer, jiffies + HZ); /* just start it here. */
921         return err;
922 }
923
924 /*
925  * return values:
926  *   1 yes, we have a valid connection
927  *   0 oops, did not work out, please try again
928  *  -1 peer talks different language,
929  *     no point in trying again, please go standalone.
930  *  -2 We do not have a network config...
931  */
932 static int conn_connect(struct drbd_connection *connection)
933 {
934         struct drbd_socket sock, msock;
935         struct drbd_peer_device *peer_device;
936         struct net_conf *nc;
937         int vnr, timeout, h;
938         bool discard_my_data, ok;
939         enum drbd_state_rv rv;
940         struct accept_wait_data ad = {
941                 .connection = connection,
942                 .door_bell = COMPLETION_INITIALIZER_ONSTACK(ad.door_bell),
943         };
944
945         clear_bit(DISCONNECT_SENT, &connection->flags);
946         if (conn_request_state(connection, NS(conn, C_WF_CONNECTION), CS_VERBOSE) < SS_SUCCESS)
947                 return -2;
948
949         mutex_init(&sock.mutex);
950         sock.sbuf = connection->data.sbuf;
951         sock.rbuf = connection->data.rbuf;
952         sock.socket = NULL;
953         mutex_init(&msock.mutex);
954         msock.sbuf = connection->meta.sbuf;
955         msock.rbuf = connection->meta.rbuf;
956         msock.socket = NULL;
957
958         /* Assume that the peer only understands protocol 80 until we know better.  */
959         connection->agreed_pro_version = 80;
960
961         if (prepare_listen_socket(connection, &ad))
962                 return 0;
963
964         do {
965                 struct socket *s;
966
967                 s = drbd_try_connect(connection);
968                 if (s) {
969                         if (!sock.socket) {
970                                 sock.socket = s;
971                                 send_first_packet(connection, &sock, P_INITIAL_DATA);
972                         } else if (!msock.socket) {
973                                 clear_bit(RESOLVE_CONFLICTS, &connection->flags);
974                                 msock.socket = s;
975                                 send_first_packet(connection, &msock, P_INITIAL_META);
976                         } else {
977                                 drbd_err(connection, "Logic error in conn_connect()\n");
978                                 goto out_release_sockets;
979                         }
980                 }
981
982                 if (connection_established(connection, &sock.socket, &msock.socket))
983                         break;
984
985 retry:
986                 s = drbd_wait_for_connect(connection, &ad);
987                 if (s) {
988                         int fp = receive_first_packet(connection, s);
989                         drbd_socket_okay(&sock.socket);
990                         drbd_socket_okay(&msock.socket);
991                         switch (fp) {
992                         case P_INITIAL_DATA:
993                                 if (sock.socket) {
994                                         drbd_warn(connection, "initial packet S crossed\n");
995                                         sock_release(sock.socket);
996                                         sock.socket = s;
997                                         goto randomize;
998                                 }
999                                 sock.socket = s;
1000                                 break;
1001                         case P_INITIAL_META:
1002                                 set_bit(RESOLVE_CONFLICTS, &connection->flags);
1003                                 if (msock.socket) {
1004                                         drbd_warn(connection, "initial packet M crossed\n");
1005                                         sock_release(msock.socket);
1006                                         msock.socket = s;
1007                                         goto randomize;
1008                                 }
1009                                 msock.socket = s;
1010                                 break;
1011                         default:
1012                                 drbd_warn(connection, "Error receiving initial packet\n");
1013                                 sock_release(s);
1014 randomize:
1015                                 if (prandom_u32() & 1)
1016                                         goto retry;
1017                         }
1018                 }
1019
1020                 if (connection->cstate <= C_DISCONNECTING)
1021                         goto out_release_sockets;
1022                 if (signal_pending(current)) {
1023                         flush_signals(current);
1024                         smp_rmb();
1025                         if (get_t_state(&connection->receiver) == EXITING)
1026                                 goto out_release_sockets;
1027                 }
1028
1029                 ok = connection_established(connection, &sock.socket, &msock.socket);
1030         } while (!ok);
1031
1032         if (ad.s_listen)
1033                 sock_release(ad.s_listen);
1034
1035         sock.socket->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
1036         msock.socket->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
1037
1038         sock.socket->sk->sk_allocation = GFP_NOIO;
1039         msock.socket->sk->sk_allocation = GFP_NOIO;
1040
1041         sock.socket->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK;
1042         msock.socket->sk->sk_priority = TC_PRIO_INTERACTIVE;
1043
1044         /* NOT YET ...
1045          * sock.socket->sk->sk_sndtimeo = connection->net_conf->timeout*HZ/10;
1046          * sock.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
1047          * first set it to the P_CONNECTION_FEATURES timeout,
1048          * which we set to 4x the configured ping_timeout. */
1049         rcu_read_lock();
1050         nc = rcu_dereference(connection->net_conf);
1051
1052         sock.socket->sk->sk_sndtimeo =
1053         sock.socket->sk->sk_rcvtimeo = nc->ping_timeo*4*HZ/10;
1054
1055         msock.socket->sk->sk_rcvtimeo = nc->ping_int*HZ;
1056         timeout = nc->timeout * HZ / 10;
1057         discard_my_data = nc->discard_my_data;
1058         rcu_read_unlock();
1059
1060         msock.socket->sk->sk_sndtimeo = timeout;
1061
1062         /* we don't want delays.
1063          * we use TCP_CORK where appropriate, though */
1064         drbd_tcp_nodelay(sock.socket);
1065         drbd_tcp_nodelay(msock.socket);
1066
1067         connection->data.socket = sock.socket;
1068         connection->meta.socket = msock.socket;
1069         connection->last_received = jiffies;
1070
1071         h = drbd_do_features(connection);
1072         if (h <= 0)
1073                 return h;
1074
1075         if (connection->cram_hmac_tfm) {
1076                 /* drbd_request_state(device, NS(conn, WFAuth)); */
1077                 switch (drbd_do_auth(connection)) {
1078                 case -1:
1079                         drbd_err(connection, "Authentication of peer failed\n");
1080                         return -1;
1081                 case 0:
1082                         drbd_err(connection, "Authentication of peer failed, trying again.\n");
1083                         return 0;
1084                 }
1085         }
1086
1087         connection->data.socket->sk->sk_sndtimeo = timeout;
1088         connection->data.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
1089
1090         if (drbd_send_protocol(connection) == -EOPNOTSUPP)
1091                 return -1;
1092
1093         /* Prevent a race between resync-handshake and
1094          * being promoted to Primary.
1095          *
1096          * Grab and release the state mutex, so we know that any current
1097          * drbd_set_role() is finished, and any incoming drbd_set_role
1098          * will see the STATE_SENT flag, and wait for it to be cleared.
1099          */
1100         idr_for_each_entry(&connection->peer_devices, peer_device, vnr)
1101                 mutex_lock(peer_device->device->state_mutex);
1102
1103         set_bit(STATE_SENT, &connection->flags);
1104
1105         idr_for_each_entry(&connection->peer_devices, peer_device, vnr)
1106                 mutex_unlock(peer_device->device->state_mutex);
1107
1108         rcu_read_lock();
1109         idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1110                 struct drbd_device *device = peer_device->device;
1111                 kref_get(&device->kref);
1112                 rcu_read_unlock();
1113
1114                 if (discard_my_data)
1115                         set_bit(DISCARD_MY_DATA, &device->flags);
1116                 else
1117                         clear_bit(DISCARD_MY_DATA, &device->flags);
1118
1119                 drbd_connected(peer_device);
1120                 kref_put(&device->kref, drbd_destroy_device);
1121                 rcu_read_lock();
1122         }
1123         rcu_read_unlock();
1124
1125         rv = conn_request_state(connection, NS(conn, C_WF_REPORT_PARAMS), CS_VERBOSE);
1126         if (rv < SS_SUCCESS || connection->cstate != C_WF_REPORT_PARAMS) {
1127                 clear_bit(STATE_SENT, &connection->flags);
1128                 return 0;
1129         }
1130
1131         drbd_thread_start(&connection->ack_receiver);
1132         /* opencoded create_singlethread_workqueue(),
1133          * to be able to use format string arguments */
1134         connection->ack_sender =
1135                 alloc_ordered_workqueue("drbd_as_%s", WQ_MEM_RECLAIM, connection->resource->name);
1136         if (!connection->ack_sender) {
1137                 drbd_err(connection, "Failed to create workqueue ack_sender\n");
1138                 return 0;
1139         }
1140
1141         mutex_lock(&connection->resource->conf_update);
1142         /* The discard_my_data flag is a single-shot modifier to the next
1143          * connection attempt, the handshake of which is now well underway.
1144          * No need for rcu style copying of the whole struct
1145          * just to clear a single value. */
1146         connection->net_conf->discard_my_data = 0;
1147         mutex_unlock(&connection->resource->conf_update);
1148
1149         return h;
1150
1151 out_release_sockets:
1152         if (ad.s_listen)
1153                 sock_release(ad.s_listen);
1154         if (sock.socket)
1155                 sock_release(sock.socket);
1156         if (msock.socket)
1157                 sock_release(msock.socket);
1158         return -1;
1159 }
1160
1161 static int decode_header(struct drbd_connection *connection, void *header, struct packet_info *pi)
1162 {
1163         unsigned int header_size = drbd_header_size(connection);
1164
1165         if (header_size == sizeof(struct p_header100) &&
1166             *(__be32 *)header == cpu_to_be32(DRBD_MAGIC_100)) {
1167                 struct p_header100 *h = header;
1168                 if (h->pad != 0) {
1169                         drbd_err(connection, "Header padding is not zero\n");
1170                         return -EINVAL;
1171                 }
1172                 pi->vnr = be16_to_cpu(h->volume);
1173                 pi->cmd = be16_to_cpu(h->command);
1174                 pi->size = be32_to_cpu(h->length);
1175         } else if (header_size == sizeof(struct p_header95) &&
1176                    *(__be16 *)header == cpu_to_be16(DRBD_MAGIC_BIG)) {
1177                 struct p_header95 *h = header;
1178                 pi->cmd = be16_to_cpu(h->command);
1179                 pi->size = be32_to_cpu(h->length);
1180                 pi->vnr = 0;
1181         } else if (header_size == sizeof(struct p_header80) &&
1182                    *(__be32 *)header == cpu_to_be32(DRBD_MAGIC)) {
1183                 struct p_header80 *h = header;
1184                 pi->cmd = be16_to_cpu(h->command);
1185                 pi->size = be16_to_cpu(h->length);
1186                 pi->vnr = 0;
1187         } else {
1188                 drbd_err(connection, "Wrong magic value 0x%08x in protocol version %d\n",
1189                          be32_to_cpu(*(__be32 *)header),
1190                          connection->agreed_pro_version);
1191                 return -EINVAL;
1192         }
1193         pi->data = header + header_size;
1194         return 0;
1195 }
1196
1197 static int drbd_recv_header(struct drbd_connection *connection, struct packet_info *pi)
1198 {
1199         void *buffer = connection->data.rbuf;
1200         int err;
1201
1202         err = drbd_recv_all_warn(connection, buffer, drbd_header_size(connection));
1203         if (err)
1204                 return err;
1205
1206         err = decode_header(connection, buffer, pi);
1207         connection->last_received = jiffies;
1208
1209         return err;
1210 }
1211
1212 /* This is blkdev_issue_flush, but asynchronous.
1213  * We want to submit to all component volumes in parallel,
1214  * then wait for all completions.
1215  */
1216 struct issue_flush_context {
1217         atomic_t pending;
1218         int error;
1219         struct completion done;
1220 };
1221 struct one_flush_context {
1222         struct drbd_device *device;
1223         struct issue_flush_context *ctx;
1224 };
1225
1226 void one_flush_endio(struct bio *bio)
1227 {
1228         struct one_flush_context *octx = bio->bi_private;
1229         struct drbd_device *device = octx->device;
1230         struct issue_flush_context *ctx = octx->ctx;
1231
1232         if (bio->bi_error) {
1233                 ctx->error = bio->bi_error;
1234                 drbd_info(device, "local disk FLUSH FAILED with status %d\n", bio->bi_error);
1235         }
1236         kfree(octx);
1237         bio_put(bio);
1238
1239         clear_bit(FLUSH_PENDING, &device->flags);
1240         put_ldev(device);
1241         kref_put(&device->kref, drbd_destroy_device);
1242
1243         if (atomic_dec_and_test(&ctx->pending))
1244                 complete(&ctx->done);
1245 }
1246
1247 static void submit_one_flush(struct drbd_device *device, struct issue_flush_context *ctx)
1248 {
1249         struct bio *bio = bio_alloc(GFP_NOIO, 0);
1250         struct one_flush_context *octx = kmalloc(sizeof(*octx), GFP_NOIO);
1251         if (!bio || !octx) {
1252                 drbd_warn(device, "Could not allocate a bio, CANNOT ISSUE FLUSH\n");
1253                 /* FIXME: what else can I do now?  disconnecting or detaching
1254                  * really does not help to improve the state of the world, either.
1255                  */
1256                 kfree(octx);
1257                 if (bio)
1258                         bio_put(bio);
1259
1260                 ctx->error = -ENOMEM;
1261                 put_ldev(device);
1262                 kref_put(&device->kref, drbd_destroy_device);
1263                 return;
1264         }
1265
1266         octx->device = device;
1267         octx->ctx = ctx;
1268         bio->bi_bdev = device->ldev->backing_bdev;
1269         bio->bi_private = octx;
1270         bio->bi_end_io = one_flush_endio;
1271         bio->bi_opf = REQ_OP_FLUSH | REQ_PREFLUSH;
1272
1273         device->flush_jif = jiffies;
1274         set_bit(FLUSH_PENDING, &device->flags);
1275         atomic_inc(&ctx->pending);
1276         submit_bio(bio);
1277 }
1278
1279 static void drbd_flush(struct drbd_connection *connection)
1280 {
1281         if (connection->resource->write_ordering >= WO_BDEV_FLUSH) {
1282                 struct drbd_peer_device *peer_device;
1283                 struct issue_flush_context ctx;
1284                 int vnr;
1285
1286                 atomic_set(&ctx.pending, 1);
1287                 ctx.error = 0;
1288                 init_completion(&ctx.done);
1289
1290                 rcu_read_lock();
1291                 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1292                         struct drbd_device *device = peer_device->device;
1293
1294                         if (!get_ldev(device))
1295                                 continue;
1296                         kref_get(&device->kref);
1297                         rcu_read_unlock();
1298
1299                         submit_one_flush(device, &ctx);
1300
1301                         rcu_read_lock();
1302                 }
1303                 rcu_read_unlock();
1304
1305                 /* Do we want to add a timeout,
1306                  * if disk-timeout is set? */
1307                 if (!atomic_dec_and_test(&ctx.pending))
1308                         wait_for_completion(&ctx.done);
1309
1310                 if (ctx.error) {
1311                         /* would rather check on EOPNOTSUPP, but that is not reliable.
1312                          * don't try again for ANY return value != 0
1313                          * if (rv == -EOPNOTSUPP) */
1314                         /* Any error is already reported by bio_endio callback. */
1315                         drbd_bump_write_ordering(connection->resource, NULL, WO_DRAIN_IO);
1316                 }
1317         }
1318 }
1319
1320 /**
1321  * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it.
1322  * @device:     DRBD device.
1323  * @epoch:      Epoch object.
1324  * @ev:         Epoch event.
1325  */
1326 static enum finish_epoch drbd_may_finish_epoch(struct drbd_connection *connection,
1327                                                struct drbd_epoch *epoch,
1328                                                enum epoch_event ev)
1329 {
1330         int epoch_size;
1331         struct drbd_epoch *next_epoch;
1332         enum finish_epoch rv = FE_STILL_LIVE;
1333
1334         spin_lock(&connection->epoch_lock);
1335         do {
1336                 next_epoch = NULL;
1337
1338                 epoch_size = atomic_read(&epoch->epoch_size);
1339
1340                 switch (ev & ~EV_CLEANUP) {
1341                 case EV_PUT:
1342                         atomic_dec(&epoch->active);
1343                         break;
1344                 case EV_GOT_BARRIER_NR:
1345                         set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags);
1346                         break;
1347                 case EV_BECAME_LAST:
1348                         /* nothing to do*/
1349                         break;
1350                 }
1351
1352                 if (epoch_size != 0 &&
1353                     atomic_read(&epoch->active) == 0 &&
1354                     (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags) || ev & EV_CLEANUP)) {
1355                         if (!(ev & EV_CLEANUP)) {
1356                                 spin_unlock(&connection->epoch_lock);
1357                                 drbd_send_b_ack(epoch->connection, epoch->barrier_nr, epoch_size);
1358                                 spin_lock(&connection->epoch_lock);
1359                         }
1360 #if 0
1361                         /* FIXME: dec unacked on connection, once we have
1362                          * something to count pending connection packets in. */
1363                         if (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags))
1364                                 dec_unacked(epoch->connection);
1365 #endif
1366
1367                         if (connection->current_epoch != epoch) {
1368                                 next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list);
1369                                 list_del(&epoch->list);
1370                                 ev = EV_BECAME_LAST | (ev & EV_CLEANUP);
1371                                 connection->epochs--;
1372                                 kfree(epoch);
1373
1374                                 if (rv == FE_STILL_LIVE)
1375                                         rv = FE_DESTROYED;
1376                         } else {
1377                                 epoch->flags = 0;
1378                                 atomic_set(&epoch->epoch_size, 0);
1379                                 /* atomic_set(&epoch->active, 0); is already zero */
1380                                 if (rv == FE_STILL_LIVE)
1381                                         rv = FE_RECYCLED;
1382                         }
1383                 }
1384
1385                 if (!next_epoch)
1386                         break;
1387
1388                 epoch = next_epoch;
1389         } while (1);
1390
1391         spin_unlock(&connection->epoch_lock);
1392
1393         return rv;
1394 }
1395
1396 static enum write_ordering_e
1397 max_allowed_wo(struct drbd_backing_dev *bdev, enum write_ordering_e wo)
1398 {
1399         struct disk_conf *dc;
1400
1401         dc = rcu_dereference(bdev->disk_conf);
1402
1403         if (wo == WO_BDEV_FLUSH && !dc->disk_flushes)
1404                 wo = WO_DRAIN_IO;
1405         if (wo == WO_DRAIN_IO && !dc->disk_drain)
1406                 wo = WO_NONE;
1407
1408         return wo;
1409 }
1410
1411 /**
1412  * drbd_bump_write_ordering() - Fall back to an other write ordering method
1413  * @connection: DRBD connection.
1414  * @wo:         Write ordering method to try.
1415  */
1416 void drbd_bump_write_ordering(struct drbd_resource *resource, struct drbd_backing_dev *bdev,
1417                               enum write_ordering_e wo)
1418 {
1419         struct drbd_device *device;
1420         enum write_ordering_e pwo;
1421         int vnr;
1422         static char *write_ordering_str[] = {
1423                 [WO_NONE] = "none",
1424                 [WO_DRAIN_IO] = "drain",
1425                 [WO_BDEV_FLUSH] = "flush",
1426         };
1427
1428         pwo = resource->write_ordering;
1429         if (wo != WO_BDEV_FLUSH)
1430                 wo = min(pwo, wo);
1431         rcu_read_lock();
1432         idr_for_each_entry(&resource->devices, device, vnr) {
1433                 if (get_ldev(device)) {
1434                         wo = max_allowed_wo(device->ldev, wo);
1435                         if (device->ldev == bdev)
1436                                 bdev = NULL;
1437                         put_ldev(device);
1438                 }
1439         }
1440
1441         if (bdev)
1442                 wo = max_allowed_wo(bdev, wo);
1443
1444         rcu_read_unlock();
1445
1446         resource->write_ordering = wo;
1447         if (pwo != resource->write_ordering || wo == WO_BDEV_FLUSH)
1448                 drbd_info(resource, "Method to ensure write ordering: %s\n", write_ordering_str[resource->write_ordering]);
1449 }
1450
1451 /*
1452  * We *may* ignore the discard-zeroes-data setting, if so configured.
1453  *
1454  * Assumption is that it "discard_zeroes_data=0" is only because the backend
1455  * may ignore partial unaligned discards.
1456  *
1457  * LVM/DM thin as of at least
1458  *   LVM version:     2.02.115(2)-RHEL7 (2015-01-28)
1459  *   Library version: 1.02.93-RHEL7 (2015-01-28)
1460  *   Driver version:  4.29.0
1461  * still behaves this way.
1462  *
1463  * For unaligned (wrt. alignment and granularity) or too small discards,
1464  * we zero-out the initial (and/or) trailing unaligned partial chunks,
1465  * but discard all the aligned full chunks.
1466  *
1467  * At least for LVM/DM thin, the result is effectively "discard_zeroes_data=1".
1468  */
1469 int drbd_issue_discard_or_zero_out(struct drbd_device *device, sector_t start, unsigned int nr_sectors, bool discard)
1470 {
1471         struct block_device *bdev = device->ldev->backing_bdev;
1472         struct request_queue *q = bdev_get_queue(bdev);
1473         sector_t tmp, nr;
1474         unsigned int max_discard_sectors, granularity;
1475         int alignment;
1476         int err = 0;
1477
1478         if (!discard)
1479                 goto zero_out;
1480
1481         /* Zero-sector (unknown) and one-sector granularities are the same.  */
1482         granularity = max(q->limits.discard_granularity >> 9, 1U);
1483         alignment = (bdev_discard_alignment(bdev) >> 9) % granularity;
1484
1485         max_discard_sectors = min(q->limits.max_discard_sectors, (1U << 22));
1486         max_discard_sectors -= max_discard_sectors % granularity;
1487         if (unlikely(!max_discard_sectors))
1488                 goto zero_out;
1489
1490         if (nr_sectors < granularity)
1491                 goto zero_out;
1492
1493         tmp = start;
1494         if (sector_div(tmp, granularity) != alignment) {
1495                 if (nr_sectors < 2*granularity)
1496                         goto zero_out;
1497                 /* start + gran - (start + gran - align) % gran */
1498                 tmp = start + granularity - alignment;
1499                 tmp = start + granularity - sector_div(tmp, granularity);
1500
1501                 nr = tmp - start;
1502                 err |= blkdev_issue_zeroout(bdev, start, nr, GFP_NOIO, 0);
1503                 nr_sectors -= nr;
1504                 start = tmp;
1505         }
1506         while (nr_sectors >= granularity) {
1507                 nr = min_t(sector_t, nr_sectors, max_discard_sectors);
1508                 err |= blkdev_issue_discard(bdev, start, nr, GFP_NOIO, 0);
1509                 nr_sectors -= nr;
1510                 start += nr;
1511         }
1512  zero_out:
1513         if (nr_sectors) {
1514                 err |= blkdev_issue_zeroout(bdev, start, nr_sectors, GFP_NOIO, 0);
1515         }
1516         return err != 0;
1517 }
1518
1519 static bool can_do_reliable_discards(struct drbd_device *device)
1520 {
1521         struct request_queue *q = bdev_get_queue(device->ldev->backing_bdev);
1522         struct disk_conf *dc;
1523         bool can_do;
1524
1525         if (!blk_queue_discard(q))
1526                 return false;
1527
1528         if (q->limits.discard_zeroes_data)
1529                 return true;
1530
1531         rcu_read_lock();
1532         dc = rcu_dereference(device->ldev->disk_conf);
1533         can_do = dc->discard_zeroes_if_aligned;
1534         rcu_read_unlock();
1535         return can_do;
1536 }
1537
1538 static void drbd_issue_peer_discard(struct drbd_device *device, struct drbd_peer_request *peer_req)
1539 {
1540         /* If the backend cannot discard, or does not guarantee
1541          * read-back zeroes in discarded ranges, we fall back to
1542          * zero-out.  Unless configuration specifically requested
1543          * otherwise. */
1544         if (!can_do_reliable_discards(device))
1545                 peer_req->flags |= EE_IS_TRIM_USE_ZEROOUT;
1546
1547         if (drbd_issue_discard_or_zero_out(device, peer_req->i.sector,
1548             peer_req->i.size >> 9, !(peer_req->flags & EE_IS_TRIM_USE_ZEROOUT)))
1549                 peer_req->flags |= EE_WAS_ERROR;
1550         drbd_endio_write_sec_final(peer_req);
1551 }
1552
1553 static void drbd_issue_peer_wsame(struct drbd_device *device,
1554                                   struct drbd_peer_request *peer_req)
1555 {
1556         struct block_device *bdev = device->ldev->backing_bdev;
1557         sector_t s = peer_req->i.sector;
1558         sector_t nr = peer_req->i.size >> 9;
1559         if (blkdev_issue_write_same(bdev, s, nr, GFP_NOIO, peer_req->pages))
1560                 peer_req->flags |= EE_WAS_ERROR;
1561         drbd_endio_write_sec_final(peer_req);
1562 }
1563
1564
1565 /**
1566  * drbd_submit_peer_request()
1567  * @device:     DRBD device.
1568  * @peer_req:   peer request
1569  * @rw:         flag field, see bio->bi_opf
1570  *
1571  * May spread the pages to multiple bios,
1572  * depending on bio_add_page restrictions.
1573  *
1574  * Returns 0 if all bios have been submitted,
1575  * -ENOMEM if we could not allocate enough bios,
1576  * -ENOSPC (any better suggestion?) if we have not been able to bio_add_page a
1577  *  single page to an empty bio (which should never happen and likely indicates
1578  *  that the lower level IO stack is in some way broken). This has been observed
1579  *  on certain Xen deployments.
1580  */
1581 /* TODO allocate from our own bio_set. */
1582 int drbd_submit_peer_request(struct drbd_device *device,
1583                              struct drbd_peer_request *peer_req,
1584                              const unsigned op, const unsigned op_flags,
1585                              const int fault_type)
1586 {
1587         struct bio *bios = NULL;
1588         struct bio *bio;
1589         struct page *page = peer_req->pages;
1590         sector_t sector = peer_req->i.sector;
1591         unsigned data_size = peer_req->i.size;
1592         unsigned n_bios = 0;
1593         unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
1594         int err = -ENOMEM;
1595
1596         /* TRIM/DISCARD: for now, always use the helper function
1597          * blkdev_issue_zeroout(..., discard=true).
1598          * It's synchronous, but it does the right thing wrt. bio splitting.
1599          * Correctness first, performance later.  Next step is to code an
1600          * asynchronous variant of the same.
1601          */
1602         if (peer_req->flags & (EE_IS_TRIM|EE_WRITE_SAME)) {
1603                 /* wait for all pending IO completions, before we start
1604                  * zeroing things out. */
1605                 conn_wait_active_ee_empty(peer_req->peer_device->connection);
1606                 /* add it to the active list now,
1607                  * so we can find it to present it in debugfs */
1608                 peer_req->submit_jif = jiffies;
1609                 peer_req->flags |= EE_SUBMITTED;
1610
1611                 /* If this was a resync request from receive_rs_deallocated(),
1612                  * it is already on the sync_ee list */
1613                 if (list_empty(&peer_req->w.list)) {
1614                         spin_lock_irq(&device->resource->req_lock);
1615                         list_add_tail(&peer_req->w.list, &device->active_ee);
1616                         spin_unlock_irq(&device->resource->req_lock);
1617                 }
1618
1619                 if (peer_req->flags & EE_IS_TRIM)
1620                         drbd_issue_peer_discard(device, peer_req);
1621                 else /* EE_WRITE_SAME */
1622                         drbd_issue_peer_wsame(device, peer_req);
1623                 return 0;
1624         }
1625
1626         /* In most cases, we will only need one bio.  But in case the lower
1627          * level restrictions happen to be different at this offset on this
1628          * side than those of the sending peer, we may need to submit the
1629          * request in more than one bio.
1630          *
1631          * Plain bio_alloc is good enough here, this is no DRBD internally
1632          * generated bio, but a bio allocated on behalf of the peer.
1633          */
1634 next_bio:
1635         bio = bio_alloc(GFP_NOIO, nr_pages);
1636         if (!bio) {
1637                 drbd_err(device, "submit_ee: Allocation of a bio failed (nr_pages=%u)\n", nr_pages);
1638                 goto fail;
1639         }
1640         /* > peer_req->i.sector, unless this is the first bio */
1641         bio->bi_iter.bi_sector = sector;
1642         bio->bi_bdev = device->ldev->backing_bdev;
1643         bio_set_op_attrs(bio, op, op_flags);
1644         bio->bi_private = peer_req;
1645         bio->bi_end_io = drbd_peer_request_endio;
1646
1647         bio->bi_next = bios;
1648         bios = bio;
1649         ++n_bios;
1650
1651         page_chain_for_each(page) {
1652                 unsigned len = min_t(unsigned, data_size, PAGE_SIZE);
1653                 if (!bio_add_page(bio, page, len, 0))
1654                         goto next_bio;
1655                 data_size -= len;
1656                 sector += len >> 9;
1657                 --nr_pages;
1658         }
1659         D_ASSERT(device, data_size == 0);
1660         D_ASSERT(device, page == NULL);
1661
1662         atomic_set(&peer_req->pending_bios, n_bios);
1663         /* for debugfs: update timestamp, mark as submitted */
1664         peer_req->submit_jif = jiffies;
1665         peer_req->flags |= EE_SUBMITTED;
1666         do {
1667                 bio = bios;
1668                 bios = bios->bi_next;
1669                 bio->bi_next = NULL;
1670
1671                 drbd_generic_make_request(device, fault_type, bio);
1672         } while (bios);
1673         return 0;
1674
1675 fail:
1676         while (bios) {
1677                 bio = bios;
1678                 bios = bios->bi_next;
1679                 bio_put(bio);
1680         }
1681         return err;
1682 }
1683
1684 static void drbd_remove_epoch_entry_interval(struct drbd_device *device,
1685                                              struct drbd_peer_request *peer_req)
1686 {
1687         struct drbd_interval *i = &peer_req->i;
1688
1689         drbd_remove_interval(&device->write_requests, i);
1690         drbd_clear_interval(i);
1691
1692         /* Wake up any processes waiting for this peer request to complete.  */
1693         if (i->waiting)
1694                 wake_up(&device->misc_wait);
1695 }
1696
1697 static void conn_wait_active_ee_empty(struct drbd_connection *connection)
1698 {
1699         struct drbd_peer_device *peer_device;
1700         int vnr;
1701
1702         rcu_read_lock();
1703         idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1704                 struct drbd_device *device = peer_device->device;
1705
1706                 kref_get(&device->kref);
1707                 rcu_read_unlock();
1708                 drbd_wait_ee_list_empty(device, &device->active_ee);
1709                 kref_put(&device->kref, drbd_destroy_device);
1710                 rcu_read_lock();
1711         }
1712         rcu_read_unlock();
1713 }
1714
1715 static int receive_Barrier(struct drbd_connection *connection, struct packet_info *pi)
1716 {
1717         int rv;
1718         struct p_barrier *p = pi->data;
1719         struct drbd_epoch *epoch;
1720
1721         /* FIXME these are unacked on connection,
1722          * not a specific (peer)device.
1723          */
1724         connection->current_epoch->barrier_nr = p->barrier;
1725         connection->current_epoch->connection = connection;
1726         rv = drbd_may_finish_epoch(connection, connection->current_epoch, EV_GOT_BARRIER_NR);
1727
1728         /* P_BARRIER_ACK may imply that the corresponding extent is dropped from
1729          * the activity log, which means it would not be resynced in case the
1730          * R_PRIMARY crashes now.
1731          * Therefore we must send the barrier_ack after the barrier request was
1732          * completed. */
1733         switch (connection->resource->write_ordering) {
1734         case WO_NONE:
1735                 if (rv == FE_RECYCLED)
1736                         return 0;
1737
1738                 /* receiver context, in the writeout path of the other node.
1739                  * avoid potential distributed deadlock */
1740                 epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1741                 if (epoch)
1742                         break;
1743                 else
1744                         drbd_warn(connection, "Allocation of an epoch failed, slowing down\n");
1745                         /* Fall through */
1746
1747         case WO_BDEV_FLUSH:
1748         case WO_DRAIN_IO:
1749                 conn_wait_active_ee_empty(connection);
1750                 drbd_flush(connection);
1751
1752                 if (atomic_read(&connection->current_epoch->epoch_size)) {
1753                         epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1754                         if (epoch)
1755                                 break;
1756                 }
1757
1758                 return 0;
1759         default:
1760                 drbd_err(connection, "Strangeness in connection->write_ordering %d\n",
1761                          connection->resource->write_ordering);
1762                 return -EIO;
1763         }
1764
1765         epoch->flags = 0;
1766         atomic_set(&epoch->epoch_size, 0);
1767         atomic_set(&epoch->active, 0);
1768
1769         spin_lock(&connection->epoch_lock);
1770         if (atomic_read(&connection->current_epoch->epoch_size)) {
1771                 list_add(&epoch->list, &connection->current_epoch->list);
1772                 connection->current_epoch = epoch;
1773                 connection->epochs++;
1774         } else {
1775                 /* The current_epoch got recycled while we allocated this one... */
1776                 kfree(epoch);
1777         }
1778         spin_unlock(&connection->epoch_lock);
1779
1780         return 0;
1781 }
1782
1783 /* quick wrapper in case payload size != request_size (write same) */
1784 static void drbd_csum_ee_size(struct crypto_ahash *h,
1785                               struct drbd_peer_request *r, void *d,
1786                               unsigned int payload_size)
1787 {
1788         unsigned int tmp = r->i.size;
1789         r->i.size = payload_size;
1790         drbd_csum_ee(h, r, d);
1791         r->i.size = tmp;
1792 }
1793
1794 /* used from receive_RSDataReply (recv_resync_read)
1795  * and from receive_Data.
1796  * data_size: actual payload ("data in")
1797  *      for normal writes that is bi_size.
1798  *      for discards, that is zero.
1799  *      for write same, it is logical_block_size.
1800  * both trim and write same have the bi_size ("data len to be affected")
1801  * as extra argument in the packet header.
1802  */
1803 static struct drbd_peer_request *
1804 read_in_block(struct drbd_peer_device *peer_device, u64 id, sector_t sector,
1805               struct packet_info *pi) __must_hold(local)
1806 {
1807         struct drbd_device *device = peer_device->device;
1808         const sector_t capacity = drbd_get_capacity(device->this_bdev);
1809         struct drbd_peer_request *peer_req;
1810         struct page *page;
1811         int digest_size, err;
1812         unsigned int data_size = pi->size, ds;
1813         void *dig_in = peer_device->connection->int_dig_in;
1814         void *dig_vv = peer_device->connection->int_dig_vv;
1815         unsigned long *data;
1816         struct p_trim *trim = (pi->cmd == P_TRIM) ? pi->data : NULL;
1817         struct p_trim *wsame = (pi->cmd == P_WSAME) ? pi->data : NULL;
1818
1819         digest_size = 0;
1820         if (!trim && peer_device->connection->peer_integrity_tfm) {
1821                 digest_size = crypto_ahash_digestsize(peer_device->connection->peer_integrity_tfm);
1822                 /*
1823                  * FIXME: Receive the incoming digest into the receive buffer
1824                  *        here, together with its struct p_data?
1825                  */
1826                 err = drbd_recv_all_warn(peer_device->connection, dig_in, digest_size);
1827                 if (err)
1828                         return NULL;
1829                 data_size -= digest_size;
1830         }
1831
1832         /* assume request_size == data_size, but special case trim and wsame. */
1833         ds = data_size;
1834         if (trim) {
1835                 if (!expect(data_size == 0))
1836                         return NULL;
1837                 ds = be32_to_cpu(trim->size);
1838         } else if (wsame) {
1839                 if (data_size != queue_logical_block_size(device->rq_queue)) {
1840                         drbd_err(peer_device, "data size (%u) != drbd logical block size (%u)\n",
1841                                 data_size, queue_logical_block_size(device->rq_queue));
1842                         return NULL;
1843                 }
1844                 if (data_size != bdev_logical_block_size(device->ldev->backing_bdev)) {
1845                         drbd_err(peer_device, "data size (%u) != backend logical block size (%u)\n",
1846                                 data_size, bdev_logical_block_size(device->ldev->backing_bdev));
1847                         return NULL;
1848                 }
1849                 ds = be32_to_cpu(wsame->size);
1850         }
1851
1852         if (!expect(IS_ALIGNED(ds, 512)))
1853                 return NULL;
1854         if (trim || wsame) {
1855                 if (!expect(ds <= (DRBD_MAX_BBIO_SECTORS << 9)))
1856                         return NULL;
1857         } else if (!expect(ds <= DRBD_MAX_BIO_SIZE))
1858                 return NULL;
1859
1860         /* even though we trust out peer,
1861          * we sometimes have to double check. */
1862         if (sector + (ds>>9) > capacity) {
1863                 drbd_err(device, "request from peer beyond end of local disk: "
1864                         "capacity: %llus < sector: %llus + size: %u\n",
1865                         (unsigned long long)capacity,
1866                         (unsigned long long)sector, ds);
1867                 return NULL;
1868         }
1869
1870         /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1871          * "criss-cross" setup, that might cause write-out on some other DRBD,
1872          * which in turn might block on the other node at this very place.  */
1873         peer_req = drbd_alloc_peer_req(peer_device, id, sector, ds, data_size, GFP_NOIO);
1874         if (!peer_req)
1875                 return NULL;
1876
1877         peer_req->flags |= EE_WRITE;
1878         if (trim) {
1879                 peer_req->flags |= EE_IS_TRIM;
1880                 return peer_req;
1881         }
1882         if (wsame)
1883                 peer_req->flags |= EE_WRITE_SAME;
1884
1885         /* receive payload size bytes into page chain */
1886         ds = data_size;
1887         page = peer_req->pages;
1888         page_chain_for_each(page) {
1889                 unsigned len = min_t(int, ds, PAGE_SIZE);
1890                 data = kmap(page);
1891                 err = drbd_recv_all_warn(peer_device->connection, data, len);
1892                 if (drbd_insert_fault(device, DRBD_FAULT_RECEIVE)) {
1893                         drbd_err(device, "Fault injection: Corrupting data on receive\n");
1894                         data[0] = data[0] ^ (unsigned long)-1;
1895                 }
1896                 kunmap(page);
1897                 if (err) {
1898                         drbd_free_peer_req(device, peer_req);
1899                         return NULL;
1900                 }
1901                 ds -= len;
1902         }
1903
1904         if (digest_size) {
1905                 drbd_csum_ee_size(peer_device->connection->peer_integrity_tfm, peer_req, dig_vv, data_size);
1906                 if (memcmp(dig_in, dig_vv, digest_size)) {
1907                         drbd_err(device, "Digest integrity check FAILED: %llus +%u\n",
1908                                 (unsigned long long)sector, data_size);
1909                         drbd_free_peer_req(device, peer_req);
1910                         return NULL;
1911                 }
1912         }
1913         device->recv_cnt += data_size >> 9;
1914         return peer_req;
1915 }
1916
1917 /* drbd_drain_block() just takes a data block
1918  * out of the socket input buffer, and discards it.
1919  */
1920 static int drbd_drain_block(struct drbd_peer_device *peer_device, int data_size)
1921 {
1922         struct page *page;
1923         int err = 0;
1924         void *data;
1925
1926         if (!data_size)
1927                 return 0;
1928
1929         page = drbd_alloc_pages(peer_device, 1, 1);
1930
1931         data = kmap(page);
1932         while (data_size) {
1933                 unsigned int len = min_t(int, data_size, PAGE_SIZE);
1934
1935                 err = drbd_recv_all_warn(peer_device->connection, data, len);
1936                 if (err)
1937                         break;
1938                 data_size -= len;
1939         }
1940         kunmap(page);
1941         drbd_free_pages(peer_device->device, page, 0);
1942         return err;
1943 }
1944
1945 static int recv_dless_read(struct drbd_peer_device *peer_device, struct drbd_request *req,
1946                            sector_t sector, int data_size)
1947 {
1948         struct bio_vec bvec;
1949         struct bvec_iter iter;
1950         struct bio *bio;
1951         int digest_size, err, expect;
1952         void *dig_in = peer_device->connection->int_dig_in;
1953         void *dig_vv = peer_device->connection->int_dig_vv;
1954
1955         digest_size = 0;
1956         if (peer_device->connection->peer_integrity_tfm) {
1957                 digest_size = crypto_ahash_digestsize(peer_device->connection->peer_integrity_tfm);
1958                 err = drbd_recv_all_warn(peer_device->connection, dig_in, digest_size);
1959                 if (err)
1960                         return err;
1961                 data_size -= digest_size;
1962         }
1963
1964         /* optimistically update recv_cnt.  if receiving fails below,
1965          * we disconnect anyways, and counters will be reset. */
1966         peer_device->device->recv_cnt += data_size>>9;
1967
1968         bio = req->master_bio;
1969         D_ASSERT(peer_device->device, sector == bio->bi_iter.bi_sector);
1970
1971         bio_for_each_segment(bvec, bio, iter) {
1972                 void *mapped = kmap(bvec.bv_page) + bvec.bv_offset;
1973                 expect = min_t(int, data_size, bvec.bv_len);
1974                 err = drbd_recv_all_warn(peer_device->connection, mapped, expect);
1975                 kunmap(bvec.bv_page);
1976                 if (err)
1977                         return err;
1978                 data_size -= expect;
1979         }
1980
1981         if (digest_size) {
1982                 drbd_csum_bio(peer_device->connection->peer_integrity_tfm, bio, dig_vv);
1983                 if (memcmp(dig_in, dig_vv, digest_size)) {
1984                         drbd_err(peer_device, "Digest integrity check FAILED. Broken NICs?\n");
1985                         return -EINVAL;
1986                 }
1987         }
1988
1989         D_ASSERT(peer_device->device, data_size == 0);
1990         return 0;
1991 }
1992
1993 /*
1994  * e_end_resync_block() is called in ack_sender context via
1995  * drbd_finish_peer_reqs().
1996  */
1997 static int e_end_resync_block(struct drbd_work *w, int unused)
1998 {
1999         struct drbd_peer_request *peer_req =
2000                 container_of(w, struct drbd_peer_request, w);
2001         struct drbd_peer_device *peer_device = peer_req->peer_device;
2002         struct drbd_device *device = peer_device->device;
2003         sector_t sector = peer_req->i.sector;
2004         int err;
2005
2006         D_ASSERT(device, drbd_interval_empty(&peer_req->i));
2007
2008         if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
2009                 drbd_set_in_sync(device, sector, peer_req->i.size);
2010                 err = drbd_send_ack(peer_device, P_RS_WRITE_ACK, peer_req);
2011         } else {
2012                 /* Record failure to sync */
2013                 drbd_rs_failed_io(device, sector, peer_req->i.size);
2014
2015                 err  = drbd_send_ack(peer_device, P_NEG_ACK, peer_req);
2016         }
2017         dec_unacked(device);
2018
2019         return err;
2020 }
2021
2022 static int recv_resync_read(struct drbd_peer_device *peer_device, sector_t sector,
2023                             struct packet_info *pi) __releases(local)
2024 {
2025         struct drbd_device *device = peer_device->device;
2026         struct drbd_peer_request *peer_req;
2027
2028         peer_req = read_in_block(peer_device, ID_SYNCER, sector, pi);
2029         if (!peer_req)
2030                 goto fail;
2031
2032         dec_rs_pending(device);
2033
2034         inc_unacked(device);
2035         /* corresponding dec_unacked() in e_end_resync_block()
2036          * respective _drbd_clear_done_ee */
2037
2038         peer_req->w.cb = e_end_resync_block;
2039         peer_req->submit_jif = jiffies;
2040
2041         spin_lock_irq(&device->resource->req_lock);
2042         list_add_tail(&peer_req->w.list, &device->sync_ee);
2043         spin_unlock_irq(&device->resource->req_lock);
2044
2045         atomic_add(pi->size >> 9, &device->rs_sect_ev);
2046         if (drbd_submit_peer_request(device, peer_req, REQ_OP_WRITE, 0,
2047                                      DRBD_FAULT_RS_WR) == 0)
2048                 return 0;
2049
2050         /* don't care for the reason here */
2051         drbd_err(device, "submit failed, triggering re-connect\n");
2052         spin_lock_irq(&device->resource->req_lock);
2053         list_del(&peer_req->w.list);
2054         spin_unlock_irq(&device->resource->req_lock);
2055
2056         drbd_free_peer_req(device, peer_req);
2057 fail:
2058         put_ldev(device);
2059         return -EIO;
2060 }
2061
2062 static struct drbd_request *
2063 find_request(struct drbd_device *device, struct rb_root *root, u64 id,
2064              sector_t sector, bool missing_ok, const char *func)
2065 {
2066         struct drbd_request *req;
2067
2068         /* Request object according to our peer */
2069         req = (struct drbd_request *)(unsigned long)id;
2070         if (drbd_contains_interval(root, sector, &req->i) && req->i.local)
2071                 return req;
2072         if (!missing_ok) {
2073                 drbd_err(device, "%s: failed to find request 0x%lx, sector %llus\n", func,
2074                         (unsigned long)id, (unsigned long long)sector);
2075         }
2076         return NULL;
2077 }
2078
2079 static int receive_DataReply(struct drbd_connection *connection, struct packet_info *pi)
2080 {
2081         struct drbd_peer_device *peer_device;
2082         struct drbd_device *device;
2083         struct drbd_request *req;
2084         sector_t sector;
2085         int err;
2086         struct p_data *p = pi->data;
2087
2088         peer_device = conn_peer_device(connection, pi->vnr);
2089         if (!peer_device)
2090                 return -EIO;
2091         device = peer_device->device;
2092
2093         sector = be64_to_cpu(p->sector);
2094
2095         spin_lock_irq(&device->resource->req_lock);
2096         req = find_request(device, &device->read_requests, p->block_id, sector, false, __func__);
2097         spin_unlock_irq(&device->resource->req_lock);
2098         if (unlikely(!req))
2099                 return -EIO;
2100
2101         /* hlist_del(&req->collision) is done in _req_may_be_done, to avoid
2102          * special casing it there for the various failure cases.
2103          * still no race with drbd_fail_pending_reads */
2104         err = recv_dless_read(peer_device, req, sector, pi->size);
2105         if (!err)
2106                 req_mod(req, DATA_RECEIVED);
2107         /* else: nothing. handled from drbd_disconnect...
2108          * I don't think we may complete this just yet
2109          * in case we are "on-disconnect: freeze" */
2110
2111         return err;
2112 }
2113
2114 static int receive_RSDataReply(struct drbd_connection *connection, struct packet_info *pi)
2115 {
2116         struct drbd_peer_device *peer_device;
2117         struct drbd_device *device;
2118         sector_t sector;
2119         int err;
2120         struct p_data *p = pi->data;
2121
2122         peer_device = conn_peer_device(connection, pi->vnr);
2123         if (!peer_device)
2124                 return -EIO;
2125         device = peer_device->device;
2126
2127         sector = be64_to_cpu(p->sector);
2128         D_ASSERT(device, p->block_id == ID_SYNCER);
2129
2130         if (get_ldev(device)) {
2131                 /* data is submitted to disk within recv_resync_read.
2132                  * corresponding put_ldev done below on error,
2133                  * or in drbd_peer_request_endio. */
2134                 err = recv_resync_read(peer_device, sector, pi);
2135         } else {
2136                 if (__ratelimit(&drbd_ratelimit_state))
2137                         drbd_err(device, "Can not write resync data to local disk.\n");
2138
2139                 err = drbd_drain_block(peer_device, pi->size);
2140
2141                 drbd_send_ack_dp(peer_device, P_NEG_ACK, p, pi->size);
2142         }
2143
2144         atomic_add(pi->size >> 9, &device->rs_sect_in);
2145
2146         return err;
2147 }
2148
2149 static void restart_conflicting_writes(struct drbd_device *device,
2150                                        sector_t sector, int size)
2151 {
2152         struct drbd_interval *i;
2153         struct drbd_request *req;
2154
2155         drbd_for_each_overlap(i, &device->write_requests, sector, size) {
2156                 if (!i->local)
2157                         continue;
2158                 req = container_of(i, struct drbd_request, i);
2159                 if (req->rq_state & RQ_LOCAL_PENDING ||
2160                     !(req->rq_state & RQ_POSTPONED))
2161                         continue;
2162                 /* as it is RQ_POSTPONED, this will cause it to
2163                  * be queued on the retry workqueue. */
2164                 __req_mod(req, CONFLICT_RESOLVED, NULL);
2165         }
2166 }
2167
2168 /*
2169  * e_end_block() is called in ack_sender context via drbd_finish_peer_reqs().
2170  */
2171 static int e_end_block(struct drbd_work *w, int cancel)
2172 {
2173         struct drbd_peer_request *peer_req =
2174                 container_of(w, struct drbd_peer_request, w);
2175         struct drbd_peer_device *peer_device = peer_req->peer_device;
2176         struct drbd_device *device = peer_device->device;
2177         sector_t sector = peer_req->i.sector;
2178         int err = 0, pcmd;
2179
2180         if (peer_req->flags & EE_SEND_WRITE_ACK) {
2181                 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
2182                         pcmd = (device->state.conn >= C_SYNC_SOURCE &&
2183                                 device->state.conn <= C_PAUSED_SYNC_T &&
2184                                 peer_req->flags & EE_MAY_SET_IN_SYNC) ?
2185                                 P_RS_WRITE_ACK : P_WRITE_ACK;
2186                         err = drbd_send_ack(peer_device, pcmd, peer_req);
2187                         if (pcmd == P_RS_WRITE_ACK)
2188                                 drbd_set_in_sync(device, sector, peer_req->i.size);
2189                 } else {
2190                         err = drbd_send_ack(peer_device, P_NEG_ACK, peer_req);
2191                         /* we expect it to be marked out of sync anyways...
2192                          * maybe assert this?  */
2193                 }
2194                 dec_unacked(device);
2195         }
2196
2197         /* we delete from the conflict detection hash _after_ we sent out the
2198          * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right.  */
2199         if (peer_req->flags & EE_IN_INTERVAL_TREE) {
2200                 spin_lock_irq(&device->resource->req_lock);
2201                 D_ASSERT(device, !drbd_interval_empty(&peer_req->i));
2202                 drbd_remove_epoch_entry_interval(device, peer_req);
2203                 if (peer_req->flags & EE_RESTART_REQUESTS)
2204                         restart_conflicting_writes(device, sector, peer_req->i.size);
2205                 spin_unlock_irq(&device->resource->req_lock);
2206         } else
2207                 D_ASSERT(device, drbd_interval_empty(&peer_req->i));
2208
2209         drbd_may_finish_epoch(peer_device->connection, peer_req->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0));
2210
2211         return err;
2212 }
2213
2214 static int e_send_ack(struct drbd_work *w, enum drbd_packet ack)
2215 {
2216         struct drbd_peer_request *peer_req =
2217                 container_of(w, struct drbd_peer_request, w);
2218         struct drbd_peer_device *peer_device = peer_req->peer_device;
2219         int err;
2220
2221         err = drbd_send_ack(peer_device, ack, peer_req);
2222         dec_unacked(peer_device->device);
2223
2224         return err;
2225 }
2226
2227 static int e_send_superseded(struct drbd_work *w, int unused)
2228 {
2229         return e_send_ack(w, P_SUPERSEDED);
2230 }
2231
2232 static int e_send_retry_write(struct drbd_work *w, int unused)
2233 {
2234         struct drbd_peer_request *peer_req =
2235                 container_of(w, struct drbd_peer_request, w);
2236         struct drbd_connection *connection = peer_req->peer_device->connection;
2237
2238         return e_send_ack(w, connection->agreed_pro_version >= 100 ?
2239                              P_RETRY_WRITE : P_SUPERSEDED);
2240 }
2241
2242 static bool seq_greater(u32 a, u32 b)
2243 {
2244         /*
2245          * We assume 32-bit wrap-around here.
2246          * For 24-bit wrap-around, we would have to shift:
2247          *  a <<= 8; b <<= 8;
2248          */
2249         return (s32)a - (s32)b > 0;
2250 }
2251
2252 static u32 seq_max(u32 a, u32 b)
2253 {
2254         return seq_greater(a, b) ? a : b;
2255 }
2256
2257 static void update_peer_seq(struct drbd_peer_device *peer_device, unsigned int peer_seq)
2258 {
2259         struct drbd_device *device = peer_device->device;
2260         unsigned int newest_peer_seq;
2261
2262         if (test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)) {
2263                 spin_lock(&device->peer_seq_lock);
2264                 newest_peer_seq = seq_max(device->peer_seq, peer_seq);
2265                 device->peer_seq = newest_peer_seq;
2266                 spin_unlock(&device->peer_seq_lock);
2267                 /* wake up only if we actually changed device->peer_seq */
2268                 if (peer_seq == newest_peer_seq)
2269                         wake_up(&device->seq_wait);
2270         }
2271 }
2272
2273 static inline int overlaps(sector_t s1, int l1, sector_t s2, int l2)
2274 {
2275         return !((s1 + (l1>>9) <= s2) || (s1 >= s2 + (l2>>9)));
2276 }
2277
2278 /* maybe change sync_ee into interval trees as well? */
2279 static bool overlapping_resync_write(struct drbd_device *device, struct drbd_peer_request *peer_req)
2280 {
2281         struct drbd_peer_request *rs_req;
2282         bool rv = false;
2283
2284         spin_lock_irq(&device->resource->req_lock);
2285         list_for_each_entry(rs_req, &device->sync_ee, w.list) {
2286                 if (overlaps(peer_req->i.sector, peer_req->i.size,
2287                              rs_req->i.sector, rs_req->i.size)) {
2288                         rv = true;
2289                         break;
2290                 }
2291         }
2292         spin_unlock_irq(&device->resource->req_lock);
2293
2294         return rv;
2295 }
2296
2297 /* Called from receive_Data.
2298  * Synchronize packets on sock with packets on msock.
2299  *
2300  * This is here so even when a P_DATA packet traveling via sock overtook an Ack
2301  * packet traveling on msock, they are still processed in the order they have
2302  * been sent.
2303  *
2304  * Note: we don't care for Ack packets overtaking P_DATA packets.
2305  *
2306  * In case packet_seq is larger than device->peer_seq number, there are
2307  * outstanding packets on the msock. We wait for them to arrive.
2308  * In case we are the logically next packet, we update device->peer_seq
2309  * ourselves. Correctly handles 32bit wrap around.
2310  *
2311  * Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
2312  * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
2313  * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
2314  * 1<<9 == 512 seconds aka ages for the 32bit wrap around...
2315  *
2316  * returns 0 if we may process the packet,
2317  * -ERESTARTSYS if we were interrupted (by disconnect signal). */
2318 static int wait_for_and_update_peer_seq(struct drbd_peer_device *peer_device, const u32 peer_seq)
2319 {
2320         struct drbd_device *device = peer_device->device;
2321         DEFINE_WAIT(wait);
2322         long timeout;
2323         int ret = 0, tp;
2324
2325         if (!test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags))
2326                 return 0;
2327
2328         spin_lock(&device->peer_seq_lock);
2329         for (;;) {
2330                 if (!seq_greater(peer_seq - 1, device->peer_seq)) {
2331                         device->peer_seq = seq_max(device->peer_seq, peer_seq);
2332                         break;
2333                 }
2334
2335                 if (signal_pending(current)) {
2336                         ret = -ERESTARTSYS;
2337                         break;
2338                 }
2339
2340                 rcu_read_lock();
2341                 tp = rcu_dereference(peer_device->connection->net_conf)->two_primaries;
2342                 rcu_read_unlock();
2343
2344                 if (!tp)
2345                         break;
2346
2347                 /* Only need to wait if two_primaries is enabled */
2348                 prepare_to_wait(&device->seq_wait, &wait, TASK_INTERRUPTIBLE);
2349                 spin_unlock(&device->peer_seq_lock);
2350                 rcu_read_lock();
2351                 timeout = rcu_dereference(peer_device->connection->net_conf)->ping_timeo*HZ/10;
2352                 rcu_read_unlock();
2353                 timeout = schedule_timeout(timeout);
2354                 spin_lock(&device->peer_seq_lock);
2355                 if (!timeout) {
2356                         ret = -ETIMEDOUT;
2357                         drbd_err(device, "Timed out waiting for missing ack packets; disconnecting\n");
2358                         break;
2359                 }
2360         }
2361         spin_unlock(&device->peer_seq_lock);
2362         finish_wait(&device->seq_wait, &wait);
2363         return ret;
2364 }
2365
2366 /* see also bio_flags_to_wire()
2367  * DRBD_REQ_*, because we need to semantically map the flags to data packet
2368  * flags and back. We may replicate to other kernel versions. */
2369 static unsigned long wire_flags_to_bio_flags(u32 dpf)
2370 {
2371         return  (dpf & DP_RW_SYNC ? REQ_SYNC : 0) |
2372                 (dpf & DP_FUA ? REQ_FUA : 0) |
2373                 (dpf & DP_FLUSH ? REQ_PREFLUSH : 0);
2374 }
2375
2376 static unsigned long wire_flags_to_bio_op(u32 dpf)
2377 {
2378         if (dpf & DP_DISCARD)
2379                 return REQ_OP_DISCARD;
2380         else
2381                 return REQ_OP_WRITE;
2382 }
2383
2384 static void fail_postponed_requests(struct drbd_device *device, sector_t sector,
2385                                     unsigned int size)
2386 {
2387         struct drbd_interval *i;
2388
2389     repeat:
2390         drbd_for_each_overlap(i, &device->write_requests, sector, size) {
2391                 struct drbd_request *req;
2392                 struct bio_and_error m;
2393
2394                 if (!i->local)
2395                         continue;
2396                 req = container_of(i, struct drbd_request, i);
2397                 if (!(req->rq_state & RQ_POSTPONED))
2398                         continue;
2399                 req->rq_state &= ~RQ_POSTPONED;
2400                 __req_mod(req, NEG_ACKED, &m);
2401                 spin_unlock_irq(&device->resource->req_lock);
2402                 if (m.bio)
2403                         complete_master_bio(device, &m);
2404                 spin_lock_irq(&device->resource->req_lock);
2405                 goto repeat;
2406         }
2407 }
2408
2409 static int handle_write_conflicts(struct drbd_device *device,
2410                                   struct drbd_peer_request *peer_req)
2411 {
2412         struct drbd_connection *connection = peer_req->peer_device->connection;
2413         bool resolve_conflicts = test_bit(RESOLVE_CONFLICTS, &connection->flags);
2414         sector_t sector = peer_req->i.sector;
2415         const unsigned int size = peer_req->i.size;
2416         struct drbd_interval *i;
2417         bool equal;
2418         int err;
2419
2420         /*
2421          * Inserting the peer request into the write_requests tree will prevent
2422          * new conflicting local requests from being added.
2423          */
2424         drbd_insert_interval(&device->write_requests, &peer_req->i);
2425
2426     repeat:
2427         drbd_for_each_overlap(i, &device->write_requests, sector, size) {
2428                 if (i == &peer_req->i)
2429                         continue;
2430                 if (i->completed)
2431                         continue;
2432
2433                 if (!i->local) {
2434                         /*
2435                          * Our peer has sent a conflicting remote request; this
2436                          * should not happen in a two-node setup.  Wait for the
2437                          * earlier peer request to complete.
2438                          */
2439                         err = drbd_wait_misc(device, i);
2440                         if (err)
2441                                 goto out;
2442                         goto repeat;
2443                 }
2444
2445                 equal = i->sector == sector && i->size == size;
2446                 if (resolve_conflicts) {
2447                         /*
2448                          * If the peer request is fully contained within the
2449                          * overlapping request, it can be considered overwritten
2450                          * and thus superseded; otherwise, it will be retried
2451                          * once all overlapping requests have completed.
2452                          */
2453                         bool superseded = i->sector <= sector && i->sector +
2454                                        (i->size >> 9) >= sector + (size >> 9);
2455
2456                         if (!equal)
2457                                 drbd_alert(device, "Concurrent writes detected: "
2458                                                "local=%llus +%u, remote=%llus +%u, "
2459                                                "assuming %s came first\n",
2460                                           (unsigned long long)i->sector, i->size,
2461                                           (unsigned long long)sector, size,
2462                                           superseded ? "local" : "remote");
2463
2464                         peer_req->w.cb = superseded ? e_send_superseded :
2465                                                    e_send_retry_write;
2466                         list_add_tail(&peer_req->w.list, &device->done_ee);
2467                         queue_work(connection->ack_sender, &peer_req->peer_device->send_acks_work);
2468
2469                         err = -ENOENT;
2470                         goto out;
2471                 } else {
2472                         struct drbd_request *req =
2473                                 container_of(i, struct drbd_request, i);
2474
2475                         if (!equal)
2476                                 drbd_alert(device, "Concurrent writes detected: "
2477                                                "local=%llus +%u, remote=%llus +%u\n",
2478                                           (unsigned long long)i->sector, i->size,
2479                                           (unsigned long long)sector, size);
2480
2481                         if (req->rq_state & RQ_LOCAL_PENDING ||
2482                             !(req->rq_state & RQ_POSTPONED)) {
2483                                 /*
2484                                  * Wait for the node with the discard flag to
2485                                  * decide if this request has been superseded
2486                                  * or needs to be retried.
2487                                  * Requests that have been superseded will
2488                                  * disappear from the write_requests tree.
2489                                  *
2490                                  * In addition, wait for the conflicting
2491                                  * request to finish locally before submitting
2492                                  * the conflicting peer request.
2493                                  */
2494                                 err = drbd_wait_misc(device, &req->i);
2495                                 if (err) {
2496                                         _conn_request_state(connection, NS(conn, C_TIMEOUT), CS_HARD);
2497                                         fail_postponed_requests(device, sector, size);
2498                                         goto out;
2499                                 }
2500                                 goto repeat;
2501                         }
2502                         /*
2503                          * Remember to restart the conflicting requests after
2504                          * the new peer request has completed.
2505                          */
2506                         peer_req->flags |= EE_RESTART_REQUESTS;
2507                 }
2508         }
2509         err = 0;
2510
2511     out:
2512         if (err)
2513                 drbd_remove_epoch_entry_interval(device, peer_req);
2514         return err;
2515 }
2516
2517 /* mirrored write */
2518 static int receive_Data(struct drbd_connection *connection, struct packet_info *pi)
2519 {
2520         struct drbd_peer_device *peer_device;
2521         struct drbd_device *device;
2522         struct net_conf *nc;
2523         sector_t sector;
2524         struct drbd_peer_request *peer_req;
2525         struct p_data *p = pi->data;
2526         u32 peer_seq = be32_to_cpu(p->seq_num);
2527         int op, op_flags;
2528         u32 dp_flags;
2529         int err, tp;
2530
2531         peer_device = conn_peer_device(connection, pi->vnr);
2532         if (!peer_device)
2533                 return -EIO;
2534         device = peer_device->device;
2535
2536         if (!get_ldev(device)) {
2537                 int err2;
2538
2539                 err = wait_for_and_update_peer_seq(peer_device, peer_seq);
2540                 drbd_send_ack_dp(peer_device, P_NEG_ACK, p, pi->size);
2541                 atomic_inc(&connection->current_epoch->epoch_size);
2542                 err2 = drbd_drain_block(peer_device, pi->size);
2543                 if (!err)
2544                         err = err2;
2545                 return err;
2546         }
2547
2548         /*
2549          * Corresponding put_ldev done either below (on various errors), or in
2550          * drbd_peer_request_endio, if we successfully submit the data at the
2551          * end of this function.
2552          */
2553
2554         sector = be64_to_cpu(p->sector);
2555         peer_req = read_in_block(peer_device, p->block_id, sector, pi);
2556         if (!peer_req) {
2557                 put_ldev(device);
2558                 return -EIO;
2559         }
2560
2561         peer_req->w.cb = e_end_block;
2562         peer_req->submit_jif = jiffies;
2563         peer_req->flags |= EE_APPLICATION;
2564
2565         dp_flags = be32_to_cpu(p->dp_flags);
2566         op = wire_flags_to_bio_op(dp_flags);
2567         op_flags = wire_flags_to_bio_flags(dp_flags);
2568         if (pi->cmd == P_TRIM) {
2569                 D_ASSERT(peer_device, peer_req->i.size > 0);
2570                 D_ASSERT(peer_device, op == REQ_OP_DISCARD);
2571                 D_ASSERT(peer_device, peer_req->pages == NULL);
2572         } else if (peer_req->pages == NULL) {
2573                 D_ASSERT(device, peer_req->i.size == 0);
2574                 D_ASSERT(device, dp_flags & DP_FLUSH);
2575         }
2576
2577         if (dp_flags & DP_MAY_SET_IN_SYNC)
2578                 peer_req->flags |= EE_MAY_SET_IN_SYNC;
2579
2580         spin_lock(&connection->epoch_lock);
2581         peer_req->epoch = connection->current_epoch;
2582         atomic_inc(&peer_req->epoch->epoch_size);
2583         atomic_inc(&peer_req->epoch->active);
2584         spin_unlock(&connection->epoch_lock);
2585
2586         rcu_read_lock();
2587         nc = rcu_dereference(peer_device->connection->net_conf);
2588         tp = nc->two_primaries;
2589         if (peer_device->connection->agreed_pro_version < 100) {
2590                 switch (nc->wire_protocol) {
2591                 case DRBD_PROT_C:
2592                         dp_flags |= DP_SEND_WRITE_ACK;
2593                         break;
2594                 case DRBD_PROT_B:
2595                         dp_flags |= DP_SEND_RECEIVE_ACK;
2596                         break;
2597                 }
2598         }
2599         rcu_read_unlock();
2600
2601         if (dp_flags & DP_SEND_WRITE_ACK) {
2602                 peer_req->flags |= EE_SEND_WRITE_ACK;
2603                 inc_unacked(device);
2604                 /* corresponding dec_unacked() in e_end_block()
2605                  * respective _drbd_clear_done_ee */
2606         }
2607
2608         if (dp_flags & DP_SEND_RECEIVE_ACK) {
2609                 /* I really don't like it that the receiver thread
2610                  * sends on the msock, but anyways */
2611                 drbd_send_ack(peer_device, P_RECV_ACK, peer_req);
2612         }
2613
2614         if (tp) {
2615                 /* two primaries implies protocol C */
2616                 D_ASSERT(device, dp_flags & DP_SEND_WRITE_ACK);
2617                 peer_req->flags |= EE_IN_INTERVAL_TREE;
2618                 err = wait_for_and_update_peer_seq(peer_device, peer_seq);
2619                 if (err)
2620                         goto out_interrupted;
2621                 spin_lock_irq(&device->resource->req_lock);
2622                 err = handle_write_conflicts(device, peer_req);
2623                 if (err) {
2624                         spin_unlock_irq(&device->resource->req_lock);
2625                         if (err == -ENOENT) {
2626                                 put_ldev(device);
2627                                 return 0;
2628                         }
2629                         goto out_interrupted;
2630                 }
2631         } else {
2632                 update_peer_seq(peer_device, peer_seq);
2633                 spin_lock_irq(&device->resource->req_lock);
2634         }
2635         /* TRIM and WRITE_SAME are processed synchronously,
2636          * we wait for all pending requests, respectively wait for
2637          * active_ee to become empty in drbd_submit_peer_request();
2638          * better not add ourselves here. */
2639         if ((peer_req->flags & (EE_IS_TRIM|EE_WRITE_SAME)) == 0)
2640                 list_add_tail(&peer_req->w.list, &device->active_ee);
2641         spin_unlock_irq(&device->resource->req_lock);
2642
2643         if (device->state.conn == C_SYNC_TARGET)
2644                 wait_event(device->ee_wait, !overlapping_resync_write(device, peer_req));
2645
2646         if (device->state.pdsk < D_INCONSISTENT) {
2647                 /* In case we have the only disk of the cluster, */
2648                 drbd_set_out_of_sync(device, peer_req->i.sector, peer_req->i.size);
2649                 peer_req->flags &= ~EE_MAY_SET_IN_SYNC;
2650                 drbd_al_begin_io(device, &peer_req->i);
2651                 peer_req->flags |= EE_CALL_AL_COMPLETE_IO;
2652         }
2653
2654         err = drbd_submit_peer_request(device, peer_req, op, op_flags,
2655                                        DRBD_FAULT_DT_WR);
2656         if (!err)
2657                 return 0;
2658
2659         /* don't care for the reason here */
2660         drbd_err(device, "submit failed, triggering re-connect\n");
2661         spin_lock_irq(&device->resource->req_lock);
2662         list_del(&peer_req->w.list);
2663         drbd_remove_epoch_entry_interval(device, peer_req);
2664         spin_unlock_irq(&device->resource->req_lock);
2665         if (peer_req->flags & EE_CALL_AL_COMPLETE_IO) {
2666                 peer_req->flags &= ~EE_CALL_AL_COMPLETE_IO;
2667                 drbd_al_complete_io(device, &peer_req->i);
2668         }
2669
2670 out_interrupted:
2671         drbd_may_finish_epoch(connection, peer_req->epoch, EV_PUT | EV_CLEANUP);
2672         put_ldev(device);
2673         drbd_free_peer_req(device, peer_req);
2674         return err;
2675 }
2676
2677 /* We may throttle resync, if the lower device seems to be busy,
2678  * and current sync rate is above c_min_rate.
2679  *
2680  * To decide whether or not the lower device is busy, we use a scheme similar
2681  * to MD RAID is_mddev_idle(): if the partition stats reveal "significant"
2682  * (more than 64 sectors) of activity we cannot account for with our own resync
2683  * activity, it obviously is "busy".
2684  *
2685  * The current sync rate used here uses only the most recent two step marks,
2686  * to have a short time average so we can react faster.
2687  */
2688 bool drbd_rs_should_slow_down(struct drbd_device *device, sector_t sector,
2689                 bool throttle_if_app_is_waiting)
2690 {
2691         struct lc_element *tmp;
2692         bool throttle = drbd_rs_c_min_rate_throttle(device);
2693
2694         if (!throttle || throttle_if_app_is_waiting)
2695                 return throttle;
2696
2697         spin_lock_irq(&device->al_lock);
2698         tmp = lc_find(device->resync, BM_SECT_TO_EXT(sector));
2699         if (tmp) {
2700                 struct bm_extent *bm_ext = lc_entry(tmp, struct bm_extent, lce);
2701                 if (test_bit(BME_PRIORITY, &bm_ext->flags))
2702                         throttle = false;
2703                 /* Do not slow down if app IO is already waiting for this extent,
2704                  * and our progress is necessary for application IO to complete. */
2705         }
2706         spin_unlock_irq(&device->al_lock);
2707
2708         return throttle;
2709 }
2710
2711 bool drbd_rs_c_min_rate_throttle(struct drbd_device *device)
2712 {
2713         struct gendisk *disk = device->ldev->backing_bdev->bd_contains->bd_disk;
2714         unsigned long db, dt, dbdt;
2715         unsigned int c_min_rate;
2716         int curr_events;
2717
2718         rcu_read_lock();
2719         c_min_rate = rcu_dereference(device->ldev->disk_conf)->c_min_rate;
2720         rcu_read_unlock();
2721
2722         /* feature disabled? */
2723         if (c_min_rate == 0)
2724                 return false;
2725
2726         curr_events = (int)part_stat_read(&disk->part0, sectors[0]) +
2727                       (int)part_stat_read(&disk->part0, sectors[1]) -
2728                         atomic_read(&device->rs_sect_ev);
2729
2730         if (atomic_read(&device->ap_actlog_cnt)
2731             || curr_events - device->rs_last_events > 64) {
2732                 unsigned long rs_left;
2733                 int i;
2734
2735                 device->rs_last_events = curr_events;
2736
2737                 /* sync speed average over the last 2*DRBD_SYNC_MARK_STEP,
2738                  * approx. */
2739                 i = (device->rs_last_mark + DRBD_SYNC_MARKS-1) % DRBD_SYNC_MARKS;
2740
2741                 if (device->state.conn == C_VERIFY_S || device->state.conn == C_VERIFY_T)
2742                         rs_left = device->ov_left;
2743                 else
2744                         rs_left = drbd_bm_total_weight(device) - device->rs_failed;
2745
2746                 dt = ((long)jiffies - (long)device->rs_mark_time[i]) / HZ;
2747                 if (!dt)
2748                         dt++;
2749                 db = device->rs_mark_left[i] - rs_left;
2750                 dbdt = Bit2KB(db/dt);
2751
2752                 if (dbdt > c_min_rate)
2753                         return true;
2754         }
2755         return false;
2756 }
2757
2758 static int receive_DataRequest(struct drbd_connection *connection, struct packet_info *pi)
2759 {
2760         struct drbd_peer_device *peer_device;
2761         struct drbd_device *device;
2762         sector_t sector;
2763         sector_t capacity;
2764         struct drbd_peer_request *peer_req;
2765         struct digest_info *di = NULL;
2766         int size, verb;
2767         unsigned int fault_type;
2768         struct p_block_req *p = pi->data;
2769
2770         peer_device = conn_peer_device(connection, pi->vnr);
2771         if (!peer_device)
2772                 return -EIO;
2773         device = peer_device->device;
2774         capacity = drbd_get_capacity(device->this_bdev);
2775
2776         sector = be64_to_cpu(p->sector);
2777         size   = be32_to_cpu(p->blksize);
2778
2779         if (size <= 0 || !IS_ALIGNED(size, 512) || size > DRBD_MAX_BIO_SIZE) {
2780                 drbd_err(device, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2781                                 (unsigned long long)sector, size);
2782                 return -EINVAL;
2783         }
2784         if (sector + (size>>9) > capacity) {
2785                 drbd_err(device, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2786                                 (unsigned long long)sector, size);
2787                 return -EINVAL;
2788         }
2789
2790         if (!get_ldev_if_state(device, D_UP_TO_DATE)) {
2791                 verb = 1;
2792                 switch (pi->cmd) {
2793                 case P_DATA_REQUEST:
2794                         drbd_send_ack_rp(peer_device, P_NEG_DREPLY, p);
2795                         break;
2796                 case P_RS_THIN_REQ:
2797                 case P_RS_DATA_REQUEST:
2798                 case P_CSUM_RS_REQUEST:
2799                 case P_OV_REQUEST:
2800                         drbd_send_ack_rp(peer_device, P_NEG_RS_DREPLY , p);
2801                         break;
2802                 case P_OV_REPLY:
2803                         verb = 0;
2804                         dec_rs_pending(device);
2805                         drbd_send_ack_ex(peer_device, P_OV_RESULT, sector, size, ID_IN_SYNC);
2806                         break;
2807                 default:
2808                         BUG();
2809                 }
2810                 if (verb && __ratelimit(&drbd_ratelimit_state))
2811                         drbd_err(device, "Can not satisfy peer's read request, "
2812                             "no local data.\n");
2813
2814                 /* drain possibly payload */
2815                 return drbd_drain_block(peer_device, pi->size);
2816         }
2817
2818         /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
2819          * "criss-cross" setup, that might cause write-out on some other DRBD,
2820          * which in turn might block on the other node at this very place.  */
2821         peer_req = drbd_alloc_peer_req(peer_device, p->block_id, sector, size,
2822                         size, GFP_NOIO);
2823         if (!peer_req) {
2824                 put_ldev(device);
2825                 return -ENOMEM;
2826         }
2827
2828         switch (pi->cmd) {
2829         case P_DATA_REQUEST:
2830                 peer_req->w.cb = w_e_end_data_req;
2831                 fault_type = DRBD_FAULT_DT_RD;
2832                 /* application IO, don't drbd_rs_begin_io */
2833                 peer_req->flags |= EE_APPLICATION;
2834                 goto submit;
2835
2836         case P_RS_THIN_REQ:
2837                 /* If at some point in the future we have a smart way to
2838                    find out if this data block is completely deallocated,
2839                    then we would do something smarter here than reading
2840                    the block... */
2841                 peer_req->flags |= EE_RS_THIN_REQ;
2842         case P_RS_DATA_REQUEST:
2843                 peer_req->w.cb = w_e_end_rsdata_req;
2844                 fault_type = DRBD_FAULT_RS_RD;
2845                 /* used in the sector offset progress display */
2846                 device->bm_resync_fo = BM_SECT_TO_BIT(sector);
2847                 break;
2848
2849         case P_OV_REPLY:
2850         case P_CSUM_RS_REQUEST:
2851                 fault_type = DRBD_FAULT_RS_RD;
2852                 di = kmalloc(sizeof(*di) + pi->size, GFP_NOIO);
2853                 if (!di)
2854                         goto out_free_e;
2855
2856                 di->digest_size = pi->size;
2857                 di->digest = (((char *)di)+sizeof(struct digest_info));
2858
2859                 peer_req->digest = di;
2860                 peer_req->flags |= EE_HAS_DIGEST;
2861
2862                 if (drbd_recv_all(peer_device->connection, di->digest, pi->size))
2863                         goto out_free_e;
2864
2865                 if (pi->cmd == P_CSUM_RS_REQUEST) {
2866                         D_ASSERT(device, peer_device->connection->agreed_pro_version >= 89);
2867                         peer_req->w.cb = w_e_end_csum_rs_req;
2868                         /* used in the sector offset progress display */
2869                         device->bm_resync_fo = BM_SECT_TO_BIT(sector);
2870                         /* remember to report stats in drbd_resync_finished */
2871                         device->use_csums = true;
2872                 } else if (pi->cmd == P_OV_REPLY) {
2873                         /* track progress, we may need to throttle */
2874                         atomic_add(size >> 9, &device->rs_sect_in);
2875                         peer_req->w.cb = w_e_end_ov_reply;
2876                         dec_rs_pending(device);
2877                         /* drbd_rs_begin_io done when we sent this request,
2878                          * but accounting still needs to be done. */
2879                         goto submit_for_resync;
2880                 }
2881                 break;
2882
2883         case P_OV_REQUEST:
2884                 if (device->ov_start_sector == ~(sector_t)0 &&
2885                     peer_device->connection->agreed_pro_version >= 90) {
2886                         unsigned long now = jiffies;
2887                         int i;
2888                         device->ov_start_sector = sector;
2889                         device->ov_position = sector;
2890                         device->ov_left = drbd_bm_bits(device) - BM_SECT_TO_BIT(sector);
2891                         device->rs_total = device->ov_left;
2892                         for (i = 0; i < DRBD_SYNC_MARKS; i++) {
2893                                 device->rs_mark_left[i] = device->ov_left;
2894                                 device->rs_mark_time[i] = now;
2895                         }
2896                         drbd_info(device, "Online Verify start sector: %llu\n",
2897                                         (unsigned long long)sector);
2898                 }
2899                 peer_req->w.cb = w_e_end_ov_req;
2900                 fault_type = DRBD_FAULT_RS_RD;
2901                 break;
2902
2903         default:
2904                 BUG();
2905         }
2906
2907         /* Throttle, drbd_rs_begin_io and submit should become asynchronous
2908          * wrt the receiver, but it is not as straightforward as it may seem.
2909          * Various places in the resync start and stop logic assume resync
2910          * requests are processed in order, requeuing this on the worker thread
2911          * introduces a bunch of new code for synchronization between threads.
2912          *
2913          * Unlimited throttling before drbd_rs_begin_io may stall the resync
2914          * "forever", throttling after drbd_rs_begin_io will lock that extent
2915          * for application writes for the same time.  For now, just throttle
2916          * here, where the rest of the code expects the receiver to sleep for
2917          * a while, anyways.
2918          */
2919
2920         /* Throttle before drbd_rs_begin_io, as that locks out application IO;
2921          * this defers syncer requests for some time, before letting at least
2922          * on request through.  The resync controller on the receiving side
2923          * will adapt to the incoming rate accordingly.
2924          *
2925          * We cannot throttle here if remote is Primary/SyncTarget:
2926          * we would also throttle its application reads.
2927          * In that case, throttling is done on the SyncTarget only.
2928          */
2929
2930         /* Even though this may be a resync request, we do add to "read_ee";
2931          * "sync_ee" is only used for resync WRITEs.
2932          * Add to list early, so debugfs can find this request
2933          * even if we have to sleep below. */
2934         spin_lock_irq(&device->resource->req_lock);
2935         list_add_tail(&peer_req->w.list, &device->read_ee);
2936         spin_unlock_irq(&device->resource->req_lock);
2937
2938         update_receiver_timing_details(connection, drbd_rs_should_slow_down);
2939         if (device->state.peer != R_PRIMARY
2940         && drbd_rs_should_slow_down(device, sector, false))
2941                 schedule_timeout_uninterruptible(HZ/10);
2942         update_receiver_timing_details(connection, drbd_rs_begin_io);
2943         if (drbd_rs_begin_io(device, sector))
2944                 goto out_free_e;
2945
2946 submit_for_resync:
2947         atomic_add(size >> 9, &device->rs_sect_ev);
2948
2949 submit:
2950         update_receiver_timing_details(connection, drbd_submit_peer_request);
2951         inc_unacked(device);
2952         if (drbd_submit_peer_request(device, peer_req, REQ_OP_READ, 0,
2953                                      fault_type) == 0)
2954                 return 0;
2955
2956         /* don't care for the reason here */
2957         drbd_err(device, "submit failed, triggering re-connect\n");
2958
2959 out_free_e:
2960         spin_lock_irq(&device->resource->req_lock);
2961         list_del(&peer_req->w.list);
2962         spin_unlock_irq(&device->resource->req_lock);
2963         /* no drbd_rs_complete_io(), we are dropping the connection anyways */
2964
2965         put_ldev(device);
2966         drbd_free_peer_req(device, peer_req);
2967         return -EIO;
2968 }
2969
2970 /**
2971  * drbd_asb_recover_0p  -  Recover after split-brain with no remaining primaries
2972  */
2973 static int drbd_asb_recover_0p(struct drbd_peer_device *peer_device) __must_hold(local)
2974 {
2975         struct drbd_device *device = peer_device->device;
2976         int self, peer, rv = -100;
2977         unsigned long ch_self, ch_peer;
2978         enum drbd_after_sb_p after_sb_0p;
2979
2980         self = device->ldev->md.uuid[UI_BITMAP] & 1;
2981         peer = device->p_uuid[UI_BITMAP] & 1;
2982
2983         ch_peer = device->p_uuid[UI_SIZE];
2984         ch_self = device->comm_bm_set;
2985
2986         rcu_read_lock();
2987         after_sb_0p = rcu_dereference(peer_device->connection->net_conf)->after_sb_0p;
2988         rcu_read_unlock();
2989         switch (after_sb_0p) {
2990         case ASB_CONSENSUS:
2991         case ASB_DISCARD_SECONDARY:
2992         case ASB_CALL_HELPER:
2993         case ASB_VIOLENTLY:
2994                 drbd_err(device, "Configuration error.\n");
2995                 break;
2996         case ASB_DISCONNECT:
2997                 break;
2998         case ASB_DISCARD_YOUNGER_PRI:
2999                 if (self == 0 && peer == 1) {
3000                         rv = -1;
3001                         break;
3002                 }
3003                 if (self == 1 && peer == 0) {
3004                         rv =  1;
3005                         break;
3006                 }
3007                 /* Else fall through to one of the other strategies... */
3008         case ASB_DISCARD_OLDER_PRI:
3009                 if (self == 0 && peer == 1) {
3010                         rv = 1;
3011                         break;
3012                 }
3013                 if (self == 1 && peer == 0) {
3014                         rv = -1;
3015                         break;
3016                 }
3017                 /* Else fall through to one of the other strategies... */
3018                 drbd_warn(device, "Discard younger/older primary did not find a decision\n"
3019                      "Using discard-least-changes instead\n");
3020         case ASB_DISCARD_ZERO_CHG:
3021                 if (ch_peer == 0 && ch_self == 0) {
3022                         rv = test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)
3023                                 ? -1 : 1;
3024                         break;
3025                 } else {
3026                         if (ch_peer == 0) { rv =  1; break; }
3027                         if (ch_self == 0) { rv = -1; break; }
3028                 }
3029                 if (after_sb_0p == ASB_DISCARD_ZERO_CHG)
3030                         break;
3031         case ASB_DISCARD_LEAST_CHG:
3032                 if      (ch_self < ch_peer)
3033                         rv = -1;
3034                 else if (ch_self > ch_peer)
3035                         rv =  1;
3036                 else /* ( ch_self == ch_peer ) */
3037                      /* Well, then use something else. */
3038                         rv = test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)
3039                                 ? -1 : 1;
3040                 break;
3041         case ASB_DISCARD_LOCAL:
3042                 rv = -1;
3043                 break;
3044         case ASB_DISCARD_REMOTE:
3045                 rv =  1;
3046         }
3047
3048         return rv;
3049 }
3050
3051 /**
3052  * drbd_asb_recover_1p  -  Recover after split-brain with one remaining primary
3053  */
3054 static int drbd_asb_recover_1p(struct drbd_peer_device *peer_device) __must_hold(local)
3055 {
3056         struct drbd_device *device = peer_device->device;
3057         int hg, rv = -100;
3058         enum drbd_after_sb_p after_sb_1p;
3059
3060         rcu_read_lock();
3061         after_sb_1p = rcu_dereference(peer_device->connection->net_conf)->after_sb_1p;
3062         rcu_read_unlock();
3063         switch (after_sb_1p) {
3064         case ASB_DISCARD_YOUNGER_PRI:
3065         case ASB_DISCARD_OLDER_PRI:
3066         case ASB_DISCARD_LEAST_CHG:
3067         case ASB_DISCARD_LOCAL:
3068         case ASB_DISCARD_REMOTE:
3069         case ASB_DISCARD_ZERO_CHG:
3070                 drbd_err(device, "Configuration error.\n");
3071                 break;
3072         case ASB_DISCONNECT:
3073                 break;
3074         case ASB_CONSENSUS:
3075                 hg = drbd_asb_recover_0p(peer_device);
3076                 if (hg == -1 && device->state.role == R_SECONDARY)
3077                         rv = hg;
3078                 if (hg == 1  && device->state.role == R_PRIMARY)
3079                         rv = hg;
3080                 break;
3081         case ASB_VIOLENTLY:
3082                 rv = drbd_asb_recover_0p(peer_device);
3083                 break;
3084         case ASB_DISCARD_SECONDARY:
3085                 return device->state.role == R_PRIMARY ? 1 : -1;
3086         case ASB_CALL_HELPER:
3087                 hg = drbd_asb_recover_0p(peer_device);
3088                 if (hg == -1 && device->state.role == R_PRIMARY) {
3089                         enum drbd_state_rv rv2;
3090
3091                          /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
3092                           * we might be here in C_WF_REPORT_PARAMS which is transient.
3093                           * we do not need to wait for the after state change work either. */
3094                         rv2 = drbd_change_state(device, CS_VERBOSE, NS(role, R_SECONDARY));
3095                         if (rv2 != SS_SUCCESS) {
3096                                 drbd_khelper(device, "pri-lost-after-sb");
3097                         } else {
3098                                 drbd_warn(device, "Successfully gave up primary role.\n");
3099                                 rv = hg;
3100                         }
3101                 } else
3102                         rv = hg;
3103         }
3104
3105         return rv;
3106 }
3107
3108 /**
3109  * drbd_asb_recover_2p  -  Recover after split-brain with two remaining primaries
3110  */
3111 static int drbd_asb_recover_2p(struct drbd_peer_device *peer_device) __must_hold(local)
3112 {
3113         struct drbd_device *device = peer_device->device;
3114         int hg, rv = -100;
3115         enum drbd_after_sb_p after_sb_2p;
3116
3117         rcu_read_lock();
3118         after_sb_2p = rcu_dereference(peer_device->connection->net_conf)->after_sb_2p;
3119         rcu_read_unlock();
3120         switch (after_sb_2p) {
3121         case ASB_DISCARD_YOUNGER_PRI:
3122         case ASB_DISCARD_OLDER_PRI:
3123         case ASB_DISCARD_LEAST_CHG:
3124         case ASB_DISCARD_LOCAL:
3125         case ASB_DISCARD_REMOTE:
3126         case ASB_CONSENSUS:
3127         case ASB_DISCARD_SECONDARY:
3128         case ASB_DISCARD_ZERO_CHG:
3129                 drbd_err(device, "Configuration error.\n");
3130                 break;
3131         case ASB_VIOLENTLY:
3132                 rv = drbd_asb_recover_0p(peer_device);
3133                 break;
3134         case ASB_DISCONNECT:
3135                 break;
3136         case ASB_CALL_HELPER:
3137                 hg = drbd_asb_recover_0p(peer_device);
3138                 if (hg == -1) {
3139                         enum drbd_state_rv rv2;
3140
3141                          /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
3142                           * we might be here in C_WF_REPORT_PARAMS which is transient.
3143                           * we do not need to wait for the after state change work either. */
3144                         rv2 = drbd_change_state(device, CS_VERBOSE, NS(role, R_SECONDARY));
3145                         if (rv2 != SS_SUCCESS) {
3146                                 drbd_khelper(device, "pri-lost-after-sb");
3147                         } else {
3148                                 drbd_warn(device, "Successfully gave up primary role.\n");
3149                                 rv = hg;
3150                         }
3151                 } else
3152                         rv = hg;
3153         }
3154
3155         return rv;
3156 }
3157
3158 static void drbd_uuid_dump(struct drbd_device *device, char *text, u64 *uuid,
3159                            u64 bits, u64 flags)
3160 {
3161         if (!uuid) {
3162                 drbd_info(device, "%s uuid info vanished while I was looking!\n", text);
3163                 return;
3164         }
3165         drbd_info(device, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",
3166              text,
3167              (unsigned long long)uuid[UI_CURRENT],
3168              (unsigned long long)uuid[UI_BITMAP],
3169              (unsigned long long)uuid[UI_HISTORY_START],
3170              (unsigned long long)uuid[UI_HISTORY_END],
3171              (unsigned long long)bits,
3172              (unsigned long long)flags);
3173 }
3174
3175 /*
3176   100   after split brain try auto recover
3177     2   C_SYNC_SOURCE set BitMap
3178     1   C_SYNC_SOURCE use BitMap
3179     0   no Sync
3180    -1   C_SYNC_TARGET use BitMap
3181    -2   C_SYNC_TARGET set BitMap
3182  -100   after split brain, disconnect
3183 -1000   unrelated data
3184 -1091   requires proto 91
3185 -1096   requires proto 96
3186  */
3187
3188 static int drbd_uuid_compare(struct drbd_device *const device, enum drbd_role const peer_role, int *rule_nr) __must_hold(local)
3189 {
3190         struct drbd_peer_device *const peer_device = first_peer_device(device);
3191         struct drbd_connection *const connection = peer_device ? peer_device->connection : NULL;
3192         u64 self, peer;
3193         int i, j;
3194
3195         self = device->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
3196         peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
3197
3198         *rule_nr = 10;
3199         if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED)
3200                 return 0;
3201
3202         *rule_nr = 20;
3203         if ((self == UUID_JUST_CREATED || self == (u64)0) &&
3204              peer != UUID_JUST_CREATED)
3205                 return -2;
3206
3207         *rule_nr = 30;
3208         if (self != UUID_JUST_CREATED &&
3209             (peer == UUID_JUST_CREATED || peer == (u64)0))
3210                 return 2;
3211
3212         if (self == peer) {
3213                 int rct, dc; /* roles at crash time */
3214
3215                 if (device->p_uuid[UI_BITMAP] == (u64)0 && device->ldev->md.uuid[UI_BITMAP] != (u64)0) {
3216
3217                         if (connection->agreed_pro_version < 91)
3218                                 return -1091;
3219
3220                         if ((device->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START] & ~((u64)1)) &&
3221                             (device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) {
3222                                 drbd_info(device, "was SyncSource, missed the resync finished event, corrected myself:\n");
3223                                 drbd_uuid_move_history(device);
3224                                 device->ldev->md.uuid[UI_HISTORY_START] = device->ldev->md.uuid[UI_BITMAP];
3225                                 device->ldev->md.uuid[UI_BITMAP] = 0;
3226
3227                                 drbd_uuid_dump(device, "self", device->ldev->md.uuid,
3228                                                device->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(device) : 0, 0);
3229                                 *rule_nr = 34;
3230                         } else {
3231                                 drbd_info(device, "was SyncSource (peer failed to write sync_uuid)\n");
3232                                 *rule_nr = 36;
3233                         }
3234
3235                         return 1;
3236                 }
3237
3238                 if (device->ldev->md.uuid[UI_BITMAP] == (u64)0 && device->p_uuid[UI_BITMAP] != (u64)0) {
3239
3240                         if (connection->agreed_pro_version < 91)
3241                                 return -1091;
3242
3243                         if ((device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (device->p_uuid[UI_BITMAP] & ~((u64)1)) &&
3244                             (device->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START] & ~((u64)1))) {
3245                                 drbd_info(device, "was SyncTarget, peer missed the resync finished event, corrected peer:\n");
3246
3247                                 device->p_uuid[UI_HISTORY_START + 1] = device->p_uuid[UI_HISTORY_START];
3248                                 device->p_uuid[UI_HISTORY_START] = device->p_uuid[UI_BITMAP];
3249                                 device->p_uuid[UI_BITMAP] = 0UL;
3250
3251                                 drbd_uuid_dump(device, "peer", device->p_uuid, device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
3252                                 *rule_nr = 35;
3253                         } else {
3254                                 drbd_info(device, "was SyncTarget (failed to write sync_uuid)\n");
3255                                 *rule_nr = 37;
3256                         }
3257
3258                         return -1;
3259                 }
3260
3261                 /* Common power [off|failure] */
3262                 rct = (test_bit(CRASHED_PRIMARY, &device->flags) ? 1 : 0) +
3263                         (device->p_uuid[UI_FLAGS] & 2);
3264                 /* lowest bit is set when we were primary,
3265                  * next bit (weight 2) is set when peer was primary */
3266                 *rule_nr = 40;
3267
3268                 /* Neither has the "crashed primary" flag set,
3269                  * only a replication link hickup. */
3270                 if (rct == 0)
3271                         return 0;
3272
3273                 /* Current UUID equal and no bitmap uuid; does not necessarily
3274                  * mean this was a "simultaneous hard crash", maybe IO was
3275                  * frozen, so no UUID-bump happened.
3276                  * This is a protocol change, overload DRBD_FF_WSAME as flag
3277                  * for "new-enough" peer DRBD version. */
3278                 if (device->state.role == R_PRIMARY || peer_role == R_PRIMARY) {
3279                         *rule_nr = 41;
3280                         if (!(connection->agreed_features & DRBD_FF_WSAME)) {
3281                                 drbd_warn(peer_device, "Equivalent unrotated UUIDs, but current primary present.\n");
3282                                 return -(0x10000 | PRO_VERSION_MAX | (DRBD_FF_WSAME << 8));
3283                         }
3284                         if (device->state.role == R_PRIMARY && peer_role == R_PRIMARY) {
3285                                 /* At least one has the "crashed primary" bit set,
3286                                  * both are primary now, but neither has rotated its UUIDs?
3287                                  * "Can not happen." */
3288                                 drbd_err(peer_device, "Equivalent unrotated UUIDs, but both are primary. Can not resolve this.\n");
3289                                 return -100;
3290                         }
3291                         if (device->state.role == R_PRIMARY)
3292                                 return 1;
3293                         return -1;
3294                 }
3295
3296                 /* Both are secondary.
3297                  * Really looks like recovery from simultaneous hard crash.
3298                  * Check which had been primary before, and arbitrate. */
3299                 switch (rct) {
3300                 case 0: /* !self_pri && !peer_pri */ return 0; /* already handled */
3301                 case 1: /*  self_pri && !peer_pri */ return 1;
3302                 case 2: /* !self_pri &&  peer_pri */ return -1;
3303                 case 3: /*  self_pri &&  peer_pri */
3304                         dc = test_bit(RESOLVE_CONFLICTS, &connection->flags);
3305                         return dc ? -1 : 1;
3306                 }
3307         }
3308
3309         *rule_nr = 50;
3310         peer = device->p_uuid[UI_BITMAP] & ~((u64)1);
3311         if (self == peer)
3312                 return -1;
3313
3314         *rule_nr = 51;
3315         peer = device->p_uuid[UI_HISTORY_START] & ~((u64)1);
3316         if (self == peer) {
3317                 if (connection->agreed_pro_version < 96 ?
3318                     (device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) ==
3319                     (device->p_uuid[UI_HISTORY_START + 1] & ~((u64)1)) :
3320                     peer + UUID_NEW_BM_OFFSET == (device->p_uuid[UI_BITMAP] & ~((u64)1))) {
3321                         /* The last P_SYNC_UUID did not get though. Undo the last start of
3322                            resync as sync source modifications of the peer's UUIDs. */
3323
3324                         if (connection->agreed_pro_version < 91)
3325                                 return -1091;
3326
3327                         device->p_uuid[UI_BITMAP] = device->p_uuid[UI_HISTORY_START];
3328                         device->p_uuid[UI_HISTORY_START] = device->p_uuid[UI_HISTORY_START + 1];
3329
3330                         drbd_info(device, "Lost last syncUUID packet, corrected:\n");
3331                         drbd_uuid_dump(device, "peer", device->p_uuid, device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
3332
3333                         return -1;
3334                 }
3335         }
3336
3337         *rule_nr = 60;
3338         self = device->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
3339         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
3340                 peer = device->p_uuid[i] & ~((u64)1);
3341                 if (self == peer)
3342                         return -2;
3343         }
3344
3345         *rule_nr = 70;
3346         self = device->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
3347         peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
3348         if (self == peer)
3349                 return 1;
3350
3351         *rule_nr = 71;
3352         self = device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
3353         if (self == peer) {
3354                 if (connection->agreed_pro_version < 96 ?
3355                     (device->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) ==
3356                     (device->p_uuid[UI_HISTORY_START] & ~((u64)1)) :
3357                     self + UUID_NEW_BM_OFFSET == (device->ldev->md.uuid[UI_BITMAP] & ~((u64)1))) {
3358                         /* The last P_SYNC_UUID did not get though. Undo the last start of
3359                            resync as sync source modifications of our UUIDs. */
3360
3361                         if (connection->agreed_pro_version < 91)
3362                                 return -1091;
3363
3364                         __drbd_uuid_set(device, UI_BITMAP, device->ldev->md.uuid[UI_HISTORY_START]);
3365                         __drbd_uuid_set(device, UI_HISTORY_START, device->ldev->md.uuid[UI_HISTORY_START + 1]);
3366
3367                         drbd_info(device, "Last syncUUID did not get through, corrected:\n");
3368                         drbd_uuid_dump(device, "self", device->ldev->md.uuid,
3369                                        device->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(device) : 0, 0);
3370
3371                         return 1;
3372                 }
3373         }
3374
3375
3376         *rule_nr = 80;
3377         peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
3378         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
3379                 self = device->ldev->md.uuid[i] & ~((u64)1);
3380                 if (self == peer)
3381                         return 2;
3382         }
3383
3384         *rule_nr = 90;
3385         self = device->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
3386         peer = device->p_uuid[UI_BITMAP] & ~((u64)1);
3387         if (self == peer && self != ((u64)0))
3388                 return 100;
3389
3390         *rule_nr = 100;
3391         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
3392                 self = device->ldev->md.uuid[i] & ~((u64)1);
3393                 for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) {
3394                         peer = device->p_uuid[j] & ~((u64)1);
3395                         if (self == peer)
3396                                 return -100;
3397                 }
3398         }
3399
3400         return -1000;
3401 }
3402
3403 /* drbd_sync_handshake() returns the new conn state on success, or
3404    CONN_MASK (-1) on failure.
3405  */
3406 static enum drbd_conns drbd_sync_handshake(struct drbd_peer_device *peer_device,
3407                                            enum drbd_role peer_role,
3408                                            enum drbd_disk_state peer_disk) __must_hold(local)
3409 {
3410         struct drbd_device *device = peer_device->device;
3411         enum drbd_conns rv = C_MASK;
3412         enum drbd_disk_state mydisk;
3413         struct net_conf *nc;
3414         int hg, rule_nr, rr_conflict, tentative;
3415
3416         mydisk = device->state.disk;
3417         if (mydisk == D_NEGOTIATING)
3418                 mydisk = device->new_state_tmp.disk;
3419
3420         drbd_info(device, "drbd_sync_handshake:\n");
3421
3422         spin_lock_irq(&device->ldev->md.uuid_lock);
3423         drbd_uuid_dump(device, "self", device->ldev->md.uuid, device->comm_bm_set, 0);
3424         drbd_uuid_dump(device, "peer", device->p_uuid,
3425                        device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
3426
3427         hg = drbd_uuid_compare(device, peer_role, &rule_nr);
3428         spin_unlock_irq(&device->ldev->md.uuid_lock);
3429
3430         drbd_info(device, "uuid_compare()=%d by rule %d\n", hg, rule_nr);
3431
3432         if (hg == -1000) {
3433                 drbd_alert(device, "Unrelated data, aborting!\n");
3434                 return C_MASK;
3435         }
3436         if (hg < -0x10000) {
3437                 int proto, fflags;
3438                 hg = -hg;
3439                 proto = hg & 0xff;
3440                 fflags = (hg >> 8) & 0xff;
3441                 drbd_alert(device, "To resolve this both sides have to support at least protocol %d and feature flags 0x%x\n",
3442                                         proto, fflags);
3443                 return C_MASK;
3444         }
3445         if (hg < -1000) {
3446                 drbd_alert(device, "To resolve this both sides have to support at least protocol %d\n", -hg - 1000);
3447                 return C_MASK;
3448         }
3449
3450         if    ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) ||
3451             (peer_disk == D_INCONSISTENT && mydisk    > D_INCONSISTENT)) {
3452                 int f = (hg == -100) || abs(hg) == 2;
3453                 hg = mydisk > D_INCONSISTENT ? 1 : -1;
3454                 if (f)
3455                         hg = hg*2;
3456                 drbd_info(device, "Becoming sync %s due to disk states.\n",
3457                      hg > 0 ? "source" : "target");
3458         }
3459
3460         if (abs(hg) == 100)
3461                 drbd_khelper(device, "initial-split-brain");
3462
3463         rcu_read_lock();
3464         nc = rcu_dereference(peer_device->connection->net_conf);
3465
3466         if (hg == 100 || (hg == -100 && nc->always_asbp)) {
3467                 int pcount = (device->state.role == R_PRIMARY)
3468                            + (peer_role == R_PRIMARY);
3469                 int forced = (hg == -100);
3470
3471                 switch (pcount) {
3472                 case 0:
3473                         hg = drbd_asb_recover_0p(peer_device);
3474                         break;
3475                 case 1:
3476                         hg = drbd_asb_recover_1p(peer_device);
3477                         break;
3478                 case 2:
3479                         hg = drbd_asb_recover_2p(peer_device);
3480                         break;
3481                 }
3482                 if (abs(hg) < 100) {
3483                         drbd_warn(device, "Split-Brain detected, %d primaries, "
3484                              "automatically solved. Sync from %s node\n",
3485                              pcount, (hg < 0) ? "peer" : "this");
3486                         if (forced) {
3487                                 drbd_warn(device, "Doing a full sync, since"
3488                                      " UUIDs where ambiguous.\n");
3489                                 hg = hg*2;
3490                         }
3491                 }
3492         }
3493
3494         if (hg == -100) {
3495                 if (test_bit(DISCARD_MY_DATA, &device->flags) && !(device->p_uuid[UI_FLAGS]&1))
3496                         hg = -1;
3497                 if (!test_bit(DISCARD_MY_DATA, &device->flags) && (device->p_uuid[UI_FLAGS]&1))
3498                         hg = 1;
3499
3500                 if (abs(hg) < 100)
3501                         drbd_warn(device, "Split-Brain detected, manually solved. "
3502                              "Sync from %s node\n",
3503                              (hg < 0) ? "peer" : "this");
3504         }
3505         rr_conflict = nc->rr_conflict;
3506         tentative = nc->tentative;
3507         rcu_read_unlock();
3508
3509         if (hg == -100) {
3510                 /* FIXME this log message is not correct if we end up here
3511                  * after an attempted attach on a diskless node.
3512                  * We just refuse to attach -- well, we drop the "connection"
3513                  * to that disk, in a way... */
3514                 drbd_alert(device, "Split-Brain detected but unresolved, dropping connection!\n");
3515                 drbd_khelper(device, "split-brain");
3516                 return C_MASK;
3517         }
3518
3519         if (hg > 0 && mydisk <= D_INCONSISTENT) {
3520                 drbd_err(device, "I shall become SyncSource, but I am inconsistent!\n");
3521                 return C_MASK;
3522         }
3523
3524         if (hg < 0 && /* by intention we do not use mydisk here. */
3525             device->state.role == R_PRIMARY && device->state.disk >= D_CONSISTENT) {
3526                 switch (rr_conflict) {
3527                 case ASB_CALL_HELPER:
3528                         drbd_khelper(device, "pri-lost");
3529                         /* fall through */
3530                 case ASB_DISCONNECT:
3531                         drbd_err(device, "I shall become SyncTarget, but I am primary!\n");
3532                         return C_MASK;
3533                 case ASB_VIOLENTLY:
3534                         drbd_warn(device, "Becoming SyncTarget, violating the stable-data"
3535                              "assumption\n");
3536                 }
3537         }
3538
3539         if (tentative || test_bit(CONN_DRY_RUN, &peer_device->connection->flags)) {
3540                 if (hg == 0)
3541                         drbd_info(device, "dry-run connect: No resync, would become Connected immediately.\n");
3542                 else
3543                         drbd_info(device, "dry-run connect: Would become %s, doing a %s resync.",
3544                                  drbd_conn_str(hg > 0 ? C_SYNC_SOURCE : C_SYNC_TARGET),
3545                                  abs(hg) >= 2 ? "full" : "bit-map based");
3546                 return C_MASK;
3547         }
3548
3549         if (abs(hg) >= 2) {
3550                 drbd_info(device, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n");
3551                 if (drbd_bitmap_io(device, &drbd_bmio_set_n_write, "set_n_write from sync_handshake",
3552                                         BM_LOCKED_SET_ALLOWED))
3553                         return C_MASK;
3554         }
3555
3556         if (hg > 0) { /* become sync source. */
3557                 rv = C_WF_BITMAP_S;
3558         } else if (hg < 0) { /* become sync target */
3559                 rv = C_WF_BITMAP_T;
3560         } else {
3561                 rv = C_CONNECTED;
3562                 if (drbd_bm_total_weight(device)) {
3563                         drbd_info(device, "No resync, but %lu bits in bitmap!\n",
3564                              drbd_bm_total_weight(device));
3565                 }
3566         }
3567
3568         return rv;
3569 }
3570
3571 static enum drbd_after_sb_p convert_after_sb(enum drbd_after_sb_p peer)
3572 {
3573         /* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */
3574         if (peer == ASB_DISCARD_REMOTE)
3575                 return ASB_DISCARD_LOCAL;
3576
3577         /* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */
3578         if (peer == ASB_DISCARD_LOCAL)
3579                 return ASB_DISCARD_REMOTE;
3580
3581         /* everything else is valid if they are equal on both sides. */
3582         return peer;
3583 }
3584
3585 static int receive_protocol(struct drbd_connection *connection, struct packet_info *pi)
3586 {
3587         struct p_protocol *p = pi->data;
3588         enum drbd_after_sb_p p_after_sb_0p, p_after_sb_1p, p_after_sb_2p;
3589         int p_proto, p_discard_my_data, p_two_primaries, cf;
3590         struct net_conf *nc, *old_net_conf, *new_net_conf = NULL;
3591         char integrity_alg[SHARED_SECRET_MAX] = "";
3592         struct crypto_ahash *peer_integrity_tfm = NULL;
3593         void *int_dig_in = NULL, *int_dig_vv = NULL;
3594
3595         p_proto         = be32_to_cpu(p->protocol);
3596         p_after_sb_0p   = be32_to_cpu(p->after_sb_0p);
3597         p_after_sb_1p   = be32_to_cpu(p->after_sb_1p);
3598         p_after_sb_2p   = be32_to_cpu(p->after_sb_2p);
3599         p_two_primaries = be32_to_cpu(p->two_primaries);
3600         cf              = be32_to_cpu(p->conn_flags);
3601         p_discard_my_data = cf & CF_DISCARD_MY_DATA;
3602
3603         if (connection->agreed_pro_version >= 87) {
3604                 int err;
3605
3606                 if (pi->size > sizeof(integrity_alg))
3607                         return -EIO;
3608                 err = drbd_recv_all(connection, integrity_alg, pi->size);
3609                 if (err)
3610                         return err;
3611                 integrity_alg[SHARED_SECRET_MAX - 1] = 0;
3612         }
3613
3614         if (pi->cmd != P_PROTOCOL_UPDATE) {
3615                 clear_bit(CONN_DRY_RUN, &connection->flags);
3616
3617                 if (cf & CF_DRY_RUN)
3618                         set_bit(CONN_DRY_RUN, &connection->flags);
3619
3620                 rcu_read_lock();
3621                 nc = rcu_dereference(connection->net_conf);
3622
3623                 if (p_proto != nc->wire_protocol) {
3624                         drbd_err(connection, "incompatible %s settings\n", "protocol");
3625                         goto disconnect_rcu_unlock;
3626                 }
3627
3628                 if (convert_after_sb(p_after_sb_0p) != nc->after_sb_0p) {
3629                         drbd_err(connection, "incompatible %s settings\n", "after-sb-0pri");
3630                         goto disconnect_rcu_unlock;
3631                 }
3632
3633                 if (convert_after_sb(p_after_sb_1p) != nc->after_sb_1p) {
3634                         drbd_err(connection, "incompatible %s settings\n", "after-sb-1pri");
3635                         goto disconnect_rcu_unlock;
3636                 }
3637
3638                 if (convert_after_sb(p_after_sb_2p) != nc->after_sb_2p) {
3639                         drbd_err(connection, "incompatible %s settings\n", "after-sb-2pri");
3640                         goto disconnect_rcu_unlock;
3641                 }
3642
3643                 if (p_discard_my_data && nc->discard_my_data) {
3644                         drbd_err(connection, "incompatible %s settings\n", "discard-my-data");
3645                         goto disconnect_rcu_unlock;
3646                 }
3647
3648                 if (p_two_primaries != nc->two_primaries) {
3649                         drbd_err(connection, "incompatible %s settings\n", "allow-two-primaries");
3650                         goto disconnect_rcu_unlock;
3651                 }
3652
3653                 if (strcmp(integrity_alg, nc->integrity_alg)) {
3654                         drbd_err(connection, "incompatible %s settings\n", "data-integrity-alg");
3655                         goto disconnect_rcu_unlock;
3656                 }
3657
3658                 rcu_read_unlock();
3659         }
3660
3661         if (integrity_alg[0]) {
3662                 int hash_size;
3663
3664                 /*
3665                  * We can only change the peer data integrity algorithm
3666                  * here.  Changing our own data integrity algorithm
3667                  * requires that we send a P_PROTOCOL_UPDATE packet at
3668                  * the same time; otherwise, the peer has no way to
3669                  * tell between which packets the algorithm should
3670                  * change.
3671                  */
3672
3673                 peer_integrity_tfm = crypto_alloc_ahash(integrity_alg, 0, CRYPTO_ALG_ASYNC);
3674                 if (IS_ERR(peer_integrity_tfm)) {
3675                         peer_integrity_tfm = NULL;
3676                         drbd_err(connection, "peer data-integrity-alg %s not supported\n",
3677                                  integrity_alg);
3678                         goto disconnect;
3679                 }
3680
3681                 hash_size = crypto_ahash_digestsize(peer_integrity_tfm);
3682                 int_dig_in = kmalloc(hash_size, GFP_KERNEL);
3683                 int_dig_vv = kmalloc(hash_size, GFP_KERNEL);
3684                 if (!(int_dig_in && int_dig_vv)) {
3685                         drbd_err(connection, "Allocation of buffers for data integrity checking failed\n");
3686                         goto disconnect;
3687                 }
3688         }
3689
3690         new_net_conf = kmalloc(sizeof(struct net_conf), GFP_KERNEL);
3691         if (!new_net_conf) {
3692                 drbd_err(connection, "Allocation of new net_conf failed\n");
3693                 goto disconnect;
3694         }
3695
3696         mutex_lock(&connection->data.mutex);
3697         mutex_lock(&connection->resource->conf_update);
3698         old_net_conf = connection->net_conf;
3699         *new_net_conf = *old_net_conf;
3700
3701         new_net_conf->wire_protocol = p_proto;
3702         new_net_conf->after_sb_0p = convert_after_sb(p_after_sb_0p);
3703         new_net_conf->after_sb_1p = convert_after_sb(p_after_sb_1p);
3704         new_net_conf->after_sb_2p = convert_after_sb(p_after_sb_2p);
3705         new_net_conf->two_primaries = p_two_primaries;
3706
3707         rcu_assign_pointer(connection->net_conf, new_net_conf);
3708         mutex_unlock(&connection->resource->conf_update);
3709         mutex_unlock(&connection->data.mutex);
3710
3711         crypto_free_ahash(connection->peer_integrity_tfm);
3712         kfree(connection->int_dig_in);
3713         kfree(connection->int_dig_vv);
3714         connection->peer_integrity_tfm = peer_integrity_tfm;
3715         connection->int_dig_in = int_dig_in;
3716         connection->int_dig_vv = int_dig_vv;
3717
3718         if (strcmp(old_net_conf->integrity_alg, integrity_alg))
3719                 drbd_info(connection, "peer data-integrity-alg: %s\n",
3720                           integrity_alg[0] ? integrity_alg : "(none)");
3721
3722         synchronize_rcu();
3723         kfree(old_net_conf);
3724         return 0;
3725
3726 disconnect_rcu_unlock:
3727         rcu_read_unlock();
3728 disconnect:
3729         crypto_free_ahash(peer_integrity_tfm);
3730         kfree(int_dig_in);
3731         kfree(int_dig_vv);
3732         conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
3733         return -EIO;
3734 }
3735
3736 /* helper function
3737  * input: alg name, feature name
3738  * return: NULL (alg name was "")
3739  *         ERR_PTR(error) if something goes wrong
3740  *         or the crypto hash ptr, if it worked out ok. */
3741 static struct crypto_ahash *drbd_crypto_alloc_digest_safe(const struct drbd_device *device,
3742                 const char *alg, const char *name)
3743 {
3744         struct crypto_ahash *tfm;
3745
3746         if (!alg[0])
3747                 return NULL;
3748
3749         tfm = crypto_alloc_ahash(alg, 0, CRYPTO_ALG_ASYNC);
3750         if (IS_ERR(tfm)) {
3751                 drbd_err(device, "Can not allocate \"%s\" as %s (reason: %ld)\n",
3752                         alg, name, PTR_ERR(tfm));
3753                 return tfm;
3754         }
3755         return tfm;
3756 }
3757
3758 static int ignore_remaining_packet(struct drbd_connection *connection, struct packet_info *pi)
3759 {
3760         void *buffer = connection->data.rbuf;
3761         int size = pi->size;
3762
3763         while (size) {
3764                 int s = min_t(int, size, DRBD_SOCKET_BUFFER_SIZE);
3765                 s = drbd_recv(connection, buffer, s);
3766                 if (s <= 0) {
3767                         if (s < 0)
3768                                 return s;
3769                         break;
3770                 }
3771                 size -= s;
3772         }
3773         if (size)
3774                 return -EIO;
3775         return 0;
3776 }
3777
3778 /*
3779  * config_unknown_volume  -  device configuration command for unknown volume
3780  *
3781  * When a device is added to an existing connection, the node on which the
3782  * device is added first will send configuration commands to its peer but the
3783  * peer will not know about the device yet.  It will warn and ignore these
3784  * commands.  Once the device is added on the second node, the second node will
3785  * send the same device configuration commands, but in the other direction.
3786  *
3787  * (We can also end up here if drbd is misconfigured.)
3788  */
3789 static int config_unknown_volume(struct drbd_connection *connection, struct packet_info *pi)
3790 {
3791         drbd_warn(connection, "%s packet received for volume %u, which is not configured locally\n",
3792                   cmdname(pi->cmd), pi->vnr);
3793         return ignore_remaining_packet(connection, pi);
3794 }
3795
3796 static int receive_SyncParam(struct drbd_connection *connection, struct packet_info *pi)
3797 {
3798         struct drbd_peer_device *peer_device;
3799         struct drbd_device *device;
3800         struct p_rs_param_95 *p;
3801         unsigned int header_size, data_size, exp_max_sz;
3802         struct crypto_ahash *verify_tfm = NULL;
3803         struct crypto_ahash *csums_tfm = NULL;
3804         struct net_conf *old_net_conf, *new_net_conf = NULL;
3805         struct disk_conf *old_disk_conf = NULL, *new_disk_conf = NULL;
3806         const int apv = connection->agreed_pro_version;
3807         struct fifo_buffer *old_plan = NULL, *new_plan = NULL;
3808         int fifo_size = 0;
3809         int err;
3810
3811         peer_device = conn_peer_device(connection, pi->vnr);
3812         if (!peer_device)
3813                 return config_unknown_volume(connection, pi);
3814         device = peer_device->device;
3815
3816         exp_max_sz  = apv <= 87 ? sizeof(struct p_rs_param)
3817                     : apv == 88 ? sizeof(struct p_rs_param)
3818                                         + SHARED_SECRET_MAX
3819                     : apv <= 94 ? sizeof(struct p_rs_param_89)
3820                     : /* apv >= 95 */ sizeof(struct p_rs_param_95);
3821
3822         if (pi->size > exp_max_sz) {
3823                 drbd_err(device, "SyncParam packet too long: received %u, expected <= %u bytes\n",
3824                     pi->size, exp_max_sz);
3825                 return -EIO;
3826         }
3827
3828         if (apv <= 88) {
3829                 header_size = sizeof(struct p_rs_param);
3830                 data_size = pi->size - header_size;
3831         } else if (apv <= 94) {
3832                 header_size = sizeof(struct p_rs_param_89);
3833                 data_size = pi->size - header_size;
3834                 D_ASSERT(device, data_size == 0);
3835         } else {
3836                 header_size = sizeof(struct p_rs_param_95);
3837                 data_size = pi->size - header_size;
3838                 D_ASSERT(device, data_size == 0);
3839         }
3840
3841         /* initialize verify_alg and csums_alg */
3842         p = pi->data;
3843         memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
3844
3845         err = drbd_recv_all(peer_device->connection, p, header_size);
3846         if (err)
3847                 return err;
3848
3849         mutex_lock(&connection->resource->conf_update);
3850         old_net_conf = peer_device->connection->net_conf;
3851         if (get_ldev(device)) {
3852                 new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
3853                 if (!new_disk_conf) {
3854                         put_ldev(device);
3855                         mutex_unlock(&connection->resource->conf_update);
3856                         drbd_err(device, "Allocation of new disk_conf failed\n");
3857                         return -ENOMEM;
3858                 }
3859
3860                 old_disk_conf = device->ldev->disk_conf;
3861                 *new_disk_conf = *old_disk_conf;
3862
3863                 new_disk_conf->resync_rate = be32_to_cpu(p->resync_rate);
3864         }
3865
3866         if (apv >= 88) {
3867                 if (apv == 88) {
3868                         if (data_size > SHARED_SECRET_MAX || data_size == 0) {
3869                                 drbd_err(device, "verify-alg of wrong size, "
3870                                         "peer wants %u, accepting only up to %u byte\n",
3871                                         data_size, SHARED_SECRET_MAX);
3872                                 err = -EIO;
3873                                 goto reconnect;
3874                         }
3875
3876                         err = drbd_recv_all(peer_device->connection, p->verify_alg, data_size);
3877                         if (err)
3878                                 goto reconnect;
3879                         /* we expect NUL terminated string */
3880                         /* but just in case someone tries to be evil */
3881                         D_ASSERT(device, p->verify_alg[data_size-1] == 0);
3882                         p->verify_alg[data_size-1] = 0;
3883
3884                 } else /* apv >= 89 */ {
3885                         /* we still expect NUL terminated strings */
3886                         /* but just in case someone tries to be evil */
3887                         D_ASSERT(device, p->verify_alg[SHARED_SECRET_MAX-1] == 0);
3888                         D_ASSERT(device, p->csums_alg[SHARED_SECRET_MAX-1] == 0);
3889                         p->verify_alg[SHARED_SECRET_MAX-1] = 0;
3890                         p->csums_alg[SHARED_SECRET_MAX-1] = 0;
3891                 }
3892
3893                 if (strcmp(old_net_conf->verify_alg, p->verify_alg)) {
3894                         if (device->state.conn == C_WF_REPORT_PARAMS) {
3895                                 drbd_err(device, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n",
3896                                     old_net_conf->verify_alg, p->verify_alg);
3897                                 goto disconnect;
3898                         }
3899                         verify_tfm = drbd_crypto_alloc_digest_safe(device,
3900                                         p->verify_alg, "verify-alg");
3901                         if (IS_ERR(verify_tfm)) {
3902                                 verify_tfm = NULL;
3903                                 goto disconnect;
3904                         }
3905                 }
3906
3907                 if (apv >= 89 && strcmp(old_net_conf->csums_alg, p->csums_alg)) {
3908                         if (device->state.conn == C_WF_REPORT_PARAMS) {
3909                                 drbd_err(device, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n",
3910                                     old_net_conf->csums_alg, p->csums_alg);
3911                                 goto disconnect;
3912                         }
3913                         csums_tfm = drbd_crypto_alloc_digest_safe(device,
3914                                         p->csums_alg, "csums-alg");
3915                         if (IS_ERR(csums_tfm)) {
3916                                 csums_tfm = NULL;
3917                                 goto disconnect;
3918                         }
3919                 }
3920
3921                 if (apv > 94 && new_disk_conf) {
3922                         new_disk_conf->c_plan_ahead = be32_to_cpu(p->c_plan_ahead);
3923                         new_disk_conf->c_delay_target = be32_to_cpu(p->c_delay_target);
3924                         new_disk_conf->c_fill_target = be32_to_cpu(p->c_fill_target);
3925                         new_disk_conf->c_max_rate = be32_to_cpu(p->c_max_rate);
3926
3927                         fifo_size = (new_disk_conf->c_plan_ahead * 10 * SLEEP_TIME) / HZ;
3928                         if (fifo_size != device->rs_plan_s->size) {
3929                                 new_plan = fifo_alloc(fifo_size);
3930                                 if (!new_plan) {
3931                                         drbd_err(device, "kmalloc of fifo_buffer failed");
3932                                         put_ldev(device);
3933                                         goto disconnect;
3934                                 }
3935                         }
3936                 }
3937
3938                 if (verify_tfm || csums_tfm) {
3939                         new_net_conf = kzalloc(sizeof(struct net_conf), GFP_KERNEL);
3940                         if (!new_net_conf) {
3941                                 drbd_err(device, "Allocation of new net_conf failed\n");
3942                                 goto disconnect;
3943                         }
3944
3945                         *new_net_conf = *old_net_conf;
3946
3947                         if (verify_tfm) {
3948                                 strcpy(new_net_conf->verify_alg, p->verify_alg);
3949                                 new_net_conf->verify_alg_len = strlen(p->verify_alg) + 1;
3950                                 crypto_free_ahash(peer_device->connection->verify_tfm);
3951                                 peer_device->connection->verify_tfm = verify_tfm;
3952                                 drbd_info(device, "using verify-alg: \"%s\"\n", p->verify_alg);
3953                         }
3954                         if (csums_tfm) {
3955                                 strcpy(new_net_conf->csums_alg, p->csums_alg);
3956                                 new_net_conf->csums_alg_len = strlen(p->csums_alg) + 1;
3957                                 crypto_free_ahash(peer_device->connection->csums_tfm);
3958                                 peer_device->connection->csums_tfm = csums_tfm;
3959                                 drbd_info(device, "using csums-alg: \"%s\"\n", p->csums_alg);
3960                         }
3961                         rcu_assign_pointer(connection->net_conf, new_net_conf);
3962                 }
3963         }
3964
3965         if (new_disk_conf) {
3966                 rcu_assign_pointer(device->ldev->disk_conf, new_disk_conf);
3967                 put_ldev(device);
3968         }
3969
3970         if (new_plan) {
3971                 old_plan = device->rs_plan_s;
3972                 rcu_assign_pointer(device->rs_plan_s, new_plan);
3973         }
3974
3975         mutex_unlock(&connection->resource->conf_update);
3976         synchronize_rcu();
3977         if (new_net_conf)
3978                 kfree(old_net_conf);
3979         kfree(old_disk_conf);
3980         kfree(old_plan);
3981
3982         return 0;
3983
3984 reconnect:
3985         if (new_disk_conf) {
3986                 put_ldev(device);
3987                 kfree(new_disk_conf);
3988         }
3989         mutex_unlock(&connection->resource->conf_update);
3990         return -EIO;
3991
3992 disconnect:
3993         kfree(new_plan);
3994         if (new_disk_conf) {
3995                 put_ldev(device);
3996                 kfree(new_disk_conf);
3997         }
3998         mutex_unlock(&connection->resource->conf_update);
3999         /* just for completeness: actually not needed,
4000          * as this is not reached if csums_tfm was ok. */
4001         crypto_free_ahash(csums_tfm);
4002         /* but free the verify_tfm again, if csums_tfm did not work out */
4003         crypto_free_ahash(verify_tfm);
4004         conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4005         return -EIO;
4006 }
4007
4008 /* warn if the arguments differ by more than 12.5% */
4009 static void warn_if_differ_considerably(struct drbd_device *device,
4010         const char *s, sector_t a, sector_t b)
4011 {
4012         sector_t d;
4013         if (a == 0 || b == 0)
4014                 return;
4015         d = (a > b) ? (a - b) : (b - a);
4016         if (d > (a>>3) || d > (b>>3))
4017                 drbd_warn(device, "Considerable difference in %s: %llus vs. %llus\n", s,
4018                      (unsigned long long)a, (unsigned long long)b);
4019 }
4020
4021 static int receive_sizes(struct drbd_connection *connection, struct packet_info *pi)
4022 {
4023         struct drbd_peer_device *peer_device;
4024         struct drbd_device *device;
4025         struct p_sizes *p = pi->data;
4026         struct o_qlim *o = (connection->agreed_features & DRBD_FF_WSAME) ? p->qlim : NULL;
4027         enum determine_dev_size dd = DS_UNCHANGED;
4028         sector_t p_size, p_usize, p_csize, my_usize;
4029         int ldsc = 0; /* local disk size changed */
4030         enum dds_flags ddsf;
4031
4032         peer_device = conn_peer_device(connection, pi->vnr);
4033         if (!peer_device)
4034                 return config_unknown_volume(connection, pi);
4035         device = peer_device->device;
4036
4037         p_size = be64_to_cpu(p->d_size);
4038         p_usize = be64_to_cpu(p->u_size);
4039         p_csize = be64_to_cpu(p->c_size);
4040
4041         /* just store the peer's disk size for now.
4042          * we still need to figure out whether we accept that. */
4043         device->p_size = p_size;
4044
4045         if (get_ldev(device)) {
4046                 sector_t new_size, cur_size;
4047                 rcu_read_lock();
4048                 my_usize = rcu_dereference(device->ldev->disk_conf)->disk_size;
4049                 rcu_read_unlock();
4050
4051                 warn_if_differ_considerably(device, "lower level device sizes",
4052                            p_size, drbd_get_max_capacity(device->ldev));
4053                 warn_if_differ_considerably(device, "user requested size",
4054                                             p_usize, my_usize);
4055
4056                 /* if this is the first connect, or an otherwise expected
4057                  * param exchange, choose the minimum */
4058                 if (device->state.conn == C_WF_REPORT_PARAMS)
4059                         p_usize = min_not_zero(my_usize, p_usize);
4060
4061                 /* Never shrink a device with usable data during connect.
4062                    But allow online shrinking if we are connected. */
4063                 new_size = drbd_new_dev_size(device, device->ldev, p_usize, 0);
4064                 cur_size = drbd_get_capacity(device->this_bdev);
4065                 if (new_size < cur_size &&
4066                     device->state.disk >= D_OUTDATED &&
4067                     device->state.conn < C_CONNECTED) {
4068                         drbd_err(device, "The peer's disk size is too small! (%llu < %llu sectors)\n",
4069                                         (unsigned long long)new_size, (unsigned long long)cur_size);
4070                         conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4071                         put_ldev(device);
4072                         return -EIO;
4073                 }
4074
4075                 if (my_usize != p_usize) {
4076                         struct disk_conf *old_disk_conf, *new_disk_conf = NULL;
4077
4078                         new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
4079                         if (!new_disk_conf) {
4080                                 drbd_err(device, "Allocation of new disk_conf failed\n");
4081                                 put_ldev(device);
4082                                 return -ENOMEM;
4083                         }
4084
4085                         mutex_lock(&connection->resource->conf_update);
4086                         old_disk_conf = device->ldev->disk_conf;
4087                         *new_disk_conf = *old_disk_conf;
4088                         new_disk_conf->disk_size = p_usize;
4089
4090                         rcu_assign_pointer(device->ldev->disk_conf, new_disk_conf);
4091                         mutex_unlock(&connection->resource->conf_update);
4092                         synchronize_rcu();
4093                         kfree(old_disk_conf);
4094
4095                         drbd_info(device, "Peer sets u_size to %lu sectors\n",
4096                                  (unsigned long)my_usize);
4097                 }
4098
4099                 put_ldev(device);
4100         }
4101
4102         device->peer_max_bio_size = be32_to_cpu(p->max_bio_size);
4103         /* Leave drbd_reconsider_queue_parameters() before drbd_determine_dev_size().
4104            In case we cleared the QUEUE_FLAG_DISCARD from our queue in
4105            drbd_reconsider_queue_parameters(), we can be sure that after
4106            drbd_determine_dev_size() no REQ_DISCARDs are in the queue. */
4107
4108         ddsf = be16_to_cpu(p->dds_flags);
4109         if (get_ldev(device)) {
4110                 drbd_reconsider_queue_parameters(device, device->ldev, o);
4111                 dd = drbd_determine_dev_size(device, ddsf, NULL);
4112                 put_ldev(device);
4113                 if (dd == DS_ERROR)
4114                         return -EIO;
4115                 drbd_md_sync(device);
4116         } else {
4117                 /*
4118                  * I am diskless, need to accept the peer's *current* size.
4119                  * I must NOT accept the peers backing disk size,
4120                  * it may have been larger than mine all along...
4121                  *
4122                  * At this point, the peer knows more about my disk, or at
4123                  * least about what we last agreed upon, than myself.
4124                  * So if his c_size is less than his d_size, the most likely
4125                  * reason is that *my* d_size was smaller last time we checked.
4126                  *
4127                  * However, if he sends a zero current size,
4128                  * take his (user-capped or) backing disk size anyways.
4129                  */
4130                 drbd_reconsider_queue_parameters(device, NULL, o);
4131                 drbd_set_my_capacity(device, p_csize ?: p_usize ?: p_size);
4132         }
4133
4134         if (get_ldev(device)) {
4135                 if (device->ldev->known_size != drbd_get_capacity(device->ldev->backing_bdev)) {
4136                         device->ldev->known_size = drbd_get_capacity(device->ldev->backing_bdev);
4137                         ldsc = 1;
4138                 }
4139
4140                 put_ldev(device);
4141         }
4142
4143         if (device->state.conn > C_WF_REPORT_PARAMS) {
4144                 if (be64_to_cpu(p->c_size) !=
4145                     drbd_get_capacity(device->this_bdev) || ldsc) {
4146                         /* we have different sizes, probably peer
4147                          * needs to know my new size... */
4148                         drbd_send_sizes(peer_device, 0, ddsf);
4149                 }
4150                 if (test_and_clear_bit(RESIZE_PENDING, &device->flags) ||
4151                     (dd == DS_GREW && device->state.conn == C_CONNECTED)) {
4152                         if (device->state.pdsk >= D_INCONSISTENT &&
4153                             device->state.disk >= D_INCONSISTENT) {
4154                                 if (ddsf & DDSF_NO_RESYNC)
4155                                         drbd_info(device, "Resync of new storage suppressed with --assume-clean\n");
4156                                 else
4157                                         resync_after_online_grow(device);
4158                         } else
4159                                 set_bit(RESYNC_AFTER_NEG, &device->flags);
4160                 }
4161         }
4162
4163         return 0;
4164 }
4165
4166 static int receive_uuids(struct drbd_connection *connection, struct packet_info *pi)
4167 {
4168         struct drbd_peer_device *peer_device;
4169         struct drbd_device *device;
4170         struct p_uuids *p = pi->data;
4171         u64 *p_uuid;
4172         int i, updated_uuids = 0;
4173
4174         peer_device = conn_peer_device(connection, pi->vnr);
4175         if (!peer_device)
4176                 return config_unknown_volume(connection, pi);
4177         device = peer_device->device;
4178
4179         p_uuid = kmalloc(sizeof(u64)*UI_EXTENDED_SIZE, GFP_NOIO);
4180         if (!p_uuid) {
4181                 drbd_err(device, "kmalloc of p_uuid failed\n");
4182                 return false;
4183         }
4184
4185         for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++)
4186                 p_uuid[i] = be64_to_cpu(p->uuid[i]);
4187
4188         kfree(device->p_uuid);
4189         device->p_uuid = p_uuid;
4190
4191         if (device->state.conn < C_CONNECTED &&
4192             device->state.disk < D_INCONSISTENT &&
4193             device->state.role == R_PRIMARY &&
4194             (device->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) {
4195                 drbd_err(device, "Can only connect to data with current UUID=%016llX\n",
4196                     (unsigned long long)device->ed_uuid);
4197                 conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4198                 return -EIO;
4199         }
4200
4201         if (get_ldev(device)) {
4202                 int skip_initial_sync =
4203                         device->state.conn == C_CONNECTED &&
4204                         peer_device->connection->agreed_pro_version >= 90 &&
4205                         device->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED &&
4206                         (p_uuid[UI_FLAGS] & 8);
4207                 if (skip_initial_sync) {
4208                         drbd_info(device, "Accepted new current UUID, preparing to skip initial sync\n");
4209                         drbd_bitmap_io(device, &drbd_bmio_clear_n_write,
4210                                         "clear_n_write from receive_uuids",
4211                                         BM_LOCKED_TEST_ALLOWED);
4212                         _drbd_uuid_set(device, UI_CURRENT, p_uuid[UI_CURRENT]);
4213                         _drbd_uuid_set(device, UI_BITMAP, 0);
4214                         _drbd_set_state(_NS2(device, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE),
4215                                         CS_VERBOSE, NULL);
4216                         drbd_md_sync(device);
4217                         updated_uuids = 1;
4218                 }
4219                 put_ldev(device);
4220         } else if (device->state.disk < D_INCONSISTENT &&
4221                    device->state.role == R_PRIMARY) {
4222                 /* I am a diskless primary, the peer just created a new current UUID
4223                    for me. */
4224                 updated_uuids = drbd_set_ed_uuid(device, p_uuid[UI_CURRENT]);
4225         }
4226
4227         /* Before we test for the disk state, we should wait until an eventually
4228            ongoing cluster wide state change is finished. That is important if
4229            we are primary and are detaching from our disk. We need to see the
4230            new disk state... */
4231         mutex_lock(device->state_mutex);
4232         mutex_unlock(device->state_mutex);
4233         if (device->state.conn >= C_CONNECTED && device->state.disk < D_INCONSISTENT)
4234                 updated_uuids |= drbd_set_ed_uuid(device, p_uuid[UI_CURRENT]);
4235
4236         if (updated_uuids)
4237                 drbd_print_uuids(device, "receiver updated UUIDs to");
4238
4239         return 0;
4240 }
4241
4242 /**
4243  * convert_state() - Converts the peer's view of the cluster state to our point of view
4244  * @ps:         The state as seen by the peer.
4245  */
4246 static union drbd_state convert_state(union drbd_state ps)
4247 {
4248         union drbd_state ms;
4249
4250         static enum drbd_conns c_tab[] = {
4251                 [C_WF_REPORT_PARAMS] = C_WF_REPORT_PARAMS,
4252                 [C_CONNECTED] = C_CONNECTED,
4253
4254                 [C_STARTING_SYNC_S] = C_STARTING_SYNC_T,
4255                 [C_STARTING_SYNC_T] = C_STARTING_SYNC_S,
4256                 [C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */
4257                 [C_VERIFY_S]       = C_VERIFY_T,
4258                 [C_MASK]   = C_MASK,
4259         };
4260
4261         ms.i = ps.i;
4262
4263         ms.conn = c_tab[ps.conn];
4264         ms.peer = ps.role;
4265         ms.role = ps.peer;
4266         ms.pdsk = ps.disk;
4267         ms.disk = ps.pdsk;
4268         ms.peer_isp = (ps.aftr_isp | ps.user_isp);
4269
4270         return ms;
4271 }
4272
4273 static int receive_req_state(struct drbd_connection *connection, struct packet_info *pi)
4274 {
4275         struct drbd_peer_device *peer_device;
4276         struct drbd_device *device;
4277         struct p_req_state *p = pi->data;
4278         union drbd_state mask, val;
4279         enum drbd_state_rv rv;
4280
4281         peer_device = conn_peer_device(connection, pi->vnr);
4282         if (!peer_device)
4283                 return -EIO;
4284         device = peer_device->device;
4285
4286         mask.i = be32_to_cpu(p->mask);
4287         val.i = be32_to_cpu(p->val);
4288
4289         if (test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags) &&
4290             mutex_is_locked(device->state_mutex)) {
4291                 drbd_send_sr_reply(peer_device, SS_CONCURRENT_ST_CHG);
4292                 return 0;
4293         }
4294
4295         mask = convert_state(mask);
4296         val = convert_state(val);
4297
4298         rv = drbd_change_state(device, CS_VERBOSE, mask, val);
4299         drbd_send_sr_reply(peer_device, rv);
4300
4301         drbd_md_sync(device);
4302
4303         return 0;
4304 }
4305
4306 static int receive_req_conn_state(struct drbd_connection *connection, struct packet_info *pi)
4307 {
4308         struct p_req_state *p = pi->data;
4309         union drbd_state mask, val;
4310         enum drbd_state_rv rv;
4311
4312         mask.i = be32_to_cpu(p->mask);
4313         val.i = be32_to_cpu(p->val);
4314
4315         if (test_bit(RESOLVE_CONFLICTS, &connection->flags) &&
4316             mutex_is_locked(&connection->cstate_mutex)) {
4317                 conn_send_sr_reply(connection, SS_CONCURRENT_ST_CHG);
4318                 return 0;
4319         }
4320
4321         mask = convert_state(mask);
4322         val = convert_state(val);
4323
4324         rv = conn_request_state(connection, mask, val, CS_VERBOSE | CS_LOCAL_ONLY | CS_IGN_OUTD_FAIL);
4325         conn_send_sr_reply(connection, rv);
4326
4327         return 0;
4328 }
4329
4330 static int receive_state(struct drbd_connection *connection, struct packet_info *pi)
4331 {
4332         struct drbd_peer_device *peer_device;
4333         struct drbd_device *device;
4334         struct p_state *p = pi->data;
4335         union drbd_state os, ns, peer_state;
4336         enum drbd_disk_state real_peer_disk;
4337         enum chg_state_flags cs_flags;
4338         int rv;
4339
4340         peer_device = conn_peer_device(connection, pi->vnr);
4341         if (!peer_device)
4342                 return config_unknown_volume(connection, pi);
4343         device = peer_device->device;
4344
4345         peer_state.i = be32_to_cpu(p->state);
4346
4347         real_peer_disk = peer_state.disk;
4348         if (peer_state.disk == D_NEGOTIATING) {
4349                 real_peer_disk = device->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT;
4350                 drbd_info(device, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk));
4351         }
4352
4353         spin_lock_irq(&device->resource->req_lock);
4354  retry:
4355         os = ns = drbd_read_state(device);
4356         spin_unlock_irq(&device->resource->req_lock);
4357
4358         /* If some other part of the code (ack_receiver thread, timeout)
4359          * already decided to close the connection again,
4360          * we must not "re-establish" it here. */
4361         if (os.conn <= C_TEAR_DOWN)
4362                 return -ECONNRESET;
4363
4364         /* If this is the "end of sync" confirmation, usually the peer disk
4365          * transitions from D_INCONSISTENT to D_UP_TO_DATE. For empty (0 bits
4366          * set) resync started in PausedSyncT, or if the timing of pause-/
4367          * unpause-sync events has been "just right", the peer disk may
4368          * transition from D_CONSISTENT to D_UP_TO_DATE as well.
4369          */
4370         if ((os.pdsk == D_INCONSISTENT || os.pdsk == D_CONSISTENT) &&
4371             real_peer_disk == D_UP_TO_DATE &&
4372             os.conn > C_CONNECTED && os.disk == D_UP_TO_DATE) {
4373                 /* If we are (becoming) SyncSource, but peer is still in sync
4374                  * preparation, ignore its uptodate-ness to avoid flapping, it
4375                  * will change to inconsistent once the peer reaches active
4376                  * syncing states.
4377                  * It may have changed syncer-paused flags, however, so we
4378                  * cannot ignore this completely. */
4379                 if (peer_state.conn > C_CONNECTED &&
4380                     peer_state.conn < C_SYNC_SOURCE)
4381                         real_peer_disk = D_INCONSISTENT;
4382
4383                 /* if peer_state changes to connected at the same time,
4384                  * it explicitly notifies us that it finished resync.
4385                  * Maybe we should finish it up, too? */
4386                 else if (os.conn >= C_SYNC_SOURCE &&
4387                          peer_state.conn == C_CONNECTED) {
4388                         if (drbd_bm_total_weight(device) <= device->rs_failed)
4389                                 drbd_resync_finished(device);
4390                         return 0;
4391                 }
4392         }
4393
4394         /* explicit verify finished notification, stop sector reached. */
4395         if (os.conn == C_VERIFY_T && os.disk == D_UP_TO_DATE &&
4396             peer_state.conn == C_CONNECTED && real_peer_disk == D_UP_TO_DATE) {
4397                 ov_out_of_sync_print(device);
4398                 drbd_resync_finished(device);
4399                 return 0;
4400         }
4401
4402         /* peer says his disk is inconsistent, while we think it is uptodate,
4403          * and this happens while the peer still thinks we have a sync going on,
4404          * but we think we are already done with the sync.
4405          * We ignore this to avoid flapping pdsk.
4406          * This should not happen, if the peer is a recent version of drbd. */
4407         if (os.pdsk == D_UP_TO_DATE && real_peer_disk == D_INCONSISTENT &&
4408             os.conn == C_CONNECTED && peer_state.conn > C_SYNC_SOURCE)
4409                 real_peer_disk = D_UP_TO_DATE;
4410
4411         if (ns.conn == C_WF_REPORT_PARAMS)
4412                 ns.conn = C_CONNECTED;
4413
4414         if (peer_state.conn == C_AHEAD)
4415                 ns.conn = C_BEHIND;
4416
4417         if (device->p_uuid && peer_state.disk >= D_NEGOTIATING &&
4418             get_ldev_if_state(device, D_NEGOTIATING)) {
4419                 int cr; /* consider resync */
4420
4421                 /* if we established a new connection */
4422                 cr  = (os.conn < C_CONNECTED);
4423                 /* if we had an established connection
4424                  * and one of the nodes newly attaches a disk */
4425                 cr |= (os.conn == C_CONNECTED &&
4426                        (peer_state.disk == D_NEGOTIATING ||
4427                         os.disk == D_NEGOTIATING));
4428                 /* if we have both been inconsistent, and the peer has been
4429                  * forced to be UpToDate with --overwrite-data */
4430                 cr |= test_bit(CONSIDER_RESYNC, &device->flags);
4431                 /* if we had been plain connected, and the admin requested to
4432                  * start a sync by "invalidate" or "invalidate-remote" */
4433                 cr |= (os.conn == C_CONNECTED &&
4434                                 (peer_state.conn >= C_STARTING_SYNC_S &&
4435                                  peer_state.conn <= C_WF_BITMAP_T));
4436
4437                 if (cr)
4438                         ns.conn = drbd_sync_handshake(peer_device, peer_state.role, real_peer_disk);
4439
4440                 put_ldev(device);
4441                 if (ns.conn == C_MASK) {
4442                         ns.conn = C_CONNECTED;
4443                         if (device->state.disk == D_NEGOTIATING) {
4444                                 drbd_force_state(device, NS(disk, D_FAILED));
4445                         } else if (peer_state.disk == D_NEGOTIATING) {
4446                                 drbd_err(device, "Disk attach process on the peer node was aborted.\n");
4447                                 peer_state.disk = D_DISKLESS;
4448                                 real_peer_disk = D_DISKLESS;
4449                         } else {
4450                                 if (test_and_clear_bit(CONN_DRY_RUN, &peer_device->connection->flags))
4451                                         return -EIO;
4452                                 D_ASSERT(device, os.conn == C_WF_REPORT_PARAMS);
4453                                 conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4454                                 return -EIO;
4455                         }
4456                 }
4457         }
4458
4459         spin_lock_irq(&device->resource->req_lock);
4460         if (os.i != drbd_read_state(device).i)
4461                 goto retry;
4462         clear_bit(CONSIDER_RESYNC, &device->flags);
4463         ns.peer = peer_state.role;
4464         ns.pdsk = real_peer_disk;
4465         ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp);
4466         if ((ns.conn == C_CONNECTED || ns.conn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING)
4467                 ns.disk = device->new_state_tmp.disk;
4468         cs_flags = CS_VERBOSE + (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED ? 0 : CS_HARD);
4469         if (ns.pdsk == D_CONSISTENT && drbd_suspended(device) && ns.conn == C_CONNECTED && os.conn < C_CONNECTED &&
4470             test_bit(NEW_CUR_UUID, &device->flags)) {
4471                 /* Do not allow tl_restart(RESEND) for a rebooted peer. We can only allow this
4472                    for temporal network outages! */
4473                 spin_unlock_irq(&device->resource->req_lock);
4474                 drbd_err(device, "Aborting Connect, can not thaw IO with an only Consistent peer\n");
4475                 tl_clear(peer_device->connection);
4476                 drbd_uuid_new_current(device);
4477                 clear_bit(NEW_CUR_UUID, &device->flags);
4478                 conn_request_state(peer_device->connection, NS2(conn, C_PROTOCOL_ERROR, susp, 0), CS_HARD);
4479                 return -EIO;
4480         }
4481         rv = _drbd_set_state(device, ns, cs_flags, NULL);
4482         ns = drbd_read_state(device);
4483         spin_unlock_irq(&device->resource->req_lock);
4484
4485         if (rv < SS_SUCCESS) {
4486                 conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4487                 return -EIO;
4488         }
4489
4490         if (os.conn > C_WF_REPORT_PARAMS) {
4491                 if (ns.conn > C_CONNECTED && peer_state.conn <= C_CONNECTED &&
4492                     peer_state.disk != D_NEGOTIATING ) {
4493                         /* we want resync, peer has not yet decided to sync... */
4494                         /* Nowadays only used when forcing a node into primary role and
4495                            setting its disk to UpToDate with that */
4496                         drbd_send_uuids(peer_device);
4497                         drbd_send_current_state(peer_device);
4498                 }
4499         }
4500
4501         clear_bit(DISCARD_MY_DATA, &device->flags);
4502
4503         drbd_md_sync(device); /* update connected indicator, la_size_sect, ... */
4504
4505         return 0;
4506 }
4507
4508 static int receive_sync_uuid(struct drbd_connection *connection, struct packet_info *pi)
4509 {
4510         struct drbd_peer_device *peer_device;
4511         struct drbd_device *device;
4512         struct p_rs_uuid *p = pi->data;
4513
4514         peer_device = conn_peer_device(connection, pi->vnr);
4515         if (!peer_device)
4516                 return -EIO;
4517         device = peer_device->device;
4518
4519         wait_event(device->misc_wait,
4520                    device->state.conn == C_WF_SYNC_UUID ||
4521                    device->state.conn == C_BEHIND ||
4522                    device->state.conn < C_CONNECTED ||
4523                    device->state.disk < D_NEGOTIATING);
4524
4525         /* D_ASSERT(device,  device->state.conn == C_WF_SYNC_UUID ); */
4526
4527         /* Here the _drbd_uuid_ functions are right, current should
4528            _not_ be rotated into the history */
4529         if (get_ldev_if_state(device, D_NEGOTIATING)) {
4530                 _drbd_uuid_set(device, UI_CURRENT, be64_to_cpu(p->uuid));
4531                 _drbd_uuid_set(device, UI_BITMAP, 0UL);
4532
4533                 drbd_print_uuids(device, "updated sync uuid");
4534                 drbd_start_resync(device, C_SYNC_TARGET);
4535
4536                 put_ldev(device);
4537         } else
4538                 drbd_err(device, "Ignoring SyncUUID packet!\n");
4539
4540         return 0;
4541 }
4542
4543 /**
4544  * receive_bitmap_plain
4545  *
4546  * Return 0 when done, 1 when another iteration is needed, and a negative error
4547  * code upon failure.
4548  */
4549 static int
4550 receive_bitmap_plain(struct drbd_peer_device *peer_device, unsigned int size,
4551                      unsigned long *p, struct bm_xfer_ctx *c)
4552 {
4553         unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE -
4554                                  drbd_header_size(peer_device->connection);
4555         unsigned int num_words = min_t(size_t, data_size / sizeof(*p),
4556                                        c->bm_words - c->word_offset);
4557         unsigned int want = num_words * sizeof(*p);
4558         int err;
4559
4560         if (want != size) {
4561                 drbd_err(peer_device, "%s:want (%u) != size (%u)\n", __func__, want, size);
4562                 return -EIO;
4563         }
4564         if (want == 0)
4565                 return 0;
4566         err = drbd_recv_all(peer_device->connection, p, want);
4567         if (err)
4568                 return err;
4569
4570         drbd_bm_merge_lel(peer_device->device, c->word_offset, num_words, p);
4571
4572         c->word_offset += num_words;
4573         c->bit_offset = c->word_offset * BITS_PER_LONG;
4574         if (c->bit_offset > c->bm_bits)
4575                 c->bit_offset = c->bm_bits;
4576
4577         return 1;
4578 }
4579
4580 static enum drbd_bitmap_code dcbp_get_code(struct p_compressed_bm *p)
4581 {
4582         return (enum drbd_bitmap_code)(p->encoding & 0x0f);
4583 }
4584
4585 static int dcbp_get_start(struct p_compressed_bm *p)
4586 {
4587         return (p->encoding & 0x80) != 0;
4588 }
4589
4590 static int dcbp_get_pad_bits(struct p_compressed_bm *p)
4591 {
4592         return (p->encoding >> 4) & 0x7;
4593 }
4594
4595 /**
4596  * recv_bm_rle_bits
4597  *
4598  * Return 0 when done, 1 when another iteration is needed, and a negative error
4599  * code upon failure.
4600  */
4601 static int
4602 recv_bm_rle_bits(struct drbd_peer_device *peer_device,
4603                 struct p_compressed_bm *p,
4604                  struct bm_xfer_ctx *c,
4605                  unsigned int len)
4606 {
4607         struct bitstream bs;
4608         u64 look_ahead;
4609         u64 rl;
4610         u64 tmp;
4611         unsigned long s = c->bit_offset;
4612         unsigned long e;
4613         int toggle = dcbp_get_start(p);
4614         int have;
4615         int bits;
4616
4617         bitstream_init(&bs, p->code, len, dcbp_get_pad_bits(p));
4618
4619         bits = bitstream_get_bits(&bs, &look_ahead, 64);
4620         if (bits < 0)
4621                 return -EIO;
4622
4623         for (have = bits; have > 0; s += rl, toggle = !toggle) {
4624                 bits = vli_decode_bits(&rl, look_ahead);
4625                 if (bits <= 0)
4626                         return -EIO;
4627
4628                 if (toggle) {
4629                         e = s + rl -1;
4630                         if (e >= c->bm_bits) {
4631                                 drbd_err(peer_device, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e);
4632                                 return -EIO;
4633                         }
4634                         _drbd_bm_set_bits(peer_device->device, s, e);
4635                 }
4636
4637                 if (have < bits) {
4638                         drbd_err(peer_device, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n",
4639                                 have, bits, look_ahead,
4640                                 (unsigned int)(bs.cur.b - p->code),
4641                                 (unsigned int)bs.buf_len);
4642                         return -EIO;
4643                 }
4644                 /* if we consumed all 64 bits, assign 0; >> 64 is "undefined"; */
4645                 if (likely(bits < 64))
4646                         look_ahead >>= bits;
4647                 else
4648                         look_ahead = 0;
4649                 have -= bits;
4650
4651                 bits = bitstream_get_bits(&bs, &tmp, 64 - have);
4652                 if (bits < 0)
4653                         return -EIO;
4654                 look_ahead |= tmp << have;
4655                 have += bits;
4656         }
4657
4658         c->bit_offset = s;
4659         bm_xfer_ctx_bit_to_word_offset(c);
4660
4661         return (s != c->bm_bits);
4662 }
4663
4664 /**
4665  * decode_bitmap_c
4666  *
4667  * Return 0 when done, 1 when another iteration is needed, and a negative error
4668  * code upon failure.
4669  */
4670 static int
4671 decode_bitmap_c(struct drbd_peer_device *peer_device,
4672                 struct p_compressed_bm *p,
4673                 struct bm_xfer_ctx *c,
4674                 unsigned int len)
4675 {
4676         if (dcbp_get_code(p) == RLE_VLI_Bits)
4677                 return recv_bm_rle_bits(peer_device, p, c, len - sizeof(*p));
4678
4679         /* other variants had been implemented for evaluation,
4680          * but have been dropped as this one turned out to be "best"
4681          * during all our tests. */
4682
4683         drbd_err(peer_device, "receive_bitmap_c: unknown encoding %u\n", p->encoding);
4684         conn_request_state(peer_device->connection, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
4685         return -EIO;
4686 }
4687
4688 void INFO_bm_xfer_stats(struct drbd_device *device,
4689                 const char *direction, struct bm_xfer_ctx *c)
4690 {
4691         /* what would it take to transfer it "plaintext" */
4692         unsigned int header_size = drbd_header_size(first_peer_device(device)->connection);
4693         unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE - header_size;
4694         unsigned int plain =
4695                 header_size * (DIV_ROUND_UP(c->bm_words, data_size) + 1) +
4696                 c->bm_words * sizeof(unsigned long);
4697         unsigned int total = c->bytes[0] + c->bytes[1];
4698         unsigned int r;
4699
4700         /* total can not be zero. but just in case: */
4701         if (total == 0)
4702                 return;
4703
4704         /* don't report if not compressed */
4705         if (total >= plain)
4706                 return;
4707
4708         /* total < plain. check for overflow, still */
4709         r = (total > UINT_MAX/1000) ? (total / (plain/1000))
4710                                     : (1000 * total / plain);
4711
4712         if (r > 1000)
4713                 r = 1000;
4714
4715         r = 1000 - r;
4716         drbd_info(device, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), "
4717              "total %u; compression: %u.%u%%\n",
4718                         direction,
4719                         c->bytes[1], c->packets[1],
4720                         c->bytes[0], c->packets[0],
4721                         total, r/10, r % 10);
4722 }
4723
4724 /* Since we are processing the bitfield from lower addresses to higher,
4725    it does not matter if the process it in 32 bit chunks or 64 bit
4726    chunks as long as it is little endian. (Understand it as byte stream,
4727    beginning with the lowest byte...) If we would use big endian
4728    we would need to process it from the highest address to the lowest,
4729    in order to be agnostic to the 32 vs 64 bits issue.
4730
4731    returns 0 on failure, 1 if we successfully received it. */
4732 static int receive_bitmap(struct drbd_connection *connection, struct packet_info *pi)
4733 {
4734         struct drbd_peer_device *peer_device;
4735         struct drbd_device *device;
4736         struct bm_xfer_ctx c;
4737         int err;
4738
4739         peer_device = conn_peer_device(connection, pi->vnr);
4740         if (!peer_device)
4741                 return -EIO;
4742         device = peer_device->device;
4743
4744         drbd_bm_lock(device, "receive bitmap", BM_LOCKED_SET_ALLOWED);
4745         /* you are supposed to send additional out-of-sync information
4746          * if you actually set bits during this phase */
4747
4748         c = (struct bm_xfer_ctx) {
4749                 .bm_bits = drbd_bm_bits(device),
4750                 .bm_words = drbd_bm_words(device),
4751         };
4752
4753         for(;;) {
4754                 if (pi->cmd == P_BITMAP)
4755                         err = receive_bitmap_plain(peer_device, pi->size, pi->data, &c);
4756                 else if (pi->cmd == P_COMPRESSED_BITMAP) {
4757                         /* MAYBE: sanity check that we speak proto >= 90,
4758                          * and the feature is enabled! */
4759                         struct p_compressed_bm *p = pi->data;
4760
4761                         if (pi->size > DRBD_SOCKET_BUFFER_SIZE - drbd_header_size(connection)) {
4762                                 drbd_err(device, "ReportCBitmap packet too large\n");
4763                                 err = -EIO;
4764                                 goto out;
4765                         }
4766                         if (pi->size <= sizeof(*p)) {
4767                                 drbd_err(device, "ReportCBitmap packet too small (l:%u)\n", pi->size);
4768                                 err = -EIO;
4769                                 goto out;
4770                         }
4771                         err = drbd_recv_all(peer_device->connection, p, pi->size);
4772                         if (err)
4773                                goto out;
4774                         err = decode_bitmap_c(peer_device, p, &c, pi->size);
4775                 } else {
4776                         drbd_warn(device, "receive_bitmap: cmd neither ReportBitMap nor ReportCBitMap (is 0x%x)", pi->cmd);
4777                         err = -EIO;
4778                         goto out;
4779                 }
4780
4781                 c.packets[pi->cmd == P_BITMAP]++;
4782                 c.bytes[pi->cmd == P_BITMAP] += drbd_header_size(connection) + pi->size;
4783
4784                 if (err <= 0) {
4785                         if (err < 0)
4786                                 goto out;
4787                         break;
4788                 }
4789                 err = drbd_recv_header(peer_device->connection, pi);
4790                 if (err)
4791                         goto out;
4792         }
4793
4794         INFO_bm_xfer_stats(device, "receive", &c);
4795
4796         if (device->state.conn == C_WF_BITMAP_T) {
4797                 enum drbd_state_rv rv;
4798
4799                 err = drbd_send_bitmap(device);
4800                 if (err)
4801                         goto out;
4802                 /* Omit CS_ORDERED with this state transition to avoid deadlocks. */
4803                 rv = _drbd_request_state(device, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
4804                 D_ASSERT(device, rv == SS_SUCCESS);
4805         } else if (device->state.conn != C_WF_BITMAP_S) {
4806                 /* admin may have requested C_DISCONNECTING,
4807                  * other threads may have noticed network errors */
4808                 drbd_info(device, "unexpected cstate (%s) in receive_bitmap\n",
4809                     drbd_conn_str(device->state.conn));
4810         }
4811         err = 0;
4812
4813  out:
4814         drbd_bm_unlock(device);
4815         if (!err && device->state.conn == C_WF_BITMAP_S)
4816                 drbd_start_resync(device, C_SYNC_SOURCE);
4817         return err;
4818 }
4819
4820 static int receive_skip(struct drbd_connection *connection, struct packet_info *pi)
4821 {
4822         drbd_warn(connection, "skipping unknown optional packet type %d, l: %d!\n",
4823                  pi->cmd, pi->size);
4824
4825         return ignore_remaining_packet(connection, pi);
4826 }
4827
4828 static int receive_UnplugRemote(struct drbd_connection *connection, struct packet_info *pi)
4829 {
4830         /* Make sure we've acked all the TCP data associated
4831          * with the data requests being unplugged */
4832         drbd_tcp_quickack(connection->data.socket);
4833
4834         return 0;
4835 }
4836
4837 static int receive_out_of_sync(struct drbd_connection *connection, struct packet_info *pi)
4838 {
4839         struct drbd_peer_device *peer_device;
4840         struct drbd_device *device;
4841         struct p_block_desc *p = pi->data;
4842
4843         peer_device = conn_peer_device(connection, pi->vnr);
4844         if (!peer_device)
4845                 return -EIO;
4846         device = peer_device->device;
4847
4848         switch (device->state.conn) {
4849         case C_WF_SYNC_UUID:
4850         case C_WF_BITMAP_T:
4851         case C_BEHIND:
4852                         break;
4853         default:
4854                 drbd_err(device, "ASSERT FAILED cstate = %s, expected: WFSyncUUID|WFBitMapT|Behind\n",
4855                                 drbd_conn_str(device->state.conn));
4856         }
4857
4858         drbd_set_out_of_sync(device, be64_to_cpu(p->sector), be32_to_cpu(p->blksize));
4859
4860         return 0;
4861 }
4862
4863 static int receive_rs_deallocated(struct drbd_connection *connection, struct packet_info *pi)
4864 {
4865         struct drbd_peer_device *peer_device;
4866         struct p_block_desc *p = pi->data;
4867         struct drbd_device *device;
4868         sector_t sector;
4869         int size, err = 0;
4870
4871         peer_device = conn_peer_device(connection, pi->vnr);
4872         if (!peer_device)
4873                 return -EIO;
4874         device = peer_device->device;
4875
4876         sector = be64_to_cpu(p->sector);
4877         size = be32_to_cpu(p->blksize);
4878
4879         dec_rs_pending(device);
4880
4881         if (get_ldev(device)) {
4882                 struct drbd_peer_request *peer_req;
4883                 const int op = REQ_OP_DISCARD;
4884
4885                 peer_req = drbd_alloc_peer_req(peer_device, ID_SYNCER, sector,
4886                                                size, 0, GFP_NOIO);
4887                 if (!peer_req) {
4888                         put_ldev(device);
4889                         return -ENOMEM;
4890                 }
4891
4892                 peer_req->w.cb = e_end_resync_block;
4893                 peer_req->submit_jif = jiffies;
4894                 peer_req->flags |= EE_IS_TRIM;
4895
4896                 spin_lock_irq(&device->resource->req_lock);
4897                 list_add_tail(&peer_req->w.list, &device->sync_ee);
4898                 spin_unlock_irq(&device->resource->req_lock);
4899
4900                 atomic_add(pi->size >> 9, &device->rs_sect_ev);
4901                 err = drbd_submit_peer_request(device, peer_req, op, 0, DRBD_FAULT_RS_WR);
4902
4903                 if (err) {
4904                         spin_lock_irq(&device->resource->req_lock);
4905                         list_del(&peer_req->w.list);
4906                         spin_unlock_irq(&device->resource->req_lock);
4907
4908                         drbd_free_peer_req(device, peer_req);
4909                         put_ldev(device);
4910                         err = 0;
4911                         goto fail;
4912                 }
4913
4914                 inc_unacked(device);
4915
4916                 /* No put_ldev() here. Gets called in drbd_endio_write_sec_final(),
4917                    as well as drbd_rs_complete_io() */
4918         } else {
4919         fail:
4920                 drbd_rs_complete_io(device, sector);
4921                 drbd_send_ack_ex(peer_device, P_NEG_ACK, sector, size, ID_SYNCER);
4922         }
4923
4924         atomic_add(size >> 9, &device->rs_sect_in);
4925
4926         return err;
4927 }
4928
4929 struct data_cmd {
4930         int expect_payload;
4931         unsigned int pkt_size;
4932         int (*fn)(struct drbd_connection *, struct packet_info *);
4933 };
4934
4935 static struct data_cmd drbd_cmd_handler[] = {
4936         [P_DATA]            = { 1, sizeof(struct p_data), receive_Data },
4937         [P_DATA_REPLY]      = { 1, sizeof(struct p_data), receive_DataReply },
4938         [P_RS_DATA_REPLY]   = { 1, sizeof(struct p_data), receive_RSDataReply } ,
4939         [P_BARRIER]         = { 0, sizeof(struct p_barrier), receive_Barrier } ,
4940         [P_BITMAP]          = { 1, 0, receive_bitmap } ,
4941         [P_COMPRESSED_BITMAP] = { 1, 0, receive_bitmap } ,
4942         [P_UNPLUG_REMOTE]   = { 0, 0, receive_UnplugRemote },
4943         [P_DATA_REQUEST]    = { 0, sizeof(struct p_block_req), receive_DataRequest },
4944         [P_RS_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
4945         [P_SYNC_PARAM]      = { 1, 0, receive_SyncParam },
4946         [P_SYNC_PARAM89]    = { 1, 0, receive_SyncParam },
4947         [P_PROTOCOL]        = { 1, sizeof(struct p_protocol), receive_protocol },
4948         [P_UUIDS]           = { 0, sizeof(struct p_uuids), receive_uuids },
4949         [P_SIZES]           = { 0, sizeof(struct p_sizes), receive_sizes },
4950         [P_STATE]           = { 0, sizeof(struct p_state), receive_state },
4951         [P_STATE_CHG_REQ]   = { 0, sizeof(struct p_req_state), receive_req_state },
4952         [P_SYNC_UUID]       = { 0, sizeof(struct p_rs_uuid), receive_sync_uuid },
4953         [P_OV_REQUEST]      = { 0, sizeof(struct p_block_req), receive_DataRequest },
4954         [P_OV_REPLY]        = { 1, sizeof(struct p_block_req), receive_DataRequest },
4955         [P_CSUM_RS_REQUEST] = { 1, sizeof(struct p_block_req), receive_DataRequest },
4956         [P_RS_THIN_REQ]     = { 0, sizeof(struct p_block_req), receive_DataRequest },
4957         [P_DELAY_PROBE]     = { 0, sizeof(struct p_delay_probe93), receive_skip },
4958         [P_OUT_OF_SYNC]     = { 0, sizeof(struct p_block_desc), receive_out_of_sync },
4959         [P_CONN_ST_CHG_REQ] = { 0, sizeof(struct p_req_state), receive_req_conn_state },
4960         [P_PROTOCOL_UPDATE] = { 1, sizeof(struct p_protocol), receive_protocol },
4961         [P_TRIM]            = { 0, sizeof(struct p_trim), receive_Data },
4962         [P_RS_DEALLOCATED]  = { 0, sizeof(struct p_block_desc), receive_rs_deallocated },
4963         [P_WSAME]           = { 1, sizeof(struct p_wsame), receive_Data },
4964 };
4965
4966 static void drbdd(struct drbd_connection *connection)
4967 {
4968         struct packet_info pi;
4969         size_t shs; /* sub header size */
4970         int err;
4971
4972         while (get_t_state(&connection->receiver) == RUNNING) {
4973                 struct data_cmd const *cmd;
4974
4975                 drbd_thread_current_set_cpu(&connection->receiver);
4976                 update_receiver_timing_details(connection, drbd_recv_header);
4977                 if (drbd_recv_header(connection, &pi))
4978                         goto err_out;
4979
4980                 cmd = &drbd_cmd_handler[pi.cmd];
4981                 if (unlikely(pi.cmd >= ARRAY_SIZE(drbd_cmd_handler) || !cmd->fn)) {
4982                         drbd_err(connection, "Unexpected data packet %s (0x%04x)",
4983                                  cmdname(pi.cmd), pi.cmd);
4984                         goto err_out;
4985                 }
4986
4987                 shs = cmd->pkt_size;
4988                 if (pi.cmd == P_SIZES && connection->agreed_features & DRBD_FF_WSAME)
4989                         shs += sizeof(struct o_qlim);
4990                 if (pi.size > shs && !cmd->expect_payload) {
4991                         drbd_err(connection, "No payload expected %s l:%d\n",
4992                                  cmdname(pi.cmd), pi.size);
4993                         goto err_out;
4994                 }
4995                 if (pi.size < shs) {
4996                         drbd_err(connection, "%s: unexpected packet size, expected:%d received:%d\n",
4997                                  cmdname(pi.cmd), (int)shs, pi.size);
4998                         goto err_out;
4999                 }
5000
5001                 if (shs) {
5002                         update_receiver_timing_details(connection, drbd_recv_all_warn);
5003                         err = drbd_recv_all_warn(connection, pi.data, shs);
5004                         if (err)
5005                                 goto err_out;
5006                         pi.size -= shs;
5007                 }
5008
5009                 update_receiver_timing_details(connection, cmd->fn);
5010                 err = cmd->fn(connection, &pi);
5011                 if (err) {
5012                         drbd_err(connection, "error receiving %s, e: %d l: %d!\n",
5013                                  cmdname(pi.cmd), err, pi.size);
5014                         goto err_out;
5015                 }
5016         }
5017         return;
5018
5019     err_out:
5020         conn_request_state(connection, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
5021 }
5022
5023 static void conn_disconnect(struct drbd_connection *connection)
5024 {
5025         struct drbd_peer_device *peer_device;
5026         enum drbd_conns oc;
5027         int vnr;
5028
5029         if (connection->cstate == C_STANDALONE)
5030                 return;
5031
5032         /* We are about to start the cleanup after connection loss.
5033          * Make sure drbd_make_request knows about that.
5034          * Usually we should be in some network failure state already,
5035          * but just in case we are not, we fix it up here.
5036          */
5037         conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
5038
5039         /* ack_receiver does not clean up anything. it must not interfere, either */
5040         drbd_thread_stop(&connection->ack_receiver);
5041         if (connection->ack_sender) {
5042                 destroy_workqueue(connection->ack_sender);
5043                 connection->ack_sender = NULL;
5044         }
5045         drbd_free_sock(connection);
5046
5047         rcu_read_lock();
5048         idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
5049                 struct drbd_device *device = peer_device->device;
5050                 kref_get(&device->kref);
5051                 rcu_read_unlock();
5052                 drbd_disconnected(peer_device);
5053                 kref_put(&device->kref, drbd_destroy_device);
5054                 rcu_read_lock();
5055         }
5056         rcu_read_unlock();
5057
5058         if (!list_empty(&connection->current_epoch->list))
5059                 drbd_err(connection, "ASSERTION FAILED: connection->current_epoch->list not empty\n");
5060         /* ok, no more ee's on the fly, it is safe to reset the epoch_size */
5061         atomic_set(&connection->current_epoch->epoch_size, 0);
5062         connection->send.seen_any_write_yet = false;
5063
5064         drbd_info(connection, "Connection closed\n");
5065
5066         if (conn_highest_role(connection) == R_PRIMARY && conn_highest_pdsk(connection) >= D_UNKNOWN)
5067                 conn_try_outdate_peer_async(connection);
5068
5069         spin_lock_irq(&connection->resource->req_lock);
5070         oc = connection->cstate;
5071         if (oc >= C_UNCONNECTED)
5072                 _conn_request_state(connection, NS(conn, C_UNCONNECTED), CS_VERBOSE);
5073
5074         spin_unlock_irq(&connection->resource->req_lock);
5075
5076         if (oc == C_DISCONNECTING)
5077                 conn_request_state(connection, NS(conn, C_STANDALONE), CS_VERBOSE | CS_HARD);
5078 }
5079
5080 static int drbd_disconnected(struct drbd_peer_device *peer_device)
5081 {
5082         struct drbd_device *device = peer_device->device;
5083         unsigned int i;
5084
5085         /* wait for current activity to cease. */
5086         spin_lock_irq(&device->resource->req_lock);
5087         _drbd_wait_ee_list_empty(device, &device->active_ee);
5088         _drbd_wait_ee_list_empty(device, &device->sync_ee);
5089         _drbd_wait_ee_list_empty(device, &device->read_ee);
5090         spin_unlock_irq(&device->resource->req_lock);
5091
5092         /* We do not have data structures that would allow us to
5093          * get the rs_pending_cnt down to 0 again.
5094          *  * On C_SYNC_TARGET we do not have any data structures describing
5095          *    the pending RSDataRequest's we have sent.
5096          *  * On C_SYNC_SOURCE there is no data structure that tracks
5097          *    the P_RS_DATA_REPLY blocks that we sent to the SyncTarget.
5098          *  And no, it is not the sum of the reference counts in the
5099          *  resync_LRU. The resync_LRU tracks the whole operation including
5100          *  the disk-IO, while the rs_pending_cnt only tracks the blocks
5101          *  on the fly. */
5102         drbd_rs_cancel_all(device);
5103         device->rs_total = 0;
5104         device->rs_failed = 0;
5105         atomic_set(&device->rs_pending_cnt, 0);
5106         wake_up(&device->misc_wait);
5107
5108         del_timer_sync(&device->resync_timer);
5109         resync_timer_fn((unsigned long)device);
5110
5111         /* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,
5112          * w_make_resync_request etc. which may still be on the worker queue
5113          * to be "canceled" */
5114         drbd_flush_workqueue(&peer_device->connection->sender_work);
5115
5116         drbd_finish_peer_reqs(device);
5117
5118         /* This second workqueue flush is necessary, since drbd_finish_peer_reqs()
5119            might have issued a work again. The one before drbd_finish_peer_reqs() is
5120            necessary to reclain net_ee in drbd_finish_peer_reqs(). */
5121         drbd_flush_workqueue(&peer_device->connection->sender_work);
5122
5123         /* need to do it again, drbd_finish_peer_reqs() may have populated it
5124          * again via drbd_try_clear_on_disk_bm(). */
5125         drbd_rs_cancel_all(device);
5126
5127         kfree(device->p_uuid);
5128         device->p_uuid = NULL;
5129
5130         if (!drbd_suspended(device))
5131                 tl_clear(peer_device->connection);
5132
5133         drbd_md_sync(device);
5134
5135         if (get_ldev(device)) {
5136                 drbd_bitmap_io(device, &drbd_bm_write_copy_pages,
5137                                 "write from disconnected", BM_LOCKED_CHANGE_ALLOWED);
5138                 put_ldev(device);
5139         }
5140
5141         /* tcp_close and release of sendpage pages can be deferred.  I don't
5142          * want to use SO_LINGER, because apparently it can be deferred for
5143          * more than 20 seconds (longest time I checked).
5144          *
5145          * Actually we don't care for exactly when the network stack does its
5146          * put_page(), but release our reference on these pages right here.
5147          */
5148         i = drbd_free_peer_reqs(device, &device->net_ee);
5149         if (i)
5150                 drbd_info(device, "net_ee not empty, killed %u entries\n", i);
5151         i = atomic_read(&device->pp_in_use_by_net);
5152         if (i)
5153                 drbd_info(device, "pp_in_use_by_net = %d, expected 0\n", i);
5154         i = atomic_read(&device->pp_in_use);
5155         if (i)
5156                 drbd_info(device, "pp_in_use = %d, expected 0\n", i);
5157
5158         D_ASSERT(device, list_empty(&device->read_ee));
5159         D_ASSERT(device, list_empty(&device->active_ee));
5160         D_ASSERT(device, list_empty(&device->sync_ee));
5161         D_ASSERT(device, list_empty(&device->done_ee));
5162
5163         return 0;
5164 }
5165
5166 /*
5167  * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version
5168  * we can agree on is stored in agreed_pro_version.
5169  *
5170  * feature flags and the reserved array should be enough room for future
5171  * enhancements of the handshake protocol, and possible plugins...
5172  *
5173  * for now, they are expected to be zero, but ignored.
5174  */
5175 static int drbd_send_features(struct drbd_connection *connection)
5176 {
5177         struct drbd_socket *sock;
5178         struct p_connection_features *p;
5179
5180         sock = &connection->data;
5181         p = conn_prepare_command(connection, sock);
5182         if (!p)
5183                 return -EIO;
5184         memset(p, 0, sizeof(*p));
5185         p->protocol_min = cpu_to_be32(PRO_VERSION_MIN);
5186         p->protocol_max = cpu_to_be32(PRO_VERSION_MAX);
5187         p->feature_flags = cpu_to_be32(PRO_FEATURES);
5188         return conn_send_command(connection, sock, P_CONNECTION_FEATURES, sizeof(*p), NULL, 0);
5189 }
5190
5191 /*
5192  * return values:
5193  *   1 yes, we have a valid connection
5194  *   0 oops, did not work out, please try again
5195  *  -1 peer talks different language,
5196  *     no point in trying again, please go standalone.
5197  */
5198 static int drbd_do_features(struct drbd_connection *connection)
5199 {
5200         /* ASSERT current == connection->receiver ... */
5201         struct p_connection_features *p;
5202         const int expect = sizeof(struct p_connection_features);
5203         struct packet_info pi;
5204         int err;
5205
5206         err = drbd_send_features(connection);
5207         if (err)
5208                 return 0;
5209
5210         err = drbd_recv_header(connection, &pi);
5211         if (err)
5212                 return 0;
5213
5214         if (pi.cmd != P_CONNECTION_FEATURES) {
5215                 drbd_err(connection, "expected ConnectionFeatures packet, received: %s (0x%04x)\n",
5216                          cmdname(pi.cmd), pi.cmd);
5217                 return -1;
5218         }
5219
5220         if (pi.size != expect) {
5221                 drbd_err(connection, "expected ConnectionFeatures length: %u, received: %u\n",
5222                      expect, pi.size);
5223                 return -1;
5224         }
5225
5226         p = pi.data;
5227         err = drbd_recv_all_warn(connection, p, expect);
5228         if (err)
5229                 return 0;
5230
5231         p->protocol_min = be32_to_cpu(p->protocol_min);
5232         p->protocol_max = be32_to_cpu(p->protocol_max);
5233         if (p->protocol_max == 0)
5234                 p->protocol_max = p->protocol_min;
5235
5236         if (PRO_VERSION_MAX < p->protocol_min ||
5237             PRO_VERSION_MIN > p->protocol_max)
5238                 goto incompat;
5239
5240         connection->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max);
5241         connection->agreed_features = PRO_FEATURES & be32_to_cpu(p->feature_flags);
5242
5243         drbd_info(connection, "Handshake successful: "
5244              "Agreed network protocol version %d\n", connection->agreed_pro_version);
5245
5246         drbd_info(connection, "Feature flags enabled on protocol level: 0x%x%s%s%s.\n",
5247                   connection->agreed_features,
5248                   connection->agreed_features & DRBD_FF_TRIM ? " TRIM" : "",
5249                   connection->agreed_features & DRBD_FF_THIN_RESYNC ? " THIN_RESYNC" : "",
5250                   connection->agreed_features & DRBD_FF_WSAME ? " WRITE_SAME" :
5251                   connection->agreed_features ? "" : " none");
5252
5253         return 1;
5254
5255  incompat:
5256         drbd_err(connection, "incompatible DRBD dialects: "
5257             "I support %d-%d, peer supports %d-%d\n",
5258             PRO_VERSION_MIN, PRO_VERSION_MAX,
5259             p->protocol_min, p->protocol_max);
5260         return -1;
5261 }
5262
5263 #if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE)
5264 static int drbd_do_auth(struct drbd_connection *connection)
5265 {
5266         drbd_err(connection, "This kernel was build without CONFIG_CRYPTO_HMAC.\n");
5267         drbd_err(connection, "You need to disable 'cram-hmac-alg' in drbd.conf.\n");
5268         return -1;
5269 }
5270 #else
5271 #define CHALLENGE_LEN 64
5272
5273 /* Return value:
5274         1 - auth succeeded,
5275         0 - failed, try again (network error),
5276         -1 - auth failed, don't try again.
5277 */
5278
5279 static int drbd_do_auth(struct drbd_connection *connection)
5280 {
5281         struct drbd_socket *sock;
5282         char my_challenge[CHALLENGE_LEN];  /* 64 Bytes... */
5283         char *response = NULL;
5284         char *right_response = NULL;
5285         char *peers_ch = NULL;
5286         unsigned int key_len;
5287         char secret[SHARED_SECRET_MAX]; /* 64 byte */
5288         unsigned int resp_size;
5289         SHASH_DESC_ON_STACK(desc, connection->cram_hmac_tfm);
5290         struct packet_info pi;
5291         struct net_conf *nc;
5292         int err, rv;
5293
5294         /* FIXME: Put the challenge/response into the preallocated socket buffer.  */
5295
5296         rcu_read_lock();
5297         nc = rcu_dereference(connection->net_conf);
5298         key_len = strlen(nc->shared_secret);
5299         memcpy(secret, nc->shared_secret, key_len);
5300         rcu_read_unlock();
5301
5302         desc->tfm = connection->cram_hmac_tfm;
5303         desc->flags = 0;
5304
5305         rv = crypto_shash_setkey(connection->cram_hmac_tfm, (u8 *)secret, key_len);
5306         if (rv) {
5307                 drbd_err(connection, "crypto_shash_setkey() failed with %d\n", rv);
5308                 rv = -1;
5309                 goto fail;
5310         }
5311
5312         get_random_bytes(my_challenge, CHALLENGE_LEN);
5313
5314         sock = &connection->data;
5315         if (!conn_prepare_command(connection, sock)) {
5316                 rv = 0;
5317                 goto fail;
5318         }
5319         rv = !conn_send_command(connection, sock, P_AUTH_CHALLENGE, 0,
5320                                 my_challenge, CHALLENGE_LEN);
5321         if (!rv)
5322                 goto fail;
5323
5324         err = drbd_recv_header(connection, &pi);
5325         if (err) {
5326                 rv = 0;
5327                 goto fail;
5328         }
5329
5330         if (pi.cmd != P_AUTH_CHALLENGE) {
5331                 drbd_err(connection, "expected AuthChallenge packet, received: %s (0x%04x)\n",
5332                          cmdname(pi.cmd), pi.cmd);
5333                 rv = 0;
5334                 goto fail;
5335         }
5336
5337         if (pi.size > CHALLENGE_LEN * 2) {
5338                 drbd_err(connection, "expected AuthChallenge payload too big.\n");
5339                 rv = -1;
5340                 goto fail;
5341         }
5342
5343         if (pi.size < CHALLENGE_LEN) {
5344                 drbd_err(connection, "AuthChallenge payload too small.\n");
5345                 rv = -1;
5346                 goto fail;
5347         }
5348
5349         peers_ch = kmalloc(pi.size, GFP_NOIO);
5350         if (peers_ch == NULL) {
5351                 drbd_err(connection, "kmalloc of peers_ch failed\n");
5352                 rv = -1;
5353                 goto fail;
5354         }
5355
5356         err = drbd_recv_all_warn(connection, peers_ch, pi.size);
5357         if (err) {
5358                 rv = 0;
5359                 goto fail;
5360         }
5361
5362         if (!memcmp(my_challenge, peers_ch, CHALLENGE_LEN)) {
5363                 drbd_err(connection, "Peer presented the same challenge!\n");
5364                 rv = -1;
5365                 goto fail;
5366         }
5367
5368         resp_size = crypto_shash_digestsize(connection->cram_hmac_tfm);
5369         response = kmalloc(resp_size, GFP_NOIO);
5370         if (response == NULL) {
5371                 drbd_err(connection, "kmalloc of response failed\n");
5372                 rv = -1;
5373                 goto fail;
5374         }
5375
5376         rv = crypto_shash_digest(desc, peers_ch, pi.size, response);
5377         if (rv) {
5378                 drbd_err(connection, "crypto_hash_digest() failed with %d\n", rv);
5379                 rv = -1;
5380                 goto fail;
5381         }
5382
5383         if (!conn_prepare_command(connection, sock)) {
5384                 rv = 0;
5385                 goto fail;
5386         }
5387         rv = !conn_send_command(connection, sock, P_AUTH_RESPONSE, 0,
5388                                 response, resp_size);
5389         if (!rv)
5390                 goto fail;
5391
5392         err = drbd_recv_header(connection, &pi);
5393         if (err) {
5394                 rv = 0;
5395                 goto fail;
5396         }
5397
5398         if (pi.cmd != P_AUTH_RESPONSE) {
5399                 drbd_err(connection, "expected AuthResponse packet, received: %s (0x%04x)\n",
5400                          cmdname(pi.cmd), pi.cmd);
5401                 rv = 0;
5402                 goto fail;
5403         }
5404
5405         if (pi.size != resp_size) {
5406                 drbd_err(connection, "expected AuthResponse payload of wrong size\n");
5407                 rv = 0;
5408                 goto fail;
5409         }
5410
5411         err = drbd_recv_all_warn(connection, response , resp_size);
5412         if (err) {
5413                 rv = 0;
5414                 goto fail;
5415         }
5416
5417         right_response = kmalloc(resp_size, GFP_NOIO);
5418         if (right_response == NULL) {
5419                 drbd_err(connection, "kmalloc of right_response failed\n");
5420                 rv = -1;
5421                 goto fail;
5422         }
5423
5424         rv = crypto_shash_digest(desc, my_challenge, CHALLENGE_LEN,
5425                                  right_response);
5426         if (rv) {
5427                 drbd_err(connection, "crypto_hash_digest() failed with %d\n", rv);
5428                 rv = -1;
5429                 goto fail;
5430         }
5431
5432         rv = !memcmp(response, right_response, resp_size);
5433
5434         if (rv)
5435                 drbd_info(connection, "Peer authenticated using %d bytes HMAC\n",
5436                      resp_size);
5437         else
5438                 rv = -1;
5439
5440  fail:
5441         kfree(peers_ch);
5442         kfree(response);
5443         kfree(right_response);
5444         shash_desc_zero(desc);
5445
5446         return rv;
5447 }
5448 #endif
5449
5450 int drbd_receiver(struct drbd_thread *thi)
5451 {
5452         struct drbd_connection *connection = thi->connection;
5453         int h;
5454
5455         drbd_info(connection, "receiver (re)started\n");
5456
5457         do {
5458                 h = conn_connect(connection);
5459                 if (h == 0) {
5460                         conn_disconnect(connection);
5461                         schedule_timeout_interruptible(HZ);
5462                 }
5463                 if (h == -1) {
5464                         drbd_warn(connection, "Discarding network configuration.\n");
5465                         conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
5466                 }
5467         } while (h == 0);
5468
5469         if (h > 0)
5470                 drbdd(connection);
5471
5472         conn_disconnect(connection);
5473
5474         drbd_info(connection, "receiver terminated\n");
5475         return 0;
5476 }
5477
5478 /* ********* acknowledge sender ******** */
5479
5480 static int got_conn_RqSReply(struct drbd_connection *connection, struct packet_info *pi)
5481 {
5482         struct p_req_state_reply *p = pi->data;
5483         int retcode = be32_to_cpu(p->retcode);
5484
5485         if (retcode >= SS_SUCCESS) {
5486                 set_bit(CONN_WD_ST_CHG_OKAY, &connection->flags);
5487         } else {
5488                 set_bit(CONN_WD_ST_CHG_FAIL, &connection->flags);
5489                 drbd_err(connection, "Requested state change failed by peer: %s (%d)\n",
5490                          drbd_set_st_err_str(retcode), retcode);
5491         }
5492         wake_up(&connection->ping_wait);
5493
5494         return 0;
5495 }
5496
5497 static int got_RqSReply(struct drbd_connection *connection, struct packet_info *pi)
5498 {
5499         struct drbd_peer_device *peer_device;
5500         struct drbd_device *device;
5501         struct p_req_state_reply *p = pi->data;
5502         int retcode = be32_to_cpu(p->retcode);
5503
5504         peer_device = conn_peer_device(connection, pi->vnr);
5505         if (!peer_device)
5506                 return -EIO;
5507         device = peer_device->device;
5508
5509         if (test_bit(CONN_WD_ST_CHG_REQ, &connection->flags)) {
5510                 D_ASSERT(device, connection->agreed_pro_version < 100);
5511                 return got_conn_RqSReply(connection, pi);
5512         }
5513
5514         if (retcode >= SS_SUCCESS) {
5515                 set_bit(CL_ST_CHG_SUCCESS, &device->flags);
5516         } else {
5517                 set_bit(CL_ST_CHG_FAIL, &device->flags);
5518                 drbd_err(device, "Requested state change failed by peer: %s (%d)\n",
5519                         drbd_set_st_err_str(retcode), retcode);
5520         }
5521         wake_up(&device->state_wait);
5522
5523         return 0;
5524 }
5525
5526 static int got_Ping(struct drbd_connection *connection, struct packet_info *pi)
5527 {
5528         return drbd_send_ping_ack(connection);
5529
5530 }
5531
5532 static int got_PingAck(struct drbd_connection *connection, struct packet_info *pi)
5533 {
5534         /* restore idle timeout */
5535         connection->meta.socket->sk->sk_rcvtimeo = connection->net_conf->ping_int*HZ;
5536         if (!test_and_set_bit(GOT_PING_ACK, &connection->flags))
5537                 wake_up(&connection->ping_wait);
5538
5539         return 0;
5540 }
5541
5542 static int got_IsInSync(struct drbd_connection *connection, struct packet_info *pi)
5543 {
5544         struct drbd_peer_device *peer_device;
5545         struct drbd_device *device;
5546         struct p_block_ack *p = pi->data;
5547         sector_t sector = be64_to_cpu(p->sector);
5548         int blksize = be32_to_cpu(p->blksize);
5549
5550         peer_device = conn_peer_device(connection, pi->vnr);
5551         if (!peer_device)
5552                 return -EIO;
5553         device = peer_device->device;
5554
5555         D_ASSERT(device, peer_device->connection->agreed_pro_version >= 89);
5556
5557         update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5558
5559         if (get_ldev(device)) {
5560                 drbd_rs_complete_io(device, sector);
5561                 drbd_set_in_sync(device, sector, blksize);
5562                 /* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
5563                 device->rs_same_csum += (blksize >> BM_BLOCK_SHIFT);
5564                 put_ldev(device);
5565         }
5566         dec_rs_pending(device);
5567         atomic_add(blksize >> 9, &device->rs_sect_in);
5568
5569         return 0;
5570 }
5571
5572 static int
5573 validate_req_change_req_state(struct drbd_device *device, u64 id, sector_t sector,
5574                               struct rb_root *root, const char *func,
5575                               enum drbd_req_event what, bool missing_ok)
5576 {
5577         struct drbd_request *req;
5578         struct bio_and_error m;
5579
5580         spin_lock_irq(&device->resource->req_lock);
5581         req = find_request(device, root, id, sector, missing_ok, func);
5582         if (unlikely(!req)) {
5583                 spin_unlock_irq(&device->resource->req_lock);
5584                 return -EIO;
5585         }
5586         __req_mod(req, what, &m);
5587         spin_unlock_irq(&device->resource->req_lock);
5588
5589         if (m.bio)
5590                 complete_master_bio(device, &m);
5591         return 0;
5592 }
5593
5594 static int got_BlockAck(struct drbd_connection *connection, struct packet_info *pi)
5595 {
5596         struct drbd_peer_device *peer_device;
5597         struct drbd_device *device;
5598         struct p_block_ack *p = pi->data;
5599         sector_t sector = be64_to_cpu(p->sector);
5600         int blksize = be32_to_cpu(p->blksize);
5601         enum drbd_req_event what;
5602
5603         peer_device = conn_peer_device(connection, pi->vnr);
5604         if (!peer_device)
5605                 return -EIO;
5606         device = peer_device->device;
5607
5608         update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5609
5610         if (p->block_id == ID_SYNCER) {
5611                 drbd_set_in_sync(device, sector, blksize);
5612                 dec_rs_pending(device);
5613                 return 0;
5614         }
5615         switch (pi->cmd) {
5616         case P_RS_WRITE_ACK:
5617                 what = WRITE_ACKED_BY_PEER_AND_SIS;
5618                 break;
5619         case P_WRITE_ACK:
5620                 what = WRITE_ACKED_BY_PEER;
5621                 break;
5622         case P_RECV_ACK:
5623                 what = RECV_ACKED_BY_PEER;
5624                 break;
5625         case P_SUPERSEDED:
5626                 what = CONFLICT_RESOLVED;
5627                 break;
5628         case P_RETRY_WRITE:
5629                 what = POSTPONE_WRITE;
5630                 break;
5631         default:
5632                 BUG();
5633         }
5634
5635         return validate_req_change_req_state(device, p->block_id, sector,
5636                                              &device->write_requests, __func__,
5637                                              what, false);
5638 }
5639
5640 static int got_NegAck(struct drbd_connection *connection, struct packet_info *pi)
5641 {
5642         struct drbd_peer_device *peer_device;
5643         struct drbd_device *device;
5644         struct p_block_ack *p = pi->data;
5645         sector_t sector = be64_to_cpu(p->sector);
5646         int size = be32_to_cpu(p->blksize);
5647         int err;
5648
5649         peer_device = conn_peer_device(connection, pi->vnr);
5650         if (!peer_device)
5651                 return -EIO;
5652         device = peer_device->device;
5653
5654         update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5655
5656         if (p->block_id == ID_SYNCER) {
5657                 dec_rs_pending(device);
5658                 drbd_rs_failed_io(device, sector, size);
5659                 return 0;
5660         }
5661
5662         err = validate_req_change_req_state(device, p->block_id, sector,
5663                                             &device->write_requests, __func__,
5664                                             NEG_ACKED, true);
5665         if (err) {
5666                 /* Protocol A has no P_WRITE_ACKs, but has P_NEG_ACKs.
5667                    The master bio might already be completed, therefore the
5668                    request is no longer in the collision hash. */
5669                 /* In Protocol B we might already have got a P_RECV_ACK
5670                    but then get a P_NEG_ACK afterwards. */
5671                 drbd_set_out_of_sync(device, sector, size);
5672         }
5673         return 0;
5674 }
5675
5676 static int got_NegDReply(struct drbd_connection *connection, struct packet_info *pi)
5677 {
5678         struct drbd_peer_device *peer_device;
5679         struct drbd_device *device;
5680         struct p_block_ack *p = pi->data;
5681         sector_t sector = be64_to_cpu(p->sector);
5682
5683         peer_device = conn_peer_device(connection, pi->vnr);
5684         if (!peer_device)
5685                 return -EIO;
5686         device = peer_device->device;
5687
5688         update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5689
5690         drbd_err(device, "Got NegDReply; Sector %llus, len %u.\n",
5691             (unsigned long long)sector, be32_to_cpu(p->blksize));
5692
5693         return validate_req_change_req_state(device, p->block_id, sector,
5694                                              &device->read_requests, __func__,
5695                                              NEG_ACKED, false);
5696 }
5697
5698 static int got_NegRSDReply(struct drbd_connection *connection, struct packet_info *pi)
5699 {
5700         struct drbd_peer_device *peer_device;
5701         struct drbd_device *device;
5702         sector_t sector;
5703         int size;
5704         struct p_block_ack *p = pi->data;
5705
5706         peer_device = conn_peer_device(connection, pi->vnr);
5707         if (!peer_device)
5708                 return -EIO;
5709         device = peer_device->device;
5710
5711         sector = be64_to_cpu(p->sector);
5712         size = be32_to_cpu(p->blksize);
5713
5714         update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5715
5716         dec_rs_pending(device);
5717
5718         if (get_ldev_if_state(device, D_FAILED)) {
5719                 drbd_rs_complete_io(device, sector);
5720                 switch (pi->cmd) {
5721                 case P_NEG_RS_DREPLY:
5722                         drbd_rs_failed_io(device, sector, size);
5723                 case P_RS_CANCEL:
5724                         break;
5725                 default:
5726                         BUG();
5727                 }
5728                 put_ldev(device);
5729         }
5730
5731         return 0;
5732 }
5733
5734 static int got_BarrierAck(struct drbd_connection *connection, struct packet_info *pi)
5735 {
5736         struct p_barrier_ack *p = pi->data;
5737         struct drbd_peer_device *peer_device;
5738         int vnr;
5739
5740         tl_release(connection, p->barrier, be32_to_cpu(p->set_size));
5741
5742         rcu_read_lock();
5743         idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
5744                 struct drbd_device *device = peer_device->device;
5745
5746                 if (device->state.conn == C_AHEAD &&
5747                     atomic_read(&device->ap_in_flight) == 0 &&
5748                     !test_and_set_bit(AHEAD_TO_SYNC_SOURCE, &device->flags)) {
5749                         device->start_resync_timer.expires = jiffies + HZ;
5750                         add_timer(&device->start_resync_timer);
5751                 }
5752         }
5753         rcu_read_unlock();
5754
5755         return 0;
5756 }
5757
5758 static int got_OVResult(struct drbd_connection *connection, struct packet_info *pi)
5759 {
5760         struct drbd_peer_device *peer_device;
5761         struct drbd_device *device;
5762         struct p_block_ack *p = pi->data;
5763         struct drbd_device_work *dw;
5764         sector_t sector;
5765         int size;
5766
5767         peer_device = conn_peer_device(connection, pi->vnr);
5768         if (!peer_device)
5769                 return -EIO;
5770         device = peer_device->device;
5771
5772         sector = be64_to_cpu(p->sector);
5773         size = be32_to_cpu(p->blksize);
5774
5775         update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5776
5777         if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC)
5778                 drbd_ov_out_of_sync_found(device, sector, size);
5779         else
5780                 ov_out_of_sync_print(device);
5781
5782         if (!get_ldev(device))
5783                 return 0;
5784
5785         drbd_rs_complete_io(device, sector);
5786         dec_rs_pending(device);
5787
5788         --device->ov_left;
5789
5790         /* let's advance progress step marks only for every other megabyte */
5791         if ((device->ov_left & 0x200) == 0x200)
5792                 drbd_advance_rs_marks(device, device->ov_left);
5793
5794         if (device->ov_left == 0) {
5795                 dw = kmalloc(sizeof(*dw), GFP_NOIO);
5796                 if (dw) {
5797                         dw->w.cb = w_ov_finished;
5798                         dw->device = device;
5799                         drbd_queue_work(&peer_device->connection->sender_work, &dw->w);
5800                 } else {
5801                         drbd_err(device, "kmalloc(dw) failed.");
5802                         ov_out_of_sync_print(device);
5803                         drbd_resync_finished(device);
5804                 }
5805         }
5806         put_ldev(device);
5807         return 0;
5808 }
5809
5810 static int got_skip(struct drbd_connection *connection, struct packet_info *pi)
5811 {
5812         return 0;
5813 }
5814
5815 struct meta_sock_cmd {
5816         size_t pkt_size;
5817         int (*fn)(struct drbd_connection *connection, struct packet_info *);
5818 };
5819
5820 static void set_rcvtimeo(struct drbd_connection *connection, bool ping_timeout)
5821 {
5822         long t;
5823         struct net_conf *nc;
5824
5825         rcu_read_lock();
5826         nc = rcu_dereference(connection->net_conf);
5827         t = ping_timeout ? nc->ping_timeo : nc->ping_int;
5828         rcu_read_unlock();
5829
5830         t *= HZ;
5831         if (ping_timeout)
5832                 t /= 10;
5833
5834         connection->meta.socket->sk->sk_rcvtimeo = t;
5835 }
5836
5837 static void set_ping_timeout(struct drbd_connection *connection)
5838 {
5839         set_rcvtimeo(connection, 1);
5840 }
5841
5842 static void set_idle_timeout(struct drbd_connection *connection)
5843 {
5844         set_rcvtimeo(connection, 0);
5845 }
5846
5847 static struct meta_sock_cmd ack_receiver_tbl[] = {
5848         [P_PING]            = { 0, got_Ping },
5849         [P_PING_ACK]        = { 0, got_PingAck },
5850         [P_RECV_ACK]        = { sizeof(struct p_block_ack), got_BlockAck },
5851         [P_WRITE_ACK]       = { sizeof(struct p_block_ack), got_BlockAck },
5852         [P_RS_WRITE_ACK]    = { sizeof(struct p_block_ack), got_BlockAck },
5853         [P_SUPERSEDED]   = { sizeof(struct p_block_ack), got_BlockAck },
5854         [P_NEG_ACK]         = { sizeof(struct p_block_ack), got_NegAck },
5855         [P_NEG_DREPLY]      = { sizeof(struct p_block_ack), got_NegDReply },
5856         [P_NEG_RS_DREPLY]   = { sizeof(struct p_block_ack), got_NegRSDReply },
5857         [P_OV_RESULT]       = { sizeof(struct p_block_ack), got_OVResult },
5858         [P_BARRIER_ACK]     = { sizeof(struct p_barrier_ack), got_BarrierAck },
5859         [P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply },
5860         [P_RS_IS_IN_SYNC]   = { sizeof(struct p_block_ack), got_IsInSync },
5861         [P_DELAY_PROBE]     = { sizeof(struct p_delay_probe93), got_skip },
5862         [P_RS_CANCEL]       = { sizeof(struct p_block_ack), got_NegRSDReply },
5863         [P_CONN_ST_CHG_REPLY]={ sizeof(struct p_req_state_reply), got_conn_RqSReply },
5864         [P_RETRY_WRITE]     = { sizeof(struct p_block_ack), got_BlockAck },
5865 };
5866
5867 int drbd_ack_receiver(struct drbd_thread *thi)
5868 {
5869         struct drbd_connection *connection = thi->connection;
5870         struct meta_sock_cmd *cmd = NULL;
5871         struct packet_info pi;
5872         unsigned long pre_recv_jif;
5873         int rv;
5874         void *buf    = connection->meta.rbuf;
5875         int received = 0;
5876         unsigned int header_size = drbd_header_size(connection);
5877         int expect   = header_size;
5878         bool ping_timeout_active = false;
5879         struct sched_param param = { .sched_priority = 2 };
5880
5881         rv = sched_setscheduler(current, SCHED_RR, &param);
5882         if (rv < 0)
5883                 drbd_err(connection, "drbd_ack_receiver: ERROR set priority, ret=%d\n", rv);
5884
5885         while (get_t_state(thi) == RUNNING) {
5886                 drbd_thread_current_set_cpu(thi);
5887
5888                 conn_reclaim_net_peer_reqs(connection);
5889
5890                 if (test_and_clear_bit(SEND_PING, &connection->flags)) {
5891                         if (drbd_send_ping(connection)) {
5892                                 drbd_err(connection, "drbd_send_ping has failed\n");
5893                                 goto reconnect;
5894                         }
5895                         set_ping_timeout(connection);
5896                         ping_timeout_active = true;
5897                 }
5898
5899                 pre_recv_jif = jiffies;
5900                 rv = drbd_recv_short(connection->meta.socket, buf, expect-received, 0);
5901
5902                 /* Note:
5903                  * -EINTR        (on meta) we got a signal
5904                  * -EAGAIN       (on meta) rcvtimeo expired
5905                  * -ECONNRESET   other side closed the connection
5906                  * -ERESTARTSYS  (on data) we got a signal
5907                  * rv <  0       other than above: unexpected error!
5908                  * rv == expected: full header or command
5909                  * rv <  expected: "woken" by signal during receive
5910                  * rv == 0       : "connection shut down by peer"
5911                  */
5912                 if (likely(rv > 0)) {
5913                         received += rv;
5914                         buf      += rv;
5915                 } else if (rv == 0) {
5916                         if (test_bit(DISCONNECT_SENT, &connection->flags)) {
5917                                 long t;
5918                                 rcu_read_lock();
5919                                 t = rcu_dereference(connection->net_conf)->ping_timeo * HZ/10;
5920                                 rcu_read_unlock();
5921
5922                                 t = wait_event_timeout(connection->ping_wait,
5923                                                        connection->cstate < C_WF_REPORT_PARAMS,
5924                                                        t);
5925                                 if (t)
5926                                         break;
5927                         }
5928                         drbd_err(connection, "meta connection shut down by peer.\n");
5929                         goto reconnect;
5930                 } else if (rv == -EAGAIN) {
5931                         /* If the data socket received something meanwhile,
5932                          * that is good enough: peer is still alive. */
5933                         if (time_after(connection->last_received, pre_recv_jif))
5934                                 continue;
5935                         if (ping_timeout_active) {
5936                                 drbd_err(connection, "PingAck did not arrive in time.\n");
5937                                 goto reconnect;
5938                         }
5939                         set_bit(SEND_PING, &connection->flags);
5940                         continue;
5941                 } else if (rv == -EINTR) {
5942                         /* maybe drbd_thread_stop(): the while condition will notice.
5943                          * maybe woken for send_ping: we'll send a ping above,
5944                          * and change the rcvtimeo */
5945                         flush_signals(current);
5946                         continue;
5947                 } else {
5948                         drbd_err(connection, "sock_recvmsg returned %d\n", rv);
5949                         goto reconnect;
5950                 }
5951
5952                 if (received == expect && cmd == NULL) {
5953                         if (decode_header(connection, connection->meta.rbuf, &pi))
5954                                 goto reconnect;
5955                         cmd = &ack_receiver_tbl[pi.cmd];
5956                         if (pi.cmd >= ARRAY_SIZE(ack_receiver_tbl) || !cmd->fn) {
5957                                 drbd_err(connection, "Unexpected meta packet %s (0x%04x)\n",
5958                                          cmdname(pi.cmd), pi.cmd);
5959                                 goto disconnect;
5960                         }
5961                         expect = header_size + cmd->pkt_size;
5962                         if (pi.size != expect - header_size) {
5963                                 drbd_err(connection, "Wrong packet size on meta (c: %d, l: %d)\n",
5964                                         pi.cmd, pi.size);
5965                                 goto reconnect;
5966                         }
5967                 }
5968                 if (received == expect) {
5969                         bool err;
5970
5971                         err = cmd->fn(connection, &pi);
5972                         if (err) {
5973                                 drbd_err(connection, "%pf failed\n", cmd->fn);
5974                                 goto reconnect;
5975                         }
5976
5977                         connection->last_received = jiffies;
5978
5979                         if (cmd == &ack_receiver_tbl[P_PING_ACK]) {
5980                                 set_idle_timeout(connection);
5981                                 ping_timeout_active = false;
5982                         }
5983
5984                         buf      = connection->meta.rbuf;
5985                         received = 0;
5986                         expect   = header_size;
5987                         cmd      = NULL;
5988                 }
5989         }
5990
5991         if (0) {
5992 reconnect:
5993                 conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
5994                 conn_md_sync(connection);
5995         }
5996         if (0) {
5997 disconnect:
5998                 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
5999         }
6000
6001         drbd_info(connection, "ack_receiver terminated\n");
6002
6003         return 0;
6004 }
6005
6006 void drbd_send_acks_wf(struct work_struct *ws)
6007 {
6008         struct drbd_peer_device *peer_device =
6009                 container_of(ws, struct drbd_peer_device, send_acks_work);
6010         struct drbd_connection *connection = peer_device->connection;
6011         struct drbd_device *device = peer_device->device;
6012         struct net_conf *nc;
6013         int tcp_cork, err;
6014
6015         rcu_read_lock();
6016         nc = rcu_dereference(connection->net_conf);
6017         tcp_cork = nc->tcp_cork;
6018         rcu_read_unlock();
6019
6020         if (tcp_cork)
6021                 drbd_tcp_cork(connection->meta.socket);
6022
6023         err = drbd_finish_peer_reqs(device);
6024         kref_put(&device->kref, drbd_destroy_device);
6025         /* get is in drbd_endio_write_sec_final(). That is necessary to keep the
6026            struct work_struct send_acks_work alive, which is in the peer_device object */
6027
6028         if (err) {
6029                 conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
6030                 return;
6031         }
6032
6033         if (tcp_cork)
6034                 drbd_tcp_uncork(connection->meta.socket);
6035
6036         return;
6037 }