dlm: try other IPs when sctp init assoc fails
[platform/adaptation/renesas_rcar/renesas_kernel.git] / fs / dlm / lowcomms.c
1 /******************************************************************************
2 *******************************************************************************
3 **
4 **  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
5 **  Copyright (C) 2004-2009 Red Hat, Inc.  All rights reserved.
6 **
7 **  This copyrighted material is made available to anyone wishing to use,
8 **  modify, copy, or redistribute it subject to the terms and conditions
9 **  of the GNU General Public License v.2.
10 **
11 *******************************************************************************
12 ******************************************************************************/
13
14 /*
15  * lowcomms.c
16  *
17  * This is the "low-level" comms layer.
18  *
19  * It is responsible for sending/receiving messages
20  * from other nodes in the cluster.
21  *
22  * Cluster nodes are referred to by their nodeids. nodeids are
23  * simply 32 bit numbers to the locking module - if they need to
24  * be expanded for the cluster infrastructure then that is its
25  * responsibility. It is this layer's
26  * responsibility to resolve these into IP address or
27  * whatever it needs for inter-node communication.
28  *
29  * The comms level is two kernel threads that deal mainly with
30  * the receiving of messages from other nodes and passing them
31  * up to the mid-level comms layer (which understands the
32  * message format) for execution by the locking core, and
33  * a send thread which does all the setting up of connections
34  * to remote nodes and the sending of data. Threads are not allowed
35  * to send their own data because it may cause them to wait in times
36  * of high load. Also, this way, the sending thread can collect together
37  * messages bound for one node and send them in one block.
38  *
39  * lowcomms will choose to use either TCP or SCTP as its transport layer
40  * depending on the configuration variable 'protocol'. This should be set
41  * to 0 (default) for TCP or 1 for SCTP. It should be configured using a
42  * cluster-wide mechanism as it must be the same on all nodes of the cluster
43  * for the DLM to function.
44  *
45  */
46
47 #include <asm/ioctls.h>
48 #include <net/sock.h>
49 #include <net/tcp.h>
50 #include <linux/pagemap.h>
51 #include <linux/file.h>
52 #include <linux/mutex.h>
53 #include <linux/sctp.h>
54 #include <linux/slab.h>
55 #include <linux/sctp.h>
56 #include <net/sctp/sctp.h>
57 #include <net/ipv6.h>
58
59 #include "dlm_internal.h"
60 #include "lowcomms.h"
61 #include "midcomms.h"
62 #include "config.h"
63
64 #define NEEDED_RMEM (4*1024*1024)
65 #define CONN_HASH_SIZE 32
66
67 /* Number of messages to send before rescheduling */
68 #define MAX_SEND_MSG_COUNT 25
69
70 struct cbuf {
71         unsigned int base;
72         unsigned int len;
73         unsigned int mask;
74 };
75
76 static void cbuf_add(struct cbuf *cb, int n)
77 {
78         cb->len += n;
79 }
80
81 static int cbuf_data(struct cbuf *cb)
82 {
83         return ((cb->base + cb->len) & cb->mask);
84 }
85
86 static void cbuf_init(struct cbuf *cb, int size)
87 {
88         cb->base = cb->len = 0;
89         cb->mask = size-1;
90 }
91
92 static void cbuf_eat(struct cbuf *cb, int n)
93 {
94         cb->len  -= n;
95         cb->base += n;
96         cb->base &= cb->mask;
97 }
98
99 static bool cbuf_empty(struct cbuf *cb)
100 {
101         return cb->len == 0;
102 }
103
104 struct connection {
105         struct socket *sock;    /* NULL if not connected */
106         uint32_t nodeid;        /* So we know who we are in the list */
107         struct mutex sock_mutex;
108         unsigned long flags;
109 #define CF_READ_PENDING 1
110 #define CF_WRITE_PENDING 2
111 #define CF_CONNECT_PENDING 3
112 #define CF_INIT_PENDING 4
113 #define CF_IS_OTHERCON 5
114 #define CF_CLOSE 6
115 #define CF_APP_LIMITED 7
116         struct list_head writequeue;  /* List of outgoing writequeue_entries */
117         spinlock_t writequeue_lock;
118         int (*rx_action) (struct connection *); /* What to do when active */
119         void (*connect_action) (struct connection *);   /* What to do to connect */
120         struct page *rx_page;
121         struct cbuf cb;
122         int retries;
123 #define MAX_CONNECT_RETRIES 3
124         int sctp_assoc;
125         struct hlist_node list;
126         struct connection *othercon;
127         struct work_struct rwork; /* Receive workqueue */
128         struct work_struct swork; /* Send workqueue */
129         bool try_new_addr;
130 };
131 #define sock2con(x) ((struct connection *)(x)->sk_user_data)
132
133 /* An entry waiting to be sent */
134 struct writequeue_entry {
135         struct list_head list;
136         struct page *page;
137         int offset;
138         int len;
139         int end;
140         int users;
141         struct connection *con;
142 };
143
144 struct dlm_node_addr {
145         struct list_head list;
146         int nodeid;
147         int addr_count;
148         int curr_addr_index;
149         struct sockaddr_storage *addr[DLM_MAX_ADDR_COUNT];
150 };
151
152 static LIST_HEAD(dlm_node_addrs);
153 static DEFINE_SPINLOCK(dlm_node_addrs_spin);
154
155 static struct sockaddr_storage *dlm_local_addr[DLM_MAX_ADDR_COUNT];
156 static int dlm_local_count;
157 static int dlm_allow_conn;
158
159 /* Work queues */
160 static struct workqueue_struct *recv_workqueue;
161 static struct workqueue_struct *send_workqueue;
162
163 static struct hlist_head connection_hash[CONN_HASH_SIZE];
164 static DEFINE_MUTEX(connections_lock);
165 static struct kmem_cache *con_cache;
166
167 static void process_recv_sockets(struct work_struct *work);
168 static void process_send_sockets(struct work_struct *work);
169
170
171 /* This is deliberately very simple because most clusters have simple
172    sequential nodeids, so we should be able to go straight to a connection
173    struct in the array */
174 static inline int nodeid_hash(int nodeid)
175 {
176         return nodeid & (CONN_HASH_SIZE-1);
177 }
178
179 static struct connection *__find_con(int nodeid)
180 {
181         int r;
182         struct connection *con;
183
184         r = nodeid_hash(nodeid);
185
186         hlist_for_each_entry(con, &connection_hash[r], list) {
187                 if (con->nodeid == nodeid)
188                         return con;
189         }
190         return NULL;
191 }
192
193 /*
194  * If 'allocation' is zero then we don't attempt to create a new
195  * connection structure for this node.
196  */
197 static struct connection *__nodeid2con(int nodeid, gfp_t alloc)
198 {
199         struct connection *con = NULL;
200         int r;
201
202         con = __find_con(nodeid);
203         if (con || !alloc)
204                 return con;
205
206         con = kmem_cache_zalloc(con_cache, alloc);
207         if (!con)
208                 return NULL;
209
210         r = nodeid_hash(nodeid);
211         hlist_add_head(&con->list, &connection_hash[r]);
212
213         con->nodeid = nodeid;
214         mutex_init(&con->sock_mutex);
215         INIT_LIST_HEAD(&con->writequeue);
216         spin_lock_init(&con->writequeue_lock);
217         INIT_WORK(&con->swork, process_send_sockets);
218         INIT_WORK(&con->rwork, process_recv_sockets);
219
220         /* Setup action pointers for child sockets */
221         if (con->nodeid) {
222                 struct connection *zerocon = __find_con(0);
223
224                 con->connect_action = zerocon->connect_action;
225                 if (!con->rx_action)
226                         con->rx_action = zerocon->rx_action;
227         }
228
229         return con;
230 }
231
232 /* Loop round all connections */
233 static void foreach_conn(void (*conn_func)(struct connection *c))
234 {
235         int i;
236         struct hlist_node *n;
237         struct connection *con;
238
239         for (i = 0; i < CONN_HASH_SIZE; i++) {
240                 hlist_for_each_entry_safe(con, n, &connection_hash[i], list)
241                         conn_func(con);
242         }
243 }
244
245 static struct connection *nodeid2con(int nodeid, gfp_t allocation)
246 {
247         struct connection *con;
248
249         mutex_lock(&connections_lock);
250         con = __nodeid2con(nodeid, allocation);
251         mutex_unlock(&connections_lock);
252
253         return con;
254 }
255
256 /* This is a bit drastic, but only called when things go wrong */
257 static struct connection *assoc2con(int assoc_id)
258 {
259         int i;
260         struct connection *con;
261
262         mutex_lock(&connections_lock);
263
264         for (i = 0 ; i < CONN_HASH_SIZE; i++) {
265                 hlist_for_each_entry(con, &connection_hash[i], list) {
266                         if (con->sctp_assoc == assoc_id) {
267                                 mutex_unlock(&connections_lock);
268                                 return con;
269                         }
270                 }
271         }
272         mutex_unlock(&connections_lock);
273         return NULL;
274 }
275
276 static struct dlm_node_addr *find_node_addr(int nodeid)
277 {
278         struct dlm_node_addr *na;
279
280         list_for_each_entry(na, &dlm_node_addrs, list) {
281                 if (na->nodeid == nodeid)
282                         return na;
283         }
284         return NULL;
285 }
286
287 static int addr_compare(struct sockaddr_storage *x, struct sockaddr_storage *y)
288 {
289         switch (x->ss_family) {
290         case AF_INET: {
291                 struct sockaddr_in *sinx = (struct sockaddr_in *)x;
292                 struct sockaddr_in *siny = (struct sockaddr_in *)y;
293                 if (sinx->sin_addr.s_addr != siny->sin_addr.s_addr)
294                         return 0;
295                 if (sinx->sin_port != siny->sin_port)
296                         return 0;
297                 break;
298         }
299         case AF_INET6: {
300                 struct sockaddr_in6 *sinx = (struct sockaddr_in6 *)x;
301                 struct sockaddr_in6 *siny = (struct sockaddr_in6 *)y;
302                 if (!ipv6_addr_equal(&sinx->sin6_addr, &siny->sin6_addr))
303                         return 0;
304                 if (sinx->sin6_port != siny->sin6_port)
305                         return 0;
306                 break;
307         }
308         default:
309                 return 0;
310         }
311         return 1;
312 }
313
314 static int nodeid_to_addr(int nodeid, struct sockaddr_storage *sas_out,
315                           struct sockaddr *sa_out, bool try_new_addr)
316 {
317         struct sockaddr_storage sas;
318         struct dlm_node_addr *na;
319
320         if (!dlm_local_count)
321                 return -1;
322
323         spin_lock(&dlm_node_addrs_spin);
324         na = find_node_addr(nodeid);
325         if (na && na->addr_count) {
326                 if (try_new_addr) {
327                         na->curr_addr_index++;
328                         if (na->curr_addr_index == na->addr_count)
329                                 na->curr_addr_index = 0;
330                 }
331
332                 memcpy(&sas, na->addr[na->curr_addr_index ],
333                         sizeof(struct sockaddr_storage));
334         }
335         spin_unlock(&dlm_node_addrs_spin);
336
337         if (!na)
338                 return -EEXIST;
339
340         if (!na->addr_count)
341                 return -ENOENT;
342
343         if (sas_out)
344                 memcpy(sas_out, &sas, sizeof(struct sockaddr_storage));
345
346         if (!sa_out)
347                 return 0;
348
349         if (dlm_local_addr[0]->ss_family == AF_INET) {
350                 struct sockaddr_in *in4  = (struct sockaddr_in *) &sas;
351                 struct sockaddr_in *ret4 = (struct sockaddr_in *) sa_out;
352                 ret4->sin_addr.s_addr = in4->sin_addr.s_addr;
353         } else {
354                 struct sockaddr_in6 *in6  = (struct sockaddr_in6 *) &sas;
355                 struct sockaddr_in6 *ret6 = (struct sockaddr_in6 *) sa_out;
356                 ret6->sin6_addr = in6->sin6_addr;
357         }
358
359         return 0;
360 }
361
362 static int addr_to_nodeid(struct sockaddr_storage *addr, int *nodeid)
363 {
364         struct dlm_node_addr *na;
365         int rv = -EEXIST;
366         int addr_i;
367
368         spin_lock(&dlm_node_addrs_spin);
369         list_for_each_entry(na, &dlm_node_addrs, list) {
370                 if (!na->addr_count)
371                         continue;
372
373                 for (addr_i = 0; addr_i < na->addr_count; addr_i++) {
374                         if (addr_compare(na->addr[addr_i], addr)) {
375                                 *nodeid = na->nodeid;
376                                 rv = 0;
377                                 goto unlock;
378                         }
379                 }
380         }
381 unlock:
382         spin_unlock(&dlm_node_addrs_spin);
383         return rv;
384 }
385
386 int dlm_lowcomms_addr(int nodeid, struct sockaddr_storage *addr, int len)
387 {
388         struct sockaddr_storage *new_addr;
389         struct dlm_node_addr *new_node, *na;
390
391         new_node = kzalloc(sizeof(struct dlm_node_addr), GFP_NOFS);
392         if (!new_node)
393                 return -ENOMEM;
394
395         new_addr = kzalloc(sizeof(struct sockaddr_storage), GFP_NOFS);
396         if (!new_addr) {
397                 kfree(new_node);
398                 return -ENOMEM;
399         }
400
401         memcpy(new_addr, addr, len);
402
403         spin_lock(&dlm_node_addrs_spin);
404         na = find_node_addr(nodeid);
405         if (!na) {
406                 new_node->nodeid = nodeid;
407                 new_node->addr[0] = new_addr;
408                 new_node->addr_count = 1;
409                 list_add(&new_node->list, &dlm_node_addrs);
410                 spin_unlock(&dlm_node_addrs_spin);
411                 return 0;
412         }
413
414         if (na->addr_count >= DLM_MAX_ADDR_COUNT) {
415                 spin_unlock(&dlm_node_addrs_spin);
416                 kfree(new_addr);
417                 kfree(new_node);
418                 return -ENOSPC;
419         }
420
421         na->addr[na->addr_count++] = new_addr;
422         spin_unlock(&dlm_node_addrs_spin);
423         kfree(new_node);
424         return 0;
425 }
426
427 /* Data available on socket or listen socket received a connect */
428 static void lowcomms_data_ready(struct sock *sk, int count_unused)
429 {
430         struct connection *con = sock2con(sk);
431         if (con && !test_and_set_bit(CF_READ_PENDING, &con->flags))
432                 queue_work(recv_workqueue, &con->rwork);
433 }
434
435 static void lowcomms_write_space(struct sock *sk)
436 {
437         struct connection *con = sock2con(sk);
438
439         if (!con)
440                 return;
441
442         clear_bit(SOCK_NOSPACE, &con->sock->flags);
443
444         if (test_and_clear_bit(CF_APP_LIMITED, &con->flags)) {
445                 con->sock->sk->sk_write_pending--;
446                 clear_bit(SOCK_ASYNC_NOSPACE, &con->sock->flags);
447         }
448
449         if (!test_and_set_bit(CF_WRITE_PENDING, &con->flags))
450                 queue_work(send_workqueue, &con->swork);
451 }
452
453 static inline void lowcomms_connect_sock(struct connection *con)
454 {
455         if (test_bit(CF_CLOSE, &con->flags))
456                 return;
457         if (!test_and_set_bit(CF_CONNECT_PENDING, &con->flags))
458                 queue_work(send_workqueue, &con->swork);
459 }
460
461 static void lowcomms_state_change(struct sock *sk)
462 {
463         if (sk->sk_state == TCP_ESTABLISHED)
464                 lowcomms_write_space(sk);
465 }
466
467 int dlm_lowcomms_connect_node(int nodeid)
468 {
469         struct connection *con;
470
471         /* with sctp there's no connecting without sending */
472         if (dlm_config.ci_protocol != 0)
473                 return 0;
474
475         if (nodeid == dlm_our_nodeid())
476                 return 0;
477
478         con = nodeid2con(nodeid, GFP_NOFS);
479         if (!con)
480                 return -ENOMEM;
481         lowcomms_connect_sock(con);
482         return 0;
483 }
484
485 /* Make a socket active */
486 static void add_sock(struct socket *sock, struct connection *con)
487 {
488         con->sock = sock;
489
490         /* Install a data_ready callback */
491         con->sock->sk->sk_data_ready = lowcomms_data_ready;
492         con->sock->sk->sk_write_space = lowcomms_write_space;
493         con->sock->sk->sk_state_change = lowcomms_state_change;
494         con->sock->sk->sk_user_data = con;
495         con->sock->sk->sk_allocation = GFP_NOFS;
496 }
497
498 /* Add the port number to an IPv6 or 4 sockaddr and return the address
499    length */
500 static void make_sockaddr(struct sockaddr_storage *saddr, uint16_t port,
501                           int *addr_len)
502 {
503         saddr->ss_family =  dlm_local_addr[0]->ss_family;
504         if (saddr->ss_family == AF_INET) {
505                 struct sockaddr_in *in4_addr = (struct sockaddr_in *)saddr;
506                 in4_addr->sin_port = cpu_to_be16(port);
507                 *addr_len = sizeof(struct sockaddr_in);
508                 memset(&in4_addr->sin_zero, 0, sizeof(in4_addr->sin_zero));
509         } else {
510                 struct sockaddr_in6 *in6_addr = (struct sockaddr_in6 *)saddr;
511                 in6_addr->sin6_port = cpu_to_be16(port);
512                 *addr_len = sizeof(struct sockaddr_in6);
513         }
514         memset((char *)saddr + *addr_len, 0, sizeof(struct sockaddr_storage) - *addr_len);
515 }
516
517 /* Close a remote connection and tidy up */
518 static void close_connection(struct connection *con, bool and_other)
519 {
520         mutex_lock(&con->sock_mutex);
521
522         if (con->sock) {
523                 sock_release(con->sock);
524                 con->sock = NULL;
525         }
526         if (con->othercon && and_other) {
527                 /* Will only re-enter once. */
528                 close_connection(con->othercon, false);
529         }
530         if (con->rx_page) {
531                 __free_page(con->rx_page);
532                 con->rx_page = NULL;
533         }
534
535         con->retries = 0;
536         mutex_unlock(&con->sock_mutex);
537 }
538
539 /* We only send shutdown messages to nodes that are not part of the cluster */
540 static void sctp_send_shutdown(sctp_assoc_t associd)
541 {
542         static char outcmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))];
543         struct msghdr outmessage;
544         struct cmsghdr *cmsg;
545         struct sctp_sndrcvinfo *sinfo;
546         int ret;
547         struct connection *con;
548
549         con = nodeid2con(0,0);
550         BUG_ON(con == NULL);
551
552         outmessage.msg_name = NULL;
553         outmessage.msg_namelen = 0;
554         outmessage.msg_control = outcmsg;
555         outmessage.msg_controllen = sizeof(outcmsg);
556         outmessage.msg_flags = MSG_EOR;
557
558         cmsg = CMSG_FIRSTHDR(&outmessage);
559         cmsg->cmsg_level = IPPROTO_SCTP;
560         cmsg->cmsg_type = SCTP_SNDRCV;
561         cmsg->cmsg_len = CMSG_LEN(sizeof(struct sctp_sndrcvinfo));
562         outmessage.msg_controllen = cmsg->cmsg_len;
563         sinfo = CMSG_DATA(cmsg);
564         memset(sinfo, 0x00, sizeof(struct sctp_sndrcvinfo));
565
566         sinfo->sinfo_flags |= MSG_EOF;
567         sinfo->sinfo_assoc_id = associd;
568
569         ret = kernel_sendmsg(con->sock, &outmessage, NULL, 0, 0);
570
571         if (ret != 0)
572                 log_print("send EOF to node failed: %d", ret);
573 }
574
575 static void sctp_init_failed_foreach(struct connection *con)
576 {
577
578         /*
579          * Don't try to recover base con and handle race where the
580          * other node's assoc init creates a assoc and we get that
581          * notification, then we get a notification that our attempt
582          * failed due. This happens when we are still trying the primary
583          * address, but the other node has already tried secondary addrs
584          * and found one that worked.
585          */
586         if (!con->nodeid || con->sctp_assoc)
587                 return;
588
589         log_print("Retrying SCTP association init for node %d\n", con->nodeid);
590
591         con->try_new_addr = true;
592         con->sctp_assoc = 0;
593         if (test_and_clear_bit(CF_INIT_PENDING, &con->flags)) {
594                 if (!test_and_set_bit(CF_WRITE_PENDING, &con->flags))
595                         queue_work(send_workqueue, &con->swork);
596         }
597 }
598
599 /* INIT failed but we don't know which node...
600    restart INIT on all pending nodes */
601 static void sctp_init_failed(void)
602 {
603         mutex_lock(&connections_lock);
604
605         foreach_conn(sctp_init_failed_foreach);
606
607         mutex_unlock(&connections_lock);
608 }
609
610 /* Something happened to an association */
611 static void process_sctp_notification(struct connection *con,
612                                       struct msghdr *msg, char *buf)
613 {
614         union sctp_notification *sn = (union sctp_notification *)buf;
615
616         if (sn->sn_header.sn_type == SCTP_ASSOC_CHANGE) {
617                 switch (sn->sn_assoc_change.sac_state) {
618
619                 case SCTP_COMM_UP:
620                 case SCTP_RESTART:
621                 {
622                         /* Check that the new node is in the lockspace */
623                         struct sctp_prim prim;
624                         int nodeid;
625                         int prim_len, ret;
626                         int addr_len;
627                         struct connection *new_con;
628
629                         /*
630                          * We get this before any data for an association.
631                          * We verify that the node is in the cluster and
632                          * then peel off a socket for it.
633                          */
634                         if ((int)sn->sn_assoc_change.sac_assoc_id <= 0) {
635                                 log_print("COMM_UP for invalid assoc ID %d",
636                                          (int)sn->sn_assoc_change.sac_assoc_id);
637                                 sctp_init_failed();
638                                 return;
639                         }
640                         memset(&prim, 0, sizeof(struct sctp_prim));
641                         prim_len = sizeof(struct sctp_prim);
642                         prim.ssp_assoc_id = sn->sn_assoc_change.sac_assoc_id;
643
644                         ret = kernel_getsockopt(con->sock,
645                                                 IPPROTO_SCTP,
646                                                 SCTP_PRIMARY_ADDR,
647                                                 (char*)&prim,
648                                                 &prim_len);
649                         if (ret < 0) {
650                                 log_print("getsockopt/sctp_primary_addr on "
651                                           "new assoc %d failed : %d",
652                                           (int)sn->sn_assoc_change.sac_assoc_id,
653                                           ret);
654
655                                 /* Retry INIT later */
656                                 new_con = assoc2con(sn->sn_assoc_change.sac_assoc_id);
657                                 if (new_con)
658                                         clear_bit(CF_CONNECT_PENDING, &con->flags);
659                                 return;
660                         }
661                         make_sockaddr(&prim.ssp_addr, 0, &addr_len);
662                         if (addr_to_nodeid(&prim.ssp_addr, &nodeid)) {
663                                 unsigned char *b=(unsigned char *)&prim.ssp_addr;
664                                 log_print("reject connect from unknown addr");
665                                 print_hex_dump_bytes("ss: ", DUMP_PREFIX_NONE, 
666                                                      b, sizeof(struct sockaddr_storage));
667                                 sctp_send_shutdown(prim.ssp_assoc_id);
668                                 return;
669                         }
670
671                         new_con = nodeid2con(nodeid, GFP_NOFS);
672                         if (!new_con)
673                                 return;
674
675                         /* Peel off a new sock */
676                         sctp_lock_sock(con->sock->sk);
677                         ret = sctp_do_peeloff(con->sock->sk,
678                                 sn->sn_assoc_change.sac_assoc_id,
679                                 &new_con->sock);
680                         sctp_release_sock(con->sock->sk);
681                         if (ret < 0) {
682                                 log_print("Can't peel off a socket for "
683                                           "connection %d to node %d: err=%d",
684                                           (int)sn->sn_assoc_change.sac_assoc_id,
685                                           nodeid, ret);
686                                 return;
687                         }
688                         add_sock(new_con->sock, new_con);
689
690                         log_print("connecting to %d sctp association %d",
691                                  nodeid, (int)sn->sn_assoc_change.sac_assoc_id);
692
693                         new_con->sctp_assoc = sn->sn_assoc_change.sac_assoc_id;
694                         new_con->try_new_addr = false;
695                         /* Send any pending writes */
696                         clear_bit(CF_CONNECT_PENDING, &new_con->flags);
697                         clear_bit(CF_INIT_PENDING, &new_con->flags);
698                         if (!test_and_set_bit(CF_WRITE_PENDING, &new_con->flags)) {
699                                 queue_work(send_workqueue, &new_con->swork);
700                         }
701                         if (!test_and_set_bit(CF_READ_PENDING, &new_con->flags))
702                                 queue_work(recv_workqueue, &new_con->rwork);
703                 }
704                 break;
705
706                 case SCTP_COMM_LOST:
707                 case SCTP_SHUTDOWN_COMP:
708                 {
709                         con = assoc2con(sn->sn_assoc_change.sac_assoc_id);
710                         if (con) {
711                                 con->sctp_assoc = 0;
712                         }
713                 }
714                 break;
715
716                 /* We don't know which INIT failed, so clear the PENDING flags
717                  * on them all.  if assoc_id is zero then it will then try
718                  * again */
719
720                 case SCTP_CANT_STR_ASSOC:
721                 {
722                         log_print("Can't start SCTP association - retrying");
723                         sctp_init_failed();
724                 }
725                 break;
726
727                 default:
728                         log_print("unexpected SCTP assoc change id=%d state=%d",
729                                   (int)sn->sn_assoc_change.sac_assoc_id,
730                                   sn->sn_assoc_change.sac_state);
731                 }
732         }
733 }
734
735 /* Data received from remote end */
736 static int receive_from_sock(struct connection *con)
737 {
738         int ret = 0;
739         struct msghdr msg = {};
740         struct kvec iov[2];
741         unsigned len;
742         int r;
743         int call_again_soon = 0;
744         int nvec;
745         char incmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))];
746
747         mutex_lock(&con->sock_mutex);
748
749         if (con->sock == NULL) {
750                 ret = -EAGAIN;
751                 goto out_close;
752         }
753
754         if (con->rx_page == NULL) {
755                 /*
756                  * This doesn't need to be atomic, but I think it should
757                  * improve performance if it is.
758                  */
759                 con->rx_page = alloc_page(GFP_ATOMIC);
760                 if (con->rx_page == NULL)
761                         goto out_resched;
762                 cbuf_init(&con->cb, PAGE_CACHE_SIZE);
763         }
764
765         /* Only SCTP needs these really */
766         memset(&incmsg, 0, sizeof(incmsg));
767         msg.msg_control = incmsg;
768         msg.msg_controllen = sizeof(incmsg);
769
770         /*
771          * iov[0] is the bit of the circular buffer between the current end
772          * point (cb.base + cb.len) and the end of the buffer.
773          */
774         iov[0].iov_len = con->cb.base - cbuf_data(&con->cb);
775         iov[0].iov_base = page_address(con->rx_page) + cbuf_data(&con->cb);
776         iov[1].iov_len = 0;
777         nvec = 1;
778
779         /*
780          * iov[1] is the bit of the circular buffer between the start of the
781          * buffer and the start of the currently used section (cb.base)
782          */
783         if (cbuf_data(&con->cb) >= con->cb.base) {
784                 iov[0].iov_len = PAGE_CACHE_SIZE - cbuf_data(&con->cb);
785                 iov[1].iov_len = con->cb.base;
786                 iov[1].iov_base = page_address(con->rx_page);
787                 nvec = 2;
788         }
789         len = iov[0].iov_len + iov[1].iov_len;
790
791         r = ret = kernel_recvmsg(con->sock, &msg, iov, nvec, len,
792                                MSG_DONTWAIT | MSG_NOSIGNAL);
793         if (ret <= 0)
794                 goto out_close;
795
796         /* Process SCTP notifications */
797         if (msg.msg_flags & MSG_NOTIFICATION) {
798                 msg.msg_control = incmsg;
799                 msg.msg_controllen = sizeof(incmsg);
800
801                 process_sctp_notification(con, &msg,
802                                 page_address(con->rx_page) + con->cb.base);
803                 mutex_unlock(&con->sock_mutex);
804                 return 0;
805         }
806         BUG_ON(con->nodeid == 0);
807
808         if (ret == len)
809                 call_again_soon = 1;
810         cbuf_add(&con->cb, ret);
811         ret = dlm_process_incoming_buffer(con->nodeid,
812                                           page_address(con->rx_page),
813                                           con->cb.base, con->cb.len,
814                                           PAGE_CACHE_SIZE);
815         if (ret == -EBADMSG) {
816                 log_print("lowcomms: addr=%p, base=%u, len=%u, "
817                           "iov_len=%u, iov_base[0]=%p, read=%d",
818                           page_address(con->rx_page), con->cb.base, con->cb.len,
819                           len, iov[0].iov_base, r);
820         }
821         if (ret < 0)
822                 goto out_close;
823         cbuf_eat(&con->cb, ret);
824
825         if (cbuf_empty(&con->cb) && !call_again_soon) {
826                 __free_page(con->rx_page);
827                 con->rx_page = NULL;
828         }
829
830         if (call_again_soon)
831                 goto out_resched;
832         mutex_unlock(&con->sock_mutex);
833         return 0;
834
835 out_resched:
836         if (!test_and_set_bit(CF_READ_PENDING, &con->flags))
837                 queue_work(recv_workqueue, &con->rwork);
838         mutex_unlock(&con->sock_mutex);
839         return -EAGAIN;
840
841 out_close:
842         mutex_unlock(&con->sock_mutex);
843         if (ret != -EAGAIN) {
844                 close_connection(con, false);
845                 /* Reconnect when there is something to send */
846         }
847         /* Don't return success if we really got EOF */
848         if (ret == 0)
849                 ret = -EAGAIN;
850
851         return ret;
852 }
853
854 /* Listening socket is busy, accept a connection */
855 static int tcp_accept_from_sock(struct connection *con)
856 {
857         int result;
858         struct sockaddr_storage peeraddr;
859         struct socket *newsock;
860         int len;
861         int nodeid;
862         struct connection *newcon;
863         struct connection *addcon;
864
865         mutex_lock(&connections_lock);
866         if (!dlm_allow_conn) {
867                 mutex_unlock(&connections_lock);
868                 return -1;
869         }
870         mutex_unlock(&connections_lock);
871
872         memset(&peeraddr, 0, sizeof(peeraddr));
873         result = sock_create_kern(dlm_local_addr[0]->ss_family, SOCK_STREAM,
874                                   IPPROTO_TCP, &newsock);
875         if (result < 0)
876                 return -ENOMEM;
877
878         mutex_lock_nested(&con->sock_mutex, 0);
879
880         result = -ENOTCONN;
881         if (con->sock == NULL)
882                 goto accept_err;
883
884         newsock->type = con->sock->type;
885         newsock->ops = con->sock->ops;
886
887         result = con->sock->ops->accept(con->sock, newsock, O_NONBLOCK);
888         if (result < 0)
889                 goto accept_err;
890
891         /* Get the connected socket's peer */
892         memset(&peeraddr, 0, sizeof(peeraddr));
893         if (newsock->ops->getname(newsock, (struct sockaddr *)&peeraddr,
894                                   &len, 2)) {
895                 result = -ECONNABORTED;
896                 goto accept_err;
897         }
898
899         /* Get the new node's NODEID */
900         make_sockaddr(&peeraddr, 0, &len);
901         if (addr_to_nodeid(&peeraddr, &nodeid)) {
902                 unsigned char *b=(unsigned char *)&peeraddr;
903                 log_print("connect from non cluster node");
904                 print_hex_dump_bytes("ss: ", DUMP_PREFIX_NONE, 
905                                      b, sizeof(struct sockaddr_storage));
906                 sock_release(newsock);
907                 mutex_unlock(&con->sock_mutex);
908                 return -1;
909         }
910
911         log_print("got connection from %d", nodeid);
912
913         /*  Check to see if we already have a connection to this node. This
914          *  could happen if the two nodes initiate a connection at roughly
915          *  the same time and the connections cross on the wire.
916          *  In this case we store the incoming one in "othercon"
917          */
918         newcon = nodeid2con(nodeid, GFP_NOFS);
919         if (!newcon) {
920                 result = -ENOMEM;
921                 goto accept_err;
922         }
923         mutex_lock_nested(&newcon->sock_mutex, 1);
924         if (newcon->sock) {
925                 struct connection *othercon = newcon->othercon;
926
927                 if (!othercon) {
928                         othercon = kmem_cache_zalloc(con_cache, GFP_NOFS);
929                         if (!othercon) {
930                                 log_print("failed to allocate incoming socket");
931                                 mutex_unlock(&newcon->sock_mutex);
932                                 result = -ENOMEM;
933                                 goto accept_err;
934                         }
935                         othercon->nodeid = nodeid;
936                         othercon->rx_action = receive_from_sock;
937                         mutex_init(&othercon->sock_mutex);
938                         INIT_WORK(&othercon->swork, process_send_sockets);
939                         INIT_WORK(&othercon->rwork, process_recv_sockets);
940                         set_bit(CF_IS_OTHERCON, &othercon->flags);
941                 }
942                 if (!othercon->sock) {
943                         newcon->othercon = othercon;
944                         othercon->sock = newsock;
945                         newsock->sk->sk_user_data = othercon;
946                         add_sock(newsock, othercon);
947                         addcon = othercon;
948                 }
949                 else {
950                         printk("Extra connection from node %d attempted\n", nodeid);
951                         result = -EAGAIN;
952                         mutex_unlock(&newcon->sock_mutex);
953                         goto accept_err;
954                 }
955         }
956         else {
957                 newsock->sk->sk_user_data = newcon;
958                 newcon->rx_action = receive_from_sock;
959                 add_sock(newsock, newcon);
960                 addcon = newcon;
961         }
962
963         mutex_unlock(&newcon->sock_mutex);
964
965         /*
966          * Add it to the active queue in case we got data
967          * between processing the accept adding the socket
968          * to the read_sockets list
969          */
970         if (!test_and_set_bit(CF_READ_PENDING, &addcon->flags))
971                 queue_work(recv_workqueue, &addcon->rwork);
972         mutex_unlock(&con->sock_mutex);
973
974         return 0;
975
976 accept_err:
977         mutex_unlock(&con->sock_mutex);
978         sock_release(newsock);
979
980         if (result != -EAGAIN)
981                 log_print("error accepting connection from node: %d", result);
982         return result;
983 }
984
985 static void free_entry(struct writequeue_entry *e)
986 {
987         __free_page(e->page);
988         kfree(e);
989 }
990
991 /* Initiate an SCTP association.
992    This is a special case of send_to_sock() in that we don't yet have a
993    peeled-off socket for this association, so we use the listening socket
994    and add the primary IP address of the remote node.
995  */
996 static void sctp_init_assoc(struct connection *con)
997 {
998         struct sockaddr_storage rem_addr;
999         char outcmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))];
1000         struct msghdr outmessage;
1001         struct cmsghdr *cmsg;
1002         struct sctp_sndrcvinfo *sinfo;
1003         struct connection *base_con;
1004         struct writequeue_entry *e;
1005         int len, offset;
1006         int ret;
1007         int addrlen;
1008         struct kvec iov[1];
1009
1010         if (test_and_set_bit(CF_INIT_PENDING, &con->flags))
1011                 return;
1012
1013         if (con->retries++ > MAX_CONNECT_RETRIES)
1014                 return;
1015
1016         if (nodeid_to_addr(con->nodeid, NULL, (struct sockaddr *)&rem_addr,
1017                            con->try_new_addr)) {
1018                 log_print("no address for nodeid %d", con->nodeid);
1019                 return;
1020         }
1021         base_con = nodeid2con(0, 0);
1022         BUG_ON(base_con == NULL);
1023
1024         make_sockaddr(&rem_addr, dlm_config.ci_tcp_port, &addrlen);
1025
1026         outmessage.msg_name = &rem_addr;
1027         outmessage.msg_namelen = addrlen;
1028         outmessage.msg_control = outcmsg;
1029         outmessage.msg_controllen = sizeof(outcmsg);
1030         outmessage.msg_flags = MSG_EOR;
1031
1032         spin_lock(&con->writequeue_lock);
1033
1034         if (list_empty(&con->writequeue)) {
1035                 spin_unlock(&con->writequeue_lock);
1036                 log_print("writequeue empty for nodeid %d", con->nodeid);
1037                 return;
1038         }
1039
1040         e = list_first_entry(&con->writequeue, struct writequeue_entry, list);
1041         len = e->len;
1042         offset = e->offset;
1043         spin_unlock(&con->writequeue_lock);
1044
1045         /* Send the first block off the write queue */
1046         iov[0].iov_base = page_address(e->page)+offset;
1047         iov[0].iov_len = len;
1048
1049         if (rem_addr.ss_family == AF_INET) {
1050                 struct sockaddr_in *sin = (struct sockaddr_in *)&rem_addr;
1051                 log_print("Trying to connect to %pI4", &sin->sin_addr.s_addr);
1052         } else {
1053                 struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)&rem_addr;
1054                 log_print("Trying to connect to %pI6", &sin6->sin6_addr);
1055         }
1056
1057         cmsg = CMSG_FIRSTHDR(&outmessage);
1058         cmsg->cmsg_level = IPPROTO_SCTP;
1059         cmsg->cmsg_type = SCTP_SNDRCV;
1060         cmsg->cmsg_len = CMSG_LEN(sizeof(struct sctp_sndrcvinfo));
1061         sinfo = CMSG_DATA(cmsg);
1062         memset(sinfo, 0x00, sizeof(struct sctp_sndrcvinfo));
1063         sinfo->sinfo_ppid = cpu_to_le32(dlm_our_nodeid());
1064         outmessage.msg_controllen = cmsg->cmsg_len;
1065         sinfo->sinfo_flags |= SCTP_ADDR_OVER;
1066
1067         ret = kernel_sendmsg(base_con->sock, &outmessage, iov, 1, len);
1068         if (ret < 0) {
1069                 log_print("Send first packet to node %d failed: %d",
1070                           con->nodeid, ret);
1071
1072                 /* Try again later */
1073                 clear_bit(CF_CONNECT_PENDING, &con->flags);
1074                 clear_bit(CF_INIT_PENDING, &con->flags);
1075         }
1076         else {
1077                 spin_lock(&con->writequeue_lock);
1078                 e->offset += ret;
1079                 e->len -= ret;
1080
1081                 if (e->len == 0 && e->users == 0) {
1082                         list_del(&e->list);
1083                         free_entry(e);
1084                 }
1085                 spin_unlock(&con->writequeue_lock);
1086         }
1087 }
1088
1089 /* Connect a new socket to its peer */
1090 static void tcp_connect_to_sock(struct connection *con)
1091 {
1092         struct sockaddr_storage saddr, src_addr;
1093         int addr_len;
1094         struct socket *sock = NULL;
1095         int one = 1;
1096         int result;
1097
1098         if (con->nodeid == 0) {
1099                 log_print("attempt to connect sock 0 foiled");
1100                 return;
1101         }
1102
1103         mutex_lock(&con->sock_mutex);
1104         if (con->retries++ > MAX_CONNECT_RETRIES)
1105                 goto out;
1106
1107         /* Some odd races can cause double-connects, ignore them */
1108         if (con->sock)
1109                 goto out;
1110
1111         /* Create a socket to communicate with */
1112         result = sock_create_kern(dlm_local_addr[0]->ss_family, SOCK_STREAM,
1113                                   IPPROTO_TCP, &sock);
1114         if (result < 0)
1115                 goto out_err;
1116
1117         memset(&saddr, 0, sizeof(saddr));
1118         result = nodeid_to_addr(con->nodeid, &saddr, NULL, false);
1119         if (result < 0) {
1120                 log_print("no address for nodeid %d", con->nodeid);
1121                 goto out_err;
1122         }
1123
1124         sock->sk->sk_user_data = con;
1125         con->rx_action = receive_from_sock;
1126         con->connect_action = tcp_connect_to_sock;
1127         add_sock(sock, con);
1128
1129         /* Bind to our cluster-known address connecting to avoid
1130            routing problems */
1131         memcpy(&src_addr, dlm_local_addr[0], sizeof(src_addr));
1132         make_sockaddr(&src_addr, 0, &addr_len);
1133         result = sock->ops->bind(sock, (struct sockaddr *) &src_addr,
1134                                  addr_len);
1135         if (result < 0) {
1136                 log_print("could not bind for connect: %d", result);
1137                 /* This *may* not indicate a critical error */
1138         }
1139
1140         make_sockaddr(&saddr, dlm_config.ci_tcp_port, &addr_len);
1141
1142         log_print("connecting to %d", con->nodeid);
1143
1144         /* Turn off Nagle's algorithm */
1145         kernel_setsockopt(sock, SOL_TCP, TCP_NODELAY, (char *)&one,
1146                           sizeof(one));
1147
1148         result = sock->ops->connect(sock, (struct sockaddr *)&saddr, addr_len,
1149                                    O_NONBLOCK);
1150         if (result == -EINPROGRESS)
1151                 result = 0;
1152         if (result == 0)
1153                 goto out;
1154
1155 out_err:
1156         if (con->sock) {
1157                 sock_release(con->sock);
1158                 con->sock = NULL;
1159         } else if (sock) {
1160                 sock_release(sock);
1161         }
1162         /*
1163          * Some errors are fatal and this list might need adjusting. For other
1164          * errors we try again until the max number of retries is reached.
1165          */
1166         if (result != -EHOSTUNREACH &&
1167             result != -ENETUNREACH &&
1168             result != -ENETDOWN && 
1169             result != -EINVAL &&
1170             result != -EPROTONOSUPPORT) {
1171                 log_print("connect %d try %d error %d", con->nodeid,
1172                           con->retries, result);
1173                 mutex_unlock(&con->sock_mutex);
1174                 msleep(1000);
1175                 lowcomms_connect_sock(con);
1176                 return;
1177         }
1178 out:
1179         mutex_unlock(&con->sock_mutex);
1180         return;
1181 }
1182
1183 static struct socket *tcp_create_listen_sock(struct connection *con,
1184                                              struct sockaddr_storage *saddr)
1185 {
1186         struct socket *sock = NULL;
1187         int result = 0;
1188         int one = 1;
1189         int addr_len;
1190
1191         if (dlm_local_addr[0]->ss_family == AF_INET)
1192                 addr_len = sizeof(struct sockaddr_in);
1193         else
1194                 addr_len = sizeof(struct sockaddr_in6);
1195
1196         /* Create a socket to communicate with */
1197         result = sock_create_kern(dlm_local_addr[0]->ss_family, SOCK_STREAM,
1198                                   IPPROTO_TCP, &sock);
1199         if (result < 0) {
1200                 log_print("Can't create listening comms socket");
1201                 goto create_out;
1202         }
1203
1204         /* Turn off Nagle's algorithm */
1205         kernel_setsockopt(sock, SOL_TCP, TCP_NODELAY, (char *)&one,
1206                           sizeof(one));
1207
1208         result = kernel_setsockopt(sock, SOL_SOCKET, SO_REUSEADDR,
1209                                    (char *)&one, sizeof(one));
1210
1211         if (result < 0) {
1212                 log_print("Failed to set SO_REUSEADDR on socket: %d", result);
1213         }
1214         con->rx_action = tcp_accept_from_sock;
1215         con->connect_action = tcp_connect_to_sock;
1216
1217         /* Bind to our port */
1218         make_sockaddr(saddr, dlm_config.ci_tcp_port, &addr_len);
1219         result = sock->ops->bind(sock, (struct sockaddr *) saddr, addr_len);
1220         if (result < 0) {
1221                 log_print("Can't bind to port %d", dlm_config.ci_tcp_port);
1222                 sock_release(sock);
1223                 sock = NULL;
1224                 con->sock = NULL;
1225                 goto create_out;
1226         }
1227         result = kernel_setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE,
1228                                  (char *)&one, sizeof(one));
1229         if (result < 0) {
1230                 log_print("Set keepalive failed: %d", result);
1231         }
1232
1233         result = sock->ops->listen(sock, 5);
1234         if (result < 0) {
1235                 log_print("Can't listen on port %d", dlm_config.ci_tcp_port);
1236                 sock_release(sock);
1237                 sock = NULL;
1238                 goto create_out;
1239         }
1240
1241 create_out:
1242         return sock;
1243 }
1244
1245 /* Get local addresses */
1246 static void init_local(void)
1247 {
1248         struct sockaddr_storage sas, *addr;
1249         int i;
1250
1251         dlm_local_count = 0;
1252         for (i = 0; i < DLM_MAX_ADDR_COUNT; i++) {
1253                 if (dlm_our_addr(&sas, i))
1254                         break;
1255
1256                 addr = kmalloc(sizeof(*addr), GFP_NOFS);
1257                 if (!addr)
1258                         break;
1259                 memcpy(addr, &sas, sizeof(*addr));
1260                 dlm_local_addr[dlm_local_count++] = addr;
1261         }
1262 }
1263
1264 /* Bind to an IP address. SCTP allows multiple address so it can do
1265    multi-homing */
1266 static int add_sctp_bind_addr(struct connection *sctp_con,
1267                               struct sockaddr_storage *addr,
1268                               int addr_len, int num)
1269 {
1270         int result = 0;
1271
1272         if (num == 1)
1273                 result = kernel_bind(sctp_con->sock,
1274                                      (struct sockaddr *) addr,
1275                                      addr_len);
1276         else
1277                 result = kernel_setsockopt(sctp_con->sock, SOL_SCTP,
1278                                            SCTP_SOCKOPT_BINDX_ADD,
1279                                            (char *)addr, addr_len);
1280
1281         if (result < 0)
1282                 log_print("Can't bind to port %d addr number %d",
1283                           dlm_config.ci_tcp_port, num);
1284
1285         return result;
1286 }
1287
1288 /* Initialise SCTP socket and bind to all interfaces */
1289 static int sctp_listen_for_all(void)
1290 {
1291         struct socket *sock = NULL;
1292         struct sockaddr_storage localaddr;
1293         struct sctp_event_subscribe subscribe;
1294         int result = -EINVAL, num = 1, i, addr_len;
1295         struct connection *con = nodeid2con(0, GFP_NOFS);
1296         int bufsize = NEEDED_RMEM;
1297
1298         if (!con)
1299                 return -ENOMEM;
1300
1301         log_print("Using SCTP for communications");
1302
1303         result = sock_create_kern(dlm_local_addr[0]->ss_family, SOCK_SEQPACKET,
1304                                   IPPROTO_SCTP, &sock);
1305         if (result < 0) {
1306                 log_print("Can't create comms socket, check SCTP is loaded");
1307                 goto out;
1308         }
1309
1310         /* Listen for events */
1311         memset(&subscribe, 0, sizeof(subscribe));
1312         subscribe.sctp_data_io_event = 1;
1313         subscribe.sctp_association_event = 1;
1314         subscribe.sctp_send_failure_event = 1;
1315         subscribe.sctp_shutdown_event = 1;
1316         subscribe.sctp_partial_delivery_event = 1;
1317
1318         result = kernel_setsockopt(sock, SOL_SOCKET, SO_RCVBUFFORCE,
1319                                  (char *)&bufsize, sizeof(bufsize));
1320         if (result)
1321                 log_print("Error increasing buffer space on socket %d", result);
1322
1323         result = kernel_setsockopt(sock, SOL_SCTP, SCTP_EVENTS,
1324                                    (char *)&subscribe, sizeof(subscribe));
1325         if (result < 0) {
1326                 log_print("Failed to set SCTP_EVENTS on socket: result=%d",
1327                           result);
1328                 goto create_delsock;
1329         }
1330
1331         /* Init con struct */
1332         sock->sk->sk_user_data = con;
1333         con->sock = sock;
1334         con->sock->sk->sk_data_ready = lowcomms_data_ready;
1335         con->rx_action = receive_from_sock;
1336         con->connect_action = sctp_init_assoc;
1337
1338         /* Bind to all interfaces. */
1339         for (i = 0; i < dlm_local_count; i++) {
1340                 memcpy(&localaddr, dlm_local_addr[i], sizeof(localaddr));
1341                 make_sockaddr(&localaddr, dlm_config.ci_tcp_port, &addr_len);
1342
1343                 result = add_sctp_bind_addr(con, &localaddr, addr_len, num);
1344                 if (result)
1345                         goto create_delsock;
1346                 ++num;
1347         }
1348
1349         result = sock->ops->listen(sock, 5);
1350         if (result < 0) {
1351                 log_print("Can't set socket listening");
1352                 goto create_delsock;
1353         }
1354
1355         return 0;
1356
1357 create_delsock:
1358         sock_release(sock);
1359         con->sock = NULL;
1360 out:
1361         return result;
1362 }
1363
1364 static int tcp_listen_for_all(void)
1365 {
1366         struct socket *sock = NULL;
1367         struct connection *con = nodeid2con(0, GFP_NOFS);
1368         int result = -EINVAL;
1369
1370         if (!con)
1371                 return -ENOMEM;
1372
1373         /* We don't support multi-homed hosts */
1374         if (dlm_local_addr[1] != NULL) {
1375                 log_print("TCP protocol can't handle multi-homed hosts, "
1376                           "try SCTP");
1377                 return -EINVAL;
1378         }
1379
1380         log_print("Using TCP for communications");
1381
1382         sock = tcp_create_listen_sock(con, dlm_local_addr[0]);
1383         if (sock) {
1384                 add_sock(sock, con);
1385                 result = 0;
1386         }
1387         else {
1388                 result = -EADDRINUSE;
1389         }
1390
1391         return result;
1392 }
1393
1394
1395
1396 static struct writequeue_entry *new_writequeue_entry(struct connection *con,
1397                                                      gfp_t allocation)
1398 {
1399         struct writequeue_entry *entry;
1400
1401         entry = kmalloc(sizeof(struct writequeue_entry), allocation);
1402         if (!entry)
1403                 return NULL;
1404
1405         entry->page = alloc_page(allocation);
1406         if (!entry->page) {
1407                 kfree(entry);
1408                 return NULL;
1409         }
1410
1411         entry->offset = 0;
1412         entry->len = 0;
1413         entry->end = 0;
1414         entry->users = 0;
1415         entry->con = con;
1416
1417         return entry;
1418 }
1419
1420 void *dlm_lowcomms_get_buffer(int nodeid, int len, gfp_t allocation, char **ppc)
1421 {
1422         struct connection *con;
1423         struct writequeue_entry *e;
1424         int offset = 0;
1425
1426         con = nodeid2con(nodeid, allocation);
1427         if (!con)
1428                 return NULL;
1429
1430         spin_lock(&con->writequeue_lock);
1431         e = list_entry(con->writequeue.prev, struct writequeue_entry, list);
1432         if ((&e->list == &con->writequeue) ||
1433             (PAGE_CACHE_SIZE - e->end < len)) {
1434                 e = NULL;
1435         } else {
1436                 offset = e->end;
1437                 e->end += len;
1438                 e->users++;
1439         }
1440         spin_unlock(&con->writequeue_lock);
1441
1442         if (e) {
1443         got_one:
1444                 *ppc = page_address(e->page) + offset;
1445                 return e;
1446         }
1447
1448         e = new_writequeue_entry(con, allocation);
1449         if (e) {
1450                 spin_lock(&con->writequeue_lock);
1451                 offset = e->end;
1452                 e->end += len;
1453                 e->users++;
1454                 list_add_tail(&e->list, &con->writequeue);
1455                 spin_unlock(&con->writequeue_lock);
1456                 goto got_one;
1457         }
1458         return NULL;
1459 }
1460
1461 void dlm_lowcomms_commit_buffer(void *mh)
1462 {
1463         struct writequeue_entry *e = (struct writequeue_entry *)mh;
1464         struct connection *con = e->con;
1465         int users;
1466
1467         spin_lock(&con->writequeue_lock);
1468         users = --e->users;
1469         if (users)
1470                 goto out;
1471         e->len = e->end - e->offset;
1472         spin_unlock(&con->writequeue_lock);
1473
1474         if (!test_and_set_bit(CF_WRITE_PENDING, &con->flags)) {
1475                 queue_work(send_workqueue, &con->swork);
1476         }
1477         return;
1478
1479 out:
1480         spin_unlock(&con->writequeue_lock);
1481         return;
1482 }
1483
1484 /* Send a message */
1485 static void send_to_sock(struct connection *con)
1486 {
1487         int ret = 0;
1488         const int msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL;
1489         struct writequeue_entry *e;
1490         int len, offset;
1491         int count = 0;
1492
1493         mutex_lock(&con->sock_mutex);
1494         if (con->sock == NULL)
1495                 goto out_connect;
1496
1497         spin_lock(&con->writequeue_lock);
1498         for (;;) {
1499                 e = list_entry(con->writequeue.next, struct writequeue_entry,
1500                                list);
1501                 if ((struct list_head *) e == &con->writequeue)
1502                         break;
1503
1504                 len = e->len;
1505                 offset = e->offset;
1506                 BUG_ON(len == 0 && e->users == 0);
1507                 spin_unlock(&con->writequeue_lock);
1508
1509                 ret = 0;
1510                 if (len) {
1511                         ret = kernel_sendpage(con->sock, e->page, offset, len,
1512                                               msg_flags);
1513                         if (ret == -EAGAIN || ret == 0) {
1514                                 if (ret == -EAGAIN &&
1515                                     test_bit(SOCK_ASYNC_NOSPACE, &con->sock->flags) &&
1516                                     !test_and_set_bit(CF_APP_LIMITED, &con->flags)) {
1517                                         /* Notify TCP that we're limited by the
1518                                          * application window size.
1519                                          */
1520                                         set_bit(SOCK_NOSPACE, &con->sock->flags);
1521                                         con->sock->sk->sk_write_pending++;
1522                                 }
1523                                 cond_resched();
1524                                 goto out;
1525                         } else if (ret < 0)
1526                                 goto send_error;
1527                 }
1528
1529                 /* Don't starve people filling buffers */
1530                 if (++count >= MAX_SEND_MSG_COUNT) {
1531                         cond_resched();
1532                         count = 0;
1533                 }
1534
1535                 spin_lock(&con->writequeue_lock);
1536                 e->offset += ret;
1537                 e->len -= ret;
1538
1539                 if (e->len == 0 && e->users == 0) {
1540                         list_del(&e->list);
1541                         free_entry(e);
1542                 }
1543         }
1544         spin_unlock(&con->writequeue_lock);
1545 out:
1546         mutex_unlock(&con->sock_mutex);
1547         return;
1548
1549 send_error:
1550         mutex_unlock(&con->sock_mutex);
1551         close_connection(con, false);
1552         lowcomms_connect_sock(con);
1553         return;
1554
1555 out_connect:
1556         mutex_unlock(&con->sock_mutex);
1557         if (!test_bit(CF_INIT_PENDING, &con->flags))
1558                 lowcomms_connect_sock(con);
1559 }
1560
1561 static void clean_one_writequeue(struct connection *con)
1562 {
1563         struct writequeue_entry *e, *safe;
1564
1565         spin_lock(&con->writequeue_lock);
1566         list_for_each_entry_safe(e, safe, &con->writequeue, list) {
1567                 list_del(&e->list);
1568                 free_entry(e);
1569         }
1570         spin_unlock(&con->writequeue_lock);
1571 }
1572
1573 /* Called from recovery when it knows that a node has
1574    left the cluster */
1575 int dlm_lowcomms_close(int nodeid)
1576 {
1577         struct connection *con;
1578         struct dlm_node_addr *na;
1579
1580         log_print("closing connection to node %d", nodeid);
1581         con = nodeid2con(nodeid, 0);
1582         if (con) {
1583                 clear_bit(CF_CONNECT_PENDING, &con->flags);
1584                 clear_bit(CF_WRITE_PENDING, &con->flags);
1585                 set_bit(CF_CLOSE, &con->flags);
1586                 if (cancel_work_sync(&con->swork))
1587                         log_print("canceled swork for node %d", nodeid);
1588                 if (cancel_work_sync(&con->rwork))
1589                         log_print("canceled rwork for node %d", nodeid);
1590                 clean_one_writequeue(con);
1591                 close_connection(con, true);
1592         }
1593
1594         spin_lock(&dlm_node_addrs_spin);
1595         na = find_node_addr(nodeid);
1596         if (na) {
1597                 list_del(&na->list);
1598                 while (na->addr_count--)
1599                         kfree(na->addr[na->addr_count]);
1600                 kfree(na);
1601         }
1602         spin_unlock(&dlm_node_addrs_spin);
1603
1604         return 0;
1605 }
1606
1607 /* Receive workqueue function */
1608 static void process_recv_sockets(struct work_struct *work)
1609 {
1610         struct connection *con = container_of(work, struct connection, rwork);
1611         int err;
1612
1613         clear_bit(CF_READ_PENDING, &con->flags);
1614         do {
1615                 err = con->rx_action(con);
1616         } while (!err);
1617 }
1618
1619 /* Send workqueue function */
1620 static void process_send_sockets(struct work_struct *work)
1621 {
1622         struct connection *con = container_of(work, struct connection, swork);
1623
1624         if (test_and_clear_bit(CF_CONNECT_PENDING, &con->flags)) {
1625                 con->connect_action(con);
1626                 set_bit(CF_WRITE_PENDING, &con->flags);
1627         }
1628         if (test_and_clear_bit(CF_WRITE_PENDING, &con->flags))
1629                 send_to_sock(con);
1630 }
1631
1632
1633 /* Discard all entries on the write queues */
1634 static void clean_writequeues(void)
1635 {
1636         foreach_conn(clean_one_writequeue);
1637 }
1638
1639 static void work_stop(void)
1640 {
1641         destroy_workqueue(recv_workqueue);
1642         destroy_workqueue(send_workqueue);
1643 }
1644
1645 static int work_start(void)
1646 {
1647         recv_workqueue = alloc_workqueue("dlm_recv",
1648                                          WQ_UNBOUND | WQ_MEM_RECLAIM, 1);
1649         if (!recv_workqueue) {
1650                 log_print("can't start dlm_recv");
1651                 return -ENOMEM;
1652         }
1653
1654         send_workqueue = alloc_workqueue("dlm_send",
1655                                          WQ_UNBOUND | WQ_MEM_RECLAIM, 1);
1656         if (!send_workqueue) {
1657                 log_print("can't start dlm_send");
1658                 destroy_workqueue(recv_workqueue);
1659                 return -ENOMEM;
1660         }
1661
1662         return 0;
1663 }
1664
1665 static void stop_conn(struct connection *con)
1666 {
1667         con->flags |= 0x0F;
1668         if (con->sock && con->sock->sk)
1669                 con->sock->sk->sk_user_data = NULL;
1670 }
1671
1672 static void free_conn(struct connection *con)
1673 {
1674         close_connection(con, true);
1675         if (con->othercon)
1676                 kmem_cache_free(con_cache, con->othercon);
1677         hlist_del(&con->list);
1678         kmem_cache_free(con_cache, con);
1679 }
1680
1681 void dlm_lowcomms_stop(void)
1682 {
1683         /* Set all the flags to prevent any
1684            socket activity.
1685         */
1686         mutex_lock(&connections_lock);
1687         dlm_allow_conn = 0;
1688         foreach_conn(stop_conn);
1689         mutex_unlock(&connections_lock);
1690
1691         work_stop();
1692
1693         mutex_lock(&connections_lock);
1694         clean_writequeues();
1695
1696         foreach_conn(free_conn);
1697
1698         mutex_unlock(&connections_lock);
1699         kmem_cache_destroy(con_cache);
1700 }
1701
1702 int dlm_lowcomms_start(void)
1703 {
1704         int error = -EINVAL;
1705         struct connection *con;
1706         int i;
1707
1708         for (i = 0; i < CONN_HASH_SIZE; i++)
1709                 INIT_HLIST_HEAD(&connection_hash[i]);
1710
1711         init_local();
1712         if (!dlm_local_count) {
1713                 error = -ENOTCONN;
1714                 log_print("no local IP address has been set");
1715                 goto fail;
1716         }
1717
1718         error = -ENOMEM;
1719         con_cache = kmem_cache_create("dlm_conn", sizeof(struct connection),
1720                                       __alignof__(struct connection), 0,
1721                                       NULL);
1722         if (!con_cache)
1723                 goto fail;
1724
1725         error = work_start();
1726         if (error)
1727                 goto fail_destroy;
1728
1729         dlm_allow_conn = 1;
1730
1731         /* Start listening */
1732         if (dlm_config.ci_protocol == 0)
1733                 error = tcp_listen_for_all();
1734         else
1735                 error = sctp_listen_for_all();
1736         if (error)
1737                 goto fail_unlisten;
1738
1739         return 0;
1740
1741 fail_unlisten:
1742         dlm_allow_conn = 0;
1743         con = nodeid2con(0,0);
1744         if (con) {
1745                 close_connection(con, false);
1746                 kmem_cache_free(con_cache, con);
1747         }
1748 fail_destroy:
1749         kmem_cache_destroy(con_cache);
1750 fail:
1751         return error;
1752 }
1753
1754 void dlm_lowcomms_exit(void)
1755 {
1756         struct dlm_node_addr *na, *safe;
1757
1758         spin_lock(&dlm_node_addrs_spin);
1759         list_for_each_entry_safe(na, safe, &dlm_node_addrs, list) {
1760                 list_del(&na->list);
1761                 while (na->addr_count--)
1762                         kfree(na->addr[na->addr_count]);
1763                 kfree(na);
1764         }
1765         spin_unlock(&dlm_node_addrs_spin);
1766 }