}
/*
- * tipc_link_names_xmit - send name table entries to new neighbor
- *
- * Send routine for bulk delivery of name table messages when contact
- * with a new neighbor occurs. No link congestion checking is performed
- * because name table messages *must* be delivered. The messages must be
- * small enough not to require fragmentation.
- * Called without any locks held.
- */
-void tipc_link_names_xmit(struct list_head *message_list, u32 dest)
-{
- struct tipc_node *n_ptr;
- struct tipc_link *l_ptr;
- struct sk_buff *buf;
- struct sk_buff *temp_buf;
-
- if (list_empty(message_list))
- return;
-
- n_ptr = tipc_node_find(dest);
- if (n_ptr) {
- tipc_node_lock(n_ptr);
- l_ptr = n_ptr->active_links[0];
- if (l_ptr) {
- /* convert circular list to linear list */
- ((struct sk_buff *)message_list->prev)->next = NULL;
- link_add_chain_to_outqueue(l_ptr,
- (struct sk_buff *)message_list->next, 0);
- tipc_link_push_queue(l_ptr);
- INIT_LIST_HEAD(message_list);
- }
- tipc_node_unlock(n_ptr);
- }
-
- /* discard the messages if they couldn't be sent */
- list_for_each_safe(buf, temp_buf, ((struct sk_buff *)message_list)) {
- list_del((struct list_head *)buf);
- kfree_skb(buf);
- }
-}
-
-/*
* tipc_link_push_packet: Push one unsent packet to the media
*/
static u32 tipc_link_push_packet(struct tipc_link *l_ptr)
void named_cluster_distribute(struct sk_buff *buf)
{
- struct sk_buff *buf_copy;
- struct tipc_node *n_ptr;
- struct tipc_link *l_ptr;
+ struct sk_buff *obuf;
+ struct tipc_node *node;
+ u32 dnode;
rcu_read_lock();
- list_for_each_entry_rcu(n_ptr, &tipc_node_list, list) {
- tipc_node_lock(n_ptr);
- l_ptr = n_ptr->active_links[n_ptr->addr & 1];
- if (l_ptr) {
- buf_copy = skb_copy(buf, GFP_ATOMIC);
- if (!buf_copy) {
- tipc_node_unlock(n_ptr);
- break;
- }
- msg_set_destnode(buf_msg(buf_copy), n_ptr->addr);
- __tipc_link_xmit(l_ptr, buf_copy);
- }
- tipc_node_unlock(n_ptr);
+ list_for_each_entry_rcu(node, &tipc_node_list, list) {
+ dnode = node->addr;
+ if (in_own_node(dnode))
+ continue;
+ if (!tipc_node_active_links(node))
+ continue;
+ obuf = skb_copy(buf, GFP_ATOMIC);
+ if (!obuf)
+ break;
+ msg_set_destnode(buf_msg(obuf), dnode);
+ tipc_link_xmit2(obuf, dnode, dnode);
}
rcu_read_unlock();
return buf;
}
-/*
+/**
* named_distribute - prepare name info for bulk distribution to another node
+ * @msg_list: list of messages (buffers) to be returned from this function
+ * @dnode: node to be updated
+ * @pls: linked list of publication items to be packed into buffer chain
*/
-static void named_distribute(struct list_head *message_list, u32 node,
- struct publ_list *pls, u32 max_item_buf)
+static void named_distribute(struct list_head *msg_list, u32 dnode,
+ struct publ_list *pls)
{
struct publication *publ;
struct sk_buff *buf = NULL;
struct distr_item *item = NULL;
- u32 left = 0;
- u32 rest = pls->size * ITEM_SIZE;
+ uint dsz = pls->size * ITEM_SIZE;
+ uint msg_dsz = (tipc_node_get_mtu(dnode, 0) / ITEM_SIZE) * ITEM_SIZE;
+ uint rem = dsz;
+ uint msg_rem = 0;
list_for_each_entry(publ, &pls->list, local_list) {
+ /* Prepare next buffer: */
if (!buf) {
- left = (rest <= max_item_buf) ? rest : max_item_buf;
- rest -= left;
- buf = named_prepare_buf(PUBLICATION, left, node);
+ msg_rem = min_t(uint, rem, msg_dsz);
+ rem -= msg_rem;
+ buf = named_prepare_buf(PUBLICATION, msg_rem, dnode);
if (!buf) {
pr_warn("Bulk publication failure\n");
return;
}
item = (struct distr_item *)msg_data(buf_msg(buf));
}
+
+ /* Pack publication into message: */
publ_to_item(item, publ);
item++;
- left -= ITEM_SIZE;
- if (!left) {
- list_add_tail((struct list_head *)buf, message_list);
+ msg_rem -= ITEM_SIZE;
+
+ /* Append full buffer to list: */
+ if (!msg_rem) {
+ list_add_tail((struct list_head *)buf, msg_list);
buf = NULL;
}
}
/**
* tipc_named_node_up - tell specified node about all publications by this node
*/
-void tipc_named_node_up(u32 max_item_buf, u32 node)
+void tipc_named_node_up(u32 dnode)
{
- LIST_HEAD(message_list);
+ LIST_HEAD(msg_list);
+ struct sk_buff *buf_chain;
read_lock_bh(&tipc_nametbl_lock);
- named_distribute(&message_list, node, &publ_cluster, max_item_buf);
- named_distribute(&message_list, node, &publ_zone, max_item_buf);
+ named_distribute(&msg_list, dnode, &publ_cluster);
+ named_distribute(&msg_list, dnode, &publ_zone);
read_unlock_bh(&tipc_nametbl_lock);
- tipc_link_names_xmit(&message_list, node);
+ /* Convert circular list to linear list and send: */
+ buf_chain = (struct sk_buff *)msg_list.next;
+ ((struct sk_buff *)msg_list.prev)->next = NULL;
+ tipc_link_xmit2(buf_chain, dnode, dnode);
}
/**