kref_put(&l_ptr->ref, tipc_link_release);
}
+static struct tipc_link *tipc_parallel_link(struct tipc_link *l)
+{
+ if (l->owner->active_links[0] != l)
+ return l->owner->active_links[0];
+ return l->owner->active_links[1];
+}
+
static void link_init_max_pkt(struct tipc_link *l_ptr)
{
struct tipc_node *node = l_ptr->owner;
}
}
+/* link_synch(): check if all packets arrived before the synch
+ * point have been consumed
+ * Returns true if the parallel links are synched, otherwise false
+ */
+static bool link_synch(struct tipc_link *l)
+{
+ unsigned int post_synch;
+ struct tipc_link *pl;
+
+ pl = tipc_parallel_link(l);
+ if (pl == l)
+ goto synched;
+
+ /* Was last pre-synch packet added to input queue ? */
+ if (less_eq(pl->next_in_no, l->synch_point))
+ return false;
+
+ /* Is it still in the input queue ? */
+ post_synch = mod(pl->next_in_no - l->synch_point) - 1;
+ if (skb_queue_len(&pl->inputq) > post_synch)
+ return false;
+synched:
+ l->flags &= ~LINK_SYNCHING;
+ return true;
+}
+
static void link_retrieve_defq(struct tipc_link *link,
struct sk_buff_head *list)
{
skb = NULL;
goto unlock;
}
+ /* Synchronize with parallel link if applicable */
+ if (unlikely((l_ptr->flags & LINK_SYNCHING) && !msg_dup(msg))) {
+ link_handle_out_of_seq_msg(l_ptr, skb);
+ if (link_synch(l_ptr))
+ link_retrieve_defq(l_ptr, &head);
+ skb = NULL;
+ goto unlock;
+ }
l_ptr->next_in_no++;
if (unlikely(!skb_queue_empty(&l_ptr->deferdq)))
link_retrieve_defq(l_ptr, &head);
switch (msg_user(msg)) {
case CHANGEOVER_PROTOCOL:
+ if (msg_dup(msg)) {
+ link->flags |= LINK_SYNCHING;
+ link->synch_point = msg_seqno(msg_get_wrapped(msg));
+ }
if (!tipc_link_tunnel_rcv(node, &skb))
break;
if (msg_user(buf_msg(skb)) != MSG_BUNDLER) {