Skip to content

Commit

Permalink
[PATCH] RPC: expose API for serializing access to RPC transports
Browse files Browse the repository at this point in the history
 The next method we abstract is the one that releases a transport,
 allowing another task to have access to the transport.

 Again, one generic version of this is provided for transports that
 don't need the RPC client to perform congestion control, and one
 version is for transports that can use the original Van Jacobson
 implementation in xprt.c.

 Test-plan:
 Use WAN simulation to cause sporadic bursty packet loss.  Look for
 significant regression in performance or client stability.

 Signed-off-by: Chuck Lever <cel@netapp.com>
 Signed-off-by: Trond Myklebust <Trond.Myklebust@netapp.com>
  • Loading branch information
Chuck Lever authored and Trond Myklebust committed Sep 23, 2005
1 parent 12a8046 commit 49e9a89
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 14 deletions.
3 changes: 3 additions & 0 deletions include/linux/sunrpc/xprt.h
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ struct rpc_xprt;
struct rpc_xprt_ops {
void (*set_buffer_size)(struct rpc_xprt *xprt);
int (*reserve_xprt)(struct rpc_task *task);
void (*release_xprt)(struct rpc_xprt *xprt, struct rpc_task *task);
void (*connect)(struct rpc_task *task);
int (*send_request)(struct rpc_task *task);
void (*set_retrans_timeout)(struct rpc_task *task);
Expand Down Expand Up @@ -238,6 +239,8 @@ int xprt_reserve_xprt_cong(struct rpc_task *task);
int xprt_prepare_transmit(struct rpc_task *task);
void xprt_transmit(struct rpc_task *task);
int xprt_adjust_timeout(struct rpc_rqst *req);
void xprt_release_xprt(struct rpc_xprt *xprt, struct rpc_task *task);
void xprt_release_xprt_cong(struct rpc_xprt *xprt, struct rpc_task *task);
void xprt_release(struct rpc_task *task);
int xprt_destroy(struct rpc_xprt *xprt);

Expand Down
77 changes: 63 additions & 14 deletions net/sunrpc/xprt.c
Original file line number Diff line number Diff line change
Expand Up @@ -153,22 +153,50 @@ static inline int xprt_lock_write(struct rpc_xprt *xprt, struct rpc_task *task)
return retval;
}


static void __xprt_lock_write_next(struct rpc_xprt *xprt)
{
struct rpc_task *task;
struct rpc_rqst *req;

if (test_and_set_bit(XPRT_LOCKED, &xprt->state))
return;

task = rpc_wake_up_next(&xprt->resend);
if (!task) {
task = rpc_wake_up_next(&xprt->sending);
if (!task)
goto out_unlock;
}

req = task->tk_rqstp;
xprt->snd_task = task;
if (req) {
req->rq_bytes_sent = 0;
req->rq_ntrans++;
}
return;

out_unlock:
smp_mb__before_clear_bit();
clear_bit(XPRT_LOCKED, &xprt->state);
smp_mb__after_clear_bit();
}

static void __xprt_lock_write_next_cong(struct rpc_xprt *xprt)
{
struct rpc_task *task;

if (test_and_set_bit(XPRT_LOCKED, &xprt->state))
return;
if (!xprt->nocong && RPCXPRT_CONGESTED(xprt))
if (RPCXPRT_CONGESTED(xprt))
goto out_unlock;
task = rpc_wake_up_next(&xprt->resend);
if (!task) {
task = rpc_wake_up_next(&xprt->sending);
if (!task)
goto out_unlock;
}
if (xprt->nocong || __xprt_get_cong(xprt, task)) {
if (__xprt_get_cong(xprt, task)) {
struct rpc_rqst *req = task->tk_rqstp;
xprt->snd_task = task;
if (req) {
Expand All @@ -183,11 +211,14 @@ static void __xprt_lock_write_next(struct rpc_xprt *xprt)
smp_mb__after_clear_bit();
}

/*
* Releases the transport for use by other requests.
/**
* xprt_release_xprt - allow other requests to use a transport
* @xprt: transport with other tasks potentially waiting
* @task: task that is releasing access to the transport
*
* Note that "task" can be NULL. No congestion control is provided.
*/
static void
__xprt_release_write(struct rpc_xprt *xprt, struct rpc_task *task)
void xprt_release_xprt(struct rpc_xprt *xprt, struct rpc_task *task)
{
if (xprt->snd_task == task) {
xprt->snd_task = NULL;
Expand All @@ -198,11 +229,29 @@ __xprt_release_write(struct rpc_xprt *xprt, struct rpc_task *task)
}
}

static inline void
xprt_release_write(struct rpc_xprt *xprt, struct rpc_task *task)
/**
* xprt_release_xprt_cong - allow other requests to use a transport
* @xprt: transport with other tasks potentially waiting
* @task: task that is releasing access to the transport
*
* Note that "task" can be NULL. Another task is awoken to use the
* transport if the transport's congestion window allows it.
*/
void xprt_release_xprt_cong(struct rpc_xprt *xprt, struct rpc_task *task)
{
if (xprt->snd_task == task) {
xprt->snd_task = NULL;
smp_mb__before_clear_bit();
clear_bit(XPRT_LOCKED, &xprt->state);
smp_mb__after_clear_bit();
__xprt_lock_write_next_cong(xprt);
}
}

static inline void xprt_release_write(struct rpc_xprt *xprt, struct rpc_task *task)
{
spin_lock_bh(&xprt->transport_lock);
__xprt_release_write(xprt, task);
xprt->ops->release_xprt(xprt, task);
spin_unlock_bh(&xprt->transport_lock);
}

Expand Down Expand Up @@ -237,7 +286,7 @@ __xprt_put_cong(struct rpc_xprt *xprt, struct rpc_rqst *req)
return;
req->rq_cong = 0;
xprt->cong -= RPC_CWNDSCALE;
__xprt_lock_write_next(xprt);
__xprt_lock_write_next_cong(xprt);
}

/*
Expand All @@ -256,7 +305,7 @@ xprt_adjust_cwnd(struct rpc_xprt *xprt, int result)
cwnd += (RPC_CWNDSCALE * RPC_CWNDSCALE + (cwnd >> 1)) / cwnd;
if (cwnd > RPC_MAXCWND(xprt))
cwnd = RPC_MAXCWND(xprt);
__xprt_lock_write_next(xprt);
__xprt_lock_write_next_cong(xprt);
} else if (result == -ETIMEDOUT) {
cwnd >>= 1;
if (cwnd < RPC_CWNDSCALE)
Expand Down Expand Up @@ -693,7 +742,7 @@ void xprt_transmit(struct rpc_task *task)
task->tk_status = -ENOTCONN;
else if (!req->rq_received)
rpc_sleep_on(&xprt->pending, task, NULL, xprt_timer);
__xprt_release_write(xprt, task);
xprt->ops->release_xprt(xprt, task);
spin_unlock_bh(&xprt->transport_lock);
return;
}
Expand Down Expand Up @@ -792,7 +841,7 @@ void xprt_release(struct rpc_task *task)
if (!(req = task->tk_rqstp))
return;
spin_lock_bh(&xprt->transport_lock);
__xprt_release_write(xprt, task);
xprt->ops->release_xprt(xprt, task);
__xprt_put_cong(xprt, req);
if (!list_empty(&req->rq_list))
list_del(&req->rq_list);
Expand Down
2 changes: 2 additions & 0 deletions net/sunrpc/xprtsock.c
Original file line number Diff line number Diff line change
Expand Up @@ -1046,6 +1046,7 @@ static void xs_connect(struct rpc_task *task)
static struct rpc_xprt_ops xs_udp_ops = {
.set_buffer_size = xs_udp_set_buffer_size,
.reserve_xprt = xprt_reserve_xprt_cong,
.release_xprt = xprt_release_xprt_cong,
.connect = xs_connect,
.send_request = xs_udp_send_request,
.set_retrans_timeout = xprt_set_retrans_timeout_rtt,
Expand All @@ -1056,6 +1057,7 @@ static struct rpc_xprt_ops xs_udp_ops = {
static struct rpc_xprt_ops xs_tcp_ops = {
.set_buffer_size = xs_tcp_set_buffer_size,
.reserve_xprt = xprt_reserve_xprt,
.release_xprt = xprt_release_xprt,
.connect = xs_connect,
.send_request = xs_tcp_send_request,
.set_retrans_timeout = xprt_set_retrans_timeout_def,
Expand Down

0 comments on commit 49e9a89

Please sign in to comment.