Skip to content

Commit

Permalink
tipc: simplify handling of lookup scope during multicast message rece…
Browse files Browse the repository at this point in the history
…ption

We introduce a new macro TIPC_ANY_SCOPE to make the handling of the
lookup scope value more comprehensible during multicast reception.

The (unchanged) rules go as follows:

1) Multicast messages sent from own node are delivered to all matching
   sockets on the own node, irrespective of their binding scope.

2) Multicast messages sent from other nodes arrive here because they
   have found TIPC_CLUSTER_SCOPE bindings emanating from this node.
   Those messages should be delivered to exactly those sockets, but not
   to local sockets bound with TIPC_NODE_SCOPE, since the latter
   obviously were not meant to be visible for those senders.

3) Group multicast/broadcast messages are delivered to the sockets with
   a binding scope matching exactly the lookup scope indicated in the
   message header, and nobody else.

Reviewed-by: Xin Long <lucien.xin@gmail.com>
Tested-by: Hoang Le <hoang.h.le@dektech.com.au>
Signed-off-by: Jon Maloy <jmaloy@redhat.com>
Signed-off-by: David S. Miller <davem@davemloft.net>
  • Loading branch information
Jon Maloy authored and David S. Miller committed Jun 3, 2021
1 parent 62633c2 commit 5ef2132
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 20 deletions.
6 changes: 3 additions & 3 deletions net/tipc/name_table.c
Original file line number Diff line number Diff line change
Expand Up @@ -673,12 +673,12 @@ bool tipc_nametbl_lookup_group(struct net *net, struct tipc_uaddr *ua,
* Returns a list of local sockets
*/
void tipc_nametbl_lookup_mcast_sockets(struct net *net, struct tipc_uaddr *ua,
bool exact, struct list_head *dports)
struct list_head *dports)
{
struct service_range *sr;
struct tipc_service *sc;
struct publication *p;
u32 scope = ua->scope;
u8 scope = ua->scope;

rcu_read_lock();
sc = tipc_service_find(net, ua);
Expand All @@ -688,7 +688,7 @@ void tipc_nametbl_lookup_mcast_sockets(struct net *net, struct tipc_uaddr *ua,
spin_lock_bh(&sc->lock);
service_range_foreach_match(sr, sc, ua->sr.lower, ua->sr.upper) {
list_for_each_entry(p, &sr->local_publ, local_publ) {
if (p->scope == scope || (!exact && p->scope < scope))
if (scope == p->scope || scope == TIPC_ANY_SCOPE)
tipc_dest_push(dports, 0, p->sk.ref);
}
}
Expand Down
4 changes: 3 additions & 1 deletion net/tipc/name_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ struct tipc_uaddr;
#define TIPC_PUBL_SCOPE_NUM (TIPC_NODE_SCOPE + 1)
#define TIPC_NAMETBL_SIZE 1024 /* must be a power of 2 */

#define TIPC_ANY_SCOPE 10 /* Both node and cluster scope will match */

/**
* struct publication - info about a published service address or range
* @sr: service range represented by this publication
Expand Down Expand Up @@ -113,7 +115,7 @@ int tipc_nl_name_table_dump(struct sk_buff *skb, struct netlink_callback *cb);
bool tipc_nametbl_lookup_anycast(struct net *net, struct tipc_uaddr *ua,
struct tipc_socket_addr *sk);
void tipc_nametbl_lookup_mcast_sockets(struct net *net, struct tipc_uaddr *ua,
bool exact, struct list_head *dports);
struct list_head *dports);
void tipc_nametbl_lookup_mcast_nodes(struct net *net, struct tipc_uaddr *ua,
struct tipc_nlist *nodes);
bool tipc_nametbl_lookup_group(struct net *net, struct tipc_uaddr *ua,
Expand Down
26 changes: 10 additions & 16 deletions net/tipc/socket.c
Original file line number Diff line number Diff line change
Expand Up @@ -1200,12 +1200,12 @@ void tipc_sk_mcast_rcv(struct net *net, struct sk_buff_head *arrvq,
struct tipc_msg *hdr;
struct tipc_uaddr ua;
int user, mtyp, hlen;
bool exact;

__skb_queue_head_init(&tmpq);
INIT_LIST_HEAD(&dports);
ua.addrtype = TIPC_SERVICE_RANGE;

/* tipc_skb_peek() increments the head skb's reference counter */
skb = tipc_skb_peek(arrvq, &inputq->lock);
for (; skb; skb = tipc_skb_peek(arrvq, &inputq->lock)) {
hdr = buf_msg(skb);
Expand All @@ -1214,6 +1214,12 @@ void tipc_sk_mcast_rcv(struct net *net, struct sk_buff_head *arrvq,
hlen = skb_headroom(skb) + msg_hdr_sz(hdr);
onode = msg_orignode(hdr);
ua.sr.type = msg_nametype(hdr);
ua.sr.lower = msg_namelower(hdr);
ua.sr.upper = msg_nameupper(hdr);
if (onode == self)
ua.scope = TIPC_ANY_SCOPE;
else
ua.scope = TIPC_CLUSTER_SCOPE;

if (mtyp == TIPC_GRP_UCAST_MSG || user == GROUP_PROTOCOL) {
spin_lock_bh(&inputq->lock);
Expand All @@ -1231,20 +1237,10 @@ void tipc_sk_mcast_rcv(struct net *net, struct sk_buff_head *arrvq,
ua.sr.lower = 0;
ua.sr.upper = ~0;
ua.scope = msg_lookup_scope(hdr);
exact = true;
} else {
/* TIPC_NODE_SCOPE means "any scope" in this context */
if (onode == self)
ua.scope = TIPC_NODE_SCOPE;
else
ua.scope = TIPC_CLUSTER_SCOPE;
exact = false;
ua.sr.lower = msg_namelower(hdr);
ua.sr.upper = msg_nameupper(hdr);
}

/* Create destination port list: */
tipc_nametbl_lookup_mcast_sockets(net, &ua, exact, &dports);
tipc_nametbl_lookup_mcast_sockets(net, &ua, &dports);

/* Clone message per destination */
while (tipc_dest_pop(&dports, NULL, &portid)) {
Expand All @@ -1256,13 +1252,11 @@ void tipc_sk_mcast_rcv(struct net *net, struct sk_buff_head *arrvq,
}
pr_warn("Failed to clone mcast rcv buffer\n");
}
/* Append to inputq if not already done by other thread */
/* Append clones to inputq only if skb is still head of arrvq */
spin_lock_bh(&inputq->lock);
if (skb_peek(arrvq) == skb) {
skb_queue_splice_tail_init(&tmpq, inputq);
/* Decrease the skb's refcnt as increasing in the
* function tipc_skb_peek
*/
/* Decrement the skb's refcnt */
kfree_skb(__skb_dequeue(arrvq));
}
spin_unlock_bh(&inputq->lock);
Expand Down

0 comments on commit 5ef2132

Please sign in to comment.