struct list_head list;
struct list_head congested;
struct sk_buff *event_msg;
+ struct sk_buff_head deferredq;
struct tipc_group *group;
u32 node;
u32 port;
return NULL;
INIT_LIST_HEAD(&m->list);
INIT_LIST_HEAD(&m->congested);
+ __skb_queue_head_init(&m->deferredq);
m->group = grp;
m->node = node;
m->port = port;
return tipc_group_cong(grp, m->node, m->port, len, &m);
}
+/* tipc_group_sort_msg() - sort msg into queue by bcast sequence number
+ */
+static void tipc_group_sort_msg(struct sk_buff *skb, struct sk_buff_head *defq)
+{
+ struct tipc_msg *_hdr, *hdr = buf_msg(skb);
+ u16 bc_seqno = msg_grp_bc_seqno(hdr);
+ struct sk_buff *_skb, *tmp;
+ int mtyp = msg_type(hdr);
+
+ /* Bcast may be bypassed by unicast, - sort it in */
+ if (mtyp == TIPC_GRP_BCAST_MSG || mtyp == TIPC_GRP_MCAST_MSG) {
+ skb_queue_walk_safe(defq, _skb, tmp) {
+ _hdr = buf_msg(_skb);
+ if (!less(bc_seqno, msg_grp_bc_seqno(_hdr)))
+ continue;
+ __skb_queue_before(defq, _skb, skb);
+ return;
+ }
+ /* Bcast was not bypassed, - add to tail */
+ }
+ /* Unicasts are never bypassed, - always add to tail */
+ __skb_queue_tail(defq, skb);
+}
+
/* tipc_group_filter_msg() - determine if we should accept arriving message
*/
void tipc_group_filter_msg(struct tipc_group *grp, struct sk_buff_head *inputq,
struct sk_buff_head *xmitq)
{
struct sk_buff *skb = __skb_dequeue(inputq);
+ struct sk_buff_head *defq;
struct tipc_member *m;
struct tipc_msg *hdr;
+ bool deliver, update;
u32 node, port;
- int mtyp;
+ int mtyp, blks;
if (!skb)
return;
hdr = buf_msg(skb);
- mtyp = msg_type(hdr);
node = msg_orignode(hdr);
port = msg_origport(hdr);
if (!msg_in_group(hdr))
goto drop;
- if (mtyp == TIPC_GRP_MEMBER_EVT) {
+ if (msg_is_grp_evt(hdr)) {
if (!grp->events)
goto drop;
__skb_queue_tail(inputq, skb);
if (!tipc_group_is_receiver(m))
goto drop;
- m->bc_rcv_nxt = msg_grp_bc_seqno(hdr) + 1;
+ if (less(msg_grp_bc_seqno(hdr), m->bc_rcv_nxt))
+ goto drop;
- /* Drop multicast here if not for this member */
- if (mtyp == TIPC_GRP_MCAST_MSG) {
- if (msg_nameinst(hdr) != grp->instance) {
- m->bc_rcv_nxt = msg_grp_bc_seqno(hdr) + 1;
- tipc_group_update_rcv_win(grp, msg_blocks(hdr),
- node, port, xmitq);
- kfree_skb(skb);
- return;
+ TIPC_SKB_CB(skb)->orig_member = m->instance;
+ defq = &m->deferredq;
+ tipc_group_sort_msg(skb, defq);
+
+ while ((skb = skb_peek(defq))) {
+ hdr = buf_msg(skb);
+ mtyp = msg_type(hdr);
+ deliver = true;
+ update = false;
+
+ if (more(msg_grp_bc_seqno(hdr), m->bc_rcv_nxt))
+ break;
+
+ /* Decide what to do with message */
+ switch (mtyp) {
+ case TIPC_GRP_MCAST_MSG:
+ if (msg_nameinst(hdr) != grp->instance) {
+ update = true;
+ deliver = false;
+ }
+ /* Fall thru */
+ case TIPC_GRP_BCAST_MSG:
+ m->bc_rcv_nxt++;
+ break;
+ case TIPC_GRP_UCAST_MSG:
+ break;
+ default:
+ break;
}
- }
- TIPC_SKB_CB(skb)->orig_member = m->instance;
- __skb_queue_tail(inputq, skb);
+ /* Execute decisions */
+ __skb_dequeue(defq);
+ if (deliver)
+ __skb_queue_tail(inputq, skb);
+ else
+ kfree_skb(skb);
+
+ if (!update)
+ continue;
+ blks = msg_blocks(hdr);
+ tipc_group_update_rcv_win(grp, blks, node, port, xmitq);
+ }
return;
drop:
kfree_skb(skb);
struct msghdr *m, struct tipc_member *mb,
u32 dnode, u32 dport, int dlen)
{
+ u16 bc_snd_nxt = tipc_group_bc_snd_nxt(tsk->group);
int blks = tsk_blocks(GROUP_H_SIZE + dlen);
struct tipc_msg *hdr = &tsk->phdr;
struct sk_buff_head pkts;
msg_set_hdr_sz(hdr, GROUP_H_SIZE);
msg_set_destport(hdr, dport);
msg_set_destnode(hdr, dnode);
+ msg_set_grp_bc_seqno(hdr, bc_snd_nxt);
/* Build message as chain of buffers */
skb_queue_head_init(&pkts);