link_init_max_pkt(l_ptr);
l_ptr->priority = b_ptr->priority;
tipc_link_set_queue_limits(l_ptr, b_ptr->window);
-
l_ptr->next_out_no = 1;
__skb_queue_head_init(&l_ptr->transmq);
__skb_queue_head_init(&l_ptr->backlogq);
* Move a number of waiting users, as permitted by available space in
* the send queue, from link wait queue to node wait queue for wakeup
*/
-void link_prepare_wakeup(struct tipc_link *link)
+void link_prepare_wakeup(struct tipc_link *l)
{
- uint pend_qsz = skb_queue_len(&link->backlogq);
+ int pnd[TIPC_SYSTEM_IMPORTANCE + 1] = {0,};
+ int imp, lim;
struct sk_buff *skb, *tmp;
- skb_queue_walk_safe(&link->wakeupq, skb, tmp) {
- if (pend_qsz >= link->queue_limit[TIPC_SKB_CB(skb)->chain_imp])
+ skb_queue_walk_safe(&l->wakeupq, skb, tmp) {
+ imp = TIPC_SKB_CB(skb)->chain_imp;
+ lim = l->window + l->backlog[imp].limit;
+ pnd[imp] += TIPC_SKB_CB(skb)->chain_sz;
+ if ((pnd[imp] + l->backlog[imp].len) >= lim)
break;
- pend_qsz += TIPC_SKB_CB(skb)->chain_sz;
- skb_unlink(skb, &link->wakeupq);
- skb_queue_tail(&link->inputq, skb);
- link->owner->inputq = &link->inputq;
- link->owner->action_flags |= TIPC_MSG_EVT;
+ skb_unlink(skb, &l->wakeupq);
+ skb_queue_tail(&l->inputq, skb);
+ l->owner->inputq = &l->inputq;
+ l->owner->action_flags |= TIPC_MSG_EVT;
}
}
l_ptr->reasm_buf = NULL;
}
+static void tipc_link_purge_backlog(struct tipc_link *l)
+{
+ __skb_queue_purge(&l->backlogq);
+ l->backlog[TIPC_LOW_IMPORTANCE].len = 0;
+ l->backlog[TIPC_MEDIUM_IMPORTANCE].len = 0;
+ l->backlog[TIPC_HIGH_IMPORTANCE].len = 0;
+ l->backlog[TIPC_CRITICAL_IMPORTANCE].len = 0;
+ l->backlog[TIPC_SYSTEM_IMPORTANCE].len = 0;
+}
+
/**
* tipc_link_purge_queues - purge all pkt queues associated with link
* @l_ptr: pointer to link
{
__skb_queue_purge(&l_ptr->deferdq);
__skb_queue_purge(&l_ptr->transmq);
- __skb_queue_purge(&l_ptr->backlogq);
+ tipc_link_purge_backlog(l_ptr);
tipc_link_reset_fragments(l_ptr);
}
/* Clean up all queues, except inputq: */
__skb_queue_purge(&l_ptr->transmq);
- __skb_queue_purge(&l_ptr->backlogq);
__skb_queue_purge(&l_ptr->deferdq);
if (!owner->inputq)
owner->inputq = &l_ptr->inputq;
skb_queue_splice_init(&l_ptr->wakeupq, owner->inputq);
if (!skb_queue_empty(owner->inputq))
owner->action_flags |= TIPC_MSG_EVT;
+ tipc_link_purge_backlog(l_ptr);
l_ptr->rcv_unacked = 0;
l_ptr->checkpoint = 1;
l_ptr->next_out_no = 1;
struct sk_buff_head *backlogq = &link->backlogq;
struct sk_buff *skb, *tmp;
- /* Match queue limit against msg importance: */
- if (unlikely(skb_queue_len(backlogq) >= link->queue_limit[imp]))
+ /* Match backlog limit against msg importance: */
+ if (unlikely(link->backlog[imp].len >= link->backlog[imp].limit))
return tipc_link_cong(link, list);
- /* Has valid packet limit been used ? */
if (unlikely(msg_size(msg) > mtu)) {
__skb_queue_purge(list);
return -EMSGSIZE;
}
-
/* Prepare each packet for sending, and add to relevant queue: */
skb_queue_walk_safe(list, skb, tmp) {
__skb_unlink(skb, list);
if (tipc_msg_make_bundle(&skb, mtu, link->addr)) {
link->stats.sent_bundled++;
link->stats.sent_bundles++;
+ imp = msg_importance(buf_msg(skb));
}
__skb_queue_tail(backlogq, skb);
+ link->backlog[imp].len++;
seqno++;
}
link->next_out_no = seqno;
if (!skb)
break;
msg = buf_msg(skb);
+ link->backlog[msg_importance(msg)].len--;
msg_set_ack(msg, ack);
msg_set_bcast_ack(msg, link->owner->bclink.last_in);
link->rcv_unacked = 0;
tipc_msg_init(link_own_addr(l_ptr), &tunnel_hdr, CHANGEOVER_PROTOCOL,
ORIGINAL_MSG, INT_H_SIZE, l_ptr->addr);
skb_queue_splice_tail_init(&l_ptr->backlogq, &l_ptr->transmq);
+ tipc_link_purge_backlog(l_ptr);
msgcount = skb_queue_len(&l_ptr->transmq);
msg_set_bearer_id(&tunnel_hdr, l_ptr->peer_bearer_id);
msg_set_msgcnt(&tunnel_hdr, msgcount);
int max_bulk = TIPC_MAX_PUBLICATIONS / (l->max_pkt / ITEM_SIZE);
l->window = win;
- l->queue_limit[TIPC_LOW_IMPORTANCE] = win / 2;
- l->queue_limit[TIPC_MEDIUM_IMPORTANCE] = win;
- l->queue_limit[TIPC_HIGH_IMPORTANCE] = win / 2 * 3;
- l->queue_limit[TIPC_CRITICAL_IMPORTANCE] = win * 2;
- l->queue_limit[TIPC_SYSTEM_IMPORTANCE] = max_bulk;
+ l->backlog[TIPC_LOW_IMPORTANCE].limit = win / 2;
+ l->backlog[TIPC_MEDIUM_IMPORTANCE].limit = win;
+ l->backlog[TIPC_HIGH_IMPORTANCE].limit = win / 2 * 3;
+ l->backlog[TIPC_CRITICAL_IMPORTANCE].limit = win * 2;
+ l->backlog[TIPC_SYSTEM_IMPORTANCE].limit = max_bulk;
}
/* tipc_link_find_owner - locate owner node of link by link's name
if (nla_put_u32(msg->skb, TIPC_NLA_PROP_TOL, link->tolerance))
goto prop_msg_full;
if (nla_put_u32(msg->skb, TIPC_NLA_PROP_WIN,
- link->queue_limit[TIPC_LOW_IMPORTANCE]))
+ link->window))
goto prop_msg_full;
if (nla_put_u32(msg->skb, TIPC_NLA_PROP_PRIO, link->priority))
goto prop_msg_full;