Skip to content

Commit

Permalink
SUNRPC: Support dynamic slot allocation for TCP connections
Browse files Browse the repository at this point in the history
Allow the number of available slots to grow with the TCP window size.

Signed-off-by: Trond Myklebust <Trond.Myklebust@netapp.com>
  • Loading branch information
Trond Myklebust authored and Trond Myklebust committed Jul 17, 2011
1 parent 21de0a9 commit d9ba131
Show file tree
Hide file tree
Showing 4 changed files with 109 additions and 20 deletions.
9 changes: 7 additions & 2 deletions include/linux/sunrpc/xprt.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#define RPC_MIN_SLOT_TABLE (2U)
#define RPC_DEF_SLOT_TABLE (16U)
#define RPC_MAX_SLOT_TABLE (128U)
#define RPC_MAX_SLOT_TABLE_LIMIT (65536U)

/*
* This describes a timeout strategy
Expand Down Expand Up @@ -168,7 +169,9 @@ struct rpc_xprt {
struct rpc_wait_queue pending; /* requests in flight */
struct rpc_wait_queue backlog; /* waiting for slot */
struct list_head free; /* free slots */
unsigned int max_reqs; /* total slots */
unsigned int max_reqs; /* max number of slots */
unsigned int min_reqs; /* min number of slots */
atomic_t num_reqs; /* total slots */
unsigned long state; /* transport state */
unsigned char shutdown : 1, /* being shut down */
resvport : 1; /* use a reserved port */
Expand Down Expand Up @@ -281,7 +284,9 @@ void xprt_release_xprt_cong(struct rpc_xprt *xprt, struct rpc_task *task);
void xprt_release(struct rpc_task *task);
struct rpc_xprt * xprt_get(struct rpc_xprt *xprt);
void xprt_put(struct rpc_xprt *xprt);
struct rpc_xprt * xprt_alloc(struct net *net, int size, int max_req);
struct rpc_xprt * xprt_alloc(struct net *net, size_t size,
unsigned int num_prealloc,
unsigned int max_req);
void xprt_free(struct rpc_xprt *);

static inline __be32 *xprt_skip_transport_header(struct rpc_xprt *xprt, __be32 *p)
Expand Down
70 changes: 59 additions & 11 deletions net/sunrpc/xprt.c
Original file line number Diff line number Diff line change
Expand Up @@ -935,25 +935,66 @@ void xprt_transmit(struct rpc_task *task)
spin_unlock_bh(&xprt->transport_lock);
}

static struct rpc_rqst *xprt_dynamic_alloc_slot(struct rpc_xprt *xprt, gfp_t gfp_flags)
{
struct rpc_rqst *req = ERR_PTR(-EAGAIN);

if (!atomic_add_unless(&xprt->num_reqs, 1, xprt->max_reqs))
goto out;
req = kzalloc(sizeof(struct rpc_rqst), gfp_flags);
if (req != NULL)
goto out;
atomic_dec(&xprt->num_reqs);
req = ERR_PTR(-ENOMEM);
out:
return req;
}

static bool xprt_dynamic_free_slot(struct rpc_xprt *xprt, struct rpc_rqst *req)
{
if (atomic_add_unless(&xprt->num_reqs, -1, xprt->min_reqs)) {
kfree(req);
return true;
}
return false;
}

static void xprt_alloc_slot(struct rpc_task *task)
{
struct rpc_xprt *xprt = task->tk_xprt;
struct rpc_rqst *req;

task->tk_status = 0;
if (!list_empty(&xprt->free)) {
struct rpc_rqst *req = list_entry(xprt->free.next, struct rpc_rqst, rq_list);
list_del_init(&req->rq_list);
task->tk_rqstp = req;
xprt_request_init(task, xprt);
return;
req = list_entry(xprt->free.next, struct rpc_rqst, rq_list);
list_del(&req->rq_list);
goto out_init_req;
}
req = xprt_dynamic_alloc_slot(xprt, GFP_NOWAIT);
if (!IS_ERR(req))
goto out_init_req;
switch (PTR_ERR(req)) {
case -ENOMEM:
rpc_delay(task, HZ >> 2);
dprintk("RPC: dynamic allocation of request slot "
"failed! Retrying\n");
break;
case -EAGAIN:
rpc_sleep_on(&xprt->backlog, task, NULL);
dprintk("RPC: waiting for request slot\n");
}
dprintk("RPC: waiting for request slot\n");
task->tk_status = -EAGAIN;
rpc_sleep_on(&xprt->backlog, task, NULL);
return;
out_init_req:
task->tk_status = 0;
task->tk_rqstp = req;
xprt_request_init(task, xprt);
}

static void xprt_free_slot(struct rpc_xprt *xprt, struct rpc_rqst *req)
{
if (xprt_dynamic_free_slot(xprt, req))
return;

memset(req, 0, sizeof(*req)); /* mark unused */

spin_lock(&xprt->reserve_lock);
Expand All @@ -972,7 +1013,9 @@ static void xprt_free_all_slots(struct rpc_xprt *xprt)
}
}

struct rpc_xprt *xprt_alloc(struct net *net, int size, int num_prealloc)
struct rpc_xprt *xprt_alloc(struct net *net, size_t size,
unsigned int num_prealloc,
unsigned int max_alloc)
{
struct rpc_xprt *xprt;
struct rpc_rqst *req;
Expand All @@ -992,7 +1035,12 @@ struct rpc_xprt *xprt_alloc(struct net *net, int size, int num_prealloc)
}
if (i < num_prealloc)
goto out_free;
xprt->max_reqs = num_prealloc;
if (max_alloc > num_prealloc)
xprt->max_reqs = max_alloc;
else
xprt->max_reqs = num_prealloc;
xprt->min_reqs = num_prealloc;
atomic_set(&xprt->num_reqs, num_prealloc);

return xprt;

Expand Down Expand Up @@ -1036,7 +1084,6 @@ void xprt_reserve(struct rpc_task *task)
if (!xprt_lock_write(xprt, task))
return;

task->tk_status = -EIO;
spin_lock(&xprt->reserve_lock);
xprt_alloc_slot(task);
spin_unlock(&xprt->reserve_lock);
Expand All @@ -1057,6 +1104,7 @@ static void xprt_request_init(struct rpc_task *task, struct rpc_xprt *xprt)
{
struct rpc_rqst *req = task->tk_rqstp;

INIT_LIST_HEAD(&req->rq_list);
req->rq_timeout = task->tk_client->cl_timeout->to_initval;
req->rq_task = task;
req->rq_xprt = xprt;
Expand Down
1 change: 1 addition & 0 deletions net/sunrpc/xprtrdma/transport.c
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,7 @@ xprt_setup_rdma(struct xprt_create *args)
}

xprt = xprt_alloc(args->net, sizeof(struct rpcrdma_xprt),
xprt_rdma_slot_table_entries,
xprt_rdma_slot_table_entries);
if (xprt == NULL) {
dprintk("RPC: %s: couldn't allocate rpcrdma_xprt\n",
Expand Down
49 changes: 42 additions & 7 deletions net/sunrpc/xprtsock.c
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ static void xs_close(struct rpc_xprt *xprt);
* xprtsock tunables
*/
unsigned int xprt_udp_slot_table_entries = RPC_DEF_SLOT_TABLE;
unsigned int xprt_tcp_slot_table_entries = RPC_DEF_SLOT_TABLE;
unsigned int xprt_tcp_slot_table_entries = RPC_MIN_SLOT_TABLE;
unsigned int xprt_max_tcp_slot_table_entries = RPC_MAX_SLOT_TABLE;

unsigned int xprt_min_resvport = RPC_DEF_MIN_RESVPORT;
unsigned int xprt_max_resvport = RPC_DEF_MAX_RESVPORT;
Expand All @@ -75,6 +76,7 @@ static unsigned int xs_tcp_fin_timeout __read_mostly = XS_TCP_LINGER_TO;

static unsigned int min_slot_table_size = RPC_MIN_SLOT_TABLE;
static unsigned int max_slot_table_size = RPC_MAX_SLOT_TABLE;
static unsigned int max_tcp_slot_table_limit = RPC_MAX_SLOT_TABLE_LIMIT;
static unsigned int xprt_min_resvport_limit = RPC_MIN_RESVPORT;
static unsigned int xprt_max_resvport_limit = RPC_MAX_RESVPORT;

Expand Down Expand Up @@ -103,6 +105,15 @@ static ctl_table xs_tunables_table[] = {
.extra1 = &min_slot_table_size,
.extra2 = &max_slot_table_size
},
{
.procname = "tcp_max_slot_table_entries",
.data = &xprt_max_tcp_slot_table_entries,
.maxlen = sizeof(unsigned int),
.mode = 0644,
.proc_handler = proc_dointvec_minmax,
.extra1 = &min_slot_table_size,
.extra2 = &max_tcp_slot_table_limit
},
{
.procname = "min_resvport",
.data = &xprt_min_resvport,
Expand Down Expand Up @@ -2491,7 +2502,8 @@ static int xs_init_anyaddr(const int family, struct sockaddr *sap)
}

static struct rpc_xprt *xs_setup_xprt(struct xprt_create *args,
unsigned int slot_table_size)
unsigned int slot_table_size,
unsigned int max_slot_table_size)
{
struct rpc_xprt *xprt;
struct sock_xprt *new;
Expand All @@ -2501,7 +2513,8 @@ static struct rpc_xprt *xs_setup_xprt(struct xprt_create *args,
return ERR_PTR(-EBADF);
}

xprt = xprt_alloc(args->net, sizeof(*new), slot_table_size);
xprt = xprt_alloc(args->net, sizeof(*new), slot_table_size,
max_slot_table_size);
if (xprt == NULL) {
dprintk("RPC: xs_setup_xprt: couldn't allocate "
"rpc_xprt\n");
Expand Down Expand Up @@ -2543,7 +2556,8 @@ static struct rpc_xprt *xs_setup_local(struct xprt_create *args)
struct rpc_xprt *xprt;
struct rpc_xprt *ret;

xprt = xs_setup_xprt(args, xprt_tcp_slot_table_entries);
xprt = xs_setup_xprt(args, xprt_tcp_slot_table_entries,
xprt_max_tcp_slot_table_entries);
if (IS_ERR(xprt))
return xprt;
transport = container_of(xprt, struct sock_xprt, xprt);
Expand Down Expand Up @@ -2607,7 +2621,8 @@ static struct rpc_xprt *xs_setup_udp(struct xprt_create *args)
struct sock_xprt *transport;
struct rpc_xprt *ret;

xprt = xs_setup_xprt(args, xprt_udp_slot_table_entries);
xprt = xs_setup_xprt(args, xprt_udp_slot_table_entries,
xprt_udp_slot_table_entries);
if (IS_ERR(xprt))
return xprt;
transport = container_of(xprt, struct sock_xprt, xprt);
Expand Down Expand Up @@ -2683,7 +2698,8 @@ static struct rpc_xprt *xs_setup_tcp(struct xprt_create *args)
struct sock_xprt *transport;
struct rpc_xprt *ret;

xprt = xs_setup_xprt(args, xprt_tcp_slot_table_entries);
xprt = xs_setup_xprt(args, xprt_tcp_slot_table_entries,
xprt_max_tcp_slot_table_entries);
if (IS_ERR(xprt))
return xprt;
transport = container_of(xprt, struct sock_xprt, xprt);
Expand Down Expand Up @@ -2762,7 +2778,8 @@ static struct rpc_xprt *xs_setup_bc_tcp(struct xprt_create *args)
*/
return args->bc_xprt->xpt_bc_xprt;
}
xprt = xs_setup_xprt(args, xprt_tcp_slot_table_entries);
xprt = xs_setup_xprt(args, xprt_tcp_slot_table_entries,
xprt_tcp_slot_table_entries);
if (IS_ERR(xprt))
return xprt;
transport = container_of(xprt, struct sock_xprt, xprt);
Expand Down Expand Up @@ -2949,8 +2966,26 @@ static struct kernel_param_ops param_ops_slot_table_size = {
#define param_check_slot_table_size(name, p) \
__param_check(name, p, unsigned int);

static int param_set_max_slot_table_size(const char *val,
const struct kernel_param *kp)
{
return param_set_uint_minmax(val, kp,
RPC_MIN_SLOT_TABLE,
RPC_MAX_SLOT_TABLE_LIMIT);
}

static struct kernel_param_ops param_ops_max_slot_table_size = {
.set = param_set_max_slot_table_size,
.get = param_get_uint,
};

#define param_check_max_slot_table_size(name, p) \
__param_check(name, p, unsigned int);

module_param_named(tcp_slot_table_entries, xprt_tcp_slot_table_entries,
slot_table_size, 0644);
module_param_named(tcp_max_slot_table_entries, xprt_max_tcp_slot_table_entries,
max_slot_table_size, 0644);
module_param_named(udp_slot_table_entries, xprt_udp_slot_table_entries,
slot_table_size, 0644);

0 comments on commit d9ba131

Please sign in to comment.