Skip to content

Commit

Permalink
xprtrdma: Pre-allocate backward rpc_rqst and send/receive buffers
Browse files Browse the repository at this point in the history
xprtrdma's backward direction send and receive buffers are the same
size as the forechannel's inline threshold, and must be pre-
registered.

The consumer has no control over which receive buffer the adapter
chooses to catch an incoming backwards-direction call. Any receive
buffer can be used for either a forward reply or a backward call.
Thus both types of RPC message must all be the same size.

Signed-off-by: Chuck Lever <chuck.lever@oracle.com>
Reviewed-by: Sagi Grimberg <sagig@mellanox.com>
Tested-By: Devesh Sharma <devesh.sharma@avagotech.com>
Signed-off-by: Anna Schumaker <Anna.Schumaker@Netapp.com>
  • Loading branch information
Chuck Lever authored and Anna Schumaker committed Nov 2, 2015
1 parent 42e5c3e commit f531a5d
Show file tree
Hide file tree
Showing 5 changed files with 309 additions and 12 deletions.
1 change: 1 addition & 0 deletions net/sunrpc/xprtrdma/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ rpcrdma-y := transport.o rpc_rdma.o verbs.o \
svc_rdma.o svc_rdma_transport.o \
svc_rdma_marshal.o svc_rdma_sendto.o svc_rdma_recvfrom.o \
module.o
rpcrdma-$(CONFIG_SUNRPC_BACKCHANNEL) += backchannel.o
206 changes: 206 additions & 0 deletions net/sunrpc/xprtrdma/backchannel.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
/*
* Copyright (c) 2015 Oracle. All rights reserved.
*
* Support for backward direction RPCs on RPC/RDMA.
*/

#include <linux/module.h>

#include "xprt_rdma.h"

#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
# define RPCDBG_FACILITY RPCDBG_TRANS
#endif

static void rpcrdma_bc_free_rqst(struct rpcrdma_xprt *r_xprt,
struct rpc_rqst *rqst)
{
struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
struct rpcrdma_req *req = rpcr_to_rdmar(rqst);

spin_lock(&buf->rb_reqslock);
list_del(&req->rl_all);
spin_unlock(&buf->rb_reqslock);

rpcrdma_destroy_req(&r_xprt->rx_ia, req);

kfree(rqst);
}

static int rpcrdma_bc_setup_rqst(struct rpcrdma_xprt *r_xprt,
struct rpc_rqst *rqst)
{
struct rpcrdma_ia *ia = &r_xprt->rx_ia;
struct rpcrdma_regbuf *rb;
struct rpcrdma_req *req;
struct xdr_buf *buf;
size_t size;

req = rpcrdma_create_req(r_xprt);
if (!req)
return -ENOMEM;
req->rl_backchannel = true;

size = RPCRDMA_INLINE_WRITE_THRESHOLD(rqst);
rb = rpcrdma_alloc_regbuf(ia, size, GFP_KERNEL);
if (IS_ERR(rb))
goto out_fail;
req->rl_rdmabuf = rb;

size += RPCRDMA_INLINE_READ_THRESHOLD(rqst);
rb = rpcrdma_alloc_regbuf(ia, size, GFP_KERNEL);
if (IS_ERR(rb))
goto out_fail;
rb->rg_owner = req;
req->rl_sendbuf = rb;
/* so that rpcr_to_rdmar works when receiving a request */
rqst->rq_buffer = (void *)req->rl_sendbuf->rg_base;

buf = &rqst->rq_snd_buf;
buf->head[0].iov_base = rqst->rq_buffer;
buf->head[0].iov_len = 0;
buf->tail[0].iov_base = NULL;
buf->tail[0].iov_len = 0;
buf->page_len = 0;
buf->len = 0;
buf->buflen = size;

return 0;

out_fail:
rpcrdma_bc_free_rqst(r_xprt, rqst);
return -ENOMEM;
}

/* Allocate and add receive buffers to the rpcrdma_buffer's
* existing list of rep's. These are released when the
* transport is destroyed.
*/
static int rpcrdma_bc_setup_reps(struct rpcrdma_xprt *r_xprt,
unsigned int count)
{
struct rpcrdma_buffer *buffers = &r_xprt->rx_buf;
struct rpcrdma_rep *rep;
unsigned long flags;
int rc = 0;

while (count--) {
rep = rpcrdma_create_rep(r_xprt);
if (IS_ERR(rep)) {
pr_err("RPC: %s: reply buffer alloc failed\n",
__func__);
rc = PTR_ERR(rep);
break;
}

spin_lock_irqsave(&buffers->rb_lock, flags);
list_add(&rep->rr_list, &buffers->rb_recv_bufs);
spin_unlock_irqrestore(&buffers->rb_lock, flags);
}

return rc;
}

/**
* xprt_rdma_bc_setup - Pre-allocate resources for handling backchannel requests
* @xprt: transport associated with these backchannel resources
* @reqs: number of concurrent incoming requests to expect
*
* Returns 0 on success; otherwise a negative errno
*/
int xprt_rdma_bc_setup(struct rpc_xprt *xprt, unsigned int reqs)
{
struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
struct rpcrdma_buffer *buffer = &r_xprt->rx_buf;
struct rpc_rqst *rqst;
unsigned int i;
int rc;

/* The backchannel reply path returns each rpc_rqst to the
* bc_pa_list _after_ the reply is sent. If the server is
* faster than the client, it can send another backward
* direction request before the rpc_rqst is returned to the
* list. The client rejects the request in this case.
*
* Twice as many rpc_rqsts are prepared to ensure there is
* always an rpc_rqst available as soon as a reply is sent.
*/
for (i = 0; i < (reqs << 1); i++) {
rqst = kzalloc(sizeof(*rqst), GFP_KERNEL);
if (!rqst) {
pr_err("RPC: %s: Failed to create bc rpc_rqst\n",
__func__);
goto out_free;
}

rqst->rq_xprt = &r_xprt->rx_xprt;
INIT_LIST_HEAD(&rqst->rq_list);
INIT_LIST_HEAD(&rqst->rq_bc_list);

if (rpcrdma_bc_setup_rqst(r_xprt, rqst))
goto out_free;

spin_lock_bh(&xprt->bc_pa_lock);
list_add(&rqst->rq_bc_pa_list, &xprt->bc_pa_list);
spin_unlock_bh(&xprt->bc_pa_lock);
}

rc = rpcrdma_bc_setup_reps(r_xprt, reqs);
if (rc)
goto out_free;

rc = rpcrdma_ep_post_extra_recv(r_xprt, reqs);
if (rc)
goto out_free;

buffer->rb_bc_srv_max_requests = reqs;
request_module("svcrdma");

return 0;

out_free:
xprt_rdma_bc_destroy(xprt, reqs);

pr_err("RPC: %s: setup backchannel transport failed\n", __func__);
return -ENOMEM;
}

/**
* xprt_rdma_bc_destroy - Release resources for handling backchannel requests
* @xprt: transport associated with these backchannel resources
* @reqs: number of incoming requests to destroy; ignored
*/
void xprt_rdma_bc_destroy(struct rpc_xprt *xprt, unsigned int reqs)
{
struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
struct rpc_rqst *rqst, *tmp;

spin_lock_bh(&xprt->bc_pa_lock);
list_for_each_entry_safe(rqst, tmp, &xprt->bc_pa_list, rq_bc_pa_list) {
list_del(&rqst->rq_bc_pa_list);
spin_unlock_bh(&xprt->bc_pa_lock);

rpcrdma_bc_free_rqst(r_xprt, rqst);

spin_lock_bh(&xprt->bc_pa_lock);
}
spin_unlock_bh(&xprt->bc_pa_lock);
}

/**
* xprt_rdma_bc_free_rqst - Release a backchannel rqst
* @rqst: request to release
*/
void xprt_rdma_bc_free_rqst(struct rpc_rqst *rqst)
{
struct rpc_xprt *xprt = rqst->rq_xprt;

smp_mb__before_atomic();
WARN_ON_ONCE(!test_bit(RPC_BC_PA_IN_USE, &rqst->rq_bc_pa_state));
clear_bit(RPC_BC_PA_IN_USE, &rqst->rq_bc_pa_state);
smp_mb__after_atomic();

spin_lock_bh(&xprt->bc_pa_lock);
list_add_tail(&rqst->rq_bc_pa_list, &xprt->bc_pa_list);
spin_unlock_bh(&xprt->bc_pa_lock);
}
7 changes: 6 additions & 1 deletion net/sunrpc/xprtrdma/transport.c
Original file line number Diff line number Diff line change
Expand Up @@ -705,7 +705,12 @@ static struct rpc_xprt_ops xprt_rdma_procs = {
.print_stats = xprt_rdma_print_stats,
.enable_swap = xprt_rdma_enable_swap,
.disable_swap = xprt_rdma_disable_swap,
.inject_disconnect = xprt_rdma_inject_disconnect
.inject_disconnect = xprt_rdma_inject_disconnect,
#if defined(CONFIG_SUNRPC_BACKCHANNEL)
.bc_setup = xprt_rdma_bc_setup,
.bc_free_rqst = xprt_rdma_bc_free_rqst,
.bc_destroy = xprt_rdma_bc_destroy,
#endif
};

static struct xprt_class xprt_rdma = {
Expand Down
87 changes: 76 additions & 11 deletions net/sunrpc/xprtrdma/verbs.c
Original file line number Diff line number Diff line change
Expand Up @@ -831,7 +831,21 @@ rpcrdma_ep_connect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
}
rc = ep->rep_connected;
} else {
struct rpcrdma_xprt *r_xprt;
unsigned int extras;

dprintk("RPC: %s: connected\n", __func__);

r_xprt = container_of(ia, struct rpcrdma_xprt, rx_ia);
extras = r_xprt->rx_buf.rb_bc_srv_max_requests;

if (extras) {
rc = rpcrdma_ep_post_extra_recv(r_xprt, extras);
if (rc)
pr_warn("%s: rpcrdma_ep_post_extra_recv: %i\n",
__func__, rc);
rc = 0;
}
}

out:
Expand Down Expand Up @@ -868,20 +882,25 @@ rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
}
}

static struct rpcrdma_req *
struct rpcrdma_req *
rpcrdma_create_req(struct rpcrdma_xprt *r_xprt)
{
struct rpcrdma_buffer *buffer = &r_xprt->rx_buf;
struct rpcrdma_req *req;

req = kzalloc(sizeof(*req), GFP_KERNEL);
if (req == NULL)
return ERR_PTR(-ENOMEM);

INIT_LIST_HEAD(&req->rl_free);
spin_lock(&buffer->rb_reqslock);
list_add(&req->rl_all, &buffer->rb_allreqs);
spin_unlock(&buffer->rb_reqslock);
req->rl_buffer = &r_xprt->rx_buf;
return req;
}

static struct rpcrdma_rep *
struct rpcrdma_rep *
rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt)
{
struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
Expand Down Expand Up @@ -920,13 +939,16 @@ rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
int i, rc;

buf->rb_max_requests = r_xprt->rx_data.max_requests;
buf->rb_bc_srv_max_requests = 0;
spin_lock_init(&buf->rb_lock);

rc = ia->ri_ops->ro_init(r_xprt);
if (rc)
goto out;

INIT_LIST_HEAD(&buf->rb_send_bufs);
INIT_LIST_HEAD(&buf->rb_allreqs);
spin_lock_init(&buf->rb_reqslock);
for (i = 0; i < buf->rb_max_requests; i++) {
struct rpcrdma_req *req;

Expand All @@ -937,6 +959,7 @@ rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
rc = PTR_ERR(req);
goto out;
}
req->rl_backchannel = false;
list_add(&req->rl_free, &buf->rb_send_bufs);
}

Expand Down Expand Up @@ -985,19 +1008,13 @@ rpcrdma_buffer_get_rep_locked(struct rpcrdma_buffer *buf)
static void
rpcrdma_destroy_rep(struct rpcrdma_ia *ia, struct rpcrdma_rep *rep)
{
if (!rep)
return;

rpcrdma_free_regbuf(ia, rep->rr_rdmabuf);
kfree(rep);
}

static void
void
rpcrdma_destroy_req(struct rpcrdma_ia *ia, struct rpcrdma_req *req)
{
if (!req)
return;

rpcrdma_free_regbuf(ia, req->rl_sendbuf);
rpcrdma_free_regbuf(ia, req->rl_rdmabuf);
kfree(req);
Expand All @@ -1015,12 +1032,19 @@ rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
rpcrdma_destroy_rep(ia, rep);
}

while (!list_empty(&buf->rb_send_bufs)) {
spin_lock(&buf->rb_reqslock);
while (!list_empty(&buf->rb_allreqs)) {
struct rpcrdma_req *req;

req = rpcrdma_buffer_get_req_locked(buf);
req = list_first_entry(&buf->rb_allreqs,
struct rpcrdma_req, rl_all);
list_del(&req->rl_all);

spin_unlock(&buf->rb_reqslock);
rpcrdma_destroy_req(ia, req);
spin_lock(&buf->rb_reqslock);
}
spin_unlock(&buf->rb_reqslock);

ia->ri_ops->ro_destroy(buf);
}
Expand Down Expand Up @@ -1288,6 +1312,47 @@ rpcrdma_ep_post_recv(struct rpcrdma_ia *ia,
return rc;
}

/**
* rpcrdma_ep_post_extra_recv - Post buffers for incoming backchannel requests
* @r_xprt: transport associated with these backchannel resources
* @min_reqs: minimum number of incoming requests expected
*
* Returns zero if all requested buffers were posted, or a negative errno.
*/
int
rpcrdma_ep_post_extra_recv(struct rpcrdma_xprt *r_xprt, unsigned int count)
{
struct rpcrdma_buffer *buffers = &r_xprt->rx_buf;
struct rpcrdma_ia *ia = &r_xprt->rx_ia;
struct rpcrdma_ep *ep = &r_xprt->rx_ep;
struct rpcrdma_rep *rep;
unsigned long flags;
int rc;

while (count--) {
spin_lock_irqsave(&buffers->rb_lock, flags);
if (list_empty(&buffers->rb_recv_bufs))
goto out_reqbuf;
rep = rpcrdma_buffer_get_rep_locked(buffers);
spin_unlock_irqrestore(&buffers->rb_lock, flags);

rc = rpcrdma_ep_post_recv(ia, ep, rep);
if (rc)
goto out_rc;
}

return 0;

out_reqbuf:
spin_unlock_irqrestore(&buffers->rb_lock, flags);
pr_warn("%s: no extra receive buffers\n", __func__);
return -ENOMEM;

out_rc:
rpcrdma_recv_buffer_put(rep);
return rc;
}

/* How many chunk list items fit within our inline buffers?
*/
unsigned int
Expand Down
Loading

0 comments on commit f531a5d

Please sign in to comment.