Skip to content

Commit

Permalink
SUNRPC: Fix a livelock problem in the xprt->backlog queue
Browse files Browse the repository at this point in the history
This patch ensures that we throttle new RPC requests if there are
requests already waiting in the xprt->backlog queue. The reason for
doing this is to fix livelock issues that can occur when an existing
(high priority) task is waiting in the backlog queue, gets woken up
by xprt_free_slot(), but a new task then steals the slot.

Signed-off-by: Trond Myklebust <Trond.Myklebust@netapp.com>
  • Loading branch information
Trond Myklebust authored and Trond Myklebust committed Apr 14, 2013
1 parent b570a97 commit ba60eb2
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 4 deletions.
2 changes: 2 additions & 0 deletions include/linux/sunrpc/xprt.h
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,7 @@ struct xprt_class {
struct rpc_xprt *xprt_create_transport(struct xprt_create *args);
void xprt_connect(struct rpc_task *task);
void xprt_reserve(struct rpc_task *task);
void xprt_retry_reserve(struct rpc_task *task);
int xprt_reserve_xprt(struct rpc_xprt *xprt, struct rpc_task *task);
int xprt_reserve_xprt_cong(struct rpc_xprt *xprt, struct rpc_task *task);
void xprt_alloc_slot(struct rpc_xprt *xprt, struct rpc_task *task);
Expand Down Expand Up @@ -334,6 +335,7 @@ int xs_swapper(struct rpc_xprt *xprt, int enable);
#define XPRT_CLOSING (6)
#define XPRT_CONNECTION_ABORT (7)
#define XPRT_CONNECTION_CLOSE (8)
#define XPRT_CONGESTED (9)

static inline void xprt_set_connected(struct rpc_xprt *xprt)
{
Expand Down
17 changes: 16 additions & 1 deletion net/sunrpc/clnt.c
Original file line number Diff line number Diff line change
Expand Up @@ -1306,6 +1306,8 @@ call_reserve(struct rpc_task *task)
xprt_reserve(task);
}

static void call_retry_reserve(struct rpc_task *task);

/*
* 1b. Grok the result of xprt_reserve()
*/
Expand Down Expand Up @@ -1347,7 +1349,7 @@ call_reserveresult(struct rpc_task *task)
case -ENOMEM:
rpc_delay(task, HZ >> 2);
case -EAGAIN: /* woken up; retry */
task->tk_action = call_reserve;
task->tk_action = call_retry_reserve;
return;
case -EIO: /* probably a shutdown */
break;
Expand All @@ -1359,6 +1361,19 @@ call_reserveresult(struct rpc_task *task)
rpc_exit(task, status);
}

/*
* 1c. Retry reserving an RPC call slot
*/
static void
call_retry_reserve(struct rpc_task *task)
{
dprint_status(task);

task->tk_status = 0;
task->tk_action = call_reserveresult;
xprt_retry_reserve(task);
}

/*
* 2. Bind and/or refresh the credentials
*/
Expand Down
61 changes: 58 additions & 3 deletions net/sunrpc/xprt.c
Original file line number Diff line number Diff line change
Expand Up @@ -948,6 +948,34 @@ void xprt_transmit(struct rpc_task *task)
spin_unlock_bh(&xprt->transport_lock);
}

static void xprt_add_backlog(struct rpc_xprt *xprt, struct rpc_task *task)
{
set_bit(XPRT_CONGESTED, &xprt->state);
rpc_sleep_on(&xprt->backlog, task, NULL);
}

static void xprt_wake_up_backlog(struct rpc_xprt *xprt)
{
if (rpc_wake_up_next(&xprt->backlog) == NULL)
clear_bit(XPRT_CONGESTED, &xprt->state);
}

static bool xprt_throttle_congested(struct rpc_xprt *xprt, struct rpc_task *task)
{
bool ret = false;

if (!test_bit(XPRT_CONGESTED, &xprt->state))
goto out;
spin_lock(&xprt->reserve_lock);
if (test_bit(XPRT_CONGESTED, &xprt->state)) {
rpc_sleep_on(&xprt->backlog, task, NULL);
ret = true;
}
spin_unlock(&xprt->reserve_lock);
out:
return ret;
}

static struct rpc_rqst *xprt_dynamic_alloc_slot(struct rpc_xprt *xprt, gfp_t gfp_flags)
{
struct rpc_rqst *req = ERR_PTR(-EAGAIN);
Expand Down Expand Up @@ -992,7 +1020,7 @@ void xprt_alloc_slot(struct rpc_xprt *xprt, struct rpc_task *task)
task->tk_status = -ENOMEM;
break;
case -EAGAIN:
rpc_sleep_on(&xprt->backlog, task, NULL);
xprt_add_backlog(xprt, task);
dprintk("RPC: waiting for request slot\n");
default:
task->tk_status = -EAGAIN;
Expand Down Expand Up @@ -1028,7 +1056,7 @@ static void xprt_free_slot(struct rpc_xprt *xprt, struct rpc_rqst *req)
memset(req, 0, sizeof(*req)); /* mark unused */
list_add(&req->rq_list, &xprt->free);
}
rpc_wake_up_next(&xprt->backlog);
xprt_wake_up_backlog(xprt);
spin_unlock(&xprt->reserve_lock);
}

Expand Down Expand Up @@ -1092,13 +1120,40 @@ EXPORT_SYMBOL_GPL(xprt_free);
* xprt_reserve - allocate an RPC request slot
* @task: RPC task requesting a slot allocation
*
* If no more slots are available, place the task on the transport's
* If the transport is marked as being congested, or if no more
* slots are available, place the task on the transport's
* backlog queue.
*/
void xprt_reserve(struct rpc_task *task)
{
struct rpc_xprt *xprt;

task->tk_status = 0;
if (task->tk_rqstp != NULL)
return;

task->tk_timeout = 0;
task->tk_status = -EAGAIN;
rcu_read_lock();
xprt = rcu_dereference(task->tk_client->cl_xprt);
if (!xprt_throttle_congested(xprt, task))
xprt->ops->alloc_slot(xprt, task);
rcu_read_unlock();
}

/**
* xprt_retry_reserve - allocate an RPC request slot
* @task: RPC task requesting a slot allocation
*
* If no more slots are available, place the task on the transport's
* backlog queue.
* Note that the only difference with xprt_reserve is that we now
* ignore the value of the XPRT_CONGESTED flag.
*/
void xprt_retry_reserve(struct rpc_task *task)
{
struct rpc_xprt *xprt;

task->tk_status = 0;
if (task->tk_rqstp != NULL)
return;
Expand Down

0 comments on commit ba60eb2

Please sign in to comment.