*/
static void tipc_publ_purge(struct net *net, struct publication *publ, u32 addr)
{
- struct tipc_net *tn = net_generic(net, tipc_net_id);
+ struct tipc_net *tn = tipc_net(net);
struct publication *p;
spin_lock_bh(&tn->nametbl_lock);
- p = tipc_nametbl_remove_publ(net, publ->type, publ->lower,
- publ->node, publ->port, publ->key);
+ p = tipc_nametbl_remove_publ(net, publ->type, publ->lower, publ->upper,
+ publ->node, publ->key);
if (p)
tipc_node_unsubscribe(net, &p->binding_node, addr);
spin_unlock_bh(&tn->nametbl_lock);
static bool tipc_update_nametbl(struct net *net, struct distr_item *i,
u32 node, u32 dtype)
{
- struct publication *publ = NULL;
+ struct publication *p = NULL;
+ u32 lower = ntohl(i->lower);
+ u32 upper = ntohl(i->upper);
+ u32 type = ntohl(i->type);
+ u32 port = ntohl(i->port);
+ u32 key = ntohl(i->key);
if (dtype == PUBLICATION) {
- publ = tipc_nametbl_insert_publ(net, ntohl(i->type),
- ntohl(i->lower),
- ntohl(i->upper),
- TIPC_CLUSTER_SCOPE, node,
- ntohl(i->port), ntohl(i->key));
- if (publ) {
- tipc_node_subscribe(net, &publ->binding_node, node);
+ p = tipc_nametbl_insert_publ(net, type, lower, upper,
+ TIPC_CLUSTER_SCOPE, node,
+ port, key);
+ if (p) {
+ tipc_node_subscribe(net, &p->binding_node, node);
return true;
}
} else if (dtype == WITHDRAWAL) {
- publ = tipc_nametbl_remove_publ(net, ntohl(i->type),
- ntohl(i->lower),
- node, ntohl(i->port),
- ntohl(i->key));
- if (publ) {
- tipc_node_unsubscribe(net, &publ->binding_node, node);
- kfree_rcu(publ, rcu);
+ p = tipc_nametbl_remove_publ(net, type, lower,
+ upper, node, key);
+ if (p) {
+ tipc_node_unsubscribe(net, &p->binding_node, node);
+ kfree_rcu(p, rcu);
return true;
}
+ pr_warn_ratelimited("Failed to remove binding %u,%u from %x\n",
+ type, lower, node);
} else {
pr_warn("Unrecognized name table message received\n");
}
return false;
}
-/**
- * tipc_named_add_backlog - add a failed name table update to the backlog
- *
- */
-static void tipc_named_add_backlog(struct net *net, struct distr_item *i,
- u32 type, u32 node)
-{
- struct distr_queue_item *e;
- struct tipc_net *tn = net_generic(net, tipc_net_id);
- unsigned long now = get_jiffies_64();
-
- e = kzalloc(sizeof(*e), GFP_ATOMIC);
- if (!e)
- return;
- e->dtype = type;
- e->node = node;
- e->expires = now + msecs_to_jiffies(sysctl_tipc_named_timeout);
- memcpy(e, i, sizeof(*i));
- list_add_tail(&e->next, &tn->dist_queue);
-}
-
-/**
- * tipc_named_process_backlog - try to process any pending name table updates
- * from the network.
- */
-void tipc_named_process_backlog(struct net *net)
-{
- struct distr_queue_item *e, *tmp;
- struct tipc_net *tn = net_generic(net, tipc_net_id);
- unsigned long now = get_jiffies_64();
-
- list_for_each_entry_safe(e, tmp, &tn->dist_queue, next) {
- if (time_after(e->expires, now)) {
- if (!tipc_update_nametbl(net, &e->i, e->node, e->dtype))
- continue;
- } else {
- pr_warn_ratelimited("Dropping name table update (%d) of {%u, %u, %u} from %x key=%u\n",
- e->dtype, ntohl(e->i.type),
- ntohl(e->i.lower),
- ntohl(e->i.upper),
- e->node, ntohl(e->i.key));
- }
- list_del(&e->next);
- kfree(e);
- }
-}
-
/**
* tipc_named_rcv - process name table update messages sent by another node
*/
count = msg_data_sz(msg) / ITEM_SIZE;
node = msg_orignode(msg);
while (count--) {
- if (!tipc_update_nametbl(net, item, node, mtype))
- tipc_named_add_backlog(net, item, mtype, node);
+ tipc_update_nametbl(net, item, node, mtype);
item++;
}
kfree_skb(skb);
- tipc_named_process_backlog(net);
}
spin_unlock_bh(&tn->nametbl_lock);
}
void tipc_named_node_up(struct net *net, u32 dnode);
void tipc_named_rcv(struct net *net, struct sk_buff_head *msg_queue);
void tipc_named_reinit(struct net *net);
-void tipc_named_process_backlog(struct net *net);
void tipc_publ_notify(struct net *net, struct list_head *nsub_list, u32 addr);
#endif
tmp = container_of(parent, struct service_range, tree_node);
if (lower < tmp->lower)
n = &(*n)->rb_left;
+ else if (lower > tmp->lower)
+ n = &(*n)->rb_right;
+ else if (upper < tmp->upper)
+ n = &(*n)->rb_left;
else if (upper > tmp->upper)
n = &(*n)->rb_right;
else
- return NULL;
+ return tmp;
}
sr = kzalloc(sizeof(*sr), GFP_ATOMIC);
if (!sr)
struct publication *p;
bool first = false;
- sr = tipc_service_find_range(sc, lower);
- if (!sr) {
- sr = tipc_service_create_range(sc, lower, upper);
- if (!sr)
- goto err;
- first = true;
- }
+ sr = tipc_service_create_range(sc, lower, upper);
+ if (!sr)
+ goto err;
- /* Lower end overlaps existing entry, but we need an exact match */
- if (sr->lower != lower || sr->upper != upper)
- return NULL;
+ first = list_empty(&sr->all_publ);
/* Return if the publication already exists */
list_for_each_entry(p, &sr->all_publ, all_publ) {
/**
* tipc_service_remove_publ - remove a publication from a service
- *
- * NOTE: There may be cases where TIPC is asked to remove a publication
- * that is not in the name table. For example, if another node issues a
- * publication for a name range that overlaps an existing name range
- * the publication will not be recorded, which means the publication won't
- * be found when the name range is later withdrawn by that node.
- * A failed withdraw request simply returns a failure indication and lets the
- * caller issue any error or warning messages associated with such a problem.
*/
static struct publication *tipc_service_remove_publ(struct net *net,
struct tipc_service *sc,
- u32 inst, u32 node,
- u32 port, u32 key)
+ u32 lower, u32 upper,
+ u32 node, u32 key)
{
struct tipc_subscription *sub, *tmp;
struct service_range *sr;
struct publication *p;
bool found = false;
bool last = false;
+ struct rb_node *n;
- sr = tipc_service_find_range(sc, inst);
+ sr = tipc_service_find_range(sc, lower);
if (!sr)
return NULL;
+ /* Find exact matching service range */
+ for (n = &sr->tree_node; n; n = rb_next(n)) {
+ sr = container_of(n, struct service_range, tree_node);
+ if (sr->upper == upper)
+ break;
+ }
+ if (!n || sr->lower != lower || sr->upper != upper)
+ return NULL;
+
/* Find publication, if it exists */
list_for_each_entry(p, &sr->all_publ, all_publ) {
if (p->key != key || (node && node != p->node))
}
struct publication *tipc_nametbl_remove_publ(struct net *net, u32 type,
- u32 lower, u32 node, u32 port,
- u32 key)
+ u32 lower, u32 upper,
+ u32 node, u32 key)
{
struct tipc_service *sc = tipc_service_find(net, type);
struct publication *p = NULL;
return NULL;
spin_lock_bh(&sc->lock);
- p = tipc_service_remove_publ(net, sc, lower, node, port, key);
+ p = tipc_service_remove_publ(net, sc, lower, upper, node, key);
/* Delete service item if this no more publications and subscriptions */
if (RB_EMPTY_ROOT(&sc->ranges) && list_empty(&sc->subscriptions)) {
if (p) {
nt->local_publ_count++;
skb = tipc_named_publish(net, p);
- /* Any pending external events? */
- tipc_named_process_backlog(net);
}
exit:
spin_unlock_bh(&tn->nametbl_lock);
* tipc_nametbl_withdraw - withdraw a service binding
*/
int tipc_nametbl_withdraw(struct net *net, u32 type, u32 lower,
- u32 port, u32 key)
+ u32 upper, u32 key)
{
struct name_table *nt = tipc_name_table(net);
struct tipc_net *tn = tipc_net(net);
spin_lock_bh(&tn->nametbl_lock);
- p = tipc_nametbl_remove_publ(net, type, lower, self, port, key);
+ p = tipc_nametbl_remove_publ(net, type, lower, upper, self, key);
if (p) {
nt->local_publ_count--;
skb = tipc_named_withdraw(net, p);
- /* Any pending external events? */
- tipc_named_process_backlog(net);
list_del_init(&p->binding_sock);
kfree_rcu(p, rcu);
} else {
pr_err("Failed to remove local publication {%u,%u,%u}/%u\n",
- type, lower, port, key);
+ type, lower, upper, key);
}
spin_unlock_bh(&tn->nametbl_lock);
rbtree_postorder_for_each_entry_safe(sr, tmpr, &sc->ranges, tree_node) {
list_for_each_entry_safe(p, tmpb,
&sr->all_publ, all_publ) {
- tipc_service_remove_publ(net, sc, p->lower, p->node,
- p->port, p->key);
+ tipc_service_remove_publ(net, sc, p->lower, p->upper,
+ p->node, p->key);
kfree_rcu(p, rcu);
}
}
struct list_head *dsts, int *dstcnt, u32 exclude,
bool all);
struct publication *tipc_nametbl_publish(struct net *net, u32 type, u32 lower,
- u32 upper, u32 scope, u32 port_ref,
+ u32 upper, u32 scope, u32 port,
u32 key);
-int tipc_nametbl_withdraw(struct net *net, u32 type, u32 lower, u32 ref,
+int tipc_nametbl_withdraw(struct net *net, u32 type, u32 lower, u32 upper,
u32 key);
struct publication *tipc_nametbl_insert_publ(struct net *net, u32 type,
u32 lower, u32 upper, u32 scope,
u32 node, u32 ref, u32 key);
struct publication *tipc_nametbl_remove_publ(struct net *net, u32 type,
- u32 lower, u32 node, u32 ref,
- u32 key);
+ u32 lower, u32 upper,
+ u32 node, u32 key);
void tipc_nametbl_subscribe(struct tipc_subscription *s);
void tipc_nametbl_unsubscribe(struct tipc_subscription *s);
int tipc_nametbl_init(struct net *net);
if (!self)
return;
- tipc_nametbl_withdraw(net, TIPC_CFG_SRV, self, 0, self);
+ tipc_nametbl_withdraw(net, TIPC_CFG_SRV, self, self, self);
rtnl_lock();
tipc_bearer_stop(net);
tipc_node_stop(net);
if (flags & TIPC_NOTIFY_LINK_DOWN) {
tipc_mon_peer_down(net, addr, bearer_id);
tipc_nametbl_withdraw(net, TIPC_LINK_STATE, addr,
- link_id, link_id);
+ addr, link_id);
}
}
if (publ->upper != seq->upper)
break;
tipc_nametbl_withdraw(net, publ->type, publ->lower,
- publ->port, publ->key);
+ publ->upper, publ->key);
rc = 0;
break;
}
tipc_nametbl_withdraw(net, publ->type, publ->lower,
- publ->port, publ->key);
+ publ->upper, publ->key);
rc = 0;
}
if (list_empty(&tsk->publications))