Skip to content

Commit

Permalink
tipc: eliminate aggregate sk_receive_queue limit
Browse files Browse the repository at this point in the history
As a complement to the per-socket sk_recv_queue limit, TIPC keeps a
global atomic counter for the sum of sk_recv_queue sizes across all
tipc sockets. When incremented, the counter is compared to an upper
threshold value, and if this is reached, the message is rejected
with error code TIPC_OVERLOAD.

This check was originally meant to protect the node against
buffer exhaustion and general CPU overload. However, all experience
indicates that the feature not only is redundant on Linux, but even
harmful. Users run into the limit very often, causing disturbances
for their applications, while removing it seems to have no negative
effects at all. We have also seen that overall performance is
boosted significantly when this bottleneck is removed.

Furthermore, we don't see any other network protocols maintaining
such a mechanism, something strengthening our conviction that this
control can be eliminated.

As a result, the atomic variable tipc_queue_size is now unused
and so it can be deleted.  There is a getsockopt call that used
to allow reading it; we retain that but just return zero for
maximum compatibility.

Signed-off-by: Ying Xue <ying.xue@windriver.com>
Signed-off-by: Jon Maloy <jon.maloy@ericsson.com>
Cc: Neil Horman <nhorman@tuxdriver.com>
[PG: phase out tipc_queue_size as pointed out by Neil Horman]
Signed-off-by: Paul Gortmaker <paul.gortmaker@windriver.com>
  • Loading branch information
Ying Xue authored and Paul Gortmaker committed Dec 7, 2012
1 parent c008413 commit 9da3d47
Showing 1 changed file with 4 additions and 19 deletions.
23 changes: 4 additions & 19 deletions net/tipc/socket.c
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
* net/tipc/socket.c: TIPC socket API
*
* Copyright (c) 2001-2007, Ericsson AB
* Copyright (c) 2004-2008, 2010-2011, Wind River Systems
* Copyright (c) 2004-2008, 2010-2012, Wind River Systems
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
Expand Down Expand Up @@ -73,8 +73,6 @@ static struct proto tipc_proto;

static int sockets_enabled;

static atomic_t tipc_queue_size = ATOMIC_INIT(0);

/*
* Revised TIPC socket locking policy:
*
Expand Down Expand Up @@ -128,7 +126,6 @@ static atomic_t tipc_queue_size = ATOMIC_INIT(0);
static void advance_rx_queue(struct sock *sk)
{
kfree_skb(__skb_dequeue(&sk->sk_receive_queue));
atomic_dec(&tipc_queue_size);
}

/**
Expand All @@ -140,10 +137,8 @@ static void discard_rx_queue(struct sock *sk)
{
struct sk_buff *buf;

while ((buf = __skb_dequeue(&sk->sk_receive_queue))) {
atomic_dec(&tipc_queue_size);
while ((buf = __skb_dequeue(&sk->sk_receive_queue)))
kfree_skb(buf);
}
}

/**
Expand All @@ -155,10 +150,8 @@ static void reject_rx_queue(struct sock *sk)
{
struct sk_buff *buf;

while ((buf = __skb_dequeue(&sk->sk_receive_queue))) {
while ((buf = __skb_dequeue(&sk->sk_receive_queue)))
tipc_reject_msg(buf, TIPC_ERR_NO_PORT);
atomic_dec(&tipc_queue_size);
}
}

/**
Expand Down Expand Up @@ -280,7 +273,6 @@ static int release(struct socket *sock)
buf = __skb_dequeue(&sk->sk_receive_queue);
if (buf == NULL)
break;
atomic_dec(&tipc_queue_size);
if (TIPC_SKB_CB(buf)->handle != 0)
kfree_skb(buf);
else {
Expand Down Expand Up @@ -1241,11 +1233,6 @@ static u32 filter_rcv(struct sock *sk, struct sk_buff *buf)
}

/* Reject message if there isn't room to queue it */
recv_q_len = (u32)atomic_read(&tipc_queue_size);
if (unlikely(recv_q_len >= OVERLOAD_LIMIT_BASE)) {
if (rx_queue_full(msg, recv_q_len, OVERLOAD_LIMIT_BASE))
return TIPC_ERR_OVERLOAD;
}
recv_q_len = skb_queue_len(&sk->sk_receive_queue);
if (unlikely(recv_q_len >= (OVERLOAD_LIMIT_BASE / 2))) {
if (rx_queue_full(msg, recv_q_len, OVERLOAD_LIMIT_BASE / 2))
Expand All @@ -1254,7 +1241,6 @@ static u32 filter_rcv(struct sock *sk, struct sk_buff *buf)

/* Enqueue message (finally!) */
TIPC_SKB_CB(buf)->handle = 0;
atomic_inc(&tipc_queue_size);
__skb_queue_tail(&sk->sk_receive_queue, buf);

/* Initiate connection termination for an incoming 'FIN' */
Expand Down Expand Up @@ -1578,7 +1564,6 @@ static int shutdown(struct socket *sock, int how)
/* Disconnect and send a 'FIN+' or 'FIN-' message to peer */
buf = __skb_dequeue(&sk->sk_receive_queue);
if (buf) {
atomic_dec(&tipc_queue_size);
if (TIPC_SKB_CB(buf)->handle != 0) {
kfree_skb(buf);
goto restart;
Expand Down Expand Up @@ -1717,7 +1702,7 @@ static int getsockopt(struct socket *sock,
/* no need to set "res", since already 0 at this point */
break;
case TIPC_NODE_RECVQ_DEPTH:
value = (u32)atomic_read(&tipc_queue_size);
value = 0; /* was tipc_queue_size, now obsolete */
break;
case TIPC_SOCK_RECVQ_DEPTH:
value = skb_queue_len(&sk->sk_receive_queue);
Expand Down

0 comments on commit 9da3d47

Please sign in to comment.