Merge branch 'tipc'
authorDavid S. Miller <davem@davemloft.net>
Tue, 7 Jan 2014 23:44:35 +0000 (18:44 -0500)
committerDavid S. Miller <davem@davemloft.net>
Tue, 7 Jan 2014 23:44:35 +0000 (18:44 -0500)
Jon Maloy says:

====================
tipc: link setup and failover improvements

This series consists of four unrelated commits with different purposes.

- Commit #1 is purely cosmetic and pedagogic, hopefully making the
  failover/tunneling logics slightly easier to understand.
- Commit #2 fixes a bug that has always been in the code, but was not
  discovered until very recently.
- Commit #3 fixes a non-fatal race issue in the neighbour discovery
  code.
- Commit #4 removes an unnecessary indirection step during link
  startup.
====================

Signed-off-by: David S. Miller <davem@davemloft.net>
net/tipc/bcast.c
net/tipc/bearer.c
net/tipc/bearer.h
net/tipc/discover.c
net/tipc/link.c
net/tipc/link.h
net/tipc/node.c
net/tipc/node.h

index 4c2a80b..bf860d9 100644 (file)
@@ -794,7 +794,7 @@ void tipc_bclink_init(void)
 void tipc_bclink_stop(void)
 {
        spin_lock_bh(&bc_lock);
-       tipc_link_stop(bcl);
+       tipc_link_purge_queues(bcl);
        spin_unlock_bh(&bc_lock);
 
        memset(bclink, 0, sizeof(*bclink));
index 07ed5cc..2d456ab 100644 (file)
@@ -541,7 +541,7 @@ static int tipc_l2_rcv_msg(struct sk_buff *buf, struct net_device *dev,
        if (likely(b_ptr)) {
                if (likely(buf->pkt_type <= PACKET_BROADCAST)) {
                        buf->next = NULL;
-                       tipc_recv_msg(buf, b_ptr);
+                       tipc_rcv(buf, b_ptr);
                        rcu_read_unlock();
                        return NET_RX_SUCCESS;
                }
index 410efb1..4f5db9a 100644 (file)
@@ -161,8 +161,7 @@ extern struct tipc_bearer tipc_bearers[];
  * TIPC routines available to supported media types
  */
 
-void tipc_recv_msg(struct sk_buff *buf, struct tipc_bearer *tb_ptr);
-
+void tipc_rcv(struct sk_buff *buf, struct tipc_bearer *tb_ptr);
 int tipc_enable_bearer(const char *bearer_name, u32 disc_domain, u32 priority);
 int tipc_disable_bearer(const char *name);
 
index bc849f1..412ff41 100644 (file)
@@ -50,6 +50,7 @@
  * @dest: destination address for request messages
  * @domain: network domain to which links can be established
  * @num_nodes: number of nodes currently discovered (i.e. with an active link)
+ * @lock: spinlock for controlling access to requests
  * @buf: request message to be (repeatedly) sent
  * @timer: timer governing period between requests
  * @timer_intv: current interval between requests (in ms)
@@ -59,6 +60,7 @@ struct tipc_link_req {
        struct tipc_media_addr dest;
        u32 domain;
        int num_nodes;
+       spinlock_t lock;
        struct sk_buff *buf;
        struct timer_list timer;
        unsigned int timer_intv;
@@ -274,7 +276,9 @@ static void disc_update(struct tipc_link_req *req)
  */
 void tipc_disc_add_dest(struct tipc_link_req *req)
 {
+       spin_lock_bh(&req->lock);
        req->num_nodes++;
+       spin_unlock_bh(&req->lock);
 }
 
 /**
@@ -283,8 +287,10 @@ void tipc_disc_add_dest(struct tipc_link_req *req)
  */
 void tipc_disc_remove_dest(struct tipc_link_req *req)
 {
+       spin_lock_bh(&req->lock);
        req->num_nodes--;
        disc_update(req);
+       spin_unlock_bh(&req->lock);
 }
 
 /**
@@ -297,7 +303,7 @@ static void disc_timeout(struct tipc_link_req *req)
 {
        int max_delay;
 
-       spin_lock_bh(&req->bearer->lock);
+       spin_lock_bh(&req->lock);
 
        /* Stop searching if only desired node has been found */
        if (tipc_node(req->domain) && req->num_nodes) {
@@ -325,7 +331,7 @@ static void disc_timeout(struct tipc_link_req *req)
 
        k_start_timer(&req->timer, req->timer_intv);
 exit:
-       spin_unlock_bh(&req->bearer->lock);
+       spin_unlock_bh(&req->lock);
 }
 
 /**
@@ -356,6 +362,7 @@ int tipc_disc_create(struct tipc_bearer *b_ptr, struct tipc_media_addr *dest,
        req->domain = dest_domain;
        req->num_nodes = 0;
        req->timer_intv = TIPC_LINK_REQ_INIT;
+       spin_lock_init(&req->lock);
        k_init_timer(&req->timer, (Handler)disc_timeout, (unsigned long)req);
        k_start_timer(&req->timer, req->timer_intv);
        b_ptr->link_req = req;
index 131a32a..471973f 100644 (file)
@@ -1,7 +1,7 @@
 /*
  * net/tipc/link.c: TIPC link code
  *
- * Copyright (c) 1996-2007, 2012, Ericsson AB
+ * Copyright (c) 1996-2007, 2012-2014, Ericsson AB
  * Copyright (c) 2004-2007, 2010-2013, Wind River Systems
  * All rights reserved.
  *
@@ -78,8 +78,8 @@ static const char *link_unk_evt = "Unknown link event ";
 static void link_handle_out_of_seq_msg(struct tipc_link *l_ptr,
                                       struct sk_buff *buf);
 static void link_recv_proto_msg(struct tipc_link *l_ptr, struct sk_buff *buf);
-static int  link_recv_changeover_msg(struct tipc_link **l_ptr,
-                                    struct sk_buff **buf);
+static int  tipc_link_tunnel_rcv(struct tipc_link **l_ptr,
+                                struct sk_buff **buf);
 static void link_set_supervision_props(struct tipc_link *l_ptr, u32 tolerance);
 static int  link_send_sections_long(struct tipc_port *sender,
                                    struct iovec const *msg_sect,
@@ -87,7 +87,6 @@ static int  link_send_sections_long(struct tipc_port *sender,
 static void link_state_event(struct tipc_link *l_ptr, u32 event);
 static void link_reset_statistics(struct tipc_link *l_ptr);
 static void link_print(struct tipc_link *l_ptr, const char *str);
-static void link_start(struct tipc_link *l_ptr);
 static int link_send_long_buf(struct tipc_link *l_ptr, struct sk_buff *buf);
 static void tipc_link_send_sync(struct tipc_link *l);
 static void tipc_link_recv_sync(struct tipc_node *n, struct sk_buff *buf);
@@ -278,9 +277,11 @@ struct tipc_link *tipc_link_create(struct tipc_node *n_ptr,
 
        tipc_node_attach_link(n_ptr, l_ptr);
 
-       k_init_timer(&l_ptr->timer, (Handler)link_timeout, (unsigned long)l_ptr);
+       k_init_timer(&l_ptr->timer, (Handler)link_timeout,
+                    (unsigned long)l_ptr);
        list_add_tail(&l_ptr->link_list, &b_ptr->links);
-       tipc_k_signal((Handler)link_start, (unsigned long)l_ptr);
+
+       link_state_event(l_ptr, STARTING_EVT);
 
        return l_ptr;
 }
@@ -305,19 +306,13 @@ void tipc_link_delete(struct tipc_link *l_ptr)
        tipc_node_lock(l_ptr->owner);
        tipc_link_reset(l_ptr);
        tipc_node_detach_link(l_ptr->owner, l_ptr);
-       tipc_link_stop(l_ptr);
+       tipc_link_purge_queues(l_ptr);
        list_del_init(&l_ptr->link_list);
        tipc_node_unlock(l_ptr->owner);
        k_term_timer(&l_ptr->timer);
        kfree(l_ptr);
 }
 
-static void link_start(struct tipc_link *l_ptr)
-{
-       tipc_node_lock(l_ptr->owner);
-       link_state_event(l_ptr, STARTING_EVT);
-       tipc_node_unlock(l_ptr->owner);
-}
 
 /**
  * link_schedule_port - schedule port for deferred sending
@@ -403,10 +398,10 @@ void tipc_link_reset_fragments(struct tipc_link *l_ptr)
 }
 
 /**
- * tipc_link_stop - purge all inbound and outbound messages associated with link
+ * tipc_link_purge_queues - purge all pkt queues associated with link
  * @l_ptr: pointer to link
  */
-void tipc_link_stop(struct tipc_link *l_ptr)
+void tipc_link_purge_queues(struct tipc_link *l_ptr)
 {
        kfree_skb_list(l_ptr->oldest_deferred_in);
        kfree_skb_list(l_ptr->first_out);
@@ -437,8 +432,7 @@ void tipc_link_reset(struct tipc_link *l_ptr)
        tipc_node_link_down(l_ptr->owner, l_ptr);
        tipc_bearer_remove_dest(l_ptr->b_ptr, l_ptr->addr);
 
-       if (was_active_link && tipc_node_active_links(l_ptr->owner) &&
-           l_ptr->owner->permit_changeover) {
+       if (was_active_link && tipc_node_active_links(l_ptr->owner)) {
                l_ptr->reset_checkpoint = checkpoint;
                l_ptr->exp_msg_count = START_CHANGEOVER;
        }
@@ -1422,14 +1416,14 @@ static int link_recv_buf_validate(struct sk_buff *buf)
 }
 
 /**
- * tipc_recv_msg - process TIPC messages arriving from off-node
+ * tipc_rcv - process TIPC packets/messages arriving from off-node
  * @head: pointer to message buffer chain
  * @tb_ptr: pointer to bearer message arrived on
  *
  * Invoked with no locks held.  Bearer pointer must point to a valid bearer
  * structure (i.e. cannot be NULL), but bearer can be inactive.
  */
-void tipc_recv_msg(struct sk_buff *head, struct tipc_bearer *b_ptr)
+void tipc_rcv(struct sk_buff *head, struct tipc_bearer *b_ptr)
 {
        read_lock_bh(&tipc_net_lock);
        while (head) {
@@ -1603,7 +1597,7 @@ deliver:
                        continue;
                case CHANGEOVER_PROTOCOL:
                        type = msg_type(msg);
-                       if (link_recv_changeover_msg(&l_ptr, &buf)) {
+                       if (tipc_link_tunnel_rcv(&l_ptr, &buf)) {
                                msg = buf_msg(buf);
                                seq_no = msg_seqno(msg);
                                if (type == ORIGINAL_MSG)
@@ -1837,8 +1831,6 @@ static void link_recv_proto_msg(struct tipc_link *l_ptr, struct sk_buff *buf)
                if (tipc_own_addr > msg_prevnode(msg))
                        l_ptr->b_ptr->net_plane = msg_net_plane(msg);
 
-       l_ptr->owner->permit_changeover = msg_redundant_link(msg);
-
        switch (msg_type(msg)) {
 
        case RESET_MSG:
@@ -1954,13 +1946,13 @@ exit:
 }
 
 
-/*
- * tipc_link_tunnel(): Send one message via a link belonging to
- * another bearer. Owner node is locked.
+/* tipc_link_tunnel_xmit(): Tunnel one packet via a link belonging to
+ * a different bearer. Owner node is locked.
  */
-static void tipc_link_tunnel(struct tipc_link *l_ptr,
-                            struct tipc_msg *tunnel_hdr, struct tipc_msg *msg,
-                            u32 selector)
+static void tipc_link_tunnel_xmit(struct tipc_link *l_ptr,
+                                 struct tipc_msg *tunnel_hdr,
+                                 struct tipc_msg *msg,
+                                 u32 selector)
 {
        struct tipc_link *tunnel;
        struct sk_buff *buf;
@@ -1983,12 +1975,13 @@ static void tipc_link_tunnel(struct tipc_link *l_ptr,
 }
 
 
-
-/*
- * changeover(): Send whole message queue via the remaining link
- *               Owner node is locked.
+/* tipc_link_failover_send_queue(): A link has gone down, but a second
+ * link is still active. We can do failover. Tunnel the failing link's
+ * whole send queue via the remaining link. This way, we don't lose
+ * any packets, and sequence order is preserved for subsequent traffic
+ * sent over the remaining link. Owner node is locked.
  */
-void tipc_link_changeover(struct tipc_link *l_ptr)
+void tipc_link_failover_send_queue(struct tipc_link *l_ptr)
 {
        u32 msgcount = l_ptr->out_queue_size;
        struct sk_buff *crs = l_ptr->first_out;
@@ -1999,11 +1992,6 @@ void tipc_link_changeover(struct tipc_link *l_ptr)
        if (!tunnel)
                return;
 
-       if (!l_ptr->owner->permit_changeover) {
-               pr_warn("%speer did not permit changeover\n", link_co_err);
-               return;
-       }
-
        tipc_msg_init(&tunnel_hdr, CHANGEOVER_PROTOCOL,
                 ORIGINAL_MSG, INT_H_SIZE, l_ptr->addr);
        msg_set_bearer_id(&tunnel_hdr, l_ptr->peer_bearer_id);
@@ -2037,20 +2025,30 @@ void tipc_link_changeover(struct tipc_link *l_ptr)
                        msgcount = msg_msgcnt(msg);
                        while (msgcount--) {
                                msg_set_seqno(m, msg_seqno(msg));
-                               tipc_link_tunnel(l_ptr, &tunnel_hdr, m,
-                                                msg_link_selector(m));
+                               tipc_link_tunnel_xmit(l_ptr, &tunnel_hdr, m,
+                                                     msg_link_selector(m));
                                pos += align(msg_size(m));
                                m = (struct tipc_msg *)pos;
                        }
                } else {
-                       tipc_link_tunnel(l_ptr, &tunnel_hdr, msg,
-                                        msg_link_selector(msg));
+                       tipc_link_tunnel_xmit(l_ptr, &tunnel_hdr, msg,
+                                             msg_link_selector(msg));
                }
                crs = crs->next;
        }
 }
 
-void tipc_link_send_duplicate(struct tipc_link *l_ptr, struct tipc_link *tunnel)
+/* tipc_link_dup_send_queue(): A second link has become active. Tunnel a
+ * duplicate of the first link's send queue via the new link. This way, we
+ * are guaranteed that currently queued packets from a socket are delivered
+ * before future traffic from the same socket, even if this is using the
+ * new link. The last arriving copy of each duplicate packet is dropped at
+ * the receiving end by the regular protocol check, so packet cardinality
+ * and sequence order is preserved per sender/receiver socket pair.
+ * Owner node is locked.
+ */
+void tipc_link_dup_send_queue(struct tipc_link *l_ptr,
+                             struct tipc_link *tunnel)
 {
        struct sk_buff *iter;
        struct tipc_msg tunnel_hdr;
@@ -2106,12 +2104,14 @@ static struct sk_buff *buf_extract(struct sk_buff *skb, u32 from_pos)
        return eb;
 }
 
-/*
- *  link_recv_changeover_msg(): Receive tunneled packet sent
- *  via other link. Node is locked. Return extracted buffer.
+/*  tipc_link_tunnel_rcv(): Receive a tunneled packet, sent
+ *  via other link as result of a failover (ORIGINAL_MSG) or
+ *  a new active link (DUPLICATE_MSG). Failover packets are
+ *  returned to the active link for delivery upwards.
+ *  Owner node is locked.
  */
-static int link_recv_changeover_msg(struct tipc_link **l_ptr,
-                                   struct sk_buff **buf)
+static int tipc_link_tunnel_rcv(struct tipc_link **l_ptr,
+                               struct sk_buff **buf)
 {
        struct sk_buff *tunnel_buf = *buf;
        struct tipc_link *dest_link;
index 0636ca9..3b6aa65 100644 (file)
@@ -216,15 +216,20 @@ struct tipc_link *tipc_link_create(struct tipc_node *n_ptr,
                              struct tipc_bearer *b_ptr,
                              const struct tipc_media_addr *media_addr);
 void tipc_link_delete(struct tipc_link *l_ptr);
-void tipc_link_changeover(struct tipc_link *l_ptr);
-void tipc_link_send_duplicate(struct tipc_link *l_ptr, struct tipc_link *dest);
+void tipc_link_failover_send_queue(struct tipc_link *l_ptr);
+void tipc_link_dup_send_queue(struct tipc_link *l_ptr,
+                             struct tipc_link *dest);
 void tipc_link_reset_fragments(struct tipc_link *l_ptr);
 int tipc_link_is_up(struct tipc_link *l_ptr);
 int tipc_link_is_active(struct tipc_link *l_ptr);
-void tipc_link_stop(struct tipc_link *l_ptr);
-struct sk_buff *tipc_link_cmd_config(const void *req_tlv_area, int req_tlv_space, u16 cmd);
-struct sk_buff *tipc_link_cmd_show_stats(const void *req_tlv_area, int req_tlv_space);
-struct sk_buff *tipc_link_cmd_reset_stats(const void *req_tlv_area, int req_tlv_space);
+void tipc_link_purge_queues(struct tipc_link *l_ptr);
+struct sk_buff *tipc_link_cmd_config(const void *req_tlv_area,
+                                    int req_tlv_space,
+                                    u16 cmd);
+struct sk_buff *tipc_link_cmd_show_stats(const void *req_tlv_area,
+                                        int req_tlv_space);
+struct sk_buff *tipc_link_cmd_reset_stats(const void *req_tlv_area,
+                                         int req_tlv_space);
 void tipc_link_reset(struct tipc_link *l_ptr);
 int tipc_link_send(struct sk_buff *buf, u32 dest, u32 selector);
 void tipc_link_send_names(struct list_head *message_list, u32 dest);
index e167d26..efe4d41 100644 (file)
@@ -162,7 +162,7 @@ void tipc_node_link_up(struct tipc_node *n_ptr, struct tipc_link *l_ptr)
                pr_info("New link <%s> becomes standby\n", l_ptr->name);
                return;
        }
-       tipc_link_send_duplicate(active[0], l_ptr);
+       tipc_link_dup_send_queue(active[0], l_ptr);
        if (l_ptr->priority == active[0]->priority) {
                active[0] = l_ptr;
                return;
@@ -225,7 +225,7 @@ void tipc_node_link_down(struct tipc_node *n_ptr, struct tipc_link *l_ptr)
        if (active[0] == l_ptr)
                node_select_active_links(n_ptr);
        if (tipc_node_is_up(n_ptr))
-               tipc_link_changeover(l_ptr);
+               tipc_link_failover_send_queue(l_ptr);
        else
                node_lost_contact(n_ptr);
 }
index d4bb654..63e2e8e 100644 (file)
@@ -64,7 +64,6 @@
  * @working_links: number of working links to node (both active and standby)
  * @block_setup: bit mask of conditions preventing link establishment to node
  * @link_cnt: number of links to node
- * @permit_changeover: non-zero if node has redundant links to this system
  * @signature: node instance identifier
  * @bclink: broadcast-related info
  *    @acked: sequence # of last outbound b'cast message acknowledged by node
@@ -89,7 +88,6 @@ struct tipc_node {
        int link_cnt;
        int working_links;
        int block_setup;
-       int permit_changeover;
        u32 signature;
        struct {
                u32 acked;