u16 advertised;
u16 window;
u16 bc_rcv_nxt;
+ u16 bc_acked;
bool usr_pending;
};
u32 portid;
u16 member_cnt;
u16 bc_snd_nxt;
+ u16 bc_ackers;
bool loopback;
bool events;
};
m->group = grp;
m->node = node;
m->port = port;
+ m->bc_acked = grp->bc_snd_nxt - 1;
grp->member_cnt++;
tipc_group_add_to_tree(grp, m);
tipc_nlist_add(&grp->dests, m->node);
{
rb_erase(&m->tree_node, &grp->members);
grp->member_cnt--;
+
+ /* Check if we were waiting for replicast ack from this member */
+ if (grp->bc_ackers && less(m->bc_acked, grp->bc_snd_nxt - 1))
+ grp->bc_ackers--;
+
list_del_init(&m->list);
list_del_init(&m->congested);
list_add_tail(&m->congested, &grp->congested);
}
-void tipc_group_update_bc_members(struct tipc_group *grp, int len)
+void tipc_group_update_bc_members(struct tipc_group *grp, int len, bool ack)
{
+ u16 prev = grp->bc_snd_nxt - 1;
struct tipc_member *m;
struct rb_node *n;
for (n = rb_first(&grp->members); n; n = rb_next(n)) {
m = container_of(n, struct tipc_member, tree_node);
- if (tipc_group_is_enabled(m))
+ if (tipc_group_is_enabled(m)) {
tipc_group_update_member(m, len);
+ m->bc_acked = prev;
+ }
}
+
+ /* Mark number of acknowledges to expect, if any */
+ if (ack)
+ grp->bc_ackers = grp->member_cnt;
grp->bc_snd_nxt++;
}
{
struct tipc_member *m = NULL;
+ /* If prev bcast was replicast, reject until all receivers have acked */
+ if (grp->bc_ackers)
+ return true;
+
if (list_empty(&grp->congested))
return false;
struct sk_buff *_skb, *tmp;
int mtyp = msg_type(hdr);
- /* Bcast may be bypassed by unicast, - sort it in */
+ /* Bcast may be bypassed by unicast or other bcast, - sort it in */
if (mtyp == TIPC_GRP_BCAST_MSG || mtyp == TIPC_GRP_MCAST_MSG) {
skb_queue_walk_safe(defq, _skb, tmp) {
_hdr = buf_msg(_skb);
struct sk_buff_head *xmitq)
{
struct sk_buff *skb = __skb_dequeue(inputq);
+ bool ack, deliver, update;
struct sk_buff_head *defq;
struct tipc_member *m;
struct tipc_msg *hdr;
- bool deliver, update;
u32 node, port;
int mtyp, blks;
hdr = buf_msg(skb);
mtyp = msg_type(hdr);
deliver = true;
+ ack = false;
update = false;
if (more(msg_grp_bc_seqno(hdr), m->bc_rcv_nxt))
/* Fall thru */
case TIPC_GRP_BCAST_MSG:
m->bc_rcv_nxt++;
+ ack = msg_grp_bc_ack_req(hdr);
break;
case TIPC_GRP_UCAST_MSG:
break;
else
kfree_skb(skb);
+ if (ack)
+ tipc_group_proto_xmit(grp, m, GRP_ACK_MSG, xmitq);
+
if (!update)
continue;
} else if (mtyp == GRP_ADV_MSG) {
msg_set_adv_win(hdr, adv);
m->advertised += adv;
+ } else if (mtyp == GRP_ACK_MSG) {
+ msg_set_grp_bc_acked(hdr, m->bc_rcv_nxt);
}
__skb_queue_tail(xmitq, skb);
}
}
/* Otherwise deliver already received WITHDRAW event */
__skb_queue_tail(inputq, m->event_msg);
- *usr_wakeup = m->usr_pending;
+ *usr_wakeup = true;
tipc_group_delete_member(grp, m);
list_del_init(&m->congested);
return;
m->usr_pending = false;
list_del_init(&m->congested);
return;
+ case GRP_ACK_MSG:
+ if (!m)
+ return;
+ m->bc_acked = msg_grp_bc_acked(hdr);
+ if (--grp->bc_ackers)
+ break;
+ *usr_wakeup = true;
+ m->usr_pending = false;
+ return;
default:
pr_warn("Received unknown GROUP_PROTO message\n");
}
TIPC_SKB_CB(skb)->orig_member = m->instance;
- *usr_wakeup = m->usr_pending;
+ *usr_wakeup = true;
m->usr_pending = false;
/* Hold back event if more messages might be expected */
struct tipc_msg *hdr,
struct sk_buff_head *inputq,
struct sk_buff_head *xmitq);
-void tipc_group_update_bc_members(struct tipc_group *grp, int len);
+void tipc_group_update_bc_members(struct tipc_group *grp, int len, bool ack);
bool tipc_group_cong(struct tipc_group *grp, u32 dnode, u32 dport,
int len, struct tipc_member **m);
bool tipc_group_bc_cong(struct tipc_group *grp, int len);
u32 port, struct sk_buff_head *xmitq);
u16 tipc_group_bc_snd_nxt(struct tipc_group *grp);
void tipc_group_update_member(struct tipc_member *m, int len);
-struct tipc_member *tipc_group_find_sender(struct tipc_group *grp,
- u32 node, u32 port);
int tipc_group_size(struct tipc_group *grp);
#endif
case TIPC_MEDIUM_IMPORTANCE:
case TIPC_HIGH_IMPORTANCE:
case TIPC_CRITICAL_IMPORTANCE:
- if (unlikely(msg_mcast(hdr))) {
+ if (unlikely(msg_in_group(hdr) || msg_mcast(hdr))) {
skb_queue_tail(l->bc_rcvlink->inputq, skb);
return true;
}
- case CONN_MANAGER:
case GROUP_PROTOCOL:
- skb_queue_tail(inputq, skb);
+ case CONN_MANAGER:
return true;
case NAME_DISTRIBUTOR:
l->bc_rcvlink->state = LINK_ESTABLISHED;
#define GRP_JOIN_MSG 0
#define GRP_LEAVE_MSG 1
#define GRP_ADV_MSG 2
+#define GRP_ACK_MSG 3
/*
* Word 1
msg_set_bits(m, 9, 16, 0xffff, n);
}
+static inline u16 msg_grp_bc_acked(struct tipc_msg *m)
+{
+ return msg_bits(m, 9, 16, 0xffff);
+}
+
+static inline void msg_set_grp_bc_acked(struct tipc_msg *m, u16 n)
+{
+ msg_set_bits(m, 9, 16, 0xffff, n);
+}
+
/* Word 10
*/
static inline u16 msg_grp_evt(struct tipc_msg *m)
msg_set_bits(m, 10, 0, 0x3, n);
}
+static inline u16 msg_grp_bc_ack_req(struct tipc_msg *m)
+{
+ return msg_bits(m, 10, 0, 0x1);
+}
+
+static inline void msg_set_grp_bc_ack_req(struct tipc_msg *m, bool n)
+{
+ msg_set_bits(m, 10, 0, 0x1, n);
+}
+
static inline u16 msg_grp_bc_seqno(struct tipc_msg *m)
{
return msg_bits(m, 10, 16, 0xffff);
u32 dnode, u32 dport, int dlen)
{
u16 bc_snd_nxt = tipc_group_bc_snd_nxt(tsk->group);
+ struct tipc_mc_method *method = &tsk->mc_method;
int blks = tsk_blocks(GROUP_H_SIZE + dlen);
struct tipc_msg *hdr = &tsk->phdr;
struct sk_buff_head pkts;
tsk->cong_link_cnt++;
}
- /* Update send window and sequence number */
+ /* Update send window */
tipc_group_update_member(mb, blks);
+ /* A broadcast sent within next EXPIRE period must follow same path */
+ method->rcast = true;
+ method->mandatory = true;
return dlen;
}
struct tipc_group *grp = tsk->group;
struct tipc_nlist *dsts = tipc_group_dests(grp);
struct tipc_mc_method *method = &tsk->mc_method;
+ bool ack = method->mandatory && method->rcast;
int blks = tsk_blocks(MCAST_H_SIZE + dlen);
struct tipc_msg *hdr = &tsk->phdr;
int mtu = tipc_bcast_get_mtu(net);
msg_set_destnode(hdr, 0);
msg_set_grp_bc_seqno(hdr, tipc_group_bc_snd_nxt(grp));
+ /* Avoid getting stuck with repeated forced replicasts */
+ msg_set_grp_bc_ack_req(hdr, ack);
+
/* Build message as chain of buffers */
skb_queue_head_init(&pkts);
rc = tipc_msg_build(hdr, m, 0, dlen, mtu, &pkts);
return rc;
/* Send message */
- rc = tipc_mcast_xmit(net, &pkts, method, dsts,
- &tsk->cong_link_cnt);
+ rc = tipc_mcast_xmit(net, &pkts, method, dsts, &tsk->cong_link_cnt);
if (unlikely(rc))
return rc;
/* Update broadcast sequence number and send windows */
- tipc_group_update_bc_members(tsk->group, blks);
+ tipc_group_update_bc_members(tsk->group, blks, ack);
+
+ /* Broadcast link is now free to choose method for next broadcast */
+ method->mandatory = false;
+ method->expires = jiffies;
+
return dlen;
}
u32 portid, oport, onode;
struct list_head dports;
struct tipc_msg *msg;
- int hsz;
+ int user, mtyp, hsz;
__skb_queue_head_init(&tmpq);
INIT_LIST_HEAD(&dports);
skb = tipc_skb_peek(arrvq, &inputq->lock);
for (; skb; skb = tipc_skb_peek(arrvq, &inputq->lock)) {
msg = buf_msg(skb);
+ user = msg_user(msg);
+ mtyp = msg_type(msg);
+ if (mtyp == TIPC_GRP_UCAST_MSG || user == GROUP_PROTOCOL) {
+ spin_lock_bh(&inputq->lock);
+ if (skb_peek(arrvq) == skb) {
+ __skb_dequeue(arrvq);
+ __skb_queue_tail(inputq, skb);
+ }
+ refcount_dec(&skb->users);
+ spin_unlock_bh(&inputq->lock);
+ continue;
+ }
hsz = skb_headroom(skb) + msg_hdr_sz(msg);
oport = msg_origport(msg);
onode = msg_orignode(msg);