Skip to content

Commit

Permalink
Merge branch 'tipc-next'
Browse files Browse the repository at this point in the history
Jon Maloy says:

====================
tipc: improve broadcast implementation

The TIPC broadcast link implementation is currently complex and hard to
follow. It also incurs some amount of code and structure duplication,
something that can be reduced significantly with a little effort.

This commit series introduces a number of improvements which address
both the locking structure, the code/structure duplication issue, and
the overall readbility of the code.

The series consists of three main parts:

1-7: Adaptation to the new link structure, and preparation for the next
     step. In particular, we want the broadcast transmission link to
     have a life cycle that is longer than any of its potential (unicast
     and broadcast receive links) users. This eliminates the need to
     always test for the presence of this link before accessing it.

8-10: This is what is really new in this series. Commit #9 is by far
      the largest and most important one, because it moves most of
      the broadcast functionality into link.c, partially reusing the
      fields and functionality of the unicast link. The removal of
      the "node_map" infrastructure in commit #10 is also an important
      achievement.

11-16: Some improvements leveraging the changes made in the previous
       commits.

The series needs commit 53387c4 ("tipc: extend broadcast link window size")
and commit e535679 ("tipc: conditionally expand buffer headroom over udp tunnel")
which are both present in 'net' but not yet in 'net-next', to apply cleanly.
====================

Signed-off-by: David S. Miller <davem@davemloft.net>
  • Loading branch information
David S. Miller committed Oct 24, 2015
2 parents ba3e208 + 2af5ae3 commit 687f079
Show file tree
Hide file tree
Showing 17 changed files with 1,005 additions and 1,387 deletions.
988 changes: 237 additions & 751 deletions net/tipc/bcast.c

Large diffs are not rendered by default.

122 changes: 32 additions & 90 deletions net/tipc/bcast.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,102 +37,44 @@
#ifndef _TIPC_BCAST_H
#define _TIPC_BCAST_H

#include <linux/tipc_config.h>
#include "link.h"
#include "node.h"
#include "core.h"

/**
* struct tipc_bcbearer_pair - a pair of bearers used by broadcast link
* @primary: pointer to primary bearer
* @secondary: pointer to secondary bearer
*
* Bearers must have same priority and same set of reachable destinations
* to be paired.
*/

struct tipc_bcbearer_pair {
struct tipc_bearer *primary;
struct tipc_bearer *secondary;
};

#define BCBEARER MAX_BEARERS

/**
* struct tipc_bcbearer - bearer used by broadcast link
* @bearer: (non-standard) broadcast bearer structure
* @media: (non-standard) broadcast media structure
* @bpairs: array of bearer pairs
* @bpairs_temp: temporary array of bearer pairs used by tipc_bcbearer_sort()
* @remains: temporary node map used by tipc_bcbearer_send()
* @remains_new: temporary node map used tipc_bcbearer_send()
*
* Note: The fields labelled "temporary" are incorporated into the bearer
* to avoid consuming potentially limited stack space through the use of
* large local variables within multicast routines. Concurrent access is
* prevented through use of the spinlock "bclink_lock".
*/
struct tipc_bcbearer {
struct tipc_bearer bearer;
struct tipc_media media;
struct tipc_bcbearer_pair bpairs[MAX_BEARERS];
struct tipc_bcbearer_pair bpairs_temp[TIPC_MAX_LINK_PRI + 1];
struct tipc_node_map remains;
struct tipc_node_map remains_new;
};
struct tipc_node;
struct tipc_msg;
struct tipc_nl_msg;
struct tipc_node_map;

/**
* struct tipc_bclink - link used for broadcast messages
* @lock: spinlock governing access to structure
* @link: (non-standard) broadcast link structure
* @node: (non-standard) node structure representing b'cast link's peer node
* @bcast_nodes: map of broadcast-capable nodes
* @retransmit_to: node that most recently requested a retransmit
*
* Handles sequence numbering, fragmentation, bundling, etc.
*/
struct tipc_bclink {
spinlock_t lock;
struct tipc_link link;
struct tipc_node node;
struct sk_buff_head arrvq;
struct sk_buff_head inputq;
struct tipc_node_map bcast_nodes;
struct tipc_node *retransmit_to;
};
int tipc_bcast_init(struct net *net);
void tipc_bcast_reinit(struct net *net);
void tipc_bcast_stop(struct net *net);
void tipc_bcast_add_peer(struct net *net, struct tipc_link *l,
struct sk_buff_head *xmitq);
void tipc_bcast_remove_peer(struct net *net, struct tipc_link *rcv_bcl);
void tipc_bcast_inc_bearer_dst_cnt(struct net *net, int bearer_id);
void tipc_bcast_dec_bearer_dst_cnt(struct net *net, int bearer_id);
int tipc_bcast_get_mtu(struct net *net);
int tipc_bcast_xmit(struct net *net, struct sk_buff_head *list);
int tipc_bcast_rcv(struct net *net, struct tipc_link *l, struct sk_buff *skb);
void tipc_bcast_ack_rcv(struct net *net, struct tipc_link *l, u32 acked);
void tipc_bcast_sync_rcv(struct net *net, struct tipc_link *l,
struct tipc_msg *hdr);
int tipc_nl_add_bc_link(struct net *net, struct tipc_nl_msg *msg);
int tipc_nl_bc_link_set(struct net *net, struct nlattr *attrs[]);
int tipc_bclink_reset_stats(struct net *net);

struct tipc_node;
extern const char tipc_bclink_name[];
static inline void tipc_bcast_lock(struct net *net)
{
spin_lock_bh(&tipc_net(net)->bclock);
}

/**
* tipc_nmap_equal - test for equality of node maps
*/
static inline int tipc_nmap_equal(struct tipc_node_map *nm_a,
struct tipc_node_map *nm_b)
static inline void tipc_bcast_unlock(struct net *net)
{
return !memcmp(nm_a, nm_b, sizeof(*nm_a));
spin_unlock_bh(&tipc_net(net)->bclock);
}

int tipc_bclink_init(struct net *net);
void tipc_bclink_stop(struct net *net);
void tipc_bclink_add_node(struct net *net, u32 addr);
void tipc_bclink_remove_node(struct net *net, u32 addr);
struct tipc_node *tipc_bclink_retransmit_to(struct net *tn);
void tipc_bclink_acknowledge(struct tipc_node *n_ptr, u32 acked);
void tipc_bclink_rcv(struct net *net, struct sk_buff *buf);
u32 tipc_bclink_get_last_sent(struct net *net);
u32 tipc_bclink_acks_missing(struct tipc_node *n_ptr);
void tipc_bclink_update_link_state(struct tipc_node *node,
u32 last_sent);
int tipc_bclink_reset_stats(struct net *net);
int tipc_bclink_set_queue_limits(struct net *net, u32 limit);
void tipc_bcbearer_sort(struct net *net, struct tipc_node_map *nm_ptr,
u32 node, bool action);
uint tipc_bclink_get_mtu(void);
int tipc_bclink_xmit(struct net *net, struct sk_buff_head *list);
void tipc_bclink_wakeup_users(struct net *net);
int tipc_nl_add_bc_link(struct net *net, struct tipc_nl_msg *msg);
int tipc_nl_bc_link_set(struct net *net, struct nlattr *attrs[]);
void tipc_bclink_input(struct net *net);
void tipc_bclink_sync_state(struct tipc_node *n, struct tipc_msg *msg);
static inline struct tipc_link *tipc_bc_sndlink(struct net *net)
{
return tipc_net(net)->bcl;
}

#endif
94 changes: 59 additions & 35 deletions net/tipc/bearer.c
Original file line number Diff line number Diff line change
Expand Up @@ -193,10 +193,8 @@ void tipc_bearer_add_dest(struct net *net, u32 bearer_id, u32 dest)

rcu_read_lock();
b_ptr = rcu_dereference_rtnl(tn->bearer_list[bearer_id]);
if (b_ptr) {
tipc_bcbearer_sort(net, &b_ptr->nodes, dest, true);
if (b_ptr)
tipc_disc_add_dest(b_ptr->link_req);
}
rcu_read_unlock();
}

Expand All @@ -207,10 +205,8 @@ void tipc_bearer_remove_dest(struct net *net, u32 bearer_id, u32 dest)

rcu_read_lock();
b_ptr = rcu_dereference_rtnl(tn->bearer_list[bearer_id]);
if (b_ptr) {
tipc_bcbearer_sort(net, &b_ptr->nodes, dest, false);
if (b_ptr)
tipc_disc_remove_dest(b_ptr->link_req);
}
rcu_read_unlock();
}

Expand Down Expand Up @@ -418,53 +414,58 @@ void tipc_disable_l2_media(struct tipc_bearer *b)
* @b_ptr: the bearer through which the packet is to be sent
* @dest: peer destination address
*/
int tipc_l2_send_msg(struct net *net, struct sk_buff *buf,
int tipc_l2_send_msg(struct net *net, struct sk_buff *skb,
struct tipc_bearer *b, struct tipc_media_addr *dest)
{
struct sk_buff *clone;
struct net_device *dev;
int delta;

dev = (struct net_device *)rcu_dereference_rtnl(b->media_ptr);
if (!dev)
return 0;

clone = skb_clone(buf, GFP_ATOMIC);
if (!clone)
return 0;

delta = dev->hard_header_len - skb_headroom(buf);
delta = dev->hard_header_len - skb_headroom(skb);
if ((delta > 0) &&
pskb_expand_head(clone, SKB_DATA_ALIGN(delta), 0, GFP_ATOMIC)) {
kfree_skb(clone);
pskb_expand_head(skb, SKB_DATA_ALIGN(delta), 0, GFP_ATOMIC)) {
kfree_skb(skb);
return 0;
}

skb_reset_network_header(clone);
clone->dev = dev;
clone->protocol = htons(ETH_P_TIPC);
dev_hard_header(clone, dev, ETH_P_TIPC, dest->value,
dev->dev_addr, clone->len);
dev_queue_xmit(clone);
skb_reset_network_header(skb);
skb->dev = dev;
skb->protocol = htons(ETH_P_TIPC);
dev_hard_header(skb, dev, ETH_P_TIPC, dest->value,
dev->dev_addr, skb->len);
dev_queue_xmit(skb);
return 0;
}

/* tipc_bearer_send- sends buffer to destination over bearer
*
* IMPORTANT:
* The media send routine must not alter the buffer being passed in
* as it may be needed for later retransmission!
int tipc_bearer_mtu(struct net *net, u32 bearer_id)
{
int mtu = 0;
struct tipc_bearer *b;

rcu_read_lock();
b = rcu_dereference_rtnl(tipc_net(net)->bearer_list[bearer_id]);
if (b)
mtu = b->mtu;
rcu_read_unlock();
return mtu;
}

/* tipc_bearer_xmit_skb - sends buffer to destination over bearer
*/
void tipc_bearer_send(struct net *net, u32 bearer_id, struct sk_buff *buf,
struct tipc_media_addr *dest)
void tipc_bearer_xmit_skb(struct net *net, u32 bearer_id,
struct sk_buff *skb,
struct tipc_media_addr *dest)
{
struct tipc_net *tn = net_generic(net, tipc_net_id);
struct tipc_bearer *b_ptr;
struct tipc_net *tn = tipc_net(net);
struct tipc_bearer *b;

rcu_read_lock();
b_ptr = rcu_dereference_rtnl(tn->bearer_list[bearer_id]);
if (likely(b_ptr))
b_ptr->media->send_msg(net, buf, b_ptr, dest);
b = rcu_dereference_rtnl(tn->bearer_list[bearer_id]);
if (likely(b))
b->media->send_msg(net, skb, b, dest);
rcu_read_unlock();
}

Expand All @@ -487,8 +488,31 @@ void tipc_bearer_xmit(struct net *net, u32 bearer_id,
skb_queue_walk_safe(xmitq, skb, tmp) {
__skb_dequeue(xmitq);
b->media->send_msg(net, skb, b, dst);
/* Until we remove cloning in tipc_l2_send_msg(): */
kfree_skb(skb);
}
}
rcu_read_unlock();
}

/* tipc_bearer_bc_xmit() - broadcast buffers to all destinations
*/
void tipc_bearer_bc_xmit(struct net *net, u32 bearer_id,
struct sk_buff_head *xmitq)
{
struct tipc_net *tn = tipc_net(net);
int net_id = tn->net_id;
struct tipc_bearer *b;
struct sk_buff *skb, *tmp;
struct tipc_msg *hdr;

rcu_read_lock();
b = rcu_dereference_rtnl(tn->bearer_list[bearer_id]);
if (likely(b)) {
skb_queue_walk_safe(xmitq, skb, tmp) {
hdr = buf_msg(skb);
msg_set_non_seq(hdr, 1);
msg_set_mc_netid(hdr, net_id);
__skb_dequeue(xmitq);
b->media->send_msg(net, skb, b, &b->bcast_addr);
}
}
rcu_read_unlock();
Expand Down
9 changes: 7 additions & 2 deletions net/tipc/bearer.h
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ struct tipc_bearer {
u32 identity;
struct tipc_link_req *link_req;
char net_plane;
int node_cnt;
struct tipc_node_map nodes;
};

Expand Down Expand Up @@ -215,10 +216,14 @@ struct tipc_media *tipc_media_find(const char *name);
int tipc_bearer_setup(void);
void tipc_bearer_cleanup(void);
void tipc_bearer_stop(struct net *net);
void tipc_bearer_send(struct net *net, u32 bearer_id, struct sk_buff *buf,
struct tipc_media_addr *dest);
int tipc_bearer_mtu(struct net *net, u32 bearer_id);
void tipc_bearer_xmit_skb(struct net *net, u32 bearer_id,
struct sk_buff *skb,
struct tipc_media_addr *dest);
void tipc_bearer_xmit(struct net *net, u32 bearer_id,
struct sk_buff_head *xmitq,
struct tipc_media_addr *dst);
void tipc_bearer_bc_xmit(struct net *net, u32 bearer_id,
struct sk_buff_head *xmitq);

#endif /* _TIPC_BEARER_H */
9 changes: 9 additions & 0 deletions net/tipc/core.c
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
#include "bearer.h"
#include "net.h"
#include "socket.h"
#include "bcast.h"

#include <linux/module.h>

Expand Down Expand Up @@ -71,8 +72,15 @@ static int __net_init tipc_init_net(struct net *net)
err = tipc_topsrv_start(net);
if (err)
goto out_subscr;

err = tipc_bcast_init(net);
if (err)
goto out_bclink;

return 0;

out_bclink:
tipc_bcast_stop(net);
out_subscr:
tipc_nametbl_stop(net);
out_nametbl:
Expand All @@ -85,6 +93,7 @@ static void __net_exit tipc_exit_net(struct net *net)
{
tipc_topsrv_stop(net);
tipc_net_stop(net);
tipc_bcast_stop(net);
tipc_nametbl_stop(net);
tipc_sk_rht_destroy(net);
}
Expand Down
12 changes: 8 additions & 4 deletions net/tipc/core.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,7 @@

struct tipc_node;
struct tipc_bearer;
struct tipc_bcbearer;
struct tipc_bclink;
struct tipc_bc_base;
struct tipc_link;
struct tipc_name_table;
struct tipc_server;
Expand Down Expand Up @@ -93,8 +92,8 @@ struct tipc_net {
struct tipc_bearer __rcu *bearer_list[MAX_BEARERS + 1];

/* Broadcast link */
struct tipc_bcbearer *bcbearer;
struct tipc_bclink *bclink;
spinlock_t bclock;
struct tipc_bc_base *bcbase;
struct tipc_link *bcl;

/* Socket hash table */
Expand All @@ -114,6 +113,11 @@ static inline struct tipc_net *tipc_net(struct net *net)
return net_generic(net, tipc_net_id);
}

static inline int tipc_netid(struct net *net)
{
return tipc_net(net)->net_id;
}

static inline u16 mod(u16 x)
{
return x & 0xffffu;
Expand Down
Loading

0 comments on commit 687f079

Please sign in to comment.