Skip to content

Commit

Permalink
Merge branch 'tipc-next'
Browse files Browse the repository at this point in the history
Jon Maloy says:

====================
tipc: some optimizations and impovements

The commits in this series contain some relatively simple changes that
lead to better throughput across TIPC connections. We also make changes
to the implementation of link transmission queueing and priority
handling, in order to make the code more comprehensible and maintainable.

v2: Commit #2: Redesigned tipc_msg_validate() to use pskb_may_pull(),
               as per feedback from David Miller.
    Commit #3: Some cosmetic changes to tipc_msg_extract(). I tried to
               replace the unconditional skb_linearize() with calls to
               pskb_may_pull() at selected locations, but I gave up.
               First, skb_trim() requires a fully linearized buffer.
               Second, it doesn't make much sense; the whole buffer
               will end up linearized, one way or another.
====================

Signed-off-by: David S. Miller <davem@davemloft.net>
  • Loading branch information
David S. Miller committed Mar 14, 2015
2 parents 5f1764d + e3eea1e commit a379520
Show file tree
Hide file tree
Showing 8 changed files with 300 additions and 341 deletions.
53 changes: 24 additions & 29 deletions net/tipc/bcast.c
Original file line number Diff line number Diff line change
Expand Up @@ -135,9 +135,10 @@ static void bclink_set_last_sent(struct net *net)
{
struct tipc_net *tn = net_generic(net, tipc_net_id);
struct tipc_link *bcl = tn->bcl;
struct sk_buff *skb = skb_peek(&bcl->backlogq);

if (bcl->next_out)
bcl->fsm_msg_cnt = mod(buf_seqno(bcl->next_out) - 1);
if (skb)
bcl->fsm_msg_cnt = mod(buf_seqno(skb) - 1);
else
bcl->fsm_msg_cnt = mod(bcl->next_out_no - 1);
}
Expand Down Expand Up @@ -180,7 +181,7 @@ static void bclink_retransmit_pkt(struct tipc_net *tn, u32 after, u32 to)
struct sk_buff *skb;
struct tipc_link *bcl = tn->bcl;

skb_queue_walk(&bcl->outqueue, skb) {
skb_queue_walk(&bcl->transmq, skb) {
if (more(buf_seqno(skb), after)) {
tipc_link_retransmit(bcl, skb, mod(to - after));
break;
Expand Down Expand Up @@ -210,14 +211,17 @@ void tipc_bclink_wakeup_users(struct net *net)
void tipc_bclink_acknowledge(struct tipc_node *n_ptr, u32 acked)
{
struct sk_buff *skb, *tmp;
struct sk_buff *next;
unsigned int released = 0;
struct net *net = n_ptr->net;
struct tipc_net *tn = net_generic(net, tipc_net_id);

if (unlikely(!n_ptr->bclink.recv_permitted))
return;

tipc_bclink_lock(net);

/* Bail out if tx queue is empty (no clean up is required) */
skb = skb_peek(&tn->bcl->outqueue);
skb = skb_peek(&tn->bcl->transmq);
if (!skb)
goto exit;

Expand All @@ -244,35 +248,27 @@ void tipc_bclink_acknowledge(struct tipc_node *n_ptr, u32 acked)
}

/* Skip over packets that node has previously acknowledged */
skb_queue_walk(&tn->bcl->outqueue, skb) {
skb_queue_walk(&tn->bcl->transmq, skb) {
if (more(buf_seqno(skb), n_ptr->bclink.acked))
break;
}

/* Update packets that node is now acknowledging */
skb_queue_walk_from_safe(&tn->bcl->outqueue, skb, tmp) {
skb_queue_walk_from_safe(&tn->bcl->transmq, skb, tmp) {
if (more(buf_seqno(skb), acked))
break;

next = tipc_skb_queue_next(&tn->bcl->outqueue, skb);
if (skb != tn->bcl->next_out) {
bcbuf_decr_acks(skb);
} else {
bcbuf_set_acks(skb, 0);
tn->bcl->next_out = next;
bclink_set_last_sent(net);
}

bcbuf_decr_acks(skb);
bclink_set_last_sent(net);
if (bcbuf_acks(skb) == 0) {
__skb_unlink(skb, &tn->bcl->outqueue);
__skb_unlink(skb, &tn->bcl->transmq);
kfree_skb(skb);
released = 1;
}
}
n_ptr->bclink.acked = acked;

/* Try resolving broadcast link congestion, if necessary */
if (unlikely(tn->bcl->next_out)) {
if (unlikely(skb_peek(&tn->bcl->backlogq))) {
tipc_link_push_packets(tn->bcl);
bclink_set_last_sent(net);
}
Expand Down Expand Up @@ -319,7 +315,7 @@ void tipc_bclink_update_link_state(struct tipc_node *n_ptr,
buf = tipc_buf_acquire(INT_H_SIZE);
if (buf) {
struct tipc_msg *msg = buf_msg(buf);
struct sk_buff *skb = skb_peek(&n_ptr->bclink.deferred_queue);
struct sk_buff *skb = skb_peek(&n_ptr->bclink.deferdq);
u32 to = skb ? buf_seqno(skb) - 1 : n_ptr->bclink.last_sent;

tipc_msg_init(tn->own_addr, msg, BCAST_PROTOCOL, STATE_MSG,
Expand Down Expand Up @@ -387,14 +383,13 @@ int tipc_bclink_xmit(struct net *net, struct sk_buff_head *list)
__skb_queue_purge(list);
return -EHOSTUNREACH;
}

/* Broadcast to all nodes */
if (likely(bclink)) {
tipc_bclink_lock(net);
if (likely(bclink->bcast_nodes.count)) {
rc = __tipc_link_xmit(net, bcl, list);
if (likely(!rc)) {
u32 len = skb_queue_len(&bcl->outqueue);
u32 len = skb_queue_len(&bcl->transmq);

bclink_set_last_sent(net);
bcl->stats.queue_sz_counts++;
Expand Down Expand Up @@ -559,25 +554,25 @@ void tipc_bclink_rcv(struct net *net, struct sk_buff *buf)
if (node->bclink.last_in == node->bclink.last_sent)
goto unlock;

if (skb_queue_empty(&node->bclink.deferred_queue)) {
if (skb_queue_empty(&node->bclink.deferdq)) {
node->bclink.oos_state = 1;
goto unlock;
}

msg = buf_msg(skb_peek(&node->bclink.deferred_queue));
msg = buf_msg(skb_peek(&node->bclink.deferdq));
seqno = msg_seqno(msg);
next_in = mod(next_in + 1);
if (seqno != next_in)
goto unlock;

/* Take in-sequence message from deferred queue & deliver it */
buf = __skb_dequeue(&node->bclink.deferred_queue);
buf = __skb_dequeue(&node->bclink.deferdq);
goto receive;
}

/* Handle out-of-sequence broadcast message */
if (less(next_in, seqno)) {
deferred = tipc_link_defer_pkt(&node->bclink.deferred_queue,
deferred = tipc_link_defer_pkt(&node->bclink.deferdq,
buf);
bclink_update_last_sent(node, seqno);
buf = NULL;
Expand Down Expand Up @@ -634,7 +629,6 @@ static int tipc_bcbearer_send(struct net *net, struct sk_buff *buf,
msg_set_non_seq(msg, 1);
msg_set_mc_netid(msg, tn->net_id);
tn->bcl->stats.sent_info++;

if (WARN_ON(!bclink->bcast_nodes.count)) {
dump_stack();
return 0;
Expand Down Expand Up @@ -913,8 +907,9 @@ int tipc_bclink_init(struct net *net)
sprintf(bcbearer->media.name, "tipc-broadcast");

spin_lock_init(&bclink->lock);
__skb_queue_head_init(&bcl->outqueue);
__skb_queue_head_init(&bcl->deferred_queue);
__skb_queue_head_init(&bcl->transmq);
__skb_queue_head_init(&bcl->backlogq);
__skb_queue_head_init(&bcl->deferdq);
skb_queue_head_init(&bcl->wakeupq);
bcl->next_out_no = 1;
spin_lock_init(&bclink->node.lock);
Expand Down
3 changes: 3 additions & 0 deletions net/tipc/discover.c
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ static void tipc_disc_init_msg(struct net *net, struct sk_buff *buf, u32 type,
MAX_H_SIZE, dest_domain);
msg_set_non_seq(msg, 1);
msg_set_node_sig(msg, tn->random);
msg_set_node_capabilities(msg, 0);
msg_set_dest_domain(msg, dest_domain);
msg_set_bc_netid(msg, tn->net_id);
b_ptr->media->addr2msg(msg_media_addr(msg), &b_ptr->addr);
Expand Down Expand Up @@ -133,6 +134,7 @@ void tipc_disc_rcv(struct net *net, struct sk_buff *buf,
u32 net_id = msg_bc_netid(msg);
u32 mtyp = msg_type(msg);
u32 signature = msg_node_sig(msg);
u16 caps = msg_node_capabilities(msg);
bool addr_match = false;
bool sign_match = false;
bool link_up = false;
Expand Down Expand Up @@ -167,6 +169,7 @@ void tipc_disc_rcv(struct net *net, struct sk_buff *buf,
if (!node)
return;
tipc_node_lock(node);
node->capabilities = caps;
link = node->links[bearer->identity];

/* Prepare to validate requesting node's signature and media address */
Expand Down
Loading

0 comments on commit a379520

Please sign in to comment.