Skip to content

Commit

Permalink
tipc: update a binding service via broadcast
Browse files Browse the repository at this point in the history
Currently, updating binding table (add service binding to
name table/withdraw a service binding) is being sent over replicast.
However, if we are scaling up clusters to > 100 nodes/containers this
method is less affection because of looping through nodes in a cluster one
by one.

It is worth to use broadcast to update a binding service. This way, the
binding table can be updated on all peer nodes in one shot.

Broadcast is used when all peer nodes, as indicated by a new capability
flag TIPC_NAMED_BCAST, support reception of this message type.

Four problems need to be considered when introducing this feature.
1) When establishing a link to a new peer node we still update this by a
unicast 'bulk' update. This may lead to race conditions, where a later
broadcast publication/withdrawal bypass the 'bulk', resulting in
disordered publications, or even that a withdrawal may arrive before the
corresponding publication. We solve this by adding an 'is_last_bulk' bit
in the last bulk messages so that it can be distinguished from all other
messages. Only when this message has arrived do we open up for reception
of broadcast publications/withdrawals.

2) When a first legacy node is added to the cluster all distribution
will switch over to use the legacy 'replicast' method, while the
opposite happens when the last legacy node leaves the cluster. This
entails another risk of message disordering that has to be handled. We
solve this by adding a sequence number to the broadcast/replicast
messages, so that disordering can be discovered and corrected. Note
however that we don't need to consider potential message loss or
duplication at this protocol level.

3) Bulk messages don't contain any sequence numbers, and will always
arrive in order. Hence we must exempt those from the sequence number
control and deliver them unconditionally. We solve this by adding a new
'is_bulk' bit in those messages so that they can be recognized.

4) Legacy messages, which don't contain any new bits or sequence
numbers, but neither can arrive out of order, also need to be exempt
from the initial synchronization and sequence number check, and
delivered unconditionally. Therefore, we add another 'is_not_legacy' bit
to all new messages so that those can be distinguished from legacy
messages and the latter delivered directly.

v1->v2:
 - fix warning issue reported by kbuild test robot <lkp@intel.com>
 - add santiy check to drop the publication message with a sequence
number that is lower than the agreed synch point

Signed-off-by: kernel test robot <lkp@intel.com>
Signed-off-by: Hoang Huu Le <hoang.h.le@dektech.com.au>
Acked-by: Jon Maloy <jmaloy@redhat.com>
Signed-off-by: David S. Miller <davem@davemloft.net>
  • Loading branch information
Hoang Huu Le authored and David S. Miller committed Jun 17, 2020
1 parent 6911967 commit cad2929
Show file tree
Hide file tree
Showing 10 changed files with 177 additions and 48 deletions.
6 changes: 3 additions & 3 deletions net/tipc/bcast.c
Original file line number Diff line number Diff line change
Expand Up @@ -250,8 +250,8 @@ static void tipc_bcast_select_xmit_method(struct net *net, int dests,
* Consumes the buffer chain.
* Returns 0 if success, otherwise errno: -EHOSTUNREACH,-EMSGSIZE
*/
static int tipc_bcast_xmit(struct net *net, struct sk_buff_head *pkts,
u16 *cong_link_cnt)
int tipc_bcast_xmit(struct net *net, struct sk_buff_head *pkts,
u16 *cong_link_cnt)
{
struct tipc_link *l = tipc_bc_sndlink(net);
struct sk_buff_head xmitq;
Expand Down Expand Up @@ -752,7 +752,7 @@ void tipc_nlist_purge(struct tipc_nlist *nl)
nl->local = false;
}

u32 tipc_bcast_get_broadcast_mode(struct net *net)
u32 tipc_bcast_get_mode(struct net *net)
{
struct tipc_bc_base *bb = tipc_bc_base(net);

Expand Down
4 changes: 3 additions & 1 deletion net/tipc/bcast.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ void tipc_bcast_toggle_rcast(struct net *net, bool supp);
int tipc_mcast_xmit(struct net *net, struct sk_buff_head *pkts,
struct tipc_mc_method *method, struct tipc_nlist *dests,
u16 *cong_link_cnt);
int tipc_bcast_xmit(struct net *net, struct sk_buff_head *pkts,
u16 *cong_link_cnt);
int tipc_bcast_rcv(struct net *net, struct tipc_link *l, struct sk_buff *skb);
void tipc_bcast_ack_rcv(struct net *net, struct tipc_link *l,
struct tipc_msg *hdr);
Expand All @@ -101,7 +103,7 @@ int tipc_nl_add_bc_link(struct net *net, struct tipc_nl_msg *msg,
int tipc_nl_bc_link_set(struct net *net, struct nlattr *attrs[]);
int tipc_bclink_reset_stats(struct net *net, struct tipc_link *l);

u32 tipc_bcast_get_broadcast_mode(struct net *net);
u32 tipc_bcast_get_mode(struct net *net);
u32 tipc_bcast_get_broadcast_ratio(struct net *net);

void tipc_mcast_filter_msg(struct net *net, struct sk_buff_head *defq,
Expand Down
2 changes: 1 addition & 1 deletion net/tipc/link.c
Original file line number Diff line number Diff line change
Expand Up @@ -2745,7 +2745,7 @@ int tipc_nl_add_bc_link(struct net *net, struct tipc_nl_msg *msg,
void *hdr;
struct nlattr *attrs;
struct nlattr *prop;
u32 bc_mode = tipc_bcast_get_broadcast_mode(net);
u32 bc_mode = tipc_bcast_get_mode(net);
u32 bc_ratio = tipc_bcast_get_broadcast_ratio(net);

if (!bcl)
Expand Down
40 changes: 40 additions & 0 deletions net/tipc/msg.h
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,36 @@ static inline void msg_set_errcode(struct tipc_msg *m, u32 err)
msg_set_bits(m, 1, 25, 0xf, err);
}

static inline void msg_set_bulk(struct tipc_msg *m)
{
msg_set_bits(m, 1, 28, 0x1, 1);
}

static inline u32 msg_is_bulk(struct tipc_msg *m)
{
return msg_bits(m, 1, 28, 0x1);
}

static inline void msg_set_last_bulk(struct tipc_msg *m)
{
msg_set_bits(m, 1, 27, 0x1, 1);
}

static inline u32 msg_is_last_bulk(struct tipc_msg *m)
{
return msg_bits(m, 1, 27, 0x1);
}

static inline void msg_set_non_legacy(struct tipc_msg *m)
{
msg_set_bits(m, 1, 26, 0x1, 1);
}

static inline u32 msg_is_legacy(struct tipc_msg *m)
{
return !msg_bits(m, 1, 26, 0x1);
}

static inline u32 msg_reroute_cnt(struct tipc_msg *m)
{
return msg_bits(m, 1, 21, 0xf);
Expand Down Expand Up @@ -567,6 +597,16 @@ static inline void msg_set_origport(struct tipc_msg *m, u32 p)
msg_set_word(m, 4, p);
}

static inline u16 msg_named_seqno(struct tipc_msg *m)
{
return msg_bits(m, 4, 0, 0xffff);
}

static inline void msg_set_named_seqno(struct tipc_msg *m, u16 n)
{
msg_set_bits(m, 4, 0, 0xffff, n);
}

static inline u32 msg_destport(struct tipc_msg *m)
{
return msg_word(m, 5);
Expand Down
116 changes: 87 additions & 29 deletions net/tipc/name_distr.c
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,8 @@ struct sk_buff *tipc_named_publish(struct net *net, struct publication *publ)
pr_warn("Publication distribution failure\n");
return NULL;
}

msg_set_named_seqno(buf_msg(skb), nt->snd_nxt++);
msg_set_non_legacy(buf_msg(skb));
item = (struct distr_item *)msg_data(buf_msg(skb));
publ_to_item(item, publ);
return skb;
Expand All @@ -114,24 +115,25 @@ struct sk_buff *tipc_named_publish(struct net *net, struct publication *publ)
struct sk_buff *tipc_named_withdraw(struct net *net, struct publication *publ)
{
struct name_table *nt = tipc_name_table(net);
struct sk_buff *buf;
struct distr_item *item;
struct sk_buff *skb;

write_lock_bh(&nt->cluster_scope_lock);
list_del(&publ->binding_node);
write_unlock_bh(&nt->cluster_scope_lock);
if (publ->scope == TIPC_NODE_SCOPE)
return NULL;

buf = named_prepare_buf(net, WITHDRAWAL, ITEM_SIZE, 0);
if (!buf) {
skb = named_prepare_buf(net, WITHDRAWAL, ITEM_SIZE, 0);
if (!skb) {
pr_warn("Withdrawal distribution failure\n");
return NULL;
}

item = (struct distr_item *)msg_data(buf_msg(buf));
msg_set_named_seqno(buf_msg(skb), nt->snd_nxt++);
msg_set_non_legacy(buf_msg(skb));
item = (struct distr_item *)msg_data(buf_msg(skb));
publ_to_item(item, publ);
return buf;
return skb;
}

/**
Expand All @@ -141,14 +143,15 @@ struct sk_buff *tipc_named_withdraw(struct net *net, struct publication *publ)
* @pls: linked list of publication items to be packed into buffer chain
*/
static void named_distribute(struct net *net, struct sk_buff_head *list,
u32 dnode, struct list_head *pls)
u32 dnode, struct list_head *pls, u16 seqno)
{
struct publication *publ;
struct sk_buff *skb = NULL;
struct distr_item *item = NULL;
u32 msg_dsz = ((tipc_node_get_mtu(net, dnode, 0, false) - INT_H_SIZE) /
ITEM_SIZE) * ITEM_SIZE;
u32 msg_rem = msg_dsz;
struct tipc_msg *hdr;

list_for_each_entry(publ, pls, binding_node) {
/* Prepare next buffer: */
Expand All @@ -159,8 +162,11 @@ static void named_distribute(struct net *net, struct sk_buff_head *list,
pr_warn("Bulk publication failure\n");
return;
}
msg_set_bc_ack_invalid(buf_msg(skb), true);
item = (struct distr_item *)msg_data(buf_msg(skb));
hdr = buf_msg(skb);
msg_set_bc_ack_invalid(hdr, true);
msg_set_bulk(hdr);
msg_set_non_legacy(hdr);
item = (struct distr_item *)msg_data(hdr);
}

/* Pack publication into message: */
Expand All @@ -176,24 +182,35 @@ static void named_distribute(struct net *net, struct sk_buff_head *list,
}
}
if (skb) {
msg_set_size(buf_msg(skb), INT_H_SIZE + (msg_dsz - msg_rem));
hdr = buf_msg(skb);
msg_set_size(hdr, INT_H_SIZE + (msg_dsz - msg_rem));
skb_trim(skb, INT_H_SIZE + (msg_dsz - msg_rem));
__skb_queue_tail(list, skb);
}
hdr = buf_msg(skb_peek_tail(list));
msg_set_last_bulk(hdr);
msg_set_named_seqno(hdr, seqno);
}

/**
* tipc_named_node_up - tell specified node about all publications by this node
*/
void tipc_named_node_up(struct net *net, u32 dnode)
void tipc_named_node_up(struct net *net, u32 dnode, u16 capabilities)
{
struct name_table *nt = tipc_name_table(net);
struct tipc_net *tn = tipc_net(net);
struct sk_buff_head head;
u16 seqno;

__skb_queue_head_init(&head);
spin_lock_bh(&tn->nametbl_lock);
if (!(capabilities & TIPC_NAMED_BCAST))
nt->rc_dests++;
seqno = nt->snd_nxt;
spin_unlock_bh(&tn->nametbl_lock);

read_lock_bh(&nt->cluster_scope_lock);
named_distribute(net, &head, dnode, &nt->cluster_scope);
named_distribute(net, &head, dnode, &nt->cluster_scope, seqno);
tipc_node_xmit(net, &head, dnode, 0);
read_unlock_bh(&nt->cluster_scope_lock);
}
Expand Down Expand Up @@ -245,13 +262,21 @@ static void tipc_dist_queue_purge(struct net *net, u32 addr)
spin_unlock_bh(&tn->nametbl_lock);
}

void tipc_publ_notify(struct net *net, struct list_head *nsub_list, u32 addr)
void tipc_publ_notify(struct net *net, struct list_head *nsub_list,
u32 addr, u16 capabilities)
{
struct name_table *nt = tipc_name_table(net);
struct tipc_net *tn = tipc_net(net);

struct publication *publ, *tmp;

list_for_each_entry_safe(publ, tmp, nsub_list, binding_node)
tipc_publ_purge(net, publ, addr);
tipc_dist_queue_purge(net, addr);
spin_lock_bh(&tn->nametbl_lock);
if (!(capabilities & TIPC_NAMED_BCAST))
nt->rc_dests--;
spin_unlock_bh(&tn->nametbl_lock);
}

/**
Expand Down Expand Up @@ -295,29 +320,62 @@ static bool tipc_update_nametbl(struct net *net, struct distr_item *i,
return false;
}

static struct sk_buff *tipc_named_dequeue(struct sk_buff_head *namedq,
u16 *rcv_nxt, bool *open)
{
struct sk_buff *skb, *tmp;
struct tipc_msg *hdr;
u16 seqno;

skb_queue_walk_safe(namedq, skb, tmp) {
skb_linearize(skb);
hdr = buf_msg(skb);
seqno = msg_named_seqno(hdr);
if (msg_is_last_bulk(hdr)) {
*rcv_nxt = seqno;
*open = true;
}

if (msg_is_bulk(hdr) || msg_is_legacy(hdr)) {
__skb_unlink(skb, namedq);
return skb;
}

if (*open && (*rcv_nxt == seqno)) {
(*rcv_nxt)++;
__skb_unlink(skb, namedq);
return skb;
}

if (less(seqno, *rcv_nxt)) {
__skb_unlink(skb, namedq);
kfree_skb(skb);
continue;
}
}
return NULL;
}

/**
* tipc_named_rcv - process name table update messages sent by another node
*/
void tipc_named_rcv(struct net *net, struct sk_buff_head *inputq)
void tipc_named_rcv(struct net *net, struct sk_buff_head *namedq,
u16 *rcv_nxt, bool *open)
{
struct tipc_net *tn = net_generic(net, tipc_net_id);
struct tipc_msg *msg;
struct tipc_net *tn = tipc_net(net);
struct distr_item *item;
uint count;
u32 node;
struct tipc_msg *hdr;
struct sk_buff *skb;
int mtype;
u32 count, node;

spin_lock_bh(&tn->nametbl_lock);
for (skb = skb_dequeue(inputq); skb; skb = skb_dequeue(inputq)) {
skb_linearize(skb);
msg = buf_msg(skb);
mtype = msg_type(msg);
item = (struct distr_item *)msg_data(msg);
count = msg_data_sz(msg) / ITEM_SIZE;
node = msg_orignode(msg);
while ((skb = tipc_named_dequeue(namedq, rcv_nxt, open))) {
hdr = buf_msg(skb);
node = msg_orignode(hdr);
item = (struct distr_item *)msg_data(hdr);
count = msg_data_sz(hdr) / ITEM_SIZE;
while (count--) {
tipc_update_nametbl(net, item, node, mtype);
tipc_update_nametbl(net, item, node, msg_type(hdr));
item++;
}
kfree_skb(skb);
Expand Down Expand Up @@ -345,6 +403,6 @@ void tipc_named_reinit(struct net *net)
publ->node = self;
list_for_each_entry_rcu(publ, &nt->cluster_scope, binding_node)
publ->node = self;

nt->rc_dests = 0;
spin_unlock_bh(&tn->nametbl_lock);
}
9 changes: 6 additions & 3 deletions net/tipc/name_distr.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,14 @@ struct distr_item {
__be32 key;
};

void tipc_named_bcast(struct net *net, struct sk_buff *skb);
struct sk_buff *tipc_named_publish(struct net *net, struct publication *publ);
struct sk_buff *tipc_named_withdraw(struct net *net, struct publication *publ);
void tipc_named_node_up(struct net *net, u32 dnode);
void tipc_named_rcv(struct net *net, struct sk_buff_head *msg_queue);
void tipc_named_node_up(struct net *net, u32 dnode, u16 capabilities);
void tipc_named_rcv(struct net *net, struct sk_buff_head *namedq,
u16 *rcv_nxt, bool *open);
void tipc_named_reinit(struct net *net);
void tipc_publ_notify(struct net *net, struct list_head *nsub_list, u32 addr);
void tipc_publ_notify(struct net *net, struct list_head *nsub_list,
u32 addr, u16 capabilities);

#endif
9 changes: 7 additions & 2 deletions net/tipc/name_table.c
Original file line number Diff line number Diff line change
Expand Up @@ -729,6 +729,7 @@ struct publication *tipc_nametbl_publish(struct net *net, u32 type, u32 lower,
struct tipc_net *tn = tipc_net(net);
struct publication *p = NULL;
struct sk_buff *skb = NULL;
u32 rc_dests;

spin_lock_bh(&tn->nametbl_lock);

Expand All @@ -743,12 +744,14 @@ struct publication *tipc_nametbl_publish(struct net *net, u32 type, u32 lower,
nt->local_publ_count++;
skb = tipc_named_publish(net, p);
}
rc_dests = nt->rc_dests;
exit:
spin_unlock_bh(&tn->nametbl_lock);

if (skb)
tipc_node_broadcast(net, skb);
tipc_node_broadcast(net, skb, rc_dests);
return p;

}

/**
Expand All @@ -762,6 +765,7 @@ int tipc_nametbl_withdraw(struct net *net, u32 type, u32 lower,
u32 self = tipc_own_addr(net);
struct sk_buff *skb = NULL;
struct publication *p;
u32 rc_dests;

spin_lock_bh(&tn->nametbl_lock);

Expand All @@ -775,10 +779,11 @@ int tipc_nametbl_withdraw(struct net *net, u32 type, u32 lower,
pr_err("Failed to remove local publication {%u,%u,%u}/%u\n",
type, lower, upper, key);
}
rc_dests = nt->rc_dests;
spin_unlock_bh(&tn->nametbl_lock);

if (skb) {
tipc_node_broadcast(net, skb);
tipc_node_broadcast(net, skb, rc_dests);
return 1;
}
return 0;
Expand Down
2 changes: 2 additions & 0 deletions net/tipc/name_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,8 @@ struct name_table {
struct list_head cluster_scope;
rwlock_t cluster_scope_lock;
u32 local_publ_count;
u32 rc_dests;
u32 snd_nxt;
};

int tipc_nl_name_table_dump(struct sk_buff *skb, struct netlink_callback *cb);
Expand Down
Loading

0 comments on commit cad2929

Please sign in to comment.