4 This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
6 Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7 Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8 Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
10 drbd is free software; you can redistribute it and/or modify
11 it under the terms of the GNU General Public License as published by
12 the Free Software Foundation; either version 2, or (at your option)
15 drbd is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License for more details.
20 You should have received a copy of the GNU General Public License
21 along with drbd; see the file COPYING. If not, write to
22 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
26 #include <linux/module.h>
28 #include <asm/uaccess.h>
31 #include <linux/drbd.h>
33 #include <linux/file.h>
36 #include <linux/memcontrol.h>
37 #include <linux/mm_inline.h>
38 #include <linux/slab.h>
39 #include <linux/pkt_sched.h>
40 #define __KERNEL_SYSCALLS__
41 #include <linux/unistd.h>
42 #include <linux/vmalloc.h>
43 #include <linux/random.h>
44 #include <linux/string.h>
45 #include <linux/scatterlist.h>
64 static int drbd_do_features(struct drbd_tconn *tconn);
65 static int drbd_do_auth(struct drbd_tconn *tconn);
66 static int drbd_disconnected(struct drbd_conf *mdev);
68 static enum finish_epoch drbd_may_finish_epoch(struct drbd_tconn *, struct drbd_epoch *, enum epoch_event);
69 static int e_end_block(struct drbd_work *, int);
72 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
75 * some helper functions to deal with single linked page lists,
76 * page->private being our "next" pointer.
79 /* If at least n pages are linked at head, get n pages off.
80 * Otherwise, don't modify head, and return NULL.
81 * Locking is the responsibility of the caller.
83 static struct page *page_chain_del(struct page **head, int n)
97 tmp = page_chain_next(page);
99 break; /* found sufficient pages */
101 /* insufficient pages, don't use any of them. */
106 /* add end of list marker for the returned list */
107 set_page_private(page, 0);
108 /* actual return value, and adjustment of head */
114 /* may be used outside of locks to find the tail of a (usually short)
115 * "private" page chain, before adding it back to a global chain head
116 * with page_chain_add() under a spinlock. */
117 static struct page *page_chain_tail(struct page *page, int *len)
121 while ((tmp = page_chain_next(page)))
128 static int page_chain_free(struct page *page)
132 page_chain_for_each_safe(page, tmp) {
139 static void page_chain_add(struct page **head,
140 struct page *chain_first, struct page *chain_last)
144 tmp = page_chain_tail(chain_first, NULL);
145 BUG_ON(tmp != chain_last);
148 /* add chain to head */
149 set_page_private(chain_last, (unsigned long)*head);
153 static struct page *__drbd_alloc_pages(struct drbd_conf *mdev,
156 struct page *page = NULL;
157 struct page *tmp = NULL;
160 /* Yes, testing drbd_pp_vacant outside the lock is racy.
161 * So what. It saves a spin_lock. */
162 if (drbd_pp_vacant >= number) {
163 spin_lock(&drbd_pp_lock);
164 page = page_chain_del(&drbd_pp_pool, number);
166 drbd_pp_vacant -= number;
167 spin_unlock(&drbd_pp_lock);
172 /* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
173 * "criss-cross" setup, that might cause write-out on some other DRBD,
174 * which in turn might block on the other node at this very place. */
175 for (i = 0; i < number; i++) {
176 tmp = alloc_page(GFP_TRY);
179 set_page_private(tmp, (unsigned long)page);
186 /* Not enough pages immediately available this time.
187 * No need to jump around here, drbd_alloc_pages will retry this
188 * function "soon". */
190 tmp = page_chain_tail(page, NULL);
191 spin_lock(&drbd_pp_lock);
192 page_chain_add(&drbd_pp_pool, page, tmp);
194 spin_unlock(&drbd_pp_lock);
199 static void reclaim_finished_net_peer_reqs(struct drbd_conf *mdev,
200 struct list_head *to_be_freed)
202 struct drbd_peer_request *peer_req;
203 struct list_head *le, *tle;
205 /* The EEs are always appended to the end of the list. Since
206 they are sent in order over the wire, they have to finish
207 in order. As soon as we see the first not finished we can
208 stop to examine the list... */
210 list_for_each_safe(le, tle, &mdev->net_ee) {
211 peer_req = list_entry(le, struct drbd_peer_request, w.list);
212 if (drbd_peer_req_has_active_page(peer_req))
214 list_move(le, to_be_freed);
218 static void drbd_kick_lo_and_reclaim_net(struct drbd_conf *mdev)
220 LIST_HEAD(reclaimed);
221 struct drbd_peer_request *peer_req, *t;
223 spin_lock_irq(&mdev->tconn->req_lock);
224 reclaim_finished_net_peer_reqs(mdev, &reclaimed);
225 spin_unlock_irq(&mdev->tconn->req_lock);
227 list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
228 drbd_free_net_peer_req(mdev, peer_req);
232 * drbd_alloc_pages() - Returns @number pages, retries forever (or until signalled)
233 * @mdev: DRBD device.
234 * @number: number of pages requested
235 * @retry: whether to retry, if not enough pages are available right now
237 * Tries to allocate number pages, first from our own page pool, then from
238 * the kernel, unless this allocation would exceed the max_buffers setting.
239 * Possibly retry until DRBD frees sufficient pages somewhere else.
241 * Returns a page chain linked via page->private.
243 struct page *drbd_alloc_pages(struct drbd_conf *mdev, unsigned int number,
246 struct page *page = NULL;
251 /* Yes, we may run up to @number over max_buffers. If we
252 * follow it strictly, the admin will get it wrong anyways. */
254 nc = rcu_dereference(mdev->tconn->net_conf);
255 mxb = nc ? nc->max_buffers : 1000000;
258 if (atomic_read(&mdev->pp_in_use) < mxb)
259 page = __drbd_alloc_pages(mdev, number);
261 while (page == NULL) {
262 prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
264 drbd_kick_lo_and_reclaim_net(mdev);
266 if (atomic_read(&mdev->pp_in_use) < mxb) {
267 page = __drbd_alloc_pages(mdev, number);
275 if (signal_pending(current)) {
276 dev_warn(DEV, "drbd_alloc_pages interrupted!\n");
282 finish_wait(&drbd_pp_wait, &wait);
285 atomic_add(number, &mdev->pp_in_use);
289 /* Must not be used from irq, as that may deadlock: see drbd_alloc_pages.
290 * Is also used from inside an other spin_lock_irq(&mdev->tconn->req_lock);
291 * Either links the page chain back to the global pool,
292 * or returns all pages to the system. */
293 static void drbd_free_pages(struct drbd_conf *mdev, struct page *page, int is_net)
295 atomic_t *a = is_net ? &mdev->pp_in_use_by_net : &mdev->pp_in_use;
298 if (drbd_pp_vacant > (DRBD_MAX_BIO_SIZE/PAGE_SIZE) * minor_count)
299 i = page_chain_free(page);
302 tmp = page_chain_tail(page, &i);
303 spin_lock(&drbd_pp_lock);
304 page_chain_add(&drbd_pp_pool, page, tmp);
306 spin_unlock(&drbd_pp_lock);
308 i = atomic_sub_return(i, a);
310 dev_warn(DEV, "ASSERTION FAILED: %s: %d < 0\n",
311 is_net ? "pp_in_use_by_net" : "pp_in_use", i);
312 wake_up(&drbd_pp_wait);
316 You need to hold the req_lock:
317 _drbd_wait_ee_list_empty()
319 You must not have the req_lock:
321 drbd_alloc_peer_req()
322 drbd_free_peer_reqs()
324 drbd_finish_peer_reqs()
326 drbd_wait_ee_list_empty()
329 struct drbd_peer_request *
330 drbd_alloc_peer_req(struct drbd_conf *mdev, u64 id, sector_t sector,
331 unsigned int data_size, gfp_t gfp_mask) __must_hold(local)
333 struct drbd_peer_request *peer_req;
335 unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
337 if (drbd_insert_fault(mdev, DRBD_FAULT_AL_EE))
340 peer_req = mempool_alloc(drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM);
342 if (!(gfp_mask & __GFP_NOWARN))
343 dev_err(DEV, "%s: allocation failed\n", __func__);
347 page = drbd_alloc_pages(mdev, nr_pages, (gfp_mask & __GFP_WAIT));
351 drbd_clear_interval(&peer_req->i);
352 peer_req->i.size = data_size;
353 peer_req->i.sector = sector;
354 peer_req->i.local = false;
355 peer_req->i.waiting = false;
357 peer_req->epoch = NULL;
358 peer_req->w.mdev = mdev;
359 peer_req->pages = page;
360 atomic_set(&peer_req->pending_bios, 0);
363 * The block_id is opaque to the receiver. It is not endianness
364 * converted, and sent back to the sender unchanged.
366 peer_req->block_id = id;
371 mempool_free(peer_req, drbd_ee_mempool);
375 void __drbd_free_peer_req(struct drbd_conf *mdev, struct drbd_peer_request *peer_req,
378 if (peer_req->flags & EE_HAS_DIGEST)
379 kfree(peer_req->digest);
380 drbd_free_pages(mdev, peer_req->pages, is_net);
381 D_ASSERT(atomic_read(&peer_req->pending_bios) == 0);
382 D_ASSERT(drbd_interval_empty(&peer_req->i));
383 mempool_free(peer_req, drbd_ee_mempool);
386 int drbd_free_peer_reqs(struct drbd_conf *mdev, struct list_head *list)
388 LIST_HEAD(work_list);
389 struct drbd_peer_request *peer_req, *t;
391 int is_net = list == &mdev->net_ee;
393 spin_lock_irq(&mdev->tconn->req_lock);
394 list_splice_init(list, &work_list);
395 spin_unlock_irq(&mdev->tconn->req_lock);
397 list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
398 __drbd_free_peer_req(mdev, peer_req, is_net);
405 * See also comments in _req_mod(,BARRIER_ACKED) and receive_Barrier.
407 static int drbd_finish_peer_reqs(struct drbd_conf *mdev)
409 LIST_HEAD(work_list);
410 LIST_HEAD(reclaimed);
411 struct drbd_peer_request *peer_req, *t;
414 spin_lock_irq(&mdev->tconn->req_lock);
415 reclaim_finished_net_peer_reqs(mdev, &reclaimed);
416 list_splice_init(&mdev->done_ee, &work_list);
417 spin_unlock_irq(&mdev->tconn->req_lock);
419 list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
420 drbd_free_net_peer_req(mdev, peer_req);
422 /* possible callbacks here:
423 * e_end_block, and e_end_resync_block, e_send_discard_write.
424 * all ignore the last argument.
426 list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
429 /* list_del not necessary, next/prev members not touched */
430 err2 = peer_req->w.cb(&peer_req->w, !!err);
433 drbd_free_peer_req(mdev, peer_req);
435 wake_up(&mdev->ee_wait);
440 static void _drbd_wait_ee_list_empty(struct drbd_conf *mdev,
441 struct list_head *head)
445 /* avoids spin_lock/unlock
446 * and calling prepare_to_wait in the fast path */
447 while (!list_empty(head)) {
448 prepare_to_wait(&mdev->ee_wait, &wait, TASK_UNINTERRUPTIBLE);
449 spin_unlock_irq(&mdev->tconn->req_lock);
451 finish_wait(&mdev->ee_wait, &wait);
452 spin_lock_irq(&mdev->tconn->req_lock);
456 static void drbd_wait_ee_list_empty(struct drbd_conf *mdev,
457 struct list_head *head)
459 spin_lock_irq(&mdev->tconn->req_lock);
460 _drbd_wait_ee_list_empty(mdev, head);
461 spin_unlock_irq(&mdev->tconn->req_lock);
464 /* see also kernel_accept; which is only present since 2.6.18.
465 * also we want to log which part of it failed, exactly */
466 static int drbd_accept(const char **what, struct socket *sock, struct socket **newsock)
468 struct sock *sk = sock->sk;
472 err = sock->ops->listen(sock, 5);
476 *what = "sock_create_lite";
477 err = sock_create_lite(sk->sk_family, sk->sk_type, sk->sk_protocol,
483 err = sock->ops->accept(sock, *newsock, 0);
485 sock_release(*newsock);
489 (*newsock)->ops = sock->ops;
495 static int drbd_recv_short(struct socket *sock, void *buf, size_t size, int flags)
502 struct msghdr msg = {
504 .msg_iov = (struct iovec *)&iov,
505 .msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL)
511 rv = sock_recvmsg(sock, &msg, size, msg.msg_flags);
517 static int drbd_recv(struct drbd_tconn *tconn, void *buf, size_t size)
524 struct msghdr msg = {
526 .msg_iov = (struct iovec *)&iov,
527 .msg_flags = MSG_WAITALL | MSG_NOSIGNAL
535 rv = sock_recvmsg(tconn->data.socket, &msg, size, msg.msg_flags);
540 * ECONNRESET other side closed the connection
541 * ERESTARTSYS (on sock) we got a signal
545 if (rv == -ECONNRESET)
546 conn_info(tconn, "sock was reset by peer\n");
547 else if (rv != -ERESTARTSYS)
548 conn_err(tconn, "sock_recvmsg returned %d\n", rv);
550 } else if (rv == 0) {
551 conn_info(tconn, "sock was shut down by peer\n");
554 /* signal came in, or peer/link went down,
555 * after we read a partial message
557 /* D_ASSERT(signal_pending(current)); */
565 conn_request_state(tconn, NS(conn, C_BROKEN_PIPE), CS_HARD);
570 static int drbd_recv_all(struct drbd_tconn *tconn, void *buf, size_t size)
574 err = drbd_recv(tconn, buf, size);
583 static int drbd_recv_all_warn(struct drbd_tconn *tconn, void *buf, size_t size)
587 err = drbd_recv_all(tconn, buf, size);
588 if (err && !signal_pending(current))
589 conn_warn(tconn, "short read (expected size %d)\n", (int)size);
594 * On individual connections, the socket buffer size must be set prior to the
595 * listen(2) or connect(2) calls in order to have it take effect.
596 * This is our wrapper to do so.
598 static void drbd_setbufsize(struct socket *sock, unsigned int snd,
601 /* open coded SO_SNDBUF, SO_RCVBUF */
603 sock->sk->sk_sndbuf = snd;
604 sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
607 sock->sk->sk_rcvbuf = rcv;
608 sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
612 static struct socket *drbd_try_connect(struct drbd_tconn *tconn)
616 struct sockaddr_in6 src_in6;
617 struct sockaddr_in6 peer_in6;
619 int err, peer_addr_len, my_addr_len;
620 int sndbuf_size, rcvbuf_size, connect_int;
621 int disconnect_on_error = 1;
624 nc = rcu_dereference(tconn->net_conf);
629 sndbuf_size = nc->sndbuf_size;
630 rcvbuf_size = nc->rcvbuf_size;
631 connect_int = nc->connect_int;
634 my_addr_len = min_t(int, tconn->my_addr_len, sizeof(src_in6));
635 memcpy(&src_in6, &tconn->my_addr, my_addr_len);
637 if (((struct sockaddr *)&tconn->my_addr)->sa_family == AF_INET6)
638 src_in6.sin6_port = 0;
640 ((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */
642 peer_addr_len = min_t(int, tconn->peer_addr_len, sizeof(src_in6));
643 memcpy(&peer_in6, &tconn->peer_addr, peer_addr_len);
645 what = "sock_create_kern";
646 err = sock_create_kern(((struct sockaddr *)&src_in6)->sa_family,
647 SOCK_STREAM, IPPROTO_TCP, &sock);
653 sock->sk->sk_rcvtimeo =
654 sock->sk->sk_sndtimeo = connect_int * HZ;
655 drbd_setbufsize(sock, sndbuf_size, rcvbuf_size);
657 /* explicitly bind to the configured IP as source IP
658 * for the outgoing connections.
659 * This is needed for multihomed hosts and to be
660 * able to use lo: interfaces for drbd.
661 * Make sure to use 0 as port number, so linux selects
662 * a free one dynamically.
664 what = "bind before connect";
665 err = sock->ops->bind(sock, (struct sockaddr *) &src_in6, my_addr_len);
669 /* connect may fail, peer not yet available.
670 * stay C_WF_CONNECTION, don't go Disconnecting! */
671 disconnect_on_error = 0;
673 err = sock->ops->connect(sock, (struct sockaddr *) &peer_in6, peer_addr_len, 0);
682 /* timeout, busy, signal pending */
683 case ETIMEDOUT: case EAGAIN: case EINPROGRESS:
684 case EINTR: case ERESTARTSYS:
685 /* peer not (yet) available, network problem */
686 case ECONNREFUSED: case ENETUNREACH:
687 case EHOSTDOWN: case EHOSTUNREACH:
688 disconnect_on_error = 0;
691 conn_err(tconn, "%s failed, err = %d\n", what, err);
693 if (disconnect_on_error)
694 conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
700 static struct socket *drbd_wait_for_connect(struct drbd_tconn *tconn)
702 int timeo, err, my_addr_len;
703 int sndbuf_size, rcvbuf_size, connect_int;
704 struct socket *s_estab = NULL, *s_listen;
705 struct sockaddr_in6 my_addr;
710 nc = rcu_dereference(tconn->net_conf);
715 sndbuf_size = nc->sndbuf_size;
716 rcvbuf_size = nc->rcvbuf_size;
717 connect_int = nc->connect_int;
720 my_addr_len = min_t(int, tconn->my_addr_len, sizeof(struct sockaddr_in6));
721 memcpy(&my_addr, &tconn->my_addr, my_addr_len);
723 what = "sock_create_kern";
724 err = sock_create_kern(((struct sockaddr *)&my_addr)->sa_family,
725 SOCK_STREAM, IPPROTO_TCP, &s_listen);
731 timeo = connect_int * HZ;
732 timeo += (random32() & 1) ? timeo / 7 : -timeo / 7; /* 28.5% random jitter */
734 s_listen->sk->sk_reuse = 1; /* SO_REUSEADDR */
735 s_listen->sk->sk_rcvtimeo = timeo;
736 s_listen->sk->sk_sndtimeo = timeo;
737 drbd_setbufsize(s_listen, sndbuf_size, rcvbuf_size);
739 what = "bind before listen";
740 err = s_listen->ops->bind(s_listen, (struct sockaddr *)&my_addr, my_addr_len);
744 err = drbd_accept(&what, s_listen, &s_estab);
748 sock_release(s_listen);
750 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
751 conn_err(tconn, "%s failed, err = %d\n", what, err);
752 conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
759 static int decode_header(struct drbd_tconn *, void *, struct packet_info *);
761 static int send_first_packet(struct drbd_tconn *tconn, struct drbd_socket *sock,
762 enum drbd_packet cmd)
764 if (!conn_prepare_command(tconn, sock))
766 return conn_send_command(tconn, sock, cmd, 0, NULL, 0);
769 static int receive_first_packet(struct drbd_tconn *tconn, struct socket *sock)
771 unsigned int header_size = drbd_header_size(tconn);
772 struct packet_info pi;
775 err = drbd_recv_short(sock, tconn->data.rbuf, header_size, 0);
776 if (err != header_size) {
781 err = decode_header(tconn, tconn->data.rbuf, &pi);
788 * drbd_socket_okay() - Free the socket if its connection is not okay
789 * @sock: pointer to the pointer to the socket.
791 static int drbd_socket_okay(struct socket **sock)
799 rr = drbd_recv_short(*sock, tb, 4, MSG_DONTWAIT | MSG_PEEK);
801 if (rr > 0 || rr == -EAGAIN) {
809 /* Gets called if a connection is established, or if a new minor gets created
811 int drbd_connected(struct drbd_conf *mdev)
815 atomic_set(&mdev->packet_seq, 0);
818 mdev->state_mutex = mdev->tconn->agreed_pro_version < 100 ?
819 &mdev->tconn->cstate_mutex :
820 &mdev->own_state_mutex;
822 err = drbd_send_sync_param(mdev);
824 err = drbd_send_sizes(mdev, 0, 0);
826 err = drbd_send_uuids(mdev);
828 err = drbd_send_current_state(mdev);
829 clear_bit(USE_DEGR_WFC_T, &mdev->flags);
830 clear_bit(RESIZE_PENDING, &mdev->flags);
831 mod_timer(&mdev->request_timer, jiffies + HZ); /* just start it here. */
837 * 1 yes, we have a valid connection
838 * 0 oops, did not work out, please try again
839 * -1 peer talks different language,
840 * no point in trying again, please go standalone.
841 * -2 We do not have a network config...
843 static int conn_connect(struct drbd_tconn *tconn)
845 struct drbd_socket sock, msock;
846 struct drbd_conf *mdev;
848 int vnr, timeout, try, h, ok;
849 bool discard_my_data;
851 if (conn_request_state(tconn, NS(conn, C_WF_CONNECTION), CS_VERBOSE) < SS_SUCCESS)
854 mutex_init(&sock.mutex);
855 sock.sbuf = tconn->data.sbuf;
856 sock.rbuf = tconn->data.rbuf;
858 mutex_init(&msock.mutex);
859 msock.sbuf = tconn->meta.sbuf;
860 msock.rbuf = tconn->meta.rbuf;
863 clear_bit(DISCARD_CONCURRENT, &tconn->flags);
865 /* Assume that the peer only understands protocol 80 until we know better. */
866 tconn->agreed_pro_version = 80;
872 /* 3 tries, this should take less than a second! */
873 s = drbd_try_connect(tconn);
876 /* give the other side time to call bind() & listen() */
877 schedule_timeout_interruptible(HZ / 10);
883 send_first_packet(tconn, &sock, P_INITIAL_DATA);
884 } else if (!msock.socket) {
886 send_first_packet(tconn, &msock, P_INITIAL_META);
888 conn_err(tconn, "Logic error in conn_connect()\n");
889 goto out_release_sockets;
893 if (sock.socket && msock.socket) {
895 nc = rcu_dereference(tconn->net_conf);
896 timeout = nc->ping_timeo * HZ / 10;
898 schedule_timeout_interruptible(timeout);
899 ok = drbd_socket_okay(&sock.socket);
900 ok = drbd_socket_okay(&msock.socket) && ok;
906 s = drbd_wait_for_connect(tconn);
908 try = receive_first_packet(tconn, s);
909 drbd_socket_okay(&sock.socket);
910 drbd_socket_okay(&msock.socket);
914 conn_warn(tconn, "initial packet S crossed\n");
915 sock_release(sock.socket);
921 conn_warn(tconn, "initial packet M crossed\n");
922 sock_release(msock.socket);
925 set_bit(DISCARD_CONCURRENT, &tconn->flags);
928 conn_warn(tconn, "Error receiving initial packet\n");
935 if (tconn->cstate <= C_DISCONNECTING)
936 goto out_release_sockets;
937 if (signal_pending(current)) {
938 flush_signals(current);
940 if (get_t_state(&tconn->receiver) == EXITING)
941 goto out_release_sockets;
944 if (sock.socket && &msock.socket) {
945 ok = drbd_socket_okay(&sock.socket);
946 ok = drbd_socket_okay(&msock.socket) && ok;
952 sock.socket->sk->sk_reuse = 1; /* SO_REUSEADDR */
953 msock.socket->sk->sk_reuse = 1; /* SO_REUSEADDR */
955 sock.socket->sk->sk_allocation = GFP_NOIO;
956 msock.socket->sk->sk_allocation = GFP_NOIO;
958 sock.socket->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK;
959 msock.socket->sk->sk_priority = TC_PRIO_INTERACTIVE;
962 * sock.socket->sk->sk_sndtimeo = tconn->net_conf->timeout*HZ/10;
963 * sock.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
964 * first set it to the P_CONNECTION_FEATURES timeout,
965 * which we set to 4x the configured ping_timeout. */
967 nc = rcu_dereference(tconn->net_conf);
969 sock.socket->sk->sk_sndtimeo =
970 sock.socket->sk->sk_rcvtimeo = nc->ping_timeo*4*HZ/10;
972 msock.socket->sk->sk_rcvtimeo = nc->ping_int*HZ;
973 timeout = nc->timeout * HZ / 10;
974 discard_my_data = nc->discard_my_data;
977 msock.socket->sk->sk_sndtimeo = timeout;
979 /* we don't want delays.
980 * we use TCP_CORK where appropriate, though */
981 drbd_tcp_nodelay(sock.socket);
982 drbd_tcp_nodelay(msock.socket);
984 tconn->data.socket = sock.socket;
985 tconn->meta.socket = msock.socket;
986 tconn->last_received = jiffies;
988 h = drbd_do_features(tconn);
992 if (tconn->cram_hmac_tfm) {
993 /* drbd_request_state(mdev, NS(conn, WFAuth)); */
994 switch (drbd_do_auth(tconn)) {
996 conn_err(tconn, "Authentication of peer failed\n");
999 conn_err(tconn, "Authentication of peer failed, trying again.\n");
1004 tconn->data.socket->sk->sk_sndtimeo = timeout;
1005 tconn->data.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
1007 if (drbd_send_protocol(tconn) == -EOPNOTSUPP)
1011 idr_for_each_entry(&tconn->volumes, mdev, vnr) {
1012 kref_get(&mdev->kref);
1015 if (discard_my_data)
1016 set_bit(DISCARD_MY_DATA, &mdev->flags);
1018 clear_bit(DISCARD_MY_DATA, &mdev->flags);
1020 drbd_connected(mdev);
1021 kref_put(&mdev->kref, &drbd_minor_destroy);
1026 if (conn_request_state(tconn, NS(conn, C_WF_REPORT_PARAMS), CS_VERBOSE) < SS_SUCCESS)
1029 drbd_thread_start(&tconn->asender);
1031 mutex_lock(&tconn->conf_update);
1032 /* The discard_my_data flag is a single-shot modifier to the next
1033 * connection attempt, the handshake of which is now well underway.
1034 * No need for rcu style copying of the whole struct
1035 * just to clear a single value. */
1036 tconn->net_conf->discard_my_data = 0;
1037 mutex_unlock(&tconn->conf_update);
1041 out_release_sockets:
1043 sock_release(sock.socket);
1045 sock_release(msock.socket);
1049 static int decode_header(struct drbd_tconn *tconn, void *header, struct packet_info *pi)
1051 unsigned int header_size = drbd_header_size(tconn);
1053 if (header_size == sizeof(struct p_header100) &&
1054 *(__be32 *)header == cpu_to_be32(DRBD_MAGIC_100)) {
1055 struct p_header100 *h = header;
1057 conn_err(tconn, "Header padding is not zero\n");
1060 pi->vnr = be16_to_cpu(h->volume);
1061 pi->cmd = be16_to_cpu(h->command);
1062 pi->size = be32_to_cpu(h->length);
1063 } else if (header_size == sizeof(struct p_header95) &&
1064 *(__be16 *)header == cpu_to_be16(DRBD_MAGIC_BIG)) {
1065 struct p_header95 *h = header;
1066 pi->cmd = be16_to_cpu(h->command);
1067 pi->size = be32_to_cpu(h->length);
1069 } else if (header_size == sizeof(struct p_header80) &&
1070 *(__be32 *)header == cpu_to_be32(DRBD_MAGIC)) {
1071 struct p_header80 *h = header;
1072 pi->cmd = be16_to_cpu(h->command);
1073 pi->size = be16_to_cpu(h->length);
1076 conn_err(tconn, "Wrong magic value 0x%08x in protocol version %d\n",
1077 be32_to_cpu(*(__be32 *)header),
1078 tconn->agreed_pro_version);
1081 pi->data = header + header_size;
1085 static int drbd_recv_header(struct drbd_tconn *tconn, struct packet_info *pi)
1087 void *buffer = tconn->data.rbuf;
1090 err = drbd_recv_all_warn(tconn, buffer, drbd_header_size(tconn));
1094 err = decode_header(tconn, buffer, pi);
1095 tconn->last_received = jiffies;
1100 static void drbd_flush(struct drbd_tconn *tconn)
1103 struct drbd_conf *mdev;
1106 if (tconn->write_ordering >= WO_bdev_flush) {
1108 idr_for_each_entry(&tconn->volumes, mdev, vnr) {
1109 if (!get_ldev(mdev))
1111 kref_get(&mdev->kref);
1114 rv = blkdev_issue_flush(mdev->ldev->backing_bdev,
1117 dev_info(DEV, "local disk flush failed with status %d\n", rv);
1118 /* would rather check on EOPNOTSUPP, but that is not reliable.
1119 * don't try again for ANY return value != 0
1120 * if (rv == -EOPNOTSUPP) */
1121 drbd_bump_write_ordering(tconn, WO_drain_io);
1124 kref_put(&mdev->kref, &drbd_minor_destroy);
1135 * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it.
1136 * @mdev: DRBD device.
1137 * @epoch: Epoch object.
1140 static enum finish_epoch drbd_may_finish_epoch(struct drbd_tconn *tconn,
1141 struct drbd_epoch *epoch,
1142 enum epoch_event ev)
1145 struct drbd_epoch *next_epoch;
1146 enum finish_epoch rv = FE_STILL_LIVE;
1148 spin_lock(&tconn->epoch_lock);
1152 epoch_size = atomic_read(&epoch->epoch_size);
1154 switch (ev & ~EV_CLEANUP) {
1156 atomic_dec(&epoch->active);
1158 case EV_GOT_BARRIER_NR:
1159 set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags);
1161 case EV_BECAME_LAST:
1166 if (epoch_size != 0 &&
1167 atomic_read(&epoch->active) == 0 &&
1168 (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags) || ev & EV_CLEANUP)) {
1169 if (!(ev & EV_CLEANUP)) {
1170 spin_unlock(&tconn->epoch_lock);
1171 drbd_send_b_ack(epoch->mdev, epoch->barrier_nr, epoch_size);
1172 spin_lock(&tconn->epoch_lock);
1174 if (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags))
1175 dec_unacked(epoch->mdev);
1177 if (tconn->current_epoch != epoch) {
1178 next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list);
1179 list_del(&epoch->list);
1180 ev = EV_BECAME_LAST | (ev & EV_CLEANUP);
1184 if (rv == FE_STILL_LIVE)
1188 atomic_set(&epoch->epoch_size, 0);
1189 /* atomic_set(&epoch->active, 0); is already zero */
1190 if (rv == FE_STILL_LIVE)
1201 spin_unlock(&tconn->epoch_lock);
1207 * drbd_bump_write_ordering() - Fall back to an other write ordering method
1208 * @tconn: DRBD connection.
1209 * @wo: Write ordering method to try.
1211 void drbd_bump_write_ordering(struct drbd_tconn *tconn, enum write_ordering_e wo)
1213 struct disk_conf *dc;
1214 struct drbd_conf *mdev;
1215 enum write_ordering_e pwo;
1217 static char *write_ordering_str[] = {
1219 [WO_drain_io] = "drain",
1220 [WO_bdev_flush] = "flush",
1223 pwo = tconn->write_ordering;
1226 idr_for_each_entry(&tconn->volumes, mdev, vnr) {
1227 if (!get_ldev(mdev))
1229 dc = rcu_dereference(mdev->ldev->disk_conf);
1231 if (wo == WO_bdev_flush && !dc->disk_flushes)
1233 if (wo == WO_drain_io && !dc->disk_drain)
1238 tconn->write_ordering = wo;
1239 if (pwo != tconn->write_ordering || wo == WO_bdev_flush)
1240 conn_info(tconn, "Method to ensure write ordering: %s\n", write_ordering_str[tconn->write_ordering]);
1244 * drbd_submit_peer_request()
1245 * @mdev: DRBD device.
1246 * @peer_req: peer request
1247 * @rw: flag field, see bio->bi_rw
1249 * May spread the pages to multiple bios,
1250 * depending on bio_add_page restrictions.
1252 * Returns 0 if all bios have been submitted,
1253 * -ENOMEM if we could not allocate enough bios,
1254 * -ENOSPC (any better suggestion?) if we have not been able to bio_add_page a
1255 * single page to an empty bio (which should never happen and likely indicates
1256 * that the lower level IO stack is in some way broken). This has been observed
1257 * on certain Xen deployments.
1259 /* TODO allocate from our own bio_set. */
1260 int drbd_submit_peer_request(struct drbd_conf *mdev,
1261 struct drbd_peer_request *peer_req,
1262 const unsigned rw, const int fault_type)
1264 struct bio *bios = NULL;
1266 struct page *page = peer_req->pages;
1267 sector_t sector = peer_req->i.sector;
1268 unsigned ds = peer_req->i.size;
1269 unsigned n_bios = 0;
1270 unsigned nr_pages = (ds + PAGE_SIZE -1) >> PAGE_SHIFT;
1273 /* In most cases, we will only need one bio. But in case the lower
1274 * level restrictions happen to be different at this offset on this
1275 * side than those of the sending peer, we may need to submit the
1276 * request in more than one bio.
1278 * Plain bio_alloc is good enough here, this is no DRBD internally
1279 * generated bio, but a bio allocated on behalf of the peer.
1282 bio = bio_alloc(GFP_NOIO, nr_pages);
1284 dev_err(DEV, "submit_ee: Allocation of a bio failed\n");
1287 /* > peer_req->i.sector, unless this is the first bio */
1288 bio->bi_sector = sector;
1289 bio->bi_bdev = mdev->ldev->backing_bdev;
1291 bio->bi_private = peer_req;
1292 bio->bi_end_io = drbd_peer_request_endio;
1294 bio->bi_next = bios;
1298 page_chain_for_each(page) {
1299 unsigned len = min_t(unsigned, ds, PAGE_SIZE);
1300 if (!bio_add_page(bio, page, len, 0)) {
1301 /* A single page must always be possible!
1302 * But in case it fails anyways,
1303 * we deal with it, and complain (below). */
1304 if (bio->bi_vcnt == 0) {
1306 "bio_add_page failed for len=%u, "
1307 "bi_vcnt=0 (bi_sector=%llu)\n",
1308 len, (unsigned long long)bio->bi_sector);
1318 D_ASSERT(page == NULL);
1321 atomic_set(&peer_req->pending_bios, n_bios);
1324 bios = bios->bi_next;
1325 bio->bi_next = NULL;
1327 drbd_generic_make_request(mdev, fault_type, bio);
1334 bios = bios->bi_next;
1340 static void drbd_remove_epoch_entry_interval(struct drbd_conf *mdev,
1341 struct drbd_peer_request *peer_req)
1343 struct drbd_interval *i = &peer_req->i;
1345 drbd_remove_interval(&mdev->write_requests, i);
1346 drbd_clear_interval(i);
1348 /* Wake up any processes waiting for this peer request to complete. */
1350 wake_up(&mdev->misc_wait);
1353 void conn_wait_active_ee_empty(struct drbd_tconn *tconn)
1355 struct drbd_conf *mdev;
1359 idr_for_each_entry(&tconn->volumes, mdev, vnr) {
1360 kref_get(&mdev->kref);
1362 drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
1363 kref_put(&mdev->kref, &drbd_minor_destroy);
1369 static int receive_Barrier(struct drbd_tconn *tconn, struct packet_info *pi)
1371 struct drbd_conf *mdev;
1373 struct p_barrier *p = pi->data;
1374 struct drbd_epoch *epoch;
1376 mdev = vnr_to_mdev(tconn, pi->vnr);
1382 tconn->current_epoch->barrier_nr = p->barrier;
1383 tconn->current_epoch->mdev = mdev;
1384 rv = drbd_may_finish_epoch(tconn, tconn->current_epoch, EV_GOT_BARRIER_NR);
1386 /* P_BARRIER_ACK may imply that the corresponding extent is dropped from
1387 * the activity log, which means it would not be resynced in case the
1388 * R_PRIMARY crashes now.
1389 * Therefore we must send the barrier_ack after the barrier request was
1391 switch (tconn->write_ordering) {
1393 if (rv == FE_RECYCLED)
1396 /* receiver context, in the writeout path of the other node.
1397 * avoid potential distributed deadlock */
1398 epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1402 dev_warn(DEV, "Allocation of an epoch failed, slowing down\n");
1407 conn_wait_active_ee_empty(tconn);
1410 if (atomic_read(&tconn->current_epoch->epoch_size)) {
1411 epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1416 epoch = tconn->current_epoch;
1417 wait_event(mdev->ee_wait, atomic_read(&epoch->epoch_size) == 0);
1419 D_ASSERT(atomic_read(&epoch->active) == 0);
1420 D_ASSERT(epoch->flags == 0);
1424 dev_err(DEV, "Strangeness in tconn->write_ordering %d\n", tconn->write_ordering);
1429 atomic_set(&epoch->epoch_size, 0);
1430 atomic_set(&epoch->active, 0);
1432 spin_lock(&tconn->epoch_lock);
1433 if (atomic_read(&tconn->current_epoch->epoch_size)) {
1434 list_add(&epoch->list, &tconn->current_epoch->list);
1435 tconn->current_epoch = epoch;
1438 /* The current_epoch got recycled while we allocated this one... */
1441 spin_unlock(&tconn->epoch_lock);
1446 /* used from receive_RSDataReply (recv_resync_read)
1447 * and from receive_Data */
1448 static struct drbd_peer_request *
1449 read_in_block(struct drbd_conf *mdev, u64 id, sector_t sector,
1450 int data_size) __must_hold(local)
1452 const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
1453 struct drbd_peer_request *peer_req;
1456 void *dig_in = mdev->tconn->int_dig_in;
1457 void *dig_vv = mdev->tconn->int_dig_vv;
1458 unsigned long *data;
1461 if (mdev->tconn->peer_integrity_tfm) {
1462 dgs = crypto_hash_digestsize(mdev->tconn->peer_integrity_tfm);
1464 * FIXME: Receive the incoming digest into the receive buffer
1465 * here, together with its struct p_data?
1467 err = drbd_recv_all_warn(mdev->tconn, dig_in, dgs);
1473 if (!expect(data_size != 0))
1475 if (!expect(IS_ALIGNED(data_size, 512)))
1477 if (!expect(data_size <= DRBD_MAX_BIO_SIZE))
1480 /* even though we trust out peer,
1481 * we sometimes have to double check. */
1482 if (sector + (data_size>>9) > capacity) {
1483 dev_err(DEV, "request from peer beyond end of local disk: "
1484 "capacity: %llus < sector: %llus + size: %u\n",
1485 (unsigned long long)capacity,
1486 (unsigned long long)sector, data_size);
1490 /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1491 * "criss-cross" setup, that might cause write-out on some other DRBD,
1492 * which in turn might block on the other node at this very place. */
1493 peer_req = drbd_alloc_peer_req(mdev, id, sector, data_size, GFP_NOIO);
1498 page = peer_req->pages;
1499 page_chain_for_each(page) {
1500 unsigned len = min_t(int, ds, PAGE_SIZE);
1502 err = drbd_recv_all_warn(mdev->tconn, data, len);
1503 if (drbd_insert_fault(mdev, DRBD_FAULT_RECEIVE)) {
1504 dev_err(DEV, "Fault injection: Corrupting data on receive\n");
1505 data[0] = data[0] ^ (unsigned long)-1;
1509 drbd_free_peer_req(mdev, peer_req);
1516 drbd_csum_ee(mdev, mdev->tconn->peer_integrity_tfm, peer_req, dig_vv);
1517 if (memcmp(dig_in, dig_vv, dgs)) {
1518 dev_err(DEV, "Digest integrity check FAILED: %llus +%u\n",
1519 (unsigned long long)sector, data_size);
1520 drbd_free_peer_req(mdev, peer_req);
1524 mdev->recv_cnt += data_size>>9;
1528 /* drbd_drain_block() just takes a data block
1529 * out of the socket input buffer, and discards it.
1531 static int drbd_drain_block(struct drbd_conf *mdev, int data_size)
1540 page = drbd_alloc_pages(mdev, 1, 1);
1544 unsigned int len = min_t(int, data_size, PAGE_SIZE);
1546 err = drbd_recv_all_warn(mdev->tconn, data, len);
1552 drbd_free_pages(mdev, page, 0);
1556 static int recv_dless_read(struct drbd_conf *mdev, struct drbd_request *req,
1557 sector_t sector, int data_size)
1559 struct bio_vec *bvec;
1561 int dgs, err, i, expect;
1562 void *dig_in = mdev->tconn->int_dig_in;
1563 void *dig_vv = mdev->tconn->int_dig_vv;
1566 if (mdev->tconn->peer_integrity_tfm) {
1567 dgs = crypto_hash_digestsize(mdev->tconn->peer_integrity_tfm);
1568 err = drbd_recv_all_warn(mdev->tconn, dig_in, dgs);
1574 /* optimistically update recv_cnt. if receiving fails below,
1575 * we disconnect anyways, and counters will be reset. */
1576 mdev->recv_cnt += data_size>>9;
1578 bio = req->master_bio;
1579 D_ASSERT(sector == bio->bi_sector);
1581 bio_for_each_segment(bvec, bio, i) {
1582 void *mapped = kmap(bvec->bv_page) + bvec->bv_offset;
1583 expect = min_t(int, data_size, bvec->bv_len);
1584 err = drbd_recv_all_warn(mdev->tconn, mapped, expect);
1585 kunmap(bvec->bv_page);
1588 data_size -= expect;
1592 drbd_csum_bio(mdev, mdev->tconn->peer_integrity_tfm, bio, dig_vv);
1593 if (memcmp(dig_in, dig_vv, dgs)) {
1594 dev_err(DEV, "Digest integrity check FAILED. Broken NICs?\n");
1599 D_ASSERT(data_size == 0);
1604 * e_end_resync_block() is called in asender context via
1605 * drbd_finish_peer_reqs().
1607 static int e_end_resync_block(struct drbd_work *w, int unused)
1609 struct drbd_peer_request *peer_req =
1610 container_of(w, struct drbd_peer_request, w);
1611 struct drbd_conf *mdev = w->mdev;
1612 sector_t sector = peer_req->i.sector;
1615 D_ASSERT(drbd_interval_empty(&peer_req->i));
1617 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1618 drbd_set_in_sync(mdev, sector, peer_req->i.size);
1619 err = drbd_send_ack(mdev, P_RS_WRITE_ACK, peer_req);
1621 /* Record failure to sync */
1622 drbd_rs_failed_io(mdev, sector, peer_req->i.size);
1624 err = drbd_send_ack(mdev, P_NEG_ACK, peer_req);
1631 static int recv_resync_read(struct drbd_conf *mdev, sector_t sector, int data_size) __releases(local)
1633 struct drbd_peer_request *peer_req;
1635 peer_req = read_in_block(mdev, ID_SYNCER, sector, data_size);
1639 dec_rs_pending(mdev);
1642 /* corresponding dec_unacked() in e_end_resync_block()
1643 * respective _drbd_clear_done_ee */
1645 peer_req->w.cb = e_end_resync_block;
1647 spin_lock_irq(&mdev->tconn->req_lock);
1648 list_add(&peer_req->w.list, &mdev->sync_ee);
1649 spin_unlock_irq(&mdev->tconn->req_lock);
1651 atomic_add(data_size >> 9, &mdev->rs_sect_ev);
1652 if (drbd_submit_peer_request(mdev, peer_req, WRITE, DRBD_FAULT_RS_WR) == 0)
1655 /* don't care for the reason here */
1656 dev_err(DEV, "submit failed, triggering re-connect\n");
1657 spin_lock_irq(&mdev->tconn->req_lock);
1658 list_del(&peer_req->w.list);
1659 spin_unlock_irq(&mdev->tconn->req_lock);
1661 drbd_free_peer_req(mdev, peer_req);
1667 static struct drbd_request *
1668 find_request(struct drbd_conf *mdev, struct rb_root *root, u64 id,
1669 sector_t sector, bool missing_ok, const char *func)
1671 struct drbd_request *req;
1673 /* Request object according to our peer */
1674 req = (struct drbd_request *)(unsigned long)id;
1675 if (drbd_contains_interval(root, sector, &req->i) && req->i.local)
1678 dev_err(DEV, "%s: failed to find request 0x%lx, sector %llus\n", func,
1679 (unsigned long)id, (unsigned long long)sector);
1684 static int receive_DataReply(struct drbd_tconn *tconn, struct packet_info *pi)
1686 struct drbd_conf *mdev;
1687 struct drbd_request *req;
1690 struct p_data *p = pi->data;
1692 mdev = vnr_to_mdev(tconn, pi->vnr);
1696 sector = be64_to_cpu(p->sector);
1698 spin_lock_irq(&mdev->tconn->req_lock);
1699 req = find_request(mdev, &mdev->read_requests, p->block_id, sector, false, __func__);
1700 spin_unlock_irq(&mdev->tconn->req_lock);
1704 /* hlist_del(&req->collision) is done in _req_may_be_done, to avoid
1705 * special casing it there for the various failure cases.
1706 * still no race with drbd_fail_pending_reads */
1707 err = recv_dless_read(mdev, req, sector, pi->size);
1709 req_mod(req, DATA_RECEIVED);
1710 /* else: nothing. handled from drbd_disconnect...
1711 * I don't think we may complete this just yet
1712 * in case we are "on-disconnect: freeze" */
1717 static int receive_RSDataReply(struct drbd_tconn *tconn, struct packet_info *pi)
1719 struct drbd_conf *mdev;
1722 struct p_data *p = pi->data;
1724 mdev = vnr_to_mdev(tconn, pi->vnr);
1728 sector = be64_to_cpu(p->sector);
1729 D_ASSERT(p->block_id == ID_SYNCER);
1731 if (get_ldev(mdev)) {
1732 /* data is submitted to disk within recv_resync_read.
1733 * corresponding put_ldev done below on error,
1734 * or in drbd_peer_request_endio. */
1735 err = recv_resync_read(mdev, sector, pi->size);
1737 if (__ratelimit(&drbd_ratelimit_state))
1738 dev_err(DEV, "Can not write resync data to local disk.\n");
1740 err = drbd_drain_block(mdev, pi->size);
1742 drbd_send_ack_dp(mdev, P_NEG_ACK, p, pi->size);
1745 atomic_add(pi->size >> 9, &mdev->rs_sect_in);
1750 static int w_restart_write(struct drbd_work *w, int cancel)
1752 struct drbd_request *req = container_of(w, struct drbd_request, w);
1753 struct drbd_conf *mdev = w->mdev;
1755 unsigned long start_time;
1756 unsigned long flags;
1758 spin_lock_irqsave(&mdev->tconn->req_lock, flags);
1759 if (!expect(req->rq_state & RQ_POSTPONED)) {
1760 spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
1763 bio = req->master_bio;
1764 start_time = req->start_time;
1765 /* Postponed requests will not have their master_bio completed! */
1766 __req_mod(req, DISCARD_WRITE, NULL);
1767 spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
1769 while (__drbd_make_request(mdev, bio, start_time))
1774 static void restart_conflicting_writes(struct drbd_conf *mdev,
1775 sector_t sector, int size)
1777 struct drbd_interval *i;
1778 struct drbd_request *req;
1780 drbd_for_each_overlap(i, &mdev->write_requests, sector, size) {
1783 req = container_of(i, struct drbd_request, i);
1784 if (req->rq_state & RQ_LOCAL_PENDING ||
1785 !(req->rq_state & RQ_POSTPONED))
1787 if (expect(list_empty(&req->w.list))) {
1789 req->w.cb = w_restart_write;
1790 drbd_queue_work(&mdev->tconn->data.work, &req->w);
1796 * e_end_block() is called in asender context via drbd_finish_peer_reqs().
1798 static int e_end_block(struct drbd_work *w, int cancel)
1800 struct drbd_peer_request *peer_req =
1801 container_of(w, struct drbd_peer_request, w);
1802 struct drbd_conf *mdev = w->mdev;
1803 sector_t sector = peer_req->i.sector;
1806 if (peer_req->flags & EE_SEND_WRITE_ACK) {
1807 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1808 pcmd = (mdev->state.conn >= C_SYNC_SOURCE &&
1809 mdev->state.conn <= C_PAUSED_SYNC_T &&
1810 peer_req->flags & EE_MAY_SET_IN_SYNC) ?
1811 P_RS_WRITE_ACK : P_WRITE_ACK;
1812 err = drbd_send_ack(mdev, pcmd, peer_req);
1813 if (pcmd == P_RS_WRITE_ACK)
1814 drbd_set_in_sync(mdev, sector, peer_req->i.size);
1816 err = drbd_send_ack(mdev, P_NEG_ACK, peer_req);
1817 /* we expect it to be marked out of sync anyways...
1818 * maybe assert this? */
1822 /* we delete from the conflict detection hash _after_ we sent out the
1823 * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right. */
1824 if (peer_req->flags & EE_IN_INTERVAL_TREE) {
1825 spin_lock_irq(&mdev->tconn->req_lock);
1826 D_ASSERT(!drbd_interval_empty(&peer_req->i));
1827 drbd_remove_epoch_entry_interval(mdev, peer_req);
1828 if (peer_req->flags & EE_RESTART_REQUESTS)
1829 restart_conflicting_writes(mdev, sector, peer_req->i.size);
1830 spin_unlock_irq(&mdev->tconn->req_lock);
1832 D_ASSERT(drbd_interval_empty(&peer_req->i));
1834 drbd_may_finish_epoch(mdev->tconn, peer_req->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0));
1839 static int e_send_ack(struct drbd_work *w, enum drbd_packet ack)
1841 struct drbd_conf *mdev = w->mdev;
1842 struct drbd_peer_request *peer_req =
1843 container_of(w, struct drbd_peer_request, w);
1846 err = drbd_send_ack(mdev, ack, peer_req);
1852 static int e_send_discard_write(struct drbd_work *w, int unused)
1854 return e_send_ack(w, P_DISCARD_WRITE);
1857 static int e_send_retry_write(struct drbd_work *w, int unused)
1859 struct drbd_tconn *tconn = w->mdev->tconn;
1861 return e_send_ack(w, tconn->agreed_pro_version >= 100 ?
1862 P_RETRY_WRITE : P_DISCARD_WRITE);
1865 static bool seq_greater(u32 a, u32 b)
1868 * We assume 32-bit wrap-around here.
1869 * For 24-bit wrap-around, we would have to shift:
1872 return (s32)a - (s32)b > 0;
1875 static u32 seq_max(u32 a, u32 b)
1877 return seq_greater(a, b) ? a : b;
1880 static bool need_peer_seq(struct drbd_conf *mdev)
1882 struct drbd_tconn *tconn = mdev->tconn;
1886 * We only need to keep track of the last packet_seq number of our peer
1887 * if we are in dual-primary mode and we have the discard flag set; see
1888 * handle_write_conflicts().
1892 tp = rcu_dereference(mdev->tconn->net_conf)->two_primaries;
1895 return tp && test_bit(DISCARD_CONCURRENT, &tconn->flags);
1898 static void update_peer_seq(struct drbd_conf *mdev, unsigned int peer_seq)
1900 unsigned int newest_peer_seq;
1902 if (need_peer_seq(mdev)) {
1903 spin_lock(&mdev->peer_seq_lock);
1904 newest_peer_seq = seq_max(mdev->peer_seq, peer_seq);
1905 mdev->peer_seq = newest_peer_seq;
1906 spin_unlock(&mdev->peer_seq_lock);
1907 /* wake up only if we actually changed mdev->peer_seq */
1908 if (peer_seq == newest_peer_seq)
1909 wake_up(&mdev->seq_wait);
1913 /* Called from receive_Data.
1914 * Synchronize packets on sock with packets on msock.
1916 * This is here so even when a P_DATA packet traveling via sock overtook an Ack
1917 * packet traveling on msock, they are still processed in the order they have
1920 * Note: we don't care for Ack packets overtaking P_DATA packets.
1922 * In case packet_seq is larger than mdev->peer_seq number, there are
1923 * outstanding packets on the msock. We wait for them to arrive.
1924 * In case we are the logically next packet, we update mdev->peer_seq
1925 * ourselves. Correctly handles 32bit wrap around.
1927 * Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
1928 * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
1929 * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
1930 * 1<<9 == 512 seconds aka ages for the 32bit wrap around...
1932 * returns 0 if we may process the packet,
1933 * -ERESTARTSYS if we were interrupted (by disconnect signal). */
1934 static int wait_for_and_update_peer_seq(struct drbd_conf *mdev, const u32 peer_seq)
1940 if (!need_peer_seq(mdev))
1943 spin_lock(&mdev->peer_seq_lock);
1945 if (!seq_greater(peer_seq - 1, mdev->peer_seq)) {
1946 mdev->peer_seq = seq_max(mdev->peer_seq, peer_seq);
1950 if (signal_pending(current)) {
1954 prepare_to_wait(&mdev->seq_wait, &wait, TASK_INTERRUPTIBLE);
1955 spin_unlock(&mdev->peer_seq_lock);
1957 timeout = rcu_dereference(mdev->tconn->net_conf)->ping_timeo*HZ/10;
1959 timeout = schedule_timeout(timeout);
1960 spin_lock(&mdev->peer_seq_lock);
1963 dev_err(DEV, "Timed out waiting for missing ack packets; disconnecting\n");
1967 spin_unlock(&mdev->peer_seq_lock);
1968 finish_wait(&mdev->seq_wait, &wait);
1972 /* see also bio_flags_to_wire()
1973 * DRBD_REQ_*, because we need to semantically map the flags to data packet
1974 * flags and back. We may replicate to other kernel versions. */
1975 static unsigned long wire_flags_to_bio(struct drbd_conf *mdev, u32 dpf)
1977 return (dpf & DP_RW_SYNC ? REQ_SYNC : 0) |
1978 (dpf & DP_FUA ? REQ_FUA : 0) |
1979 (dpf & DP_FLUSH ? REQ_FLUSH : 0) |
1980 (dpf & DP_DISCARD ? REQ_DISCARD : 0);
1983 static void fail_postponed_requests(struct drbd_conf *mdev, sector_t sector,
1986 struct drbd_interval *i;
1989 drbd_for_each_overlap(i, &mdev->write_requests, sector, size) {
1990 struct drbd_request *req;
1991 struct bio_and_error m;
1995 req = container_of(i, struct drbd_request, i);
1996 if (!(req->rq_state & RQ_POSTPONED))
1998 req->rq_state &= ~RQ_POSTPONED;
1999 __req_mod(req, NEG_ACKED, &m);
2000 spin_unlock_irq(&mdev->tconn->req_lock);
2002 complete_master_bio(mdev, &m);
2003 spin_lock_irq(&mdev->tconn->req_lock);
2008 static int handle_write_conflicts(struct drbd_conf *mdev,
2009 struct drbd_peer_request *peer_req)
2011 struct drbd_tconn *tconn = mdev->tconn;
2012 bool resolve_conflicts = test_bit(DISCARD_CONCURRENT, &tconn->flags);
2013 sector_t sector = peer_req->i.sector;
2014 const unsigned int size = peer_req->i.size;
2015 struct drbd_interval *i;
2020 * Inserting the peer request into the write_requests tree will prevent
2021 * new conflicting local requests from being added.
2023 drbd_insert_interval(&mdev->write_requests, &peer_req->i);
2026 drbd_for_each_overlap(i, &mdev->write_requests, sector, size) {
2027 if (i == &peer_req->i)
2032 * Our peer has sent a conflicting remote request; this
2033 * should not happen in a two-node setup. Wait for the
2034 * earlier peer request to complete.
2036 err = drbd_wait_misc(mdev, i);
2042 equal = i->sector == sector && i->size == size;
2043 if (resolve_conflicts) {
2045 * If the peer request is fully contained within the
2046 * overlapping request, it can be discarded; otherwise,
2047 * it will be retried once all overlapping requests
2050 bool discard = i->sector <= sector && i->sector +
2051 (i->size >> 9) >= sector + (size >> 9);
2054 dev_alert(DEV, "Concurrent writes detected: "
2055 "local=%llus +%u, remote=%llus +%u, "
2056 "assuming %s came first\n",
2057 (unsigned long long)i->sector, i->size,
2058 (unsigned long long)sector, size,
2059 discard ? "local" : "remote");
2062 peer_req->w.cb = discard ? e_send_discard_write :
2064 list_add_tail(&peer_req->w.list, &mdev->done_ee);
2065 wake_asender(mdev->tconn);
2070 struct drbd_request *req =
2071 container_of(i, struct drbd_request, i);
2074 dev_alert(DEV, "Concurrent writes detected: "
2075 "local=%llus +%u, remote=%llus +%u\n",
2076 (unsigned long long)i->sector, i->size,
2077 (unsigned long long)sector, size);
2079 if (req->rq_state & RQ_LOCAL_PENDING ||
2080 !(req->rq_state & RQ_POSTPONED)) {
2082 * Wait for the node with the discard flag to
2083 * decide if this request will be discarded or
2084 * retried. Requests that are discarded will
2085 * disappear from the write_requests tree.
2087 * In addition, wait for the conflicting
2088 * request to finish locally before submitting
2089 * the conflicting peer request.
2091 err = drbd_wait_misc(mdev, &req->i);
2093 _conn_request_state(mdev->tconn,
2094 NS(conn, C_TIMEOUT),
2096 fail_postponed_requests(mdev, sector, size);
2102 * Remember to restart the conflicting requests after
2103 * the new peer request has completed.
2105 peer_req->flags |= EE_RESTART_REQUESTS;
2112 drbd_remove_epoch_entry_interval(mdev, peer_req);
2116 /* mirrored write */
2117 static int receive_Data(struct drbd_tconn *tconn, struct packet_info *pi)
2119 struct drbd_conf *mdev;
2121 struct drbd_peer_request *peer_req;
2122 struct p_data *p = pi->data;
2123 u32 peer_seq = be32_to_cpu(p->seq_num);
2128 mdev = vnr_to_mdev(tconn, pi->vnr);
2132 if (!get_ldev(mdev)) {
2135 err = wait_for_and_update_peer_seq(mdev, peer_seq);
2136 drbd_send_ack_dp(mdev, P_NEG_ACK, p, pi->size);
2137 atomic_inc(&tconn->current_epoch->epoch_size);
2138 err2 = drbd_drain_block(mdev, pi->size);
2145 * Corresponding put_ldev done either below (on various errors), or in
2146 * drbd_peer_request_endio, if we successfully submit the data at the
2147 * end of this function.
2150 sector = be64_to_cpu(p->sector);
2151 peer_req = read_in_block(mdev, p->block_id, sector, pi->size);
2157 peer_req->w.cb = e_end_block;
2159 dp_flags = be32_to_cpu(p->dp_flags);
2160 rw |= wire_flags_to_bio(mdev, dp_flags);
2162 if (dp_flags & DP_MAY_SET_IN_SYNC)
2163 peer_req->flags |= EE_MAY_SET_IN_SYNC;
2165 spin_lock(&tconn->epoch_lock);
2166 peer_req->epoch = tconn->current_epoch;
2167 atomic_inc(&peer_req->epoch->epoch_size);
2168 atomic_inc(&peer_req->epoch->active);
2169 spin_unlock(&tconn->epoch_lock);
2172 tp = rcu_dereference(mdev->tconn->net_conf)->two_primaries;
2175 peer_req->flags |= EE_IN_INTERVAL_TREE;
2176 err = wait_for_and_update_peer_seq(mdev, peer_seq);
2178 goto out_interrupted;
2179 spin_lock_irq(&mdev->tconn->req_lock);
2180 err = handle_write_conflicts(mdev, peer_req);
2182 spin_unlock_irq(&mdev->tconn->req_lock);
2183 if (err == -ENOENT) {
2187 goto out_interrupted;
2190 spin_lock_irq(&mdev->tconn->req_lock);
2191 list_add(&peer_req->w.list, &mdev->active_ee);
2192 spin_unlock_irq(&mdev->tconn->req_lock);
2194 if (mdev->tconn->agreed_pro_version < 100) {
2196 switch (rcu_dereference(mdev->tconn->net_conf)->wire_protocol) {
2198 dp_flags |= DP_SEND_WRITE_ACK;
2201 dp_flags |= DP_SEND_RECEIVE_ACK;
2207 if (dp_flags & DP_SEND_WRITE_ACK) {
2208 peer_req->flags |= EE_SEND_WRITE_ACK;
2210 /* corresponding dec_unacked() in e_end_block()
2211 * respective _drbd_clear_done_ee */
2214 if (dp_flags & DP_SEND_RECEIVE_ACK) {
2215 /* I really don't like it that the receiver thread
2216 * sends on the msock, but anyways */
2217 drbd_send_ack(mdev, P_RECV_ACK, peer_req);
2220 if (mdev->state.pdsk < D_INCONSISTENT) {
2221 /* In case we have the only disk of the cluster, */
2222 drbd_set_out_of_sync(mdev, peer_req->i.sector, peer_req->i.size);
2223 peer_req->flags |= EE_CALL_AL_COMPLETE_IO;
2224 peer_req->flags &= ~EE_MAY_SET_IN_SYNC;
2225 drbd_al_begin_io(mdev, &peer_req->i);
2228 err = drbd_submit_peer_request(mdev, peer_req, rw, DRBD_FAULT_DT_WR);
2232 /* don't care for the reason here */
2233 dev_err(DEV, "submit failed, triggering re-connect\n");
2234 spin_lock_irq(&mdev->tconn->req_lock);
2235 list_del(&peer_req->w.list);
2236 drbd_remove_epoch_entry_interval(mdev, peer_req);
2237 spin_unlock_irq(&mdev->tconn->req_lock);
2238 if (peer_req->flags & EE_CALL_AL_COMPLETE_IO)
2239 drbd_al_complete_io(mdev, &peer_req->i);
2242 drbd_may_finish_epoch(tconn, peer_req->epoch, EV_PUT + EV_CLEANUP);
2244 drbd_free_peer_req(mdev, peer_req);
2248 /* We may throttle resync, if the lower device seems to be busy,
2249 * and current sync rate is above c_min_rate.
2251 * To decide whether or not the lower device is busy, we use a scheme similar
2252 * to MD RAID is_mddev_idle(): if the partition stats reveal "significant"
2253 * (more than 64 sectors) of activity we cannot account for with our own resync
2254 * activity, it obviously is "busy".
2256 * The current sync rate used here uses only the most recent two step marks,
2257 * to have a short time average so we can react faster.
2259 int drbd_rs_should_slow_down(struct drbd_conf *mdev, sector_t sector)
2261 struct gendisk *disk = mdev->ldev->backing_bdev->bd_contains->bd_disk;
2262 unsigned long db, dt, dbdt;
2263 struct lc_element *tmp;
2266 unsigned int c_min_rate;
2269 c_min_rate = rcu_dereference(mdev->ldev->disk_conf)->c_min_rate;
2272 /* feature disabled? */
2273 if (c_min_rate == 0)
2276 spin_lock_irq(&mdev->al_lock);
2277 tmp = lc_find(mdev->resync, BM_SECT_TO_EXT(sector));
2279 struct bm_extent *bm_ext = lc_entry(tmp, struct bm_extent, lce);
2280 if (test_bit(BME_PRIORITY, &bm_ext->flags)) {
2281 spin_unlock_irq(&mdev->al_lock);
2284 /* Do not slow down if app IO is already waiting for this extent */
2286 spin_unlock_irq(&mdev->al_lock);
2288 curr_events = (int)part_stat_read(&disk->part0, sectors[0]) +
2289 (int)part_stat_read(&disk->part0, sectors[1]) -
2290 atomic_read(&mdev->rs_sect_ev);
2292 if (!mdev->rs_last_events || curr_events - mdev->rs_last_events > 64) {
2293 unsigned long rs_left;
2296 mdev->rs_last_events = curr_events;
2298 /* sync speed average over the last 2*DRBD_SYNC_MARK_STEP,
2300 i = (mdev->rs_last_mark + DRBD_SYNC_MARKS-1) % DRBD_SYNC_MARKS;
2302 if (mdev->state.conn == C_VERIFY_S || mdev->state.conn == C_VERIFY_T)
2303 rs_left = mdev->ov_left;
2305 rs_left = drbd_bm_total_weight(mdev) - mdev->rs_failed;
2307 dt = ((long)jiffies - (long)mdev->rs_mark_time[i]) / HZ;
2310 db = mdev->rs_mark_left[i] - rs_left;
2311 dbdt = Bit2KB(db/dt);
2313 if (dbdt > c_min_rate)
2320 static int receive_DataRequest(struct drbd_tconn *tconn, struct packet_info *pi)
2322 struct drbd_conf *mdev;
2325 struct drbd_peer_request *peer_req;
2326 struct digest_info *di = NULL;
2328 unsigned int fault_type;
2329 struct p_block_req *p = pi->data;
2331 mdev = vnr_to_mdev(tconn, pi->vnr);
2334 capacity = drbd_get_capacity(mdev->this_bdev);
2336 sector = be64_to_cpu(p->sector);
2337 size = be32_to_cpu(p->blksize);
2339 if (size <= 0 || !IS_ALIGNED(size, 512) || size > DRBD_MAX_BIO_SIZE) {
2340 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2341 (unsigned long long)sector, size);
2344 if (sector + (size>>9) > capacity) {
2345 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2346 (unsigned long long)sector, size);
2350 if (!get_ldev_if_state(mdev, D_UP_TO_DATE)) {
2353 case P_DATA_REQUEST:
2354 drbd_send_ack_rp(mdev, P_NEG_DREPLY, p);
2356 case P_RS_DATA_REQUEST:
2357 case P_CSUM_RS_REQUEST:
2359 drbd_send_ack_rp(mdev, P_NEG_RS_DREPLY , p);
2363 dec_rs_pending(mdev);
2364 drbd_send_ack_ex(mdev, P_OV_RESULT, sector, size, ID_IN_SYNC);
2369 if (verb && __ratelimit(&drbd_ratelimit_state))
2370 dev_err(DEV, "Can not satisfy peer's read request, "
2371 "no local data.\n");
2373 /* drain possibly payload */
2374 return drbd_drain_block(mdev, pi->size);
2377 /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
2378 * "criss-cross" setup, that might cause write-out on some other DRBD,
2379 * which in turn might block on the other node at this very place. */
2380 peer_req = drbd_alloc_peer_req(mdev, p->block_id, sector, size, GFP_NOIO);
2387 case P_DATA_REQUEST:
2388 peer_req->w.cb = w_e_end_data_req;
2389 fault_type = DRBD_FAULT_DT_RD;
2390 /* application IO, don't drbd_rs_begin_io */
2393 case P_RS_DATA_REQUEST:
2394 peer_req->w.cb = w_e_end_rsdata_req;
2395 fault_type = DRBD_FAULT_RS_RD;
2396 /* used in the sector offset progress display */
2397 mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
2401 case P_CSUM_RS_REQUEST:
2402 fault_type = DRBD_FAULT_RS_RD;
2403 di = kmalloc(sizeof(*di) + pi->size, GFP_NOIO);
2407 di->digest_size = pi->size;
2408 di->digest = (((char *)di)+sizeof(struct digest_info));
2410 peer_req->digest = di;
2411 peer_req->flags |= EE_HAS_DIGEST;
2413 if (drbd_recv_all(mdev->tconn, di->digest, pi->size))
2416 if (pi->cmd == P_CSUM_RS_REQUEST) {
2417 D_ASSERT(mdev->tconn->agreed_pro_version >= 89);
2418 peer_req->w.cb = w_e_end_csum_rs_req;
2419 /* used in the sector offset progress display */
2420 mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
2421 } else if (pi->cmd == P_OV_REPLY) {
2422 /* track progress, we may need to throttle */
2423 atomic_add(size >> 9, &mdev->rs_sect_in);
2424 peer_req->w.cb = w_e_end_ov_reply;
2425 dec_rs_pending(mdev);
2426 /* drbd_rs_begin_io done when we sent this request,
2427 * but accounting still needs to be done. */
2428 goto submit_for_resync;
2433 if (mdev->ov_start_sector == ~(sector_t)0 &&
2434 mdev->tconn->agreed_pro_version >= 90) {
2435 unsigned long now = jiffies;
2437 mdev->ov_start_sector = sector;
2438 mdev->ov_position = sector;
2439 mdev->ov_left = drbd_bm_bits(mdev) - BM_SECT_TO_BIT(sector);
2440 mdev->rs_total = mdev->ov_left;
2441 for (i = 0; i < DRBD_SYNC_MARKS; i++) {
2442 mdev->rs_mark_left[i] = mdev->ov_left;
2443 mdev->rs_mark_time[i] = now;
2445 dev_info(DEV, "Online Verify start sector: %llu\n",
2446 (unsigned long long)sector);
2448 peer_req->w.cb = w_e_end_ov_req;
2449 fault_type = DRBD_FAULT_RS_RD;
2456 /* Throttle, drbd_rs_begin_io and submit should become asynchronous
2457 * wrt the receiver, but it is not as straightforward as it may seem.
2458 * Various places in the resync start and stop logic assume resync
2459 * requests are processed in order, requeuing this on the worker thread
2460 * introduces a bunch of new code for synchronization between threads.
2462 * Unlimited throttling before drbd_rs_begin_io may stall the resync
2463 * "forever", throttling after drbd_rs_begin_io will lock that extent
2464 * for application writes for the same time. For now, just throttle
2465 * here, where the rest of the code expects the receiver to sleep for
2469 /* Throttle before drbd_rs_begin_io, as that locks out application IO;
2470 * this defers syncer requests for some time, before letting at least
2471 * on request through. The resync controller on the receiving side
2472 * will adapt to the incoming rate accordingly.
2474 * We cannot throttle here if remote is Primary/SyncTarget:
2475 * we would also throttle its application reads.
2476 * In that case, throttling is done on the SyncTarget only.
2478 if (mdev->state.peer != R_PRIMARY && drbd_rs_should_slow_down(mdev, sector))
2479 schedule_timeout_uninterruptible(HZ/10);
2480 if (drbd_rs_begin_io(mdev, sector))
2484 atomic_add(size >> 9, &mdev->rs_sect_ev);
2488 spin_lock_irq(&mdev->tconn->req_lock);
2489 list_add_tail(&peer_req->w.list, &mdev->read_ee);
2490 spin_unlock_irq(&mdev->tconn->req_lock);
2492 if (drbd_submit_peer_request(mdev, peer_req, READ, fault_type) == 0)
2495 /* don't care for the reason here */
2496 dev_err(DEV, "submit failed, triggering re-connect\n");
2497 spin_lock_irq(&mdev->tconn->req_lock);
2498 list_del(&peer_req->w.list);
2499 spin_unlock_irq(&mdev->tconn->req_lock);
2500 /* no drbd_rs_complete_io(), we are dropping the connection anyways */
2504 drbd_free_peer_req(mdev, peer_req);
2508 static int drbd_asb_recover_0p(struct drbd_conf *mdev) __must_hold(local)
2510 int self, peer, rv = -100;
2511 unsigned long ch_self, ch_peer;
2512 enum drbd_after_sb_p after_sb_0p;
2514 self = mdev->ldev->md.uuid[UI_BITMAP] & 1;
2515 peer = mdev->p_uuid[UI_BITMAP] & 1;
2517 ch_peer = mdev->p_uuid[UI_SIZE];
2518 ch_self = mdev->comm_bm_set;
2521 after_sb_0p = rcu_dereference(mdev->tconn->net_conf)->after_sb_0p;
2523 switch (after_sb_0p) {
2525 case ASB_DISCARD_SECONDARY:
2526 case ASB_CALL_HELPER:
2528 dev_err(DEV, "Configuration error.\n");
2530 case ASB_DISCONNECT:
2532 case ASB_DISCARD_YOUNGER_PRI:
2533 if (self == 0 && peer == 1) {
2537 if (self == 1 && peer == 0) {
2541 /* Else fall through to one of the other strategies... */
2542 case ASB_DISCARD_OLDER_PRI:
2543 if (self == 0 && peer == 1) {
2547 if (self == 1 && peer == 0) {
2551 /* Else fall through to one of the other strategies... */
2552 dev_warn(DEV, "Discard younger/older primary did not find a decision\n"
2553 "Using discard-least-changes instead\n");
2554 case ASB_DISCARD_ZERO_CHG:
2555 if (ch_peer == 0 && ch_self == 0) {
2556 rv = test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags)
2560 if (ch_peer == 0) { rv = 1; break; }
2561 if (ch_self == 0) { rv = -1; break; }
2563 if (after_sb_0p == ASB_DISCARD_ZERO_CHG)
2565 case ASB_DISCARD_LEAST_CHG:
2566 if (ch_self < ch_peer)
2568 else if (ch_self > ch_peer)
2570 else /* ( ch_self == ch_peer ) */
2571 /* Well, then use something else. */
2572 rv = test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags)
2575 case ASB_DISCARD_LOCAL:
2578 case ASB_DISCARD_REMOTE:
2585 static int drbd_asb_recover_1p(struct drbd_conf *mdev) __must_hold(local)
2588 enum drbd_after_sb_p after_sb_1p;
2591 after_sb_1p = rcu_dereference(mdev->tconn->net_conf)->after_sb_1p;
2593 switch (after_sb_1p) {
2594 case ASB_DISCARD_YOUNGER_PRI:
2595 case ASB_DISCARD_OLDER_PRI:
2596 case ASB_DISCARD_LEAST_CHG:
2597 case ASB_DISCARD_LOCAL:
2598 case ASB_DISCARD_REMOTE:
2599 case ASB_DISCARD_ZERO_CHG:
2600 dev_err(DEV, "Configuration error.\n");
2602 case ASB_DISCONNECT:
2605 hg = drbd_asb_recover_0p(mdev);
2606 if (hg == -1 && mdev->state.role == R_SECONDARY)
2608 if (hg == 1 && mdev->state.role == R_PRIMARY)
2612 rv = drbd_asb_recover_0p(mdev);
2614 case ASB_DISCARD_SECONDARY:
2615 return mdev->state.role == R_PRIMARY ? 1 : -1;
2616 case ASB_CALL_HELPER:
2617 hg = drbd_asb_recover_0p(mdev);
2618 if (hg == -1 && mdev->state.role == R_PRIMARY) {
2619 enum drbd_state_rv rv2;
2621 drbd_set_role(mdev, R_SECONDARY, 0);
2622 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2623 * we might be here in C_WF_REPORT_PARAMS which is transient.
2624 * we do not need to wait for the after state change work either. */
2625 rv2 = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2626 if (rv2 != SS_SUCCESS) {
2627 drbd_khelper(mdev, "pri-lost-after-sb");
2629 dev_warn(DEV, "Successfully gave up primary role.\n");
2639 static int drbd_asb_recover_2p(struct drbd_conf *mdev) __must_hold(local)
2642 enum drbd_after_sb_p after_sb_2p;
2645 after_sb_2p = rcu_dereference(mdev->tconn->net_conf)->after_sb_2p;
2647 switch (after_sb_2p) {
2648 case ASB_DISCARD_YOUNGER_PRI:
2649 case ASB_DISCARD_OLDER_PRI:
2650 case ASB_DISCARD_LEAST_CHG:
2651 case ASB_DISCARD_LOCAL:
2652 case ASB_DISCARD_REMOTE:
2654 case ASB_DISCARD_SECONDARY:
2655 case ASB_DISCARD_ZERO_CHG:
2656 dev_err(DEV, "Configuration error.\n");
2659 rv = drbd_asb_recover_0p(mdev);
2661 case ASB_DISCONNECT:
2663 case ASB_CALL_HELPER:
2664 hg = drbd_asb_recover_0p(mdev);
2666 enum drbd_state_rv rv2;
2668 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2669 * we might be here in C_WF_REPORT_PARAMS which is transient.
2670 * we do not need to wait for the after state change work either. */
2671 rv2 = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2672 if (rv2 != SS_SUCCESS) {
2673 drbd_khelper(mdev, "pri-lost-after-sb");
2675 dev_warn(DEV, "Successfully gave up primary role.\n");
2685 static void drbd_uuid_dump(struct drbd_conf *mdev, char *text, u64 *uuid,
2686 u64 bits, u64 flags)
2689 dev_info(DEV, "%s uuid info vanished while I was looking!\n", text);
2692 dev_info(DEV, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",
2694 (unsigned long long)uuid[UI_CURRENT],
2695 (unsigned long long)uuid[UI_BITMAP],
2696 (unsigned long long)uuid[UI_HISTORY_START],
2697 (unsigned long long)uuid[UI_HISTORY_END],
2698 (unsigned long long)bits,
2699 (unsigned long long)flags);
2703 100 after split brain try auto recover
2704 2 C_SYNC_SOURCE set BitMap
2705 1 C_SYNC_SOURCE use BitMap
2707 -1 C_SYNC_TARGET use BitMap
2708 -2 C_SYNC_TARGET set BitMap
2709 -100 after split brain, disconnect
2710 -1000 unrelated data
2711 -1091 requires proto 91
2712 -1096 requires proto 96
2714 static int drbd_uuid_compare(struct drbd_conf *mdev, int *rule_nr) __must_hold(local)
2719 self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2720 peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2723 if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED)
2727 if ((self == UUID_JUST_CREATED || self == (u64)0) &&
2728 peer != UUID_JUST_CREATED)
2732 if (self != UUID_JUST_CREATED &&
2733 (peer == UUID_JUST_CREATED || peer == (u64)0))
2737 int rct, dc; /* roles at crash time */
2739 if (mdev->p_uuid[UI_BITMAP] == (u64)0 && mdev->ldev->md.uuid[UI_BITMAP] != (u64)0) {
2741 if (mdev->tconn->agreed_pro_version < 91)
2744 if ((mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) &&
2745 (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) {
2746 dev_info(DEV, "was SyncSource, missed the resync finished event, corrected myself:\n");
2747 drbd_uuid_set_bm(mdev, 0UL);
2749 drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2750 mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2753 dev_info(DEV, "was SyncSource (peer failed to write sync_uuid)\n");
2760 if (mdev->ldev->md.uuid[UI_BITMAP] == (u64)0 && mdev->p_uuid[UI_BITMAP] != (u64)0) {
2762 if (mdev->tconn->agreed_pro_version < 91)
2765 if ((mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_BITMAP] & ~((u64)1)) &&
2766 (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1))) {
2767 dev_info(DEV, "was SyncTarget, peer missed the resync finished event, corrected peer:\n");
2769 mdev->p_uuid[UI_HISTORY_START + 1] = mdev->p_uuid[UI_HISTORY_START];
2770 mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_BITMAP];
2771 mdev->p_uuid[UI_BITMAP] = 0UL;
2773 drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2776 dev_info(DEV, "was SyncTarget (failed to write sync_uuid)\n");
2783 /* Common power [off|failure] */
2784 rct = (test_bit(CRASHED_PRIMARY, &mdev->flags) ? 1 : 0) +
2785 (mdev->p_uuid[UI_FLAGS] & 2);
2786 /* lowest bit is set when we were primary,
2787 * next bit (weight 2) is set when peer was primary */
2791 case 0: /* !self_pri && !peer_pri */ return 0;
2792 case 1: /* self_pri && !peer_pri */ return 1;
2793 case 2: /* !self_pri && peer_pri */ return -1;
2794 case 3: /* self_pri && peer_pri */
2795 dc = test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags);
2801 peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2806 peer = mdev->p_uuid[UI_HISTORY_START] & ~((u64)1);
2808 if (mdev->tconn->agreed_pro_version < 96 ?
2809 (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) ==
2810 (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1)) :
2811 peer + UUID_NEW_BM_OFFSET == (mdev->p_uuid[UI_BITMAP] & ~((u64)1))) {
2812 /* The last P_SYNC_UUID did not get though. Undo the last start of
2813 resync as sync source modifications of the peer's UUIDs. */
2815 if (mdev->tconn->agreed_pro_version < 91)
2818 mdev->p_uuid[UI_BITMAP] = mdev->p_uuid[UI_HISTORY_START];
2819 mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_HISTORY_START + 1];
2821 dev_info(DEV, "Did not got last syncUUID packet, corrected:\n");
2822 drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2829 self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2830 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2831 peer = mdev->p_uuid[i] & ~((u64)1);
2837 self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2838 peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2843 self = mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
2845 if (mdev->tconn->agreed_pro_version < 96 ?
2846 (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) ==
2847 (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) :
2848 self + UUID_NEW_BM_OFFSET == (mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1))) {
2849 /* The last P_SYNC_UUID did not get though. Undo the last start of
2850 resync as sync source modifications of our UUIDs. */
2852 if (mdev->tconn->agreed_pro_version < 91)
2855 _drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_HISTORY_START]);
2856 _drbd_uuid_set(mdev, UI_HISTORY_START, mdev->ldev->md.uuid[UI_HISTORY_START + 1]);
2858 dev_info(DEV, "Last syncUUID did not get through, corrected:\n");
2859 drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2860 mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2868 peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2869 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2870 self = mdev->ldev->md.uuid[i] & ~((u64)1);
2876 self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2877 peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2878 if (self == peer && self != ((u64)0))
2882 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2883 self = mdev->ldev->md.uuid[i] & ~((u64)1);
2884 for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) {
2885 peer = mdev->p_uuid[j] & ~((u64)1);
2894 /* drbd_sync_handshake() returns the new conn state on success, or
2895 CONN_MASK (-1) on failure.
2897 static enum drbd_conns drbd_sync_handshake(struct drbd_conf *mdev, enum drbd_role peer_role,
2898 enum drbd_disk_state peer_disk) __must_hold(local)
2900 enum drbd_conns rv = C_MASK;
2901 enum drbd_disk_state mydisk;
2902 struct net_conf *nc;
2903 int hg, rule_nr, rr_conflict, tentative;
2905 mydisk = mdev->state.disk;
2906 if (mydisk == D_NEGOTIATING)
2907 mydisk = mdev->new_state_tmp.disk;
2909 dev_info(DEV, "drbd_sync_handshake:\n");
2910 drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid, mdev->comm_bm_set, 0);
2911 drbd_uuid_dump(mdev, "peer", mdev->p_uuid,
2912 mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2914 hg = drbd_uuid_compare(mdev, &rule_nr);
2916 dev_info(DEV, "uuid_compare()=%d by rule %d\n", hg, rule_nr);
2919 dev_alert(DEV, "Unrelated data, aborting!\n");
2923 dev_alert(DEV, "To resolve this both sides have to support at least protocol %d\n", -hg - 1000);
2927 if ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) ||
2928 (peer_disk == D_INCONSISTENT && mydisk > D_INCONSISTENT)) {
2929 int f = (hg == -100) || abs(hg) == 2;
2930 hg = mydisk > D_INCONSISTENT ? 1 : -1;
2933 dev_info(DEV, "Becoming sync %s due to disk states.\n",
2934 hg > 0 ? "source" : "target");
2938 drbd_khelper(mdev, "initial-split-brain");
2941 nc = rcu_dereference(mdev->tconn->net_conf);
2943 if (hg == 100 || (hg == -100 && nc->always_asbp)) {
2944 int pcount = (mdev->state.role == R_PRIMARY)
2945 + (peer_role == R_PRIMARY);
2946 int forced = (hg == -100);
2950 hg = drbd_asb_recover_0p(mdev);
2953 hg = drbd_asb_recover_1p(mdev);
2956 hg = drbd_asb_recover_2p(mdev);
2959 if (abs(hg) < 100) {
2960 dev_warn(DEV, "Split-Brain detected, %d primaries, "
2961 "automatically solved. Sync from %s node\n",
2962 pcount, (hg < 0) ? "peer" : "this");
2964 dev_warn(DEV, "Doing a full sync, since"
2965 " UUIDs where ambiguous.\n");
2972 if (test_bit(DISCARD_MY_DATA, &mdev->flags) && !(mdev->p_uuid[UI_FLAGS]&1))
2974 if (!test_bit(DISCARD_MY_DATA, &mdev->flags) && (mdev->p_uuid[UI_FLAGS]&1))
2978 dev_warn(DEV, "Split-Brain detected, manually solved. "
2979 "Sync from %s node\n",
2980 (hg < 0) ? "peer" : "this");
2982 rr_conflict = nc->rr_conflict;
2983 tentative = nc->tentative;
2987 /* FIXME this log message is not correct if we end up here
2988 * after an attempted attach on a diskless node.
2989 * We just refuse to attach -- well, we drop the "connection"
2990 * to that disk, in a way... */
2991 dev_alert(DEV, "Split-Brain detected but unresolved, dropping connection!\n");
2992 drbd_khelper(mdev, "split-brain");
2996 if (hg > 0 && mydisk <= D_INCONSISTENT) {
2997 dev_err(DEV, "I shall become SyncSource, but I am inconsistent!\n");
3001 if (hg < 0 && /* by intention we do not use mydisk here. */
3002 mdev->state.role == R_PRIMARY && mdev->state.disk >= D_CONSISTENT) {
3003 switch (rr_conflict) {
3004 case ASB_CALL_HELPER:
3005 drbd_khelper(mdev, "pri-lost");
3007 case ASB_DISCONNECT:
3008 dev_err(DEV, "I shall become SyncTarget, but I am primary!\n");
3011 dev_warn(DEV, "Becoming SyncTarget, violating the stable-data"
3016 if (tentative || test_bit(CONN_DRY_RUN, &mdev->tconn->flags)) {
3018 dev_info(DEV, "dry-run connect: No resync, would become Connected immediately.\n");
3020 dev_info(DEV, "dry-run connect: Would become %s, doing a %s resync.",
3021 drbd_conn_str(hg > 0 ? C_SYNC_SOURCE : C_SYNC_TARGET),
3022 abs(hg) >= 2 ? "full" : "bit-map based");
3027 dev_info(DEV, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n");
3028 if (drbd_bitmap_io(mdev, &drbd_bmio_set_n_write, "set_n_write from sync_handshake",
3029 BM_LOCKED_SET_ALLOWED))
3033 if (hg > 0) { /* become sync source. */
3035 } else if (hg < 0) { /* become sync target */
3039 if (drbd_bm_total_weight(mdev)) {
3040 dev_info(DEV, "No resync, but %lu bits in bitmap!\n",
3041 drbd_bm_total_weight(mdev));
3048 static enum drbd_after_sb_p convert_after_sb(enum drbd_after_sb_p peer)
3050 /* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */
3051 if (peer == ASB_DISCARD_REMOTE)
3052 return ASB_DISCARD_LOCAL;
3054 /* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */
3055 if (peer == ASB_DISCARD_LOCAL)
3056 return ASB_DISCARD_REMOTE;
3058 /* everything else is valid if they are equal on both sides. */
3062 static int receive_protocol(struct drbd_tconn *tconn, struct packet_info *pi)
3064 struct p_protocol *p = pi->data;
3065 enum drbd_after_sb_p p_after_sb_0p, p_after_sb_1p, p_after_sb_2p;
3066 int p_proto, p_discard_my_data, p_two_primaries, cf;
3067 struct net_conf *nc, *old_net_conf, *new_net_conf = NULL;
3068 char integrity_alg[SHARED_SECRET_MAX] = "";
3069 struct crypto_hash *peer_integrity_tfm = NULL;
3070 void *int_dig_in = NULL, *int_dig_vv = NULL;
3072 p_proto = be32_to_cpu(p->protocol);
3073 p_after_sb_0p = be32_to_cpu(p->after_sb_0p);
3074 p_after_sb_1p = be32_to_cpu(p->after_sb_1p);
3075 p_after_sb_2p = be32_to_cpu(p->after_sb_2p);
3076 p_two_primaries = be32_to_cpu(p->two_primaries);
3077 cf = be32_to_cpu(p->conn_flags);
3078 p_discard_my_data = cf & CF_DISCARD_MY_DATA;
3080 if (tconn->agreed_pro_version >= 87) {
3083 if (pi->size > sizeof(integrity_alg))
3085 err = drbd_recv_all(tconn, integrity_alg, pi->size);
3088 integrity_alg[SHARED_SECRET_MAX - 1] = 0;
3091 if (pi->cmd != P_PROTOCOL_UPDATE) {
3092 clear_bit(CONN_DRY_RUN, &tconn->flags);
3094 if (cf & CF_DRY_RUN)
3095 set_bit(CONN_DRY_RUN, &tconn->flags);
3098 nc = rcu_dereference(tconn->net_conf);
3100 if (p_proto != nc->wire_protocol) {
3101 conn_err(tconn, "incompatible %s settings\n", "protocol");
3102 goto disconnect_rcu_unlock;
3105 if (convert_after_sb(p_after_sb_0p) != nc->after_sb_0p) {
3106 conn_err(tconn, "incompatible %s settings\n", "after-sb-0pri");
3107 goto disconnect_rcu_unlock;
3110 if (convert_after_sb(p_after_sb_1p) != nc->after_sb_1p) {
3111 conn_err(tconn, "incompatible %s settings\n", "after-sb-1pri");
3112 goto disconnect_rcu_unlock;
3115 if (convert_after_sb(p_after_sb_2p) != nc->after_sb_2p) {
3116 conn_err(tconn, "incompatible %s settings\n", "after-sb-2pri");
3117 goto disconnect_rcu_unlock;
3120 if (p_discard_my_data && nc->discard_my_data) {
3121 conn_err(tconn, "incompatible %s settings\n", "discard-my-data");
3122 goto disconnect_rcu_unlock;
3125 if (p_two_primaries != nc->two_primaries) {
3126 conn_err(tconn, "incompatible %s settings\n", "allow-two-primaries");
3127 goto disconnect_rcu_unlock;
3130 if (strcmp(integrity_alg, nc->integrity_alg)) {
3131 conn_err(tconn, "incompatible %s settings\n", "data-integrity-alg");
3132 goto disconnect_rcu_unlock;
3138 if (integrity_alg[0]) {
3142 * We can only change the peer data integrity algorithm
3143 * here. Changing our own data integrity algorithm
3144 * requires that we send a P_PROTOCOL_UPDATE packet at
3145 * the same time; otherwise, the peer has no way to
3146 * tell between which packets the algorithm should
3150 peer_integrity_tfm = crypto_alloc_hash(integrity_alg, 0, CRYPTO_ALG_ASYNC);
3151 if (!peer_integrity_tfm) {
3152 conn_err(tconn, "peer data-integrity-alg %s not supported\n",
3157 hash_size = crypto_hash_digestsize(peer_integrity_tfm);
3158 int_dig_in = kmalloc(hash_size, GFP_KERNEL);
3159 int_dig_vv = kmalloc(hash_size, GFP_KERNEL);
3160 if (!(int_dig_in && int_dig_vv)) {
3161 conn_err(tconn, "Allocation of buffers for data integrity checking failed\n");
3166 new_net_conf = kmalloc(sizeof(struct net_conf), GFP_KERNEL);
3167 if (!new_net_conf) {
3168 conn_err(tconn, "Allocation of new net_conf failed\n");
3172 mutex_lock(&tconn->data.mutex);
3173 mutex_lock(&tconn->conf_update);
3174 old_net_conf = tconn->net_conf;
3175 *new_net_conf = *old_net_conf;
3177 new_net_conf->wire_protocol = p_proto;
3178 new_net_conf->after_sb_0p = convert_after_sb(p_after_sb_0p);
3179 new_net_conf->after_sb_1p = convert_after_sb(p_after_sb_1p);
3180 new_net_conf->after_sb_2p = convert_after_sb(p_after_sb_2p);
3181 new_net_conf->two_primaries = p_two_primaries;
3183 rcu_assign_pointer(tconn->net_conf, new_net_conf);
3184 mutex_unlock(&tconn->conf_update);
3185 mutex_unlock(&tconn->data.mutex);
3187 crypto_free_hash(tconn->peer_integrity_tfm);
3188 kfree(tconn->int_dig_in);
3189 kfree(tconn->int_dig_vv);
3190 tconn->peer_integrity_tfm = peer_integrity_tfm;
3191 tconn->int_dig_in = int_dig_in;
3192 tconn->int_dig_vv = int_dig_vv;
3194 if (strcmp(old_net_conf->integrity_alg, integrity_alg))
3195 conn_info(tconn, "peer data-integrity-alg: %s\n",
3196 integrity_alg[0] ? integrity_alg : "(none)");
3199 kfree(old_net_conf);
3202 disconnect_rcu_unlock:
3205 crypto_free_hash(peer_integrity_tfm);
3208 conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3213 * input: alg name, feature name
3214 * return: NULL (alg name was "")
3215 * ERR_PTR(error) if something goes wrong
3216 * or the crypto hash ptr, if it worked out ok. */
3217 struct crypto_hash *drbd_crypto_alloc_digest_safe(const struct drbd_conf *mdev,
3218 const char *alg, const char *name)
3220 struct crypto_hash *tfm;
3225 tfm = crypto_alloc_hash(alg, 0, CRYPTO_ALG_ASYNC);
3227 dev_err(DEV, "Can not allocate \"%s\" as %s (reason: %ld)\n",
3228 alg, name, PTR_ERR(tfm));
3234 static int ignore_remaining_packet(struct drbd_tconn *tconn, struct packet_info *pi)
3236 void *buffer = tconn->data.rbuf;
3237 int size = pi->size;
3240 int s = min_t(int, size, DRBD_SOCKET_BUFFER_SIZE);
3241 s = drbd_recv(tconn, buffer, s);
3255 * config_unknown_volume - device configuration command for unknown volume
3257 * When a device is added to an existing connection, the node on which the
3258 * device is added first will send configuration commands to its peer but the
3259 * peer will not know about the device yet. It will warn and ignore these
3260 * commands. Once the device is added on the second node, the second node will
3261 * send the same device configuration commands, but in the other direction.
3263 * (We can also end up here if drbd is misconfigured.)
3265 static int config_unknown_volume(struct drbd_tconn *tconn, struct packet_info *pi)
3267 conn_warn(tconn, "%s packet received for volume %u, which is not configured locally\n",
3268 cmdname(pi->cmd), pi->vnr);
3269 return ignore_remaining_packet(tconn, pi);
3272 static int receive_SyncParam(struct drbd_tconn *tconn, struct packet_info *pi)
3274 struct drbd_conf *mdev;
3275 struct p_rs_param_95 *p;
3276 unsigned int header_size, data_size, exp_max_sz;
3277 struct crypto_hash *verify_tfm = NULL;
3278 struct crypto_hash *csums_tfm = NULL;
3279 struct net_conf *old_net_conf, *new_net_conf = NULL;
3280 struct disk_conf *old_disk_conf = NULL, *new_disk_conf = NULL;
3281 const int apv = tconn->agreed_pro_version;
3282 struct fifo_buffer *old_plan = NULL, *new_plan = NULL;
3286 mdev = vnr_to_mdev(tconn, pi->vnr);
3288 return config_unknown_volume(tconn, pi);
3290 exp_max_sz = apv <= 87 ? sizeof(struct p_rs_param)
3291 : apv == 88 ? sizeof(struct p_rs_param)
3293 : apv <= 94 ? sizeof(struct p_rs_param_89)
3294 : /* apv >= 95 */ sizeof(struct p_rs_param_95);
3296 if (pi->size > exp_max_sz) {
3297 dev_err(DEV, "SyncParam packet too long: received %u, expected <= %u bytes\n",
3298 pi->size, exp_max_sz);
3303 header_size = sizeof(struct p_rs_param);
3304 data_size = pi->size - header_size;
3305 } else if (apv <= 94) {
3306 header_size = sizeof(struct p_rs_param_89);
3307 data_size = pi->size - header_size;
3308 D_ASSERT(data_size == 0);
3310 header_size = sizeof(struct p_rs_param_95);
3311 data_size = pi->size - header_size;
3312 D_ASSERT(data_size == 0);
3315 /* initialize verify_alg and csums_alg */
3317 memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
3319 err = drbd_recv_all(mdev->tconn, p, header_size);
3323 mutex_lock(&mdev->tconn->conf_update);
3324 old_net_conf = mdev->tconn->net_conf;
3325 if (get_ldev(mdev)) {
3326 new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
3327 if (!new_disk_conf) {
3329 mutex_unlock(&mdev->tconn->conf_update);
3330 dev_err(DEV, "Allocation of new disk_conf failed\n");
3334 old_disk_conf = mdev->ldev->disk_conf;
3335 *new_disk_conf = *old_disk_conf;
3337 new_disk_conf->resync_rate = be32_to_cpu(p->resync_rate);
3342 if (data_size > SHARED_SECRET_MAX) {
3343 dev_err(DEV, "verify-alg too long, "
3344 "peer wants %u, accepting only %u byte\n",
3345 data_size, SHARED_SECRET_MAX);
3350 err = drbd_recv_all(mdev->tconn, p->verify_alg, data_size);
3353 /* we expect NUL terminated string */
3354 /* but just in case someone tries to be evil */
3355 D_ASSERT(p->verify_alg[data_size-1] == 0);
3356 p->verify_alg[data_size-1] = 0;
3358 } else /* apv >= 89 */ {
3359 /* we still expect NUL terminated strings */
3360 /* but just in case someone tries to be evil */
3361 D_ASSERT(p->verify_alg[SHARED_SECRET_MAX-1] == 0);
3362 D_ASSERT(p->csums_alg[SHARED_SECRET_MAX-1] == 0);
3363 p->verify_alg[SHARED_SECRET_MAX-1] = 0;
3364 p->csums_alg[SHARED_SECRET_MAX-1] = 0;
3367 if (strcmp(old_net_conf->verify_alg, p->verify_alg)) {
3368 if (mdev->state.conn == C_WF_REPORT_PARAMS) {
3369 dev_err(DEV, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n",
3370 old_net_conf->verify_alg, p->verify_alg);
3373 verify_tfm = drbd_crypto_alloc_digest_safe(mdev,
3374 p->verify_alg, "verify-alg");
3375 if (IS_ERR(verify_tfm)) {
3381 if (apv >= 89 && strcmp(old_net_conf->csums_alg, p->csums_alg)) {
3382 if (mdev->state.conn == C_WF_REPORT_PARAMS) {
3383 dev_err(DEV, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n",
3384 old_net_conf->csums_alg, p->csums_alg);
3387 csums_tfm = drbd_crypto_alloc_digest_safe(mdev,
3388 p->csums_alg, "csums-alg");
3389 if (IS_ERR(csums_tfm)) {
3395 if (apv > 94 && new_disk_conf) {
3396 new_disk_conf->c_plan_ahead = be32_to_cpu(p->c_plan_ahead);
3397 new_disk_conf->c_delay_target = be32_to_cpu(p->c_delay_target);
3398 new_disk_conf->c_fill_target = be32_to_cpu(p->c_fill_target);
3399 new_disk_conf->c_max_rate = be32_to_cpu(p->c_max_rate);
3401 fifo_size = (new_disk_conf->c_plan_ahead * 10 * SLEEP_TIME) / HZ;
3402 if (fifo_size != mdev->rs_plan_s->size) {
3403 new_plan = fifo_alloc(fifo_size);
3405 dev_err(DEV, "kmalloc of fifo_buffer failed");
3412 if (verify_tfm || csums_tfm) {
3413 new_net_conf = kzalloc(sizeof(struct net_conf), GFP_KERNEL);
3414 if (!new_net_conf) {
3415 dev_err(DEV, "Allocation of new net_conf failed\n");
3419 *new_net_conf = *old_net_conf;
3422 strcpy(new_net_conf->verify_alg, p->verify_alg);
3423 new_net_conf->verify_alg_len = strlen(p->verify_alg) + 1;
3424 crypto_free_hash(mdev->tconn->verify_tfm);
3425 mdev->tconn->verify_tfm = verify_tfm;
3426 dev_info(DEV, "using verify-alg: \"%s\"\n", p->verify_alg);
3429 strcpy(new_net_conf->csums_alg, p->csums_alg);
3430 new_net_conf->csums_alg_len = strlen(p->csums_alg) + 1;
3431 crypto_free_hash(mdev->tconn->csums_tfm);
3432 mdev->tconn->csums_tfm = csums_tfm;
3433 dev_info(DEV, "using csums-alg: \"%s\"\n", p->csums_alg);
3435 rcu_assign_pointer(tconn->net_conf, new_net_conf);
3439 if (new_disk_conf) {
3440 rcu_assign_pointer(mdev->ldev->disk_conf, new_disk_conf);
3445 old_plan = mdev->rs_plan_s;
3446 rcu_assign_pointer(mdev->rs_plan_s, new_plan);
3449 mutex_unlock(&mdev->tconn->conf_update);
3452 kfree(old_net_conf);
3453 kfree(old_disk_conf);
3459 if (new_disk_conf) {
3461 kfree(new_disk_conf);
3463 mutex_unlock(&mdev->tconn->conf_update);
3468 if (new_disk_conf) {
3470 kfree(new_disk_conf);
3472 mutex_unlock(&mdev->tconn->conf_update);
3473 /* just for completeness: actually not needed,
3474 * as this is not reached if csums_tfm was ok. */
3475 crypto_free_hash(csums_tfm);
3476 /* but free the verify_tfm again, if csums_tfm did not work out */
3477 crypto_free_hash(verify_tfm);
3478 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3482 /* warn if the arguments differ by more than 12.5% */
3483 static void warn_if_differ_considerably(struct drbd_conf *mdev,
3484 const char *s, sector_t a, sector_t b)
3487 if (a == 0 || b == 0)
3489 d = (a > b) ? (a - b) : (b - a);
3490 if (d > (a>>3) || d > (b>>3))
3491 dev_warn(DEV, "Considerable difference in %s: %llus vs. %llus\n", s,
3492 (unsigned long long)a, (unsigned long long)b);
3495 static int receive_sizes(struct drbd_tconn *tconn, struct packet_info *pi)
3497 struct drbd_conf *mdev;
3498 struct p_sizes *p = pi->data;
3499 enum determine_dev_size dd = unchanged;
3500 sector_t p_size, p_usize, my_usize;
3501 int ldsc = 0; /* local disk size changed */
3502 enum dds_flags ddsf;
3504 mdev = vnr_to_mdev(tconn, pi->vnr);
3506 return config_unknown_volume(tconn, pi);
3508 p_size = be64_to_cpu(p->d_size);
3509 p_usize = be64_to_cpu(p->u_size);
3511 /* just store the peer's disk size for now.
3512 * we still need to figure out whether we accept that. */
3513 mdev->p_size = p_size;
3515 if (get_ldev(mdev)) {
3517 my_usize = rcu_dereference(mdev->ldev->disk_conf)->disk_size;
3520 warn_if_differ_considerably(mdev, "lower level device sizes",
3521 p_size, drbd_get_max_capacity(mdev->ldev));
3522 warn_if_differ_considerably(mdev, "user requested size",
3525 /* if this is the first connect, or an otherwise expected
3526 * param exchange, choose the minimum */
3527 if (mdev->state.conn == C_WF_REPORT_PARAMS)
3528 p_usize = min_not_zero(my_usize, p_usize);
3530 /* Never shrink a device with usable data during connect.
3531 But allow online shrinking if we are connected. */
3532 if (drbd_new_dev_size(mdev, mdev->ldev, p_usize, 0) <
3533 drbd_get_capacity(mdev->this_bdev) &&
3534 mdev->state.disk >= D_OUTDATED &&
3535 mdev->state.conn < C_CONNECTED) {
3536 dev_err(DEV, "The peer's disk size is too small!\n");
3537 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3542 if (my_usize != p_usize) {
3543 struct disk_conf *old_disk_conf, *new_disk_conf = NULL;
3545 new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
3546 if (!new_disk_conf) {
3547 dev_err(DEV, "Allocation of new disk_conf failed\n");
3552 mutex_lock(&mdev->tconn->conf_update);
3553 old_disk_conf = mdev->ldev->disk_conf;
3554 *new_disk_conf = *old_disk_conf;
3555 new_disk_conf->disk_size = p_usize;
3557 rcu_assign_pointer(mdev->ldev->disk_conf, new_disk_conf);
3558 mutex_unlock(&mdev->tconn->conf_update);
3560 kfree(old_disk_conf);
3562 dev_info(DEV, "Peer sets u_size to %lu sectors\n",
3563 (unsigned long)my_usize);
3569 ddsf = be16_to_cpu(p->dds_flags);
3570 if (get_ldev(mdev)) {
3571 dd = drbd_determine_dev_size(mdev, ddsf);
3573 if (dd == dev_size_error)
3577 /* I am diskless, need to accept the peer's size. */
3578 drbd_set_my_capacity(mdev, p_size);
3581 mdev->peer_max_bio_size = be32_to_cpu(p->max_bio_size);
3582 drbd_reconsider_max_bio_size(mdev);
3584 if (get_ldev(mdev)) {
3585 if (mdev->ldev->known_size != drbd_get_capacity(mdev->ldev->backing_bdev)) {
3586 mdev->ldev->known_size = drbd_get_capacity(mdev->ldev->backing_bdev);
3593 if (mdev->state.conn > C_WF_REPORT_PARAMS) {
3594 if (be64_to_cpu(p->c_size) !=
3595 drbd_get_capacity(mdev->this_bdev) || ldsc) {
3596 /* we have different sizes, probably peer
3597 * needs to know my new size... */
3598 drbd_send_sizes(mdev, 0, ddsf);
3600 if (test_and_clear_bit(RESIZE_PENDING, &mdev->flags) ||
3601 (dd == grew && mdev->state.conn == C_CONNECTED)) {
3602 if (mdev->state.pdsk >= D_INCONSISTENT &&
3603 mdev->state.disk >= D_INCONSISTENT) {
3604 if (ddsf & DDSF_NO_RESYNC)
3605 dev_info(DEV, "Resync of new storage suppressed with --assume-clean\n");
3607 resync_after_online_grow(mdev);
3609 set_bit(RESYNC_AFTER_NEG, &mdev->flags);
3616 static int receive_uuids(struct drbd_tconn *tconn, struct packet_info *pi)
3618 struct drbd_conf *mdev;
3619 struct p_uuids *p = pi->data;
3621 int i, updated_uuids = 0;
3623 mdev = vnr_to_mdev(tconn, pi->vnr);
3625 return config_unknown_volume(tconn, pi);
3627 p_uuid = kmalloc(sizeof(u64)*UI_EXTENDED_SIZE, GFP_NOIO);
3629 for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++)
3630 p_uuid[i] = be64_to_cpu(p->uuid[i]);
3632 kfree(mdev->p_uuid);
3633 mdev->p_uuid = p_uuid;
3635 if (mdev->state.conn < C_CONNECTED &&
3636 mdev->state.disk < D_INCONSISTENT &&
3637 mdev->state.role == R_PRIMARY &&
3638 (mdev->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) {
3639 dev_err(DEV, "Can only connect to data with current UUID=%016llX\n",
3640 (unsigned long long)mdev->ed_uuid);
3641 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3645 if (get_ldev(mdev)) {
3646 int skip_initial_sync =
3647 mdev->state.conn == C_CONNECTED &&
3648 mdev->tconn->agreed_pro_version >= 90 &&
3649 mdev->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED &&
3650 (p_uuid[UI_FLAGS] & 8);
3651 if (skip_initial_sync) {
3652 dev_info(DEV, "Accepted new current UUID, preparing to skip initial sync\n");
3653 drbd_bitmap_io(mdev, &drbd_bmio_clear_n_write,
3654 "clear_n_write from receive_uuids",
3655 BM_LOCKED_TEST_ALLOWED);
3656 _drbd_uuid_set(mdev, UI_CURRENT, p_uuid[UI_CURRENT]);
3657 _drbd_uuid_set(mdev, UI_BITMAP, 0);
3658 _drbd_set_state(_NS2(mdev, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE),
3664 } else if (mdev->state.disk < D_INCONSISTENT &&
3665 mdev->state.role == R_PRIMARY) {
3666 /* I am a diskless primary, the peer just created a new current UUID
3668 updated_uuids = drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
3671 /* Before we test for the disk state, we should wait until an eventually
3672 ongoing cluster wide state change is finished. That is important if
3673 we are primary and are detaching from our disk. We need to see the
3674 new disk state... */
3675 mutex_lock(mdev->state_mutex);
3676 mutex_unlock(mdev->state_mutex);
3677 if (mdev->state.conn >= C_CONNECTED && mdev->state.disk < D_INCONSISTENT)
3678 updated_uuids |= drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
3681 drbd_print_uuids(mdev, "receiver updated UUIDs to");
3687 * convert_state() - Converts the peer's view of the cluster state to our point of view
3688 * @ps: The state as seen by the peer.
3690 static union drbd_state convert_state(union drbd_state ps)
3692 union drbd_state ms;
3694 static enum drbd_conns c_tab[] = {
3695 [C_WF_REPORT_PARAMS] = C_WF_REPORT_PARAMS,
3696 [C_CONNECTED] = C_CONNECTED,
3698 [C_STARTING_SYNC_S] = C_STARTING_SYNC_T,
3699 [C_STARTING_SYNC_T] = C_STARTING_SYNC_S,
3700 [C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */
3701 [C_VERIFY_S] = C_VERIFY_T,
3707 ms.conn = c_tab[ps.conn];
3712 ms.peer_isp = (ps.aftr_isp | ps.user_isp);
3717 static int receive_req_state(struct drbd_tconn *tconn, struct packet_info *pi)
3719 struct drbd_conf *mdev;
3720 struct p_req_state *p = pi->data;
3721 union drbd_state mask, val;
3722 enum drbd_state_rv rv;
3724 mdev = vnr_to_mdev(tconn, pi->vnr);
3728 mask.i = be32_to_cpu(p->mask);
3729 val.i = be32_to_cpu(p->val);
3731 if (test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags) &&
3732 mutex_is_locked(mdev->state_mutex)) {
3733 drbd_send_sr_reply(mdev, SS_CONCURRENT_ST_CHG);
3737 mask = convert_state(mask);
3738 val = convert_state(val);
3740 rv = drbd_change_state(mdev, CS_VERBOSE, mask, val);
3741 drbd_send_sr_reply(mdev, rv);
3748 static int receive_req_conn_state(struct drbd_tconn *tconn, struct packet_info *pi)
3750 struct p_req_state *p = pi->data;
3751 union drbd_state mask, val;
3752 enum drbd_state_rv rv;
3754 mask.i = be32_to_cpu(p->mask);
3755 val.i = be32_to_cpu(p->val);
3757 if (test_bit(DISCARD_CONCURRENT, &tconn->flags) &&
3758 mutex_is_locked(&tconn->cstate_mutex)) {
3759 conn_send_sr_reply(tconn, SS_CONCURRENT_ST_CHG);
3763 mask = convert_state(mask);
3764 val = convert_state(val);
3766 rv = conn_request_state(tconn, mask, val, CS_VERBOSE | CS_LOCAL_ONLY | CS_IGN_OUTD_FAIL);
3767 conn_send_sr_reply(tconn, rv);
3772 static int receive_state(struct drbd_tconn *tconn, struct packet_info *pi)
3774 struct drbd_conf *mdev;
3775 struct p_state *p = pi->data;
3776 union drbd_state os, ns, peer_state;
3777 enum drbd_disk_state real_peer_disk;
3778 enum chg_state_flags cs_flags;
3781 mdev = vnr_to_mdev(tconn, pi->vnr);
3783 return config_unknown_volume(tconn, pi);
3785 peer_state.i = be32_to_cpu(p->state);
3787 real_peer_disk = peer_state.disk;
3788 if (peer_state.disk == D_NEGOTIATING) {
3789 real_peer_disk = mdev->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT;
3790 dev_info(DEV, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk));
3793 spin_lock_irq(&mdev->tconn->req_lock);
3795 os = ns = drbd_read_state(mdev);
3796 spin_unlock_irq(&mdev->tconn->req_lock);
3798 /* If some other part of the code (asender thread, timeout)
3799 * already decided to close the connection again,
3800 * we must not "re-establish" it here. */
3801 if (os.conn <= C_TEAR_DOWN)
3804 /* If this is the "end of sync" confirmation, usually the peer disk
3805 * transitions from D_INCONSISTENT to D_UP_TO_DATE. For empty (0 bits
3806 * set) resync started in PausedSyncT, or if the timing of pause-/
3807 * unpause-sync events has been "just right", the peer disk may
3808 * transition from D_CONSISTENT to D_UP_TO_DATE as well.
3810 if ((os.pdsk == D_INCONSISTENT || os.pdsk == D_CONSISTENT) &&
3811 real_peer_disk == D_UP_TO_DATE &&
3812 os.conn > C_CONNECTED && os.disk == D_UP_TO_DATE) {
3813 /* If we are (becoming) SyncSource, but peer is still in sync
3814 * preparation, ignore its uptodate-ness to avoid flapping, it
3815 * will change to inconsistent once the peer reaches active
3817 * It may have changed syncer-paused flags, however, so we
3818 * cannot ignore this completely. */
3819 if (peer_state.conn > C_CONNECTED &&
3820 peer_state.conn < C_SYNC_SOURCE)
3821 real_peer_disk = D_INCONSISTENT;
3823 /* if peer_state changes to connected at the same time,
3824 * it explicitly notifies us that it finished resync.
3825 * Maybe we should finish it up, too? */
3826 else if (os.conn >= C_SYNC_SOURCE &&
3827 peer_state.conn == C_CONNECTED) {
3828 if (drbd_bm_total_weight(mdev) <= mdev->rs_failed)
3829 drbd_resync_finished(mdev);
3834 /* peer says his disk is inconsistent, while we think it is uptodate,
3835 * and this happens while the peer still thinks we have a sync going on,
3836 * but we think we are already done with the sync.
3837 * We ignore this to avoid flapping pdsk.
3838 * This should not happen, if the peer is a recent version of drbd. */
3839 if (os.pdsk == D_UP_TO_DATE && real_peer_disk == D_INCONSISTENT &&
3840 os.conn == C_CONNECTED && peer_state.conn > C_SYNC_SOURCE)
3841 real_peer_disk = D_UP_TO_DATE;
3843 if (ns.conn == C_WF_REPORT_PARAMS)
3844 ns.conn = C_CONNECTED;
3846 if (peer_state.conn == C_AHEAD)
3849 if (mdev->p_uuid && peer_state.disk >= D_NEGOTIATING &&
3850 get_ldev_if_state(mdev, D_NEGOTIATING)) {
3851 int cr; /* consider resync */
3853 /* if we established a new connection */
3854 cr = (os.conn < C_CONNECTED);
3855 /* if we had an established connection
3856 * and one of the nodes newly attaches a disk */
3857 cr |= (os.conn == C_CONNECTED &&
3858 (peer_state.disk == D_NEGOTIATING ||
3859 os.disk == D_NEGOTIATING));
3860 /* if we have both been inconsistent, and the peer has been
3861 * forced to be UpToDate with --overwrite-data */
3862 cr |= test_bit(CONSIDER_RESYNC, &mdev->flags);
3863 /* if we had been plain connected, and the admin requested to
3864 * start a sync by "invalidate" or "invalidate-remote" */
3865 cr |= (os.conn == C_CONNECTED &&
3866 (peer_state.conn >= C_STARTING_SYNC_S &&
3867 peer_state.conn <= C_WF_BITMAP_T));
3870 ns.conn = drbd_sync_handshake(mdev, peer_state.role, real_peer_disk);
3873 if (ns.conn == C_MASK) {
3874 ns.conn = C_CONNECTED;
3875 if (mdev->state.disk == D_NEGOTIATING) {
3876 drbd_force_state(mdev, NS(disk, D_FAILED));
3877 } else if (peer_state.disk == D_NEGOTIATING) {
3878 dev_err(DEV, "Disk attach process on the peer node was aborted.\n");
3879 peer_state.disk = D_DISKLESS;
3880 real_peer_disk = D_DISKLESS;
3882 if (test_and_clear_bit(CONN_DRY_RUN, &mdev->tconn->flags))
3884 D_ASSERT(os.conn == C_WF_REPORT_PARAMS);
3885 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3891 spin_lock_irq(&mdev->tconn->req_lock);
3892 if (os.i != drbd_read_state(mdev).i)
3894 clear_bit(CONSIDER_RESYNC, &mdev->flags);
3895 ns.peer = peer_state.role;
3896 ns.pdsk = real_peer_disk;
3897 ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp);
3898 if ((ns.conn == C_CONNECTED || ns.conn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING)
3899 ns.disk = mdev->new_state_tmp.disk;
3900 cs_flags = CS_VERBOSE + (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED ? 0 : CS_HARD);
3901 if (ns.pdsk == D_CONSISTENT && drbd_suspended(mdev) && ns.conn == C_CONNECTED && os.conn < C_CONNECTED &&
3902 test_bit(NEW_CUR_UUID, &mdev->flags)) {
3903 /* Do not allow tl_restart(RESEND) for a rebooted peer. We can only allow this
3904 for temporal network outages! */
3905 spin_unlock_irq(&mdev->tconn->req_lock);
3906 dev_err(DEV, "Aborting Connect, can not thaw IO with an only Consistent peer\n");
3907 tl_clear(mdev->tconn);
3908 drbd_uuid_new_current(mdev);
3909 clear_bit(NEW_CUR_UUID, &mdev->flags);
3910 conn_request_state(mdev->tconn, NS2(conn, C_PROTOCOL_ERROR, susp, 0), CS_HARD);
3913 rv = _drbd_set_state(mdev, ns, cs_flags, NULL);
3914 ns = drbd_read_state(mdev);
3915 spin_unlock_irq(&mdev->tconn->req_lock);
3917 if (rv < SS_SUCCESS) {
3918 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3922 if (os.conn > C_WF_REPORT_PARAMS) {
3923 if (ns.conn > C_CONNECTED && peer_state.conn <= C_CONNECTED &&
3924 peer_state.disk != D_NEGOTIATING ) {
3925 /* we want resync, peer has not yet decided to sync... */
3926 /* Nowadays only used when forcing a node into primary role and
3927 setting its disk to UpToDate with that */
3928 drbd_send_uuids(mdev);
3929 drbd_send_current_state(mdev);
3933 clear_bit(DISCARD_MY_DATA, &mdev->flags);
3935 drbd_md_sync(mdev); /* update connected indicator, la_size, ... */
3940 static int receive_sync_uuid(struct drbd_tconn *tconn, struct packet_info *pi)
3942 struct drbd_conf *mdev;
3943 struct p_rs_uuid *p = pi->data;
3945 mdev = vnr_to_mdev(tconn, pi->vnr);
3949 wait_event(mdev->misc_wait,
3950 mdev->state.conn == C_WF_SYNC_UUID ||
3951 mdev->state.conn == C_BEHIND ||
3952 mdev->state.conn < C_CONNECTED ||
3953 mdev->state.disk < D_NEGOTIATING);
3955 /* D_ASSERT( mdev->state.conn == C_WF_SYNC_UUID ); */
3957 /* Here the _drbd_uuid_ functions are right, current should
3958 _not_ be rotated into the history */
3959 if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
3960 _drbd_uuid_set(mdev, UI_CURRENT, be64_to_cpu(p->uuid));
3961 _drbd_uuid_set(mdev, UI_BITMAP, 0UL);
3963 drbd_print_uuids(mdev, "updated sync uuid");
3964 drbd_start_resync(mdev, C_SYNC_TARGET);
3968 dev_err(DEV, "Ignoring SyncUUID packet!\n");
3974 * receive_bitmap_plain
3976 * Return 0 when done, 1 when another iteration is needed, and a negative error
3977 * code upon failure.
3980 receive_bitmap_plain(struct drbd_conf *mdev, unsigned int size,
3981 unsigned long *p, struct bm_xfer_ctx *c)
3983 unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE -
3984 drbd_header_size(mdev->tconn);
3985 unsigned int num_words = min_t(size_t, data_size / sizeof(*p),
3986 c->bm_words - c->word_offset);
3987 unsigned int want = num_words * sizeof(*p);
3991 dev_err(DEV, "%s:want (%u) != size (%u)\n", __func__, want, size);
3996 err = drbd_recv_all(mdev->tconn, p, want);
4000 drbd_bm_merge_lel(mdev, c->word_offset, num_words, p);
4002 c->word_offset += num_words;
4003 c->bit_offset = c->word_offset * BITS_PER_LONG;
4004 if (c->bit_offset > c->bm_bits)
4005 c->bit_offset = c->bm_bits;
4010 static enum drbd_bitmap_code dcbp_get_code(struct p_compressed_bm *p)
4012 return (enum drbd_bitmap_code)(p->encoding & 0x0f);
4015 static int dcbp_get_start(struct p_compressed_bm *p)
4017 return (p->encoding & 0x80) != 0;
4020 static int dcbp_get_pad_bits(struct p_compressed_bm *p)
4022 return (p->encoding >> 4) & 0x7;
4028 * Return 0 when done, 1 when another iteration is needed, and a negative error
4029 * code upon failure.
4032 recv_bm_rle_bits(struct drbd_conf *mdev,
4033 struct p_compressed_bm *p,
4034 struct bm_xfer_ctx *c,
4037 struct bitstream bs;
4041 unsigned long s = c->bit_offset;
4043 int toggle = dcbp_get_start(p);
4047 bitstream_init(&bs, p->code, len, dcbp_get_pad_bits(p));
4049 bits = bitstream_get_bits(&bs, &look_ahead, 64);
4053 for (have = bits; have > 0; s += rl, toggle = !toggle) {
4054 bits = vli_decode_bits(&rl, look_ahead);
4060 if (e >= c->bm_bits) {
4061 dev_err(DEV, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e);
4064 _drbd_bm_set_bits(mdev, s, e);
4068 dev_err(DEV, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n",
4069 have, bits, look_ahead,
4070 (unsigned int)(bs.cur.b - p->code),
4071 (unsigned int)bs.buf_len);
4074 look_ahead >>= bits;
4077 bits = bitstream_get_bits(&bs, &tmp, 64 - have);
4080 look_ahead |= tmp << have;
4085 bm_xfer_ctx_bit_to_word_offset(c);
4087 return (s != c->bm_bits);
4093 * Return 0 when done, 1 when another iteration is needed, and a negative error
4094 * code upon failure.
4097 decode_bitmap_c(struct drbd_conf *mdev,
4098 struct p_compressed_bm *p,
4099 struct bm_xfer_ctx *c,
4102 if (dcbp_get_code(p) == RLE_VLI_Bits)
4103 return recv_bm_rle_bits(mdev, p, c, len - sizeof(*p));
4105 /* other variants had been implemented for evaluation,
4106 * but have been dropped as this one turned out to be "best"
4107 * during all our tests. */
4109 dev_err(DEV, "receive_bitmap_c: unknown encoding %u\n", p->encoding);
4110 conn_request_state(mdev->tconn, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
4114 void INFO_bm_xfer_stats(struct drbd_conf *mdev,
4115 const char *direction, struct bm_xfer_ctx *c)
4117 /* what would it take to transfer it "plaintext" */
4118 unsigned int header_size = drbd_header_size(mdev->tconn);
4119 unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE - header_size;
4120 unsigned int plain =
4121 header_size * (DIV_ROUND_UP(c->bm_words, data_size) + 1) +
4122 c->bm_words * sizeof(unsigned long);
4123 unsigned int total = c->bytes[0] + c->bytes[1];
4126 /* total can not be zero. but just in case: */
4130 /* don't report if not compressed */
4134 /* total < plain. check for overflow, still */
4135 r = (total > UINT_MAX/1000) ? (total / (plain/1000))
4136 : (1000 * total / plain);
4142 dev_info(DEV, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), "
4143 "total %u; compression: %u.%u%%\n",
4145 c->bytes[1], c->packets[1],
4146 c->bytes[0], c->packets[0],
4147 total, r/10, r % 10);
4150 /* Since we are processing the bitfield from lower addresses to higher,
4151 it does not matter if the process it in 32 bit chunks or 64 bit
4152 chunks as long as it is little endian. (Understand it as byte stream,
4153 beginning with the lowest byte...) If we would use big endian
4154 we would need to process it from the highest address to the lowest,
4155 in order to be agnostic to the 32 vs 64 bits issue.
4157 returns 0 on failure, 1 if we successfully received it. */
4158 static int receive_bitmap(struct drbd_tconn *tconn, struct packet_info *pi)
4160 struct drbd_conf *mdev;
4161 struct bm_xfer_ctx c;
4164 mdev = vnr_to_mdev(tconn, pi->vnr);
4168 drbd_bm_lock(mdev, "receive bitmap", BM_LOCKED_SET_ALLOWED);
4169 /* you are supposed to send additional out-of-sync information
4170 * if you actually set bits during this phase */
4172 c = (struct bm_xfer_ctx) {
4173 .bm_bits = drbd_bm_bits(mdev),
4174 .bm_words = drbd_bm_words(mdev),
4178 if (pi->cmd == P_BITMAP)
4179 err = receive_bitmap_plain(mdev, pi->size, pi->data, &c);
4180 else if (pi->cmd == P_COMPRESSED_BITMAP) {
4181 /* MAYBE: sanity check that we speak proto >= 90,
4182 * and the feature is enabled! */
4183 struct p_compressed_bm *p = pi->data;
4185 if (pi->size > DRBD_SOCKET_BUFFER_SIZE - drbd_header_size(tconn)) {
4186 dev_err(DEV, "ReportCBitmap packet too large\n");
4190 if (pi->size <= sizeof(*p)) {
4191 dev_err(DEV, "ReportCBitmap packet too small (l:%u)\n", pi->size);
4195 err = drbd_recv_all(mdev->tconn, p, pi->size);
4198 err = decode_bitmap_c(mdev, p, &c, pi->size);
4200 dev_warn(DEV, "receive_bitmap: cmd neither ReportBitMap nor ReportCBitMap (is 0x%x)", pi->cmd);
4205 c.packets[pi->cmd == P_BITMAP]++;
4206 c.bytes[pi->cmd == P_BITMAP] += drbd_header_size(tconn) + pi->size;
4213 err = drbd_recv_header(mdev->tconn, pi);
4218 INFO_bm_xfer_stats(mdev, "receive", &c);
4220 if (mdev->state.conn == C_WF_BITMAP_T) {
4221 enum drbd_state_rv rv;
4223 err = drbd_send_bitmap(mdev);
4226 /* Omit CS_ORDERED with this state transition to avoid deadlocks. */
4227 rv = _drbd_request_state(mdev, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
4228 D_ASSERT(rv == SS_SUCCESS);
4229 } else if (mdev->state.conn != C_WF_BITMAP_S) {
4230 /* admin may have requested C_DISCONNECTING,
4231 * other threads may have noticed network errors */
4232 dev_info(DEV, "unexpected cstate (%s) in receive_bitmap\n",
4233 drbd_conn_str(mdev->state.conn));
4238 drbd_bm_unlock(mdev);
4239 if (!err && mdev->state.conn == C_WF_BITMAP_S)
4240 drbd_start_resync(mdev, C_SYNC_SOURCE);
4244 static int receive_skip(struct drbd_tconn *tconn, struct packet_info *pi)
4246 conn_warn(tconn, "skipping unknown optional packet type %d, l: %d!\n",
4249 return ignore_remaining_packet(tconn, pi);
4252 static int receive_UnplugRemote(struct drbd_tconn *tconn, struct packet_info *pi)
4254 /* Make sure we've acked all the TCP data associated
4255 * with the data requests being unplugged */
4256 drbd_tcp_quickack(tconn->data.socket);
4261 static int receive_out_of_sync(struct drbd_tconn *tconn, struct packet_info *pi)
4263 struct drbd_conf *mdev;
4264 struct p_block_desc *p = pi->data;
4266 mdev = vnr_to_mdev(tconn, pi->vnr);
4270 switch (mdev->state.conn) {
4271 case C_WF_SYNC_UUID:
4276 dev_err(DEV, "ASSERT FAILED cstate = %s, expected: WFSyncUUID|WFBitMapT|Behind\n",
4277 drbd_conn_str(mdev->state.conn));
4280 drbd_set_out_of_sync(mdev, be64_to_cpu(p->sector), be32_to_cpu(p->blksize));
4288 int (*fn)(struct drbd_tconn *, struct packet_info *);
4291 static struct data_cmd drbd_cmd_handler[] = {
4292 [P_DATA] = { 1, sizeof(struct p_data), receive_Data },
4293 [P_DATA_REPLY] = { 1, sizeof(struct p_data), receive_DataReply },
4294 [P_RS_DATA_REPLY] = { 1, sizeof(struct p_data), receive_RSDataReply } ,
4295 [P_BARRIER] = { 0, sizeof(struct p_barrier), receive_Barrier } ,
4296 [P_BITMAP] = { 1, 0, receive_bitmap } ,
4297 [P_COMPRESSED_BITMAP] = { 1, 0, receive_bitmap } ,
4298 [P_UNPLUG_REMOTE] = { 0, 0, receive_UnplugRemote },
4299 [P_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
4300 [P_RS_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
4301 [P_SYNC_PARAM] = { 1, 0, receive_SyncParam },
4302 [P_SYNC_PARAM89] = { 1, 0, receive_SyncParam },
4303 [P_PROTOCOL] = { 1, sizeof(struct p_protocol), receive_protocol },
4304 [P_UUIDS] = { 0, sizeof(struct p_uuids), receive_uuids },
4305 [P_SIZES] = { 0, sizeof(struct p_sizes), receive_sizes },
4306 [P_STATE] = { 0, sizeof(struct p_state), receive_state },
4307 [P_STATE_CHG_REQ] = { 0, sizeof(struct p_req_state), receive_req_state },
4308 [P_SYNC_UUID] = { 0, sizeof(struct p_rs_uuid), receive_sync_uuid },
4309 [P_OV_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
4310 [P_OV_REPLY] = { 1, sizeof(struct p_block_req), receive_DataRequest },
4311 [P_CSUM_RS_REQUEST] = { 1, sizeof(struct p_block_req), receive_DataRequest },
4312 [P_DELAY_PROBE] = { 0, sizeof(struct p_delay_probe93), receive_skip },
4313 [P_OUT_OF_SYNC] = { 0, sizeof(struct p_block_desc), receive_out_of_sync },
4314 [P_CONN_ST_CHG_REQ] = { 0, sizeof(struct p_req_state), receive_req_conn_state },
4315 [P_PROTOCOL_UPDATE] = { 1, sizeof(struct p_protocol), receive_protocol },
4318 static void drbdd(struct drbd_tconn *tconn)
4320 struct packet_info pi;
4321 size_t shs; /* sub header size */
4324 while (get_t_state(&tconn->receiver) == RUNNING) {
4325 struct data_cmd *cmd;
4327 drbd_thread_current_set_cpu(&tconn->receiver);
4328 if (drbd_recv_header(tconn, &pi))
4331 cmd = &drbd_cmd_handler[pi.cmd];
4332 if (unlikely(pi.cmd >= ARRAY_SIZE(drbd_cmd_handler) || !cmd->fn)) {
4333 conn_err(tconn, "Unexpected data packet %s (0x%04x)",
4334 cmdname(pi.cmd), pi.cmd);
4338 shs = cmd->pkt_size;
4339 if (pi.size > shs && !cmd->expect_payload) {
4340 conn_err(tconn, "No payload expected %s l:%d\n",
4341 cmdname(pi.cmd), pi.size);
4346 err = drbd_recv_all_warn(tconn, pi.data, shs);
4352 err = cmd->fn(tconn, &pi);
4354 conn_err(tconn, "error receiving %s, e: %d l: %d!\n",
4355 cmdname(pi.cmd), err, pi.size);
4362 conn_request_state(tconn, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
4365 void conn_flush_workqueue(struct drbd_tconn *tconn)
4367 struct drbd_wq_barrier barr;
4369 barr.w.cb = w_prev_work_done;
4370 barr.w.tconn = tconn;
4371 init_completion(&barr.done);
4372 drbd_queue_work(&tconn->data.work, &barr.w);
4373 wait_for_completion(&barr.done);
4376 static void conn_disconnect(struct drbd_tconn *tconn)
4378 struct drbd_conf *mdev;
4382 if (tconn->cstate == C_STANDALONE)
4385 /* We are about to start the cleanup after connection loss.
4386 * Make sure drbd_make_request knows about that.
4387 * Usually we should be in some network failure state already,
4388 * but just in case we are not, we fix it up here.
4390 conn_request_state(tconn, NS(conn, C_NETWORK_FAILURE), CS_HARD);
4392 /* asender does not clean up anything. it must not interfere, either */
4393 drbd_thread_stop(&tconn->asender);
4394 drbd_free_sock(tconn);
4397 idr_for_each_entry(&tconn->volumes, mdev, vnr) {
4398 kref_get(&mdev->kref);
4400 drbd_disconnected(mdev);
4401 kref_put(&mdev->kref, &drbd_minor_destroy);
4406 if (!list_empty(&tconn->current_epoch->list))
4407 conn_err(tconn, "ASSERTION FAILED: tconn->current_epoch->list not empty\n");
4408 /* ok, no more ee's on the fly, it is safe to reset the epoch_size */
4409 atomic_set(&tconn->current_epoch->epoch_size, 0);
4411 conn_info(tconn, "Connection closed\n");
4413 if (conn_highest_role(tconn) == R_PRIMARY && conn_highest_pdsk(tconn) >= D_UNKNOWN)
4414 conn_try_outdate_peer_async(tconn);
4416 spin_lock_irq(&tconn->req_lock);
4418 if (oc >= C_UNCONNECTED)
4419 _conn_request_state(tconn, NS(conn, C_UNCONNECTED), CS_VERBOSE);
4421 spin_unlock_irq(&tconn->req_lock);
4423 if (oc == C_DISCONNECTING)
4424 conn_request_state(tconn, NS(conn, C_STANDALONE), CS_VERBOSE | CS_HARD);
4427 static int drbd_disconnected(struct drbd_conf *mdev)
4431 /* wait for current activity to cease. */
4432 spin_lock_irq(&mdev->tconn->req_lock);
4433 _drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
4434 _drbd_wait_ee_list_empty(mdev, &mdev->sync_ee);
4435 _drbd_wait_ee_list_empty(mdev, &mdev->read_ee);
4436 spin_unlock_irq(&mdev->tconn->req_lock);
4438 /* We do not have data structures that would allow us to
4439 * get the rs_pending_cnt down to 0 again.
4440 * * On C_SYNC_TARGET we do not have any data structures describing
4441 * the pending RSDataRequest's we have sent.
4442 * * On C_SYNC_SOURCE there is no data structure that tracks
4443 * the P_RS_DATA_REPLY blocks that we sent to the SyncTarget.
4444 * And no, it is not the sum of the reference counts in the
4445 * resync_LRU. The resync_LRU tracks the whole operation including
4446 * the disk-IO, while the rs_pending_cnt only tracks the blocks
4448 drbd_rs_cancel_all(mdev);
4450 mdev->rs_failed = 0;
4451 atomic_set(&mdev->rs_pending_cnt, 0);
4452 wake_up(&mdev->misc_wait);
4454 del_timer_sync(&mdev->resync_timer);
4455 resync_timer_fn((unsigned long)mdev);
4457 /* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,
4458 * w_make_resync_request etc. which may still be on the worker queue
4459 * to be "canceled" */
4460 drbd_flush_workqueue(mdev);
4462 drbd_finish_peer_reqs(mdev);
4464 /* This second workqueue flush is necessary, since drbd_finish_peer_reqs()
4465 might have issued a work again. The one before drbd_finish_peer_reqs() is
4466 necessary to reclain net_ee in drbd_finish_peer_reqs(). */
4467 drbd_flush_workqueue(mdev);
4469 kfree(mdev->p_uuid);
4470 mdev->p_uuid = NULL;
4472 if (!drbd_suspended(mdev))
4473 tl_clear(mdev->tconn);
4477 /* serialize with bitmap writeout triggered by the state change,
4479 wait_event(mdev->misc_wait, !test_bit(BITMAP_IO, &mdev->flags));
4481 /* tcp_close and release of sendpage pages can be deferred. I don't
4482 * want to use SO_LINGER, because apparently it can be deferred for
4483 * more than 20 seconds (longest time I checked).
4485 * Actually we don't care for exactly when the network stack does its
4486 * put_page(), but release our reference on these pages right here.
4488 i = drbd_free_peer_reqs(mdev, &mdev->net_ee);
4490 dev_info(DEV, "net_ee not empty, killed %u entries\n", i);
4491 i = atomic_read(&mdev->pp_in_use_by_net);
4493 dev_info(DEV, "pp_in_use_by_net = %d, expected 0\n", i);
4494 i = atomic_read(&mdev->pp_in_use);
4496 dev_info(DEV, "pp_in_use = %d, expected 0\n", i);
4498 D_ASSERT(list_empty(&mdev->read_ee));
4499 D_ASSERT(list_empty(&mdev->active_ee));
4500 D_ASSERT(list_empty(&mdev->sync_ee));
4501 D_ASSERT(list_empty(&mdev->done_ee));
4507 * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version
4508 * we can agree on is stored in agreed_pro_version.
4510 * feature flags and the reserved array should be enough room for future
4511 * enhancements of the handshake protocol, and possible plugins...
4513 * for now, they are expected to be zero, but ignored.
4515 static int drbd_send_features(struct drbd_tconn *tconn)
4517 struct drbd_socket *sock;
4518 struct p_connection_features *p;
4520 sock = &tconn->data;
4521 p = conn_prepare_command(tconn, sock);
4524 memset(p, 0, sizeof(*p));
4525 p->protocol_min = cpu_to_be32(PRO_VERSION_MIN);
4526 p->protocol_max = cpu_to_be32(PRO_VERSION_MAX);
4527 return conn_send_command(tconn, sock, P_CONNECTION_FEATURES, sizeof(*p), NULL, 0);
4532 * 1 yes, we have a valid connection
4533 * 0 oops, did not work out, please try again
4534 * -1 peer talks different language,
4535 * no point in trying again, please go standalone.
4537 static int drbd_do_features(struct drbd_tconn *tconn)
4539 /* ASSERT current == tconn->receiver ... */
4540 struct p_connection_features *p;
4541 const int expect = sizeof(struct p_connection_features);
4542 struct packet_info pi;
4545 err = drbd_send_features(tconn);
4549 err = drbd_recv_header(tconn, &pi);
4553 if (pi.cmd != P_CONNECTION_FEATURES) {
4554 conn_err(tconn, "expected ConnectionFeatures packet, received: %s (0x%04x)\n",
4555 cmdname(pi.cmd), pi.cmd);
4559 if (pi.size != expect) {
4560 conn_err(tconn, "expected ConnectionFeatures length: %u, received: %u\n",
4566 err = drbd_recv_all_warn(tconn, p, expect);
4570 p->protocol_min = be32_to_cpu(p->protocol_min);
4571 p->protocol_max = be32_to_cpu(p->protocol_max);
4572 if (p->protocol_max == 0)
4573 p->protocol_max = p->protocol_min;
4575 if (PRO_VERSION_MAX < p->protocol_min ||
4576 PRO_VERSION_MIN > p->protocol_max)
4579 tconn->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max);
4581 conn_info(tconn, "Handshake successful: "
4582 "Agreed network protocol version %d\n", tconn->agreed_pro_version);
4587 conn_err(tconn, "incompatible DRBD dialects: "
4588 "I support %d-%d, peer supports %d-%d\n",
4589 PRO_VERSION_MIN, PRO_VERSION_MAX,
4590 p->protocol_min, p->protocol_max);
4594 #if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE)
4595 static int drbd_do_auth(struct drbd_tconn *tconn)
4597 dev_err(DEV, "This kernel was build without CONFIG_CRYPTO_HMAC.\n");
4598 dev_err(DEV, "You need to disable 'cram-hmac-alg' in drbd.conf.\n");
4602 #define CHALLENGE_LEN 64
4606 0 - failed, try again (network error),
4607 -1 - auth failed, don't try again.
4610 static int drbd_do_auth(struct drbd_tconn *tconn)
4612 struct drbd_socket *sock;
4613 char my_challenge[CHALLENGE_LEN]; /* 64 Bytes... */
4614 struct scatterlist sg;
4615 char *response = NULL;
4616 char *right_response = NULL;
4617 char *peers_ch = NULL;
4618 unsigned int key_len;
4619 char secret[SHARED_SECRET_MAX]; /* 64 byte */
4620 unsigned int resp_size;
4621 struct hash_desc desc;
4622 struct packet_info pi;
4623 struct net_conf *nc;
4626 /* FIXME: Put the challenge/response into the preallocated socket buffer. */
4629 nc = rcu_dereference(tconn->net_conf);
4630 key_len = strlen(nc->shared_secret);
4631 memcpy(secret, nc->shared_secret, key_len);
4634 desc.tfm = tconn->cram_hmac_tfm;
4637 rv = crypto_hash_setkey(tconn->cram_hmac_tfm, (u8 *)secret, key_len);
4639 conn_err(tconn, "crypto_hash_setkey() failed with %d\n", rv);
4644 get_random_bytes(my_challenge, CHALLENGE_LEN);
4646 sock = &tconn->data;
4647 if (!conn_prepare_command(tconn, sock)) {
4651 rv = !conn_send_command(tconn, sock, P_AUTH_CHALLENGE, 0,
4652 my_challenge, CHALLENGE_LEN);
4656 err = drbd_recv_header(tconn, &pi);
4662 if (pi.cmd != P_AUTH_CHALLENGE) {
4663 conn_err(tconn, "expected AuthChallenge packet, received: %s (0x%04x)\n",
4664 cmdname(pi.cmd), pi.cmd);
4669 if (pi.size > CHALLENGE_LEN * 2) {
4670 conn_err(tconn, "expected AuthChallenge payload too big.\n");
4675 peers_ch = kmalloc(pi.size, GFP_NOIO);
4676 if (peers_ch == NULL) {
4677 conn_err(tconn, "kmalloc of peers_ch failed\n");
4682 err = drbd_recv_all_warn(tconn, peers_ch, pi.size);
4688 resp_size = crypto_hash_digestsize(tconn->cram_hmac_tfm);
4689 response = kmalloc(resp_size, GFP_NOIO);
4690 if (response == NULL) {
4691 conn_err(tconn, "kmalloc of response failed\n");
4696 sg_init_table(&sg, 1);
4697 sg_set_buf(&sg, peers_ch, pi.size);
4699 rv = crypto_hash_digest(&desc, &sg, sg.length, response);
4701 conn_err(tconn, "crypto_hash_digest() failed with %d\n", rv);
4706 if (!conn_prepare_command(tconn, sock)) {
4710 rv = !conn_send_command(tconn, sock, P_AUTH_RESPONSE, 0,
4711 response, resp_size);
4715 err = drbd_recv_header(tconn, &pi);
4721 if (pi.cmd != P_AUTH_RESPONSE) {
4722 conn_err(tconn, "expected AuthResponse packet, received: %s (0x%04x)\n",
4723 cmdname(pi.cmd), pi.cmd);
4728 if (pi.size != resp_size) {
4729 conn_err(tconn, "expected AuthResponse payload of wrong size\n");
4734 err = drbd_recv_all_warn(tconn, response , resp_size);
4740 right_response = kmalloc(resp_size, GFP_NOIO);
4741 if (right_response == NULL) {
4742 conn_err(tconn, "kmalloc of right_response failed\n");
4747 sg_set_buf(&sg, my_challenge, CHALLENGE_LEN);
4749 rv = crypto_hash_digest(&desc, &sg, sg.length, right_response);
4751 conn_err(tconn, "crypto_hash_digest() failed with %d\n", rv);
4756 rv = !memcmp(response, right_response, resp_size);
4759 conn_info(tconn, "Peer authenticated using %d bytes HMAC\n",
4767 kfree(right_response);
4773 int drbdd_init(struct drbd_thread *thi)
4775 struct drbd_tconn *tconn = thi->tconn;
4778 conn_info(tconn, "receiver (re)started\n");
4781 h = conn_connect(tconn);
4783 conn_disconnect(tconn);
4784 schedule_timeout_interruptible(HZ);
4787 conn_warn(tconn, "Discarding network configuration.\n");
4788 conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
4795 conn_disconnect(tconn);
4797 conn_info(tconn, "receiver terminated\n");
4801 /* ********* acknowledge sender ******** */
4803 static int got_conn_RqSReply(struct drbd_tconn *tconn, struct packet_info *pi)
4805 struct p_req_state_reply *p = pi->data;
4806 int retcode = be32_to_cpu(p->retcode);
4808 if (retcode >= SS_SUCCESS) {
4809 set_bit(CONN_WD_ST_CHG_OKAY, &tconn->flags);
4811 set_bit(CONN_WD_ST_CHG_FAIL, &tconn->flags);
4812 conn_err(tconn, "Requested state change failed by peer: %s (%d)\n",
4813 drbd_set_st_err_str(retcode), retcode);
4815 wake_up(&tconn->ping_wait);
4820 static int got_RqSReply(struct drbd_tconn *tconn, struct packet_info *pi)
4822 struct drbd_conf *mdev;
4823 struct p_req_state_reply *p = pi->data;
4824 int retcode = be32_to_cpu(p->retcode);
4826 mdev = vnr_to_mdev(tconn, pi->vnr);
4830 if (retcode >= SS_SUCCESS) {
4831 set_bit(CL_ST_CHG_SUCCESS, &mdev->flags);
4833 set_bit(CL_ST_CHG_FAIL, &mdev->flags);
4834 dev_err(DEV, "Requested state change failed by peer: %s (%d)\n",
4835 drbd_set_st_err_str(retcode), retcode);
4837 wake_up(&mdev->state_wait);
4842 static int got_Ping(struct drbd_tconn *tconn, struct packet_info *pi)
4844 return drbd_send_ping_ack(tconn);
4848 static int got_PingAck(struct drbd_tconn *tconn, struct packet_info *pi)
4850 /* restore idle timeout */
4851 tconn->meta.socket->sk->sk_rcvtimeo = tconn->net_conf->ping_int*HZ;
4852 if (!test_and_set_bit(GOT_PING_ACK, &tconn->flags))
4853 wake_up(&tconn->ping_wait);
4858 static int got_IsInSync(struct drbd_tconn *tconn, struct packet_info *pi)
4860 struct drbd_conf *mdev;
4861 struct p_block_ack *p = pi->data;
4862 sector_t sector = be64_to_cpu(p->sector);
4863 int blksize = be32_to_cpu(p->blksize);
4865 mdev = vnr_to_mdev(tconn, pi->vnr);
4869 D_ASSERT(mdev->tconn->agreed_pro_version >= 89);
4871 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4873 if (get_ldev(mdev)) {
4874 drbd_rs_complete_io(mdev, sector);
4875 drbd_set_in_sync(mdev, sector, blksize);
4876 /* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
4877 mdev->rs_same_csum += (blksize >> BM_BLOCK_SHIFT);
4880 dec_rs_pending(mdev);
4881 atomic_add(blksize >> 9, &mdev->rs_sect_in);
4887 validate_req_change_req_state(struct drbd_conf *mdev, u64 id, sector_t sector,
4888 struct rb_root *root, const char *func,
4889 enum drbd_req_event what, bool missing_ok)
4891 struct drbd_request *req;
4892 struct bio_and_error m;
4894 spin_lock_irq(&mdev->tconn->req_lock);
4895 req = find_request(mdev, root, id, sector, missing_ok, func);
4896 if (unlikely(!req)) {
4897 spin_unlock_irq(&mdev->tconn->req_lock);
4900 __req_mod(req, what, &m);
4901 spin_unlock_irq(&mdev->tconn->req_lock);
4904 complete_master_bio(mdev, &m);
4908 static int got_BlockAck(struct drbd_tconn *tconn, struct packet_info *pi)
4910 struct drbd_conf *mdev;
4911 struct p_block_ack *p = pi->data;
4912 sector_t sector = be64_to_cpu(p->sector);
4913 int blksize = be32_to_cpu(p->blksize);
4914 enum drbd_req_event what;
4916 mdev = vnr_to_mdev(tconn, pi->vnr);
4920 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4922 if (p->block_id == ID_SYNCER) {
4923 drbd_set_in_sync(mdev, sector, blksize);
4924 dec_rs_pending(mdev);
4928 case P_RS_WRITE_ACK:
4929 what = WRITE_ACKED_BY_PEER_AND_SIS;
4932 what = WRITE_ACKED_BY_PEER;
4935 what = RECV_ACKED_BY_PEER;
4937 case P_DISCARD_WRITE:
4938 what = DISCARD_WRITE;
4941 what = POSTPONE_WRITE;
4947 return validate_req_change_req_state(mdev, p->block_id, sector,
4948 &mdev->write_requests, __func__,
4952 static int got_NegAck(struct drbd_tconn *tconn, struct packet_info *pi)
4954 struct drbd_conf *mdev;
4955 struct p_block_ack *p = pi->data;
4956 sector_t sector = be64_to_cpu(p->sector);
4957 int size = be32_to_cpu(p->blksize);
4960 mdev = vnr_to_mdev(tconn, pi->vnr);
4964 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4966 if (p->block_id == ID_SYNCER) {
4967 dec_rs_pending(mdev);
4968 drbd_rs_failed_io(mdev, sector, size);
4972 err = validate_req_change_req_state(mdev, p->block_id, sector,
4973 &mdev->write_requests, __func__,
4976 /* Protocol A has no P_WRITE_ACKs, but has P_NEG_ACKs.
4977 The master bio might already be completed, therefore the
4978 request is no longer in the collision hash. */
4979 /* In Protocol B we might already have got a P_RECV_ACK
4980 but then get a P_NEG_ACK afterwards. */
4981 drbd_set_out_of_sync(mdev, sector, size);
4986 static int got_NegDReply(struct drbd_tconn *tconn, struct packet_info *pi)
4988 struct drbd_conf *mdev;
4989 struct p_block_ack *p = pi->data;
4990 sector_t sector = be64_to_cpu(p->sector);
4992 mdev = vnr_to_mdev(tconn, pi->vnr);
4996 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4998 dev_err(DEV, "Got NegDReply; Sector %llus, len %u.\n",
4999 (unsigned long long)sector, be32_to_cpu(p->blksize));
5001 return validate_req_change_req_state(mdev, p->block_id, sector,
5002 &mdev->read_requests, __func__,
5006 static int got_NegRSDReply(struct drbd_tconn *tconn, struct packet_info *pi)
5008 struct drbd_conf *mdev;
5011 struct p_block_ack *p = pi->data;
5013 mdev = vnr_to_mdev(tconn, pi->vnr);
5017 sector = be64_to_cpu(p->sector);
5018 size = be32_to_cpu(p->blksize);
5020 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
5022 dec_rs_pending(mdev);
5024 if (get_ldev_if_state(mdev, D_FAILED)) {
5025 drbd_rs_complete_io(mdev, sector);
5027 case P_NEG_RS_DREPLY:
5028 drbd_rs_failed_io(mdev, sector, size);
5040 static int got_BarrierAck(struct drbd_tconn *tconn, struct packet_info *pi)
5042 struct drbd_conf *mdev;
5043 struct p_barrier_ack *p = pi->data;
5045 mdev = vnr_to_mdev(tconn, pi->vnr);
5049 tl_release(mdev->tconn, p->barrier, be32_to_cpu(p->set_size));
5051 if (mdev->state.conn == C_AHEAD &&
5052 atomic_read(&mdev->ap_in_flight) == 0 &&
5053 !test_and_set_bit(AHEAD_TO_SYNC_SOURCE, &mdev->flags)) {
5054 mdev->start_resync_timer.expires = jiffies + HZ;
5055 add_timer(&mdev->start_resync_timer);
5061 static int got_OVResult(struct drbd_tconn *tconn, struct packet_info *pi)
5063 struct drbd_conf *mdev;
5064 struct p_block_ack *p = pi->data;
5065 struct drbd_work *w;
5069 mdev = vnr_to_mdev(tconn, pi->vnr);
5073 sector = be64_to_cpu(p->sector);
5074 size = be32_to_cpu(p->blksize);
5076 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
5078 if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC)
5079 drbd_ov_out_of_sync_found(mdev, sector, size);
5081 ov_out_of_sync_print(mdev);
5083 if (!get_ldev(mdev))
5086 drbd_rs_complete_io(mdev, sector);
5087 dec_rs_pending(mdev);
5091 /* let's advance progress step marks only for every other megabyte */
5092 if ((mdev->ov_left & 0x200) == 0x200)
5093 drbd_advance_rs_marks(mdev, mdev->ov_left);
5095 if (mdev->ov_left == 0) {
5096 w = kmalloc(sizeof(*w), GFP_NOIO);
5098 w->cb = w_ov_finished;
5100 drbd_queue_work_front(&mdev->tconn->data.work, w);
5102 dev_err(DEV, "kmalloc(w) failed.");
5103 ov_out_of_sync_print(mdev);
5104 drbd_resync_finished(mdev);
5111 static int got_skip(struct drbd_tconn *tconn, struct packet_info *pi)
5116 static int tconn_finish_peer_reqs(struct drbd_tconn *tconn)
5118 struct drbd_conf *mdev;
5119 int vnr, not_empty = 0;
5122 clear_bit(SIGNAL_ASENDER, &tconn->flags);
5123 flush_signals(current);
5126 idr_for_each_entry(&tconn->volumes, mdev, vnr) {
5127 kref_get(&mdev->kref);
5129 if (drbd_finish_peer_reqs(mdev)) {
5130 kref_put(&mdev->kref, &drbd_minor_destroy);
5133 kref_put(&mdev->kref, &drbd_minor_destroy);
5136 set_bit(SIGNAL_ASENDER, &tconn->flags);
5138 spin_lock_irq(&tconn->req_lock);
5139 idr_for_each_entry(&tconn->volumes, mdev, vnr) {
5140 not_empty = !list_empty(&mdev->done_ee);
5144 spin_unlock_irq(&tconn->req_lock);
5146 } while (not_empty);
5151 struct asender_cmd {
5153 int (*fn)(struct drbd_tconn *tconn, struct packet_info *);
5156 static struct asender_cmd asender_tbl[] = {
5157 [P_PING] = { 0, got_Ping },
5158 [P_PING_ACK] = { 0, got_PingAck },
5159 [P_RECV_ACK] = { sizeof(struct p_block_ack), got_BlockAck },
5160 [P_WRITE_ACK] = { sizeof(struct p_block_ack), got_BlockAck },
5161 [P_RS_WRITE_ACK] = { sizeof(struct p_block_ack), got_BlockAck },
5162 [P_DISCARD_WRITE] = { sizeof(struct p_block_ack), got_BlockAck },
5163 [P_NEG_ACK] = { sizeof(struct p_block_ack), got_NegAck },
5164 [P_NEG_DREPLY] = { sizeof(struct p_block_ack), got_NegDReply },
5165 [P_NEG_RS_DREPLY] = { sizeof(struct p_block_ack), got_NegRSDReply },
5166 [P_OV_RESULT] = { sizeof(struct p_block_ack), got_OVResult },
5167 [P_BARRIER_ACK] = { sizeof(struct p_barrier_ack), got_BarrierAck },
5168 [P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply },
5169 [P_RS_IS_IN_SYNC] = { sizeof(struct p_block_ack), got_IsInSync },
5170 [P_DELAY_PROBE] = { sizeof(struct p_delay_probe93), got_skip },
5171 [P_RS_CANCEL] = { sizeof(struct p_block_ack), got_NegRSDReply },
5172 [P_CONN_ST_CHG_REPLY]={ sizeof(struct p_req_state_reply), got_conn_RqSReply },
5173 [P_RETRY_WRITE] = { sizeof(struct p_block_ack), got_BlockAck },
5176 int drbd_asender(struct drbd_thread *thi)
5178 struct drbd_tconn *tconn = thi->tconn;
5179 struct asender_cmd *cmd = NULL;
5180 struct packet_info pi;
5182 void *buf = tconn->meta.rbuf;
5184 unsigned int header_size = drbd_header_size(tconn);
5185 int expect = header_size;
5186 bool ping_timeout_active = false;
5187 struct net_conf *nc;
5188 int ping_timeo, tcp_cork, ping_int;
5190 current->policy = SCHED_RR; /* Make this a realtime task! */
5191 current->rt_priority = 2; /* more important than all other tasks */
5193 while (get_t_state(thi) == RUNNING) {
5194 drbd_thread_current_set_cpu(thi);
5197 nc = rcu_dereference(tconn->net_conf);
5198 ping_timeo = nc->ping_timeo;
5199 tcp_cork = nc->tcp_cork;
5200 ping_int = nc->ping_int;
5203 if (test_and_clear_bit(SEND_PING, &tconn->flags)) {
5204 if (drbd_send_ping(tconn)) {
5205 conn_err(tconn, "drbd_send_ping has failed\n");
5208 tconn->meta.socket->sk->sk_rcvtimeo = ping_timeo * HZ / 10;
5209 ping_timeout_active = true;
5212 /* TODO: conditionally cork; it may hurt latency if we cork without
5215 drbd_tcp_cork(tconn->meta.socket);
5216 if (tconn_finish_peer_reqs(tconn)) {
5217 conn_err(tconn, "tconn_finish_peer_reqs() failed\n");
5220 /* but unconditionally uncork unless disabled */
5222 drbd_tcp_uncork(tconn->meta.socket);
5224 /* short circuit, recv_msg would return EINTR anyways. */
5225 if (signal_pending(current))
5228 rv = drbd_recv_short(tconn->meta.socket, buf, expect-received, 0);
5229 clear_bit(SIGNAL_ASENDER, &tconn->flags);
5231 flush_signals(current);
5234 * -EINTR (on meta) we got a signal
5235 * -EAGAIN (on meta) rcvtimeo expired
5236 * -ECONNRESET other side closed the connection
5237 * -ERESTARTSYS (on data) we got a signal
5238 * rv < 0 other than above: unexpected error!
5239 * rv == expected: full header or command
5240 * rv < expected: "woken" by signal during receive
5241 * rv == 0 : "connection shut down by peer"
5243 if (likely(rv > 0)) {
5246 } else if (rv == 0) {
5247 conn_err(tconn, "meta connection shut down by peer.\n");
5249 } else if (rv == -EAGAIN) {
5250 /* If the data socket received something meanwhile,
5251 * that is good enough: peer is still alive. */
5252 if (time_after(tconn->last_received,
5253 jiffies - tconn->meta.socket->sk->sk_rcvtimeo))
5255 if (ping_timeout_active) {
5256 conn_err(tconn, "PingAck did not arrive in time.\n");
5259 set_bit(SEND_PING, &tconn->flags);
5261 } else if (rv == -EINTR) {
5264 conn_err(tconn, "sock_recvmsg returned %d\n", rv);
5268 if (received == expect && cmd == NULL) {
5269 if (decode_header(tconn, tconn->meta.rbuf, &pi))
5271 cmd = &asender_tbl[pi.cmd];
5272 if (pi.cmd >= ARRAY_SIZE(asender_tbl) || !cmd->fn) {
5273 conn_err(tconn, "Unexpected meta packet %s (0x%04x)\n",
5274 cmdname(pi.cmd), pi.cmd);
5277 expect = header_size + cmd->pkt_size;
5278 if (pi.size != expect - header_size) {
5279 conn_err(tconn, "Wrong packet size on meta (c: %d, l: %d)\n",
5284 if (received == expect) {
5287 err = cmd->fn(tconn, &pi);
5289 conn_err(tconn, "%pf failed\n", cmd->fn);
5293 tconn->last_received = jiffies;
5295 if (cmd == &asender_tbl[P_PING_ACK]) {
5296 /* restore idle timeout */
5297 tconn->meta.socket->sk->sk_rcvtimeo = ping_int * HZ;
5298 ping_timeout_active = false;
5301 buf = tconn->meta.rbuf;
5303 expect = header_size;
5310 conn_request_state(tconn, NS(conn, C_NETWORK_FAILURE), CS_HARD);
5314 conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
5316 clear_bit(SIGNAL_ASENDER, &tconn->flags);
5318 conn_info(tconn, "asender terminated\n");