From 7ae934bebe289362e5b1f5ea50e4c347863ae202 Mon Sep 17 00:00:00 2001 From: Erik Hugne Date: Tue, 1 Jul 2014 10:22:40 +0200 Subject: [PATCH 1/2] tipc: refactor message delivery out of tipc_rcv This is a cosmetic change, separating message delivery from the link state processing. Signed-off-by: Erik Hugne Reviewed-by: Ying Xue Reviewed-by: Jon Maloy Signed-off-by: David S. Miller --- net/tipc/link.c | 124 +++++++++++++++++++++++++++++++----------------- 1 file changed, 80 insertions(+), 44 deletions(-) diff --git a/net/tipc/link.c b/net/tipc/link.c index a081e7d08d22a..18b1524098c63 100644 --- a/net/tipc/link.c +++ b/net/tipc/link.c @@ -88,6 +88,8 @@ static void link_print(struct tipc_link *l_ptr, const char *str); static int tipc_link_frag_xmit(struct tipc_link *l_ptr, struct sk_buff *buf); static void tipc_link_sync_xmit(struct tipc_link *l); static void tipc_link_sync_rcv(struct tipc_node *n, struct sk_buff *buf); +static int tipc_link_input(struct tipc_link *l, struct sk_buff *buf); +static int tipc_link_prepare_input(struct tipc_link *l, struct sk_buff **buf); /* * Simple link routines @@ -1458,54 +1460,14 @@ void tipc_rcv(struct sk_buff *head, struct tipc_bearer *b_ptr) if (unlikely(l_ptr->oldest_deferred_in)) head = link_insert_deferred_queue(l_ptr, head); - /* Deliver packet/message to correct user: */ - if (unlikely(msg_user(msg) == CHANGEOVER_PROTOCOL)) { - if (!tipc_link_tunnel_rcv(n_ptr, &buf)) { - tipc_node_unlock(n_ptr); - continue; - } - msg = buf_msg(buf); - } else if (msg_user(msg) == MSG_FRAGMENTER) { - l_ptr->stats.recv_fragments++; - if (tipc_buf_append(&l_ptr->reasm_buf, &buf)) { - l_ptr->stats.recv_fragmented++; - msg = buf_msg(buf); - } else { - if (!l_ptr->reasm_buf) - tipc_link_reset(l_ptr); - tipc_node_unlock(n_ptr); - continue; - } - } - - switch (msg_user(msg)) { - case TIPC_LOW_IMPORTANCE: - case TIPC_MEDIUM_IMPORTANCE: - case TIPC_HIGH_IMPORTANCE: - case TIPC_CRITICAL_IMPORTANCE: - case CONN_MANAGER: - tipc_node_unlock(n_ptr); - tipc_sk_rcv(buf); - continue; - case MSG_BUNDLER: - l_ptr->stats.recv_bundles++; - l_ptr->stats.recv_bundled += msg_msgcnt(msg); + if (tipc_link_prepare_input(l_ptr, &buf)) { tipc_node_unlock(n_ptr); - tipc_link_bundle_rcv(buf); continue; - case NAME_DISTRIBUTOR: - n_ptr->bclink.recv_permitted = true; - tipc_node_unlock(n_ptr); - tipc_named_rcv(buf); - continue; - case BCAST_PROTOCOL: - tipc_link_sync_rcv(n_ptr, buf); - break; - default: - kfree_skb(buf); - break; } tipc_node_unlock(n_ptr); + msg = buf_msg(buf); + if (tipc_link_input(l_ptr, buf) != 0) + goto discard; continue; unlock_discard: tipc_node_unlock(n_ptr); @@ -1514,6 +1476,80 @@ void tipc_rcv(struct sk_buff *head, struct tipc_bearer *b_ptr) } } +/** + * tipc_link_prepare_input - process TIPC link messages + * + * returns nonzero if the message was consumed + * + * Node lock must be held + */ +static int tipc_link_prepare_input(struct tipc_link *l, struct sk_buff **buf) +{ + struct tipc_node *n; + struct tipc_msg *msg; + int res = -EINVAL; + + n = l->owner; + msg = buf_msg(*buf); + switch (msg_user(msg)) { + case CHANGEOVER_PROTOCOL: + if (tipc_link_tunnel_rcv(n, buf)) + res = 0; + break; + case MSG_FRAGMENTER: + l->stats.recv_fragments++; + if (tipc_buf_append(&l->reasm_buf, buf)) { + l->stats.recv_fragmented++; + res = 0; + } else if (!l->reasm_buf) { + tipc_link_reset(l); + } + break; + case MSG_BUNDLER: + l->stats.recv_bundles++; + l->stats.recv_bundled += msg_msgcnt(msg); + res = 0; + break; + case NAME_DISTRIBUTOR: + n->bclink.recv_permitted = true; + res = 0; + break; + case BCAST_PROTOCOL: + tipc_link_sync_rcv(n, *buf); + break; + default: + res = 0; + } + return res; +} +/** + * tipc_link_input - Deliver message too higher layers + */ +static int tipc_link_input(struct tipc_link *l, struct sk_buff *buf) +{ + struct tipc_msg *msg = buf_msg(buf); + int res = 0; + + switch (msg_user(msg)) { + case TIPC_LOW_IMPORTANCE: + case TIPC_MEDIUM_IMPORTANCE: + case TIPC_HIGH_IMPORTANCE: + case TIPC_CRITICAL_IMPORTANCE: + case CONN_MANAGER: + tipc_sk_rcv(buf); + break; + case NAME_DISTRIBUTOR: + tipc_named_rcv(buf); + break; + case MSG_BUNDLER: + tipc_link_bundle_rcv(buf); + break; + default: + res = -EINVAL; + } + return res; +} + /** * tipc_link_defer_pkt - Add out-of-sequence message to deferred reception queue * From 3f53bd8f8b563cc6024d25c1665d8358745f48b2 Mon Sep 17 00:00:00 2001 From: Erik Hugne Date: Tue, 1 Jul 2014 10:22:41 +0200 Subject: [PATCH 2/2] tipc: fix link acknowledge logic in receive path Link state acks triggered from the receive path is done before the last received packet have been processed by the link layer. The effect of this is that the last received packet will not be included in the ack. This causes problems if the link window is set to TIPC_MIN_LINK_WIN, where the ack interval will be equal to the link tolerance, and the link enters a stop-and-go behavior. We move the ack logic to after link state processing, just before the packet is delivered to higher layers. Signed-off-by: Erik Hugne Signed-off-by: Carl Sigurjonsson Reviewed-by: Ying Xue Signed-off-by: David S. Miller --- net/tipc/link.c | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/net/tipc/link.c b/net/tipc/link.c index 18b1524098c63..a235b245f682b 100644 --- a/net/tipc/link.c +++ b/net/tipc/link.c @@ -1422,11 +1422,6 @@ void tipc_rcv(struct sk_buff *head, struct tipc_bearer *b_ptr) if (unlikely(!list_empty(&l_ptr->waiting_ports))) tipc_link_wakeup_ports(l_ptr, 0); - if (unlikely(++l_ptr->unacked_window >= TIPC_MIN_LINK_WIN)) { - l_ptr->stats.sent_acks++; - tipc_link_proto_xmit(l_ptr, STATE_MSG, 0, 0, 0, 0, 0); - } - /* Process the incoming packet */ if (unlikely(!link_working_working(l_ptr))) { if (msg_user(msg) == LINK_PROTOCOL) { @@ -1460,6 +1455,11 @@ void tipc_rcv(struct sk_buff *head, struct tipc_bearer *b_ptr) if (unlikely(l_ptr->oldest_deferred_in)) head = link_insert_deferred_queue(l_ptr, head); + if (unlikely(++l_ptr->unacked_window >= TIPC_MIN_LINK_WIN)) { + l_ptr->stats.sent_acks++; + tipc_link_proto_xmit(l_ptr, STATE_MSG, 0, 0, 0, 0, 0); + } + if (tipc_link_prepare_input(l_ptr, &buf)) { tipc_node_unlock(n_ptr); continue;