Skip to content

Commit

Permalink
Merge branch 'tipc-separate-link-and-aggregation'
Browse files Browse the repository at this point in the history
Jon Maloy says:

====================
tipc: separate link and link aggregation layer

This is the first batch of a longer series that has two main objectives:

o Finer lock granularity during message sending and reception,
  especially regarding usage of the node spinlock.

o Better separation between the link layer implementation and the link
  aggregation layer, represented by node.c::struct tipc_node.

Hopefully these changes also make this part of code somewhat easier
to comprehend and maintain.
====================

Signed-off-by: David S. Miller <davem@davemloft.net>
  • Loading branch information
David S. Miller committed Jul 21, 2015
2 parents 6acc232 + d999297 commit 7781e5d
Show file tree
Hide file tree
Showing 13 changed files with 1,431 additions and 1,018 deletions.
31 changes: 26 additions & 5 deletions net/tipc/bcast.c
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,29 @@ void tipc_bclink_update_link_state(struct tipc_node *n_ptr,
}
}

void tipc_bclink_sync_state(struct tipc_node *n, struct tipc_msg *hdr)
{
u16 last = msg_last_bcast(hdr);
int mtyp = msg_type(hdr);

if (unlikely(msg_user(hdr) != LINK_PROTOCOL))
return;
if (mtyp == STATE_MSG) {
tipc_bclink_update_link_state(n, last);
return;
}
/* Compatibility: older nodes don't know BCAST_PROTOCOL synchronization,
* and transfer synch info in LINK_PROTOCOL messages.
*/
if (tipc_node_is_up(n))
return;
if ((mtyp != RESET_MSG) && (mtyp != ACTIVATE_MSG))
return;
n->bclink.last_sent = last;
n->bclink.last_in = last;
n->bclink.oos_state = 0;
}

/**
* bclink_peek_nack - monitor retransmission requests sent by other nodes
*
Expand Down Expand Up @@ -358,10 +381,9 @@ int tipc_bclink_xmit(struct net *net, struct sk_buff_head *list)

/* Prepare clone of message for local node */
skb = tipc_msg_reassemble(list);
if (unlikely(!skb)) {
__skb_queue_purge(list);
if (unlikely(!skb))
return -EHOSTUNREACH;
}

/* Broadcast to all nodes */
if (likely(bclink)) {
tipc_bclink_lock(net);
Expand Down Expand Up @@ -413,7 +435,7 @@ static void bclink_accept_pkt(struct tipc_node *node, u32 seqno)
* all nodes in the cluster don't ACK at the same time
*/
if (((seqno - tn->own_addr) % TIPC_MIN_LINK_WIN) == 0) {
tipc_link_proto_xmit(node->active_links[node->addr & 1],
tipc_link_proto_xmit(node_active_link(node, node->addr),
STATE_MSG, 0, 0, 0, 0);
tn->bcl->stats.sent_acks++;
}
Expand Down Expand Up @@ -925,7 +947,6 @@ int tipc_bclink_init(struct net *net)
tipc_link_set_queue_limits(bcl, BCLINK_WIN_DEFAULT);
bcl->bearer_id = MAX_BEARERS;
rcu_assign_pointer(tn->bearer_list[MAX_BEARERS], &bcbearer->bearer);
bcl->state = WORKING_WORKING;
bcl->pmsg = (struct tipc_msg *)&bcl->proto_msg;
msg_set_prevnode(bcl->pmsg, tn->own_addr);
strlcpy(bcl->name, tipc_bclink_name, TIPC_MAX_LINK_NAME);
Expand Down
1 change: 1 addition & 0 deletions net/tipc/bcast.h
Original file line number Diff line number Diff line change
Expand Up @@ -133,5 +133,6 @@ void tipc_bclink_wakeup_users(struct net *net);
int tipc_nl_add_bc_link(struct net *net, struct tipc_nl_msg *msg);
int tipc_nl_bc_link_set(struct net *net, struct nlattr *attrs[]);
void tipc_bclink_input(struct net *net);
void tipc_bclink_sync_state(struct tipc_node *n, struct tipc_msg *msg);

#endif
26 changes: 26 additions & 0 deletions net/tipc/bearer.c
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,32 @@ void tipc_bearer_send(struct net *net, u32 bearer_id, struct sk_buff *buf,
rcu_read_unlock();
}

/* tipc_bearer_xmit() -send buffer to destination over bearer
*/
void tipc_bearer_xmit(struct net *net, u32 bearer_id,
struct sk_buff_head *xmitq,
struct tipc_media_addr *dst)
{
struct tipc_net *tn = net_generic(net, tipc_net_id);
struct tipc_bearer *b;
struct sk_buff *skb, *tmp;

if (skb_queue_empty(xmitq))
return;

rcu_read_lock();
b = rcu_dereference_rtnl(tn->bearer_list[bearer_id]);
if (likely(b)) {
skb_queue_walk_safe(xmitq, skb, tmp) {
__skb_dequeue(xmitq);
b->media->send_msg(net, skb, b, dst);
/* Until we remove cloning in tipc_l2_send_msg(): */
kfree_skb(skb);
}
}
rcu_read_unlock();
}

/**
* tipc_l2_rcv_msg - handle incoming TIPC message from an interface
* @buf: the received packet
Expand Down
3 changes: 3 additions & 0 deletions net/tipc/bearer.h
Original file line number Diff line number Diff line change
Expand Up @@ -217,5 +217,8 @@ void tipc_bearer_cleanup(void);
void tipc_bearer_stop(struct net *net);
void tipc_bearer_send(struct net *net, u32 bearer_id, struct sk_buff *buf,
struct tipc_media_addr *dest);
void tipc_bearer_xmit(struct net *net, u32 bearer_id,
struct sk_buff_head *xmitq,
struct tipc_media_addr *dst);

#endif /* _TIPC_BEARER_H */
5 changes: 5 additions & 0 deletions net/tipc/core.h
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,11 @@ static inline int less(u16 left, u16 right)
return less_eq(left, right) && (mod(right) != mod(left));
}

static inline int in_range(u16 val, u16 min, u16 max)
{
return !less(val, min) && !more(val, max);
}

#ifdef CONFIG_SYSCTL
int tipc_register_sysctl(void);
void tipc_unregister_sysctl(void);
Expand Down
20 changes: 4 additions & 16 deletions net/tipc/discover.c
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
*/

#include "core.h"
#include "link.h"
#include "node.h"
#include "discover.h"

/* min delay during bearer start up */
Expand Down Expand Up @@ -125,7 +125,6 @@ void tipc_disc_rcv(struct net *net, struct sk_buff *buf,
{
struct tipc_net *tn = net_generic(net, tipc_net_id);
struct tipc_node *node;
struct tipc_link *link;
struct tipc_media_addr maddr;
struct sk_buff *rbuf;
struct tipc_msg *msg = buf_msg(buf);
Expand Down Expand Up @@ -170,13 +169,10 @@ void tipc_disc_rcv(struct net *net, struct sk_buff *buf,
return;
tipc_node_lock(node);
node->capabilities = caps;
link = node->links[bearer->identity];

/* Prepare to validate requesting node's signature and media address */
sign_match = (signature == node->signature);
addr_match = link && !memcmp(&link->media_addr, &maddr, sizeof(maddr));
link_up = link && tipc_link_is_up(link);

tipc_node_check_dest(node, bearer, &link_up, &addr_match, &maddr);

/* These three flags give us eight permutations: */

Expand Down Expand Up @@ -239,16 +235,8 @@ void tipc_disc_rcv(struct net *net, struct sk_buff *buf,
if (accept_sign)
node->signature = signature;

if (accept_addr) {
if (!link)
link = tipc_link_create(node, bearer, &maddr);
if (link) {
memcpy(&link->media_addr, &maddr, sizeof(maddr));
tipc_link_reset(link);
} else {
respond = false;
}
}
if (accept_addr && !tipc_node_update_dest(node, bearer, &maddr))
respond = false;

/* Send response, if necessary */
if (respond && (mtyp == DSC_REQ_MSG)) {
Expand Down
Loading

0 comments on commit 7781e5d

Please sign in to comment.