Skip to content

Commit

Permalink
SUNRPC: Support for congestion control when queuing is enabled
Browse files Browse the repository at this point in the history
Both RDMA and UDP transports require the request to get a "congestion control"
credit before they can be transmitted. Right now, this is done when
the request locks the socket. We'd like it to happen when a request attempts
to be transmitted for the first time.
In order to support retransmission of requests that already hold such
credits, we also want to ensure that they get queued first, so that we
don't deadlock with requests that have yet to obtain a credit.

Signed-off-by: Trond Myklebust <trond.myklebust@hammerspace.com>
  • Loading branch information
Trond Myklebust committed Sep 30, 2018
1 parent 918f3c1 commit 75891f5
Show file tree
Hide file tree
Showing 6 changed files with 109 additions and 36 deletions.
2 changes: 2 additions & 0 deletions include/linux/sunrpc/xprt.h
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,7 @@ void xprt_complete_rqst(struct rpc_task *task, int copied);
void xprt_pin_rqst(struct rpc_rqst *req);
void xprt_unpin_rqst(struct rpc_rqst *req);
void xprt_release_rqst_cong(struct rpc_task *task);
bool xprt_request_get_cong(struct rpc_xprt *xprt, struct rpc_rqst *req);
void xprt_disconnect_done(struct rpc_xprt *xprt);
void xprt_force_disconnect(struct rpc_xprt *xprt);
void xprt_conditional_disconnect(struct rpc_xprt *xprt, unsigned int cookie);
Expand All @@ -415,6 +416,7 @@ void xprt_unlock_connect(struct rpc_xprt *, void *);
#define XPRT_BINDING (5)
#define XPRT_CLOSING (6)
#define XPRT_CONGESTED (9)
#define XPRT_CWND_WAIT (10)

static inline void xprt_set_connected(struct rpc_xprt *xprt)
{
Expand Down
5 changes: 5 additions & 0 deletions net/sunrpc/clnt.c
Original file line number Diff line number Diff line change
Expand Up @@ -1996,6 +1996,11 @@ call_transmit_status(struct rpc_task *task)
dprint_status(task);
xprt_end_transmit(task);
break;
case -EBADSLT:
xprt_end_transmit(task);
task->tk_action = call_transmit;
task->tk_status = 0;
break;
case -EBADMSG:
xprt_end_transmit(task);
task->tk_status = 0;
Expand Down
128 changes: 92 additions & 36 deletions net/sunrpc/xprt.c
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,6 @@
static void xprt_init(struct rpc_xprt *xprt, struct net *net);
static __be32 xprt_alloc_xid(struct rpc_xprt *xprt);
static void xprt_connect_status(struct rpc_task *task);
static int __xprt_get_cong(struct rpc_xprt *, struct rpc_task *);
static void __xprt_put_cong(struct rpc_xprt *, struct rpc_rqst *);
static void xprt_destroy(struct rpc_xprt *xprt);

static DEFINE_SPINLOCK(xprt_list_lock);
Expand Down Expand Up @@ -221,13 +219,39 @@ static void xprt_clear_locked(struct rpc_xprt *xprt)
queue_work(xprtiod_workqueue, &xprt->task_cleanup);
}

static bool
xprt_need_congestion_window_wait(struct rpc_xprt *xprt)
{
return test_bit(XPRT_CWND_WAIT, &xprt->state);
}

static void
xprt_set_congestion_window_wait(struct rpc_xprt *xprt)
{
if (!list_empty(&xprt->xmit_queue)) {
/* Peek at head of queue to see if it can make progress */
if (list_first_entry(&xprt->xmit_queue, struct rpc_rqst,
rq_xmit)->rq_cong)
return;
}
set_bit(XPRT_CWND_WAIT, &xprt->state);
}

static void
xprt_test_and_clear_congestion_window_wait(struct rpc_xprt *xprt)
{
if (!RPCXPRT_CONGESTED(xprt))
clear_bit(XPRT_CWND_WAIT, &xprt->state);
}

/*
* xprt_reserve_xprt_cong - serialize write access to transports
* @task: task that is requesting access to the transport
*
* Same as xprt_reserve_xprt, but Van Jacobson congestion control is
* integrated into the decision of whether a request is allowed to be
* woken up and given access to the transport.
* Note that the lock is only granted if we know there are free slots.
*/
int xprt_reserve_xprt_cong(struct rpc_xprt *xprt, struct rpc_task *task)
{
Expand All @@ -243,14 +267,12 @@ int xprt_reserve_xprt_cong(struct rpc_xprt *xprt, struct rpc_task *task)
xprt->snd_task = task;
return 1;
}
if (__xprt_get_cong(xprt, task)) {
if (!xprt_need_congestion_window_wait(xprt)) {
xprt->snd_task = task;
return 1;
}
xprt_clear_locked(xprt);
out_sleep:
if (req)
__xprt_put_cong(xprt, req);
dprintk("RPC: %5u failed to lock transport %p\n", task->tk_pid, xprt);
task->tk_timeout = 0;
task->tk_status = -EAGAIN;
Expand Down Expand Up @@ -294,32 +316,14 @@ static void __xprt_lock_write_next(struct rpc_xprt *xprt)
xprt_clear_locked(xprt);
}

static bool __xprt_lock_write_cong_func(struct rpc_task *task, void *data)
{
struct rpc_xprt *xprt = data;
struct rpc_rqst *req;

req = task->tk_rqstp;
if (req == NULL) {
xprt->snd_task = task;
return true;
}
if (__xprt_get_cong(xprt, task)) {
xprt->snd_task = task;
req->rq_ntrans++;
return true;
}
return false;
}

static void __xprt_lock_write_next_cong(struct rpc_xprt *xprt)
{
if (test_and_set_bit(XPRT_LOCKED, &xprt->state))
return;
if (RPCXPRT_CONGESTED(xprt))
if (xprt_need_congestion_window_wait(xprt))
goto out_unlock;
if (rpc_wake_up_first_on_wq(xprtiod_workqueue, &xprt->sending,
__xprt_lock_write_cong_func, xprt))
__xprt_lock_write_func, xprt))
return;
out_unlock:
xprt_clear_locked(xprt);
Expand Down Expand Up @@ -370,16 +374,16 @@ static inline void xprt_release_write(struct rpc_xprt *xprt, struct rpc_task *ta
* overflowed. Put the task to sleep if this is the case.
*/
static int
__xprt_get_cong(struct rpc_xprt *xprt, struct rpc_task *task)
__xprt_get_cong(struct rpc_xprt *xprt, struct rpc_rqst *req)
{
struct rpc_rqst *req = task->tk_rqstp;

if (req->rq_cong)
return 1;
dprintk("RPC: %5u xprt_cwnd_limited cong = %lu cwnd = %lu\n",
task->tk_pid, xprt->cong, xprt->cwnd);
if (RPCXPRT_CONGESTED(xprt))
req->rq_task->tk_pid, xprt->cong, xprt->cwnd);
if (RPCXPRT_CONGESTED(xprt)) {
xprt_set_congestion_window_wait(xprt);
return 0;
}
req->rq_cong = 1;
xprt->cong += RPC_CWNDSCALE;
return 1;
Expand All @@ -396,9 +400,31 @@ __xprt_put_cong(struct rpc_xprt *xprt, struct rpc_rqst *req)
return;
req->rq_cong = 0;
xprt->cong -= RPC_CWNDSCALE;
xprt_test_and_clear_congestion_window_wait(xprt);
__xprt_lock_write_next_cong(xprt);
}

/**
* xprt_request_get_cong - Request congestion control credits
* @xprt: pointer to transport
* @req: pointer to RPC request
*
* Useful for transports that require congestion control.
*/
bool
xprt_request_get_cong(struct rpc_xprt *xprt, struct rpc_rqst *req)
{
bool ret = false;

if (req->rq_cong)
return true;
spin_lock_bh(&xprt->transport_lock);
ret = __xprt_get_cong(xprt, req) != 0;
spin_unlock_bh(&xprt->transport_lock);
return ret;
}
EXPORT_SYMBOL_GPL(xprt_request_get_cong);

/**
* xprt_release_rqst_cong - housekeeping when request is complete
* @task: RPC request that recently completed
Expand All @@ -413,6 +439,20 @@ void xprt_release_rqst_cong(struct rpc_task *task)
}
EXPORT_SYMBOL_GPL(xprt_release_rqst_cong);

/*
* Clear the congestion window wait flag and wake up the next
* entry on xprt->sending
*/
static void
xprt_clear_congestion_window_wait(struct rpc_xprt *xprt)
{
if (test_and_clear_bit(XPRT_CWND_WAIT, &xprt->state)) {
spin_lock_bh(&xprt->transport_lock);
__xprt_lock_write_next_cong(xprt);
spin_unlock_bh(&xprt->transport_lock);
}
}

/**
* xprt_adjust_cwnd - adjust transport congestion window
* @xprt: pointer to xprt
Expand Down Expand Up @@ -1058,12 +1098,28 @@ xprt_request_enqueue_transmit(struct rpc_task *task)

if (xprt_request_need_enqueue_transmit(task, req)) {
spin_lock(&xprt->queue_lock);
list_for_each_entry(pos, &xprt->xmit_queue, rq_xmit) {
if (pos->rq_task->tk_owner != task->tk_owner)
continue;
list_add_tail(&req->rq_xmit2, &pos->rq_xmit2);
INIT_LIST_HEAD(&req->rq_xmit);
goto out;
/*
* Requests that carry congestion control credits are added
* to the head of the list to avoid starvation issues.
*/
if (req->rq_cong) {
xprt_clear_congestion_window_wait(xprt);
list_for_each_entry(pos, &xprt->xmit_queue, rq_xmit) {
if (pos->rq_cong)
continue;
/* Note: req is added _before_ pos */
list_add_tail(&req->rq_xmit, &pos->rq_xmit);
INIT_LIST_HEAD(&req->rq_xmit2);
goto out;
}
} else {
list_for_each_entry(pos, &xprt->xmit_queue, rq_xmit) {
if (pos->rq_task->tk_owner != task->tk_owner)
continue;
list_add_tail(&req->rq_xmit2, &pos->rq_xmit2);
INIT_LIST_HEAD(&req->rq_xmit);
goto out;
}
}
list_add_tail(&req->rq_xmit, &xprt->xmit_queue);
INIT_LIST_HEAD(&req->rq_xmit2);
Expand Down
3 changes: 3 additions & 0 deletions net/sunrpc/xprtrdma/backchannel.c
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,9 @@ int xprt_rdma_bc_send_reply(struct rpc_rqst *rqst)
if (!xprt_connected(rqst->rq_xprt))
goto drop_connection;

if (!xprt_request_get_cong(rqst->rq_xprt, rqst))
return -EBADSLT;

rc = rpcrdma_bc_marshal_reply(rqst);
if (rc < 0)
goto failed_marshal;
Expand Down
3 changes: 3 additions & 0 deletions net/sunrpc/xprtrdma/transport.c
Original file line number Diff line number Diff line change
Expand Up @@ -721,6 +721,9 @@ xprt_rdma_send_request(struct rpc_rqst *rqst, struct rpc_task *task)
if (!xprt_connected(xprt))
goto drop_connection;

if (!xprt_request_get_cong(xprt, rqst))
return -EBADSLT;

rc = rpcrdma_marshal_req(r_xprt, rqst);
if (rc < 0)
goto failed_marshal;
Expand Down
4 changes: 4 additions & 0 deletions net/sunrpc/xprtsock.c
Original file line number Diff line number Diff line change
Expand Up @@ -609,6 +609,10 @@ static int xs_udp_send_request(struct rpc_rqst *req, struct rpc_task *task)

if (!xprt_bound(xprt))
return -ENOTCONN;

if (!xprt_request_get_cong(xprt, req))
return -EBADSLT;

req->rq_xtime = ktime_get();
status = xs_sendpages(transport->sock, xs_addr(xprt), xprt->addrlen,
xdr, 0, true, &sent);
Expand Down

0 comments on commit 75891f5

Please sign in to comment.