Skip to content

Commit

Permalink
Merge branch 'sunrpc'
Browse files Browse the repository at this point in the history
  • Loading branch information
Trond Myklebust committed Jul 24, 2016
2 parents 149a4fd + ce27230 commit 7f94ed2
Show file tree
Hide file tree
Showing 7 changed files with 136 additions and 74 deletions.
5 changes: 5 additions & 0 deletions include/linux/sunrpc/sched.h
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,10 @@ void rpc_wake_up_queued_task(struct rpc_wait_queue *,
struct rpc_task *);
void rpc_wake_up(struct rpc_wait_queue *);
struct rpc_task *rpc_wake_up_next(struct rpc_wait_queue *);
struct rpc_task *rpc_wake_up_first_on_wq(struct workqueue_struct *wq,
struct rpc_wait_queue *,
bool (*)(struct rpc_task *, void *),
void *);
struct rpc_task *rpc_wake_up_first(struct rpc_wait_queue *,
bool (*)(struct rpc_task *, void *),
void *);
Expand All @@ -247,6 +251,7 @@ void rpc_show_tasks(struct net *);
int rpc_init_mempool(void);
void rpc_destroy_mempool(void);
extern struct workqueue_struct *rpciod_workqueue;
extern struct workqueue_struct *xprtiod_workqueue;
void rpc_prepare_task(struct rpc_task *task);

static inline int rpc_wait_for_completion_task(struct rpc_task *task)
Expand Down
1 change: 1 addition & 0 deletions include/linux/sunrpc/xprtsock.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ struct sock_xprt {
#define TCP_RPC_REPLY (1UL << 6)

#define XPRT_SOCK_CONNECTING 1U
#define XPRT_SOCK_DATA_READY (2)

#endif /* __KERNEL__ */

Expand Down
2 changes: 1 addition & 1 deletion net/sunrpc/clnt.c
Original file line number Diff line number Diff line change
Expand Up @@ -2577,7 +2577,7 @@ static void rpc_cb_add_xprt_release(void *calldata)
kfree(data);
}

const static struct rpc_call_ops rpc_cb_add_xprt_call_ops = {
static const struct rpc_call_ops rpc_cb_add_xprt_call_ops = {
.rpc_call_done = rpc_cb_add_xprt_done,
.rpc_release = rpc_cb_add_xprt_release,
};
Expand Down
67 changes: 53 additions & 14 deletions net/sunrpc/sched.c
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ static struct rpc_wait_queue delay_queue;
/*
* rpciod-related stuff
*/
struct workqueue_struct *rpciod_workqueue;
struct workqueue_struct *rpciod_workqueue __read_mostly;
struct workqueue_struct *xprtiod_workqueue __read_mostly;

/*
* Disable the timer for a given RPC task. Should be called with
Expand Down Expand Up @@ -329,7 +330,8 @@ EXPORT_SYMBOL_GPL(__rpc_wait_for_completion_task);
* lockless RPC_IS_QUEUED() test) before we've had a chance to test
* the RPC_TASK_RUNNING flag.
*/
static void rpc_make_runnable(struct rpc_task *task)
static void rpc_make_runnable(struct workqueue_struct *wq,
struct rpc_task *task)
{
bool need_wakeup = !rpc_test_and_set_running(task);

Expand All @@ -338,7 +340,7 @@ static void rpc_make_runnable(struct rpc_task *task)
return;
if (RPC_IS_ASYNC(task)) {
INIT_WORK(&task->u.tk_work, rpc_async_schedule);
queue_work(rpciod_workqueue, &task->u.tk_work);
queue_work(wq, &task->u.tk_work);
} else
wake_up_bit(&task->tk_runstate, RPC_TASK_QUEUED);
}
Expand Down Expand Up @@ -407,13 +409,16 @@ void rpc_sleep_on_priority(struct rpc_wait_queue *q, struct rpc_task *task,
EXPORT_SYMBOL_GPL(rpc_sleep_on_priority);

/**
* __rpc_do_wake_up_task - wake up a single rpc_task
* __rpc_do_wake_up_task_on_wq - wake up a single rpc_task
* @wq: workqueue on which to run task
* @queue: wait queue
* @task: task to be woken up
*
* Caller must hold queue->lock, and have cleared the task queued flag.
*/
static void __rpc_do_wake_up_task(struct rpc_wait_queue *queue, struct rpc_task *task)
static void __rpc_do_wake_up_task_on_wq(struct workqueue_struct *wq,
struct rpc_wait_queue *queue,
struct rpc_task *task)
{
dprintk("RPC: %5u __rpc_wake_up_task (now %lu)\n",
task->tk_pid, jiffies);
Expand All @@ -428,23 +433,32 @@ static void __rpc_do_wake_up_task(struct rpc_wait_queue *queue, struct rpc_task

__rpc_remove_wait_queue(queue, task);

rpc_make_runnable(task);
rpc_make_runnable(wq, task);

dprintk("RPC: __rpc_wake_up_task done\n");
}

/*
* Wake up a queued task while the queue lock is being held
*/
static void rpc_wake_up_task_queue_locked(struct rpc_wait_queue *queue, struct rpc_task *task)
static void rpc_wake_up_task_on_wq_queue_locked(struct workqueue_struct *wq,
struct rpc_wait_queue *queue, struct rpc_task *task)
{
if (RPC_IS_QUEUED(task)) {
smp_rmb();
if (task->tk_waitqueue == queue)
__rpc_do_wake_up_task(queue, task);
__rpc_do_wake_up_task_on_wq(wq, queue, task);
}
}

/*
* Wake up a queued task while the queue lock is being held
*/
static void rpc_wake_up_task_queue_locked(struct rpc_wait_queue *queue, struct rpc_task *task)
{
rpc_wake_up_task_on_wq_queue_locked(rpciod_workqueue, queue, task);
}

/*
* Wake up a task on a specific queue
*/
Expand Down Expand Up @@ -518,7 +532,8 @@ static struct rpc_task *__rpc_find_next_queued(struct rpc_wait_queue *queue)
/*
* Wake up the first task on the wait queue.
*/
struct rpc_task *rpc_wake_up_first(struct rpc_wait_queue *queue,
struct rpc_task *rpc_wake_up_first_on_wq(struct workqueue_struct *wq,
struct rpc_wait_queue *queue,
bool (*func)(struct rpc_task *, void *), void *data)
{
struct rpc_task *task = NULL;
Expand All @@ -529,14 +544,23 @@ struct rpc_task *rpc_wake_up_first(struct rpc_wait_queue *queue,
task = __rpc_find_next_queued(queue);
if (task != NULL) {
if (func(task, data))
rpc_wake_up_task_queue_locked(queue, task);
rpc_wake_up_task_on_wq_queue_locked(wq, queue, task);
else
task = NULL;
}
spin_unlock_bh(&queue->lock);

return task;
}

/*
* Wake up the first task on the wait queue.
*/
struct rpc_task *rpc_wake_up_first(struct rpc_wait_queue *queue,
bool (*func)(struct rpc_task *, void *), void *data)
{
return rpc_wake_up_first_on_wq(rpciod_workqueue, queue, func, data);
}
EXPORT_SYMBOL_GPL(rpc_wake_up_first);

static bool rpc_wake_up_next_func(struct rpc_task *task, void *data)
Expand Down Expand Up @@ -814,7 +838,7 @@ void rpc_execute(struct rpc_task *task)
bool is_async = RPC_IS_ASYNC(task);

rpc_set_active(task);
rpc_make_runnable(task);
rpc_make_runnable(rpciod_workqueue, task);
if (!is_async)
__rpc_execute(task);
}
Expand Down Expand Up @@ -1071,10 +1095,22 @@ static int rpciod_start(void)
* Create the rpciod thread and wait for it to start.
*/
dprintk("RPC: creating workqueue rpciod\n");
/* Note: highpri because network receive is latency sensitive */
wq = alloc_workqueue("rpciod", WQ_MEM_RECLAIM | WQ_HIGHPRI, 0);
wq = alloc_workqueue("rpciod", WQ_MEM_RECLAIM, 0);
if (!wq)
goto out_failed;
rpciod_workqueue = wq;
return rpciod_workqueue != NULL;
/* Note: highpri because network receive is latency sensitive */
wq = alloc_workqueue("xprtiod", WQ_MEM_RECLAIM | WQ_HIGHPRI, 0);
if (!wq)
goto free_rpciod;
xprtiod_workqueue = wq;
return 1;
free_rpciod:
wq = rpciod_workqueue;
rpciod_workqueue = NULL;
destroy_workqueue(wq);
out_failed:
return 0;
}

static void rpciod_stop(void)
Expand All @@ -1088,6 +1124,9 @@ static void rpciod_stop(void)
wq = rpciod_workqueue;
rpciod_workqueue = NULL;
destroy_workqueue(wq);
wq = xprtiod_workqueue;
xprtiod_workqueue = NULL;
destroy_workqueue(wq);
}

void
Expand Down
14 changes: 8 additions & 6 deletions net/sunrpc/xprt.c
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ static void xprt_clear_locked(struct rpc_xprt *xprt)
clear_bit(XPRT_LOCKED, &xprt->state);
smp_mb__after_atomic();
} else
queue_work(rpciod_workqueue, &xprt->task_cleanup);
queue_work(xprtiod_workqueue, &xprt->task_cleanup);
}

/*
Expand Down Expand Up @@ -295,7 +295,8 @@ static void __xprt_lock_write_next(struct rpc_xprt *xprt)
if (test_and_set_bit(XPRT_LOCKED, &xprt->state))
return;

if (rpc_wake_up_first(&xprt->sending, __xprt_lock_write_func, xprt))
if (rpc_wake_up_first_on_wq(xprtiod_workqueue, &xprt->sending,
__xprt_lock_write_func, xprt))
return;
xprt_clear_locked(xprt);
}
Expand Down Expand Up @@ -324,7 +325,8 @@ static void __xprt_lock_write_next_cong(struct rpc_xprt *xprt)
return;
if (RPCXPRT_CONGESTED(xprt))
goto out_unlock;
if (rpc_wake_up_first(&xprt->sending, __xprt_lock_write_cong_func, xprt))
if (rpc_wake_up_first_on_wq(xprtiod_workqueue, &xprt->sending,
__xprt_lock_write_cong_func, xprt))
return;
out_unlock:
xprt_clear_locked(xprt);
Expand Down Expand Up @@ -645,7 +647,7 @@ void xprt_force_disconnect(struct rpc_xprt *xprt)
set_bit(XPRT_CLOSE_WAIT, &xprt->state);
/* Try to schedule an autoclose RPC call */
if (test_and_set_bit(XPRT_LOCKED, &xprt->state) == 0)
queue_work(rpciod_workqueue, &xprt->task_cleanup);
queue_work(xprtiod_workqueue, &xprt->task_cleanup);
xprt_wake_pending_tasks(xprt, -EAGAIN);
spin_unlock_bh(&xprt->transport_lock);
}
Expand All @@ -672,7 +674,7 @@ void xprt_conditional_disconnect(struct rpc_xprt *xprt, unsigned int cookie)
set_bit(XPRT_CLOSE_WAIT, &xprt->state);
/* Try to schedule an autoclose RPC call */
if (test_and_set_bit(XPRT_LOCKED, &xprt->state) == 0)
queue_work(rpciod_workqueue, &xprt->task_cleanup);
queue_work(xprtiod_workqueue, &xprt->task_cleanup);
xprt_wake_pending_tasks(xprt, -EAGAIN);
out:
spin_unlock_bh(&xprt->transport_lock);
Expand All @@ -689,7 +691,7 @@ xprt_init_autodisconnect(unsigned long data)
if (test_and_set_bit(XPRT_LOCKED, &xprt->state))
goto out_abort;
spin_unlock(&xprt->transport_lock);
queue_work(rpciod_workqueue, &xprt->task_cleanup);
queue_work(xprtiod_workqueue, &xprt->task_cleanup);
return;
out_abort:
spin_unlock(&xprt->transport_lock);
Expand Down
8 changes: 3 additions & 5 deletions net/sunrpc/xprtmultipath.c
Original file line number Diff line number Diff line change
Expand Up @@ -271,14 +271,12 @@ struct rpc_xprt *xprt_iter_next_entry_multiple(struct rpc_xprt_iter *xpi,
xprt_switch_find_xprt_t find_next)
{
struct rpc_xprt_switch *xps = rcu_dereference(xpi->xpi_xpswitch);
struct list_head *head;

if (xps == NULL)
return NULL;
head = &xps->xps_xprt_list;
if (xps->xps_nxprts < 2)
return xprt_switch_find_first_entry(head);
return xprt_switch_set_next_cursor(head, &xpi->xpi_cursor, find_next);
return xprt_switch_set_next_cursor(&xps->xps_xprt_list,
&xpi->xpi_cursor,
find_next);
}

static
Expand Down
Loading

0 comments on commit 7f94ed2

Please sign in to comment.