Skip to content

Commit

Permalink
SUNRPC: Reduce latency when send queue is congested
Browse files Browse the repository at this point in the history
Use the low latency transport workqueue to process the task that is
next in line on the xprt->sending queue.

Signed-off-by: Trond Myklebust <trond.myklebust@primarydata.com>
  • Loading branch information
Trond Myklebust committed Jun 13, 2016
1 parent 40a5f1b commit f1dc237
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 12 deletions.
4 changes: 4 additions & 0 deletions include/linux/sunrpc/sched.h
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,10 @@ void rpc_wake_up_queued_task(struct rpc_wait_queue *,
struct rpc_task *);
void rpc_wake_up(struct rpc_wait_queue *);
struct rpc_task *rpc_wake_up_next(struct rpc_wait_queue *);
struct rpc_task *rpc_wake_up_first_on_wq(struct workqueue_struct *wq,
struct rpc_wait_queue *,
bool (*)(struct rpc_task *, void *),
void *);
struct rpc_task *rpc_wake_up_first(struct rpc_wait_queue *,
bool (*)(struct rpc_task *, void *),
void *);
Expand Down
43 changes: 33 additions & 10 deletions net/sunrpc/sched.c
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,8 @@ EXPORT_SYMBOL_GPL(__rpc_wait_for_completion_task);
* lockless RPC_IS_QUEUED() test) before we've had a chance to test
* the RPC_TASK_RUNNING flag.
*/
static void rpc_make_runnable(struct rpc_task *task)
static void rpc_make_runnable(struct workqueue_struct *wq,
struct rpc_task *task)
{
bool need_wakeup = !rpc_test_and_set_running(task);

Expand All @@ -339,7 +340,7 @@ static void rpc_make_runnable(struct rpc_task *task)
return;
if (RPC_IS_ASYNC(task)) {
INIT_WORK(&task->u.tk_work, rpc_async_schedule);
queue_work(rpciod_workqueue, &task->u.tk_work);
queue_work(wq, &task->u.tk_work);
} else
wake_up_bit(&task->tk_runstate, RPC_TASK_QUEUED);
}
Expand Down Expand Up @@ -408,13 +409,16 @@ void rpc_sleep_on_priority(struct rpc_wait_queue *q, struct rpc_task *task,
EXPORT_SYMBOL_GPL(rpc_sleep_on_priority);

/**
* __rpc_do_wake_up_task - wake up a single rpc_task
* __rpc_do_wake_up_task_on_wq - wake up a single rpc_task
* @wq: workqueue on which to run task
* @queue: wait queue
* @task: task to be woken up
*
* Caller must hold queue->lock, and have cleared the task queued flag.
*/
static void __rpc_do_wake_up_task(struct rpc_wait_queue *queue, struct rpc_task *task)
static void __rpc_do_wake_up_task_on_wq(struct workqueue_struct *wq,
struct rpc_wait_queue *queue,
struct rpc_task *task)
{
dprintk("RPC: %5u __rpc_wake_up_task (now %lu)\n",
task->tk_pid, jiffies);
Expand All @@ -429,23 +433,32 @@ static void __rpc_do_wake_up_task(struct rpc_wait_queue *queue, struct rpc_task

__rpc_remove_wait_queue(queue, task);

rpc_make_runnable(task);
rpc_make_runnable(wq, task);

dprintk("RPC: __rpc_wake_up_task done\n");
}

/*
* Wake up a queued task while the queue lock is being held
*/
static void rpc_wake_up_task_queue_locked(struct rpc_wait_queue *queue, struct rpc_task *task)
static void rpc_wake_up_task_on_wq_queue_locked(struct workqueue_struct *wq,
struct rpc_wait_queue *queue, struct rpc_task *task)
{
if (RPC_IS_QUEUED(task)) {
smp_rmb();
if (task->tk_waitqueue == queue)
__rpc_do_wake_up_task(queue, task);
__rpc_do_wake_up_task_on_wq(wq, queue, task);
}
}

/*
* Wake up a queued task while the queue lock is being held
*/
static void rpc_wake_up_task_queue_locked(struct rpc_wait_queue *queue, struct rpc_task *task)
{
rpc_wake_up_task_on_wq_queue_locked(rpciod_workqueue, queue, task);
}

/*
* Wake up a task on a specific queue
*/
Expand Down Expand Up @@ -519,7 +532,8 @@ static struct rpc_task *__rpc_find_next_queued(struct rpc_wait_queue *queue)
/*
* Wake up the first task on the wait queue.
*/
struct rpc_task *rpc_wake_up_first(struct rpc_wait_queue *queue,
struct rpc_task *rpc_wake_up_first_on_wq(struct workqueue_struct *wq,
struct rpc_wait_queue *queue,
bool (*func)(struct rpc_task *, void *), void *data)
{
struct rpc_task *task = NULL;
Expand All @@ -530,14 +544,23 @@ struct rpc_task *rpc_wake_up_first(struct rpc_wait_queue *queue,
task = __rpc_find_next_queued(queue);
if (task != NULL) {
if (func(task, data))
rpc_wake_up_task_queue_locked(queue, task);
rpc_wake_up_task_on_wq_queue_locked(wq, queue, task);
else
task = NULL;
}
spin_unlock_bh(&queue->lock);

return task;
}

/*
* Wake up the first task on the wait queue.
*/
struct rpc_task *rpc_wake_up_first(struct rpc_wait_queue *queue,
bool (*func)(struct rpc_task *, void *), void *data)
{
return rpc_wake_up_first_on_wq(rpciod_workqueue, queue, func, data);
}
EXPORT_SYMBOL_GPL(rpc_wake_up_first);

static bool rpc_wake_up_next_func(struct rpc_task *task, void *data)
Expand Down Expand Up @@ -815,7 +838,7 @@ void rpc_execute(struct rpc_task *task)
bool is_async = RPC_IS_ASYNC(task);

rpc_set_active(task);
rpc_make_runnable(task);
rpc_make_runnable(rpciod_workqueue, task);
if (!is_async)
__rpc_execute(task);
}
Expand Down
6 changes: 4 additions & 2 deletions net/sunrpc/xprt.c
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,8 @@ static void __xprt_lock_write_next(struct rpc_xprt *xprt)
if (test_and_set_bit(XPRT_LOCKED, &xprt->state))
return;

if (rpc_wake_up_first(&xprt->sending, __xprt_lock_write_func, xprt))
if (rpc_wake_up_first_on_wq(xprtiod_workqueue, &xprt->sending,
__xprt_lock_write_func, xprt))
return;
xprt_clear_locked(xprt);
}
Expand Down Expand Up @@ -324,7 +325,8 @@ static void __xprt_lock_write_next_cong(struct rpc_xprt *xprt)
return;
if (RPCXPRT_CONGESTED(xprt))
goto out_unlock;
if (rpc_wake_up_first(&xprt->sending, __xprt_lock_write_cong_func, xprt))
if (rpc_wake_up_first_on_wq(xprtiod_workqueue, &xprt->sending,
__xprt_lock_write_cong_func, xprt))
return;
out_unlock:
xprt_clear_locked(xprt);
Expand Down

0 comments on commit f1dc237

Please sign in to comment.