Skip to content

Commit

Permalink
xprtrdma: Split the completion queue
Browse files Browse the repository at this point in the history
The current CQ handler uses the ib_wc.opcode field to distinguish
between event types. However, the contents of that field are not
reliable if the completion status is not IB_WC_SUCCESS.

When an error completion occurs on a send event, the CQ handler
schedules a tasklet with something that is not a struct rpcrdma_rep.
This is never correct behavior, and sometimes it results in a panic.

To resolve this issue, split the completion queue into a send CQ and
a receive CQ. The send CQ handler now handles only struct rpcrdma_mw
wr_id's, and the receive CQ handler now handles only struct
rpcrdma_rep wr_id's.

Fix suggested by Shirley Ma <shirley.ma@oracle.com>

Reported-by: Rafael Reiter <rafael.reiter@ims.co.at>
Fixes: 5c635e0
BugLink: https://bugzilla.kernel.org/show_bug.cgi?id=73211
Signed-off-by: Chuck Lever <chuck.lever@oracle.com>
Tested-by: Klemens Senn <klemens.senn@ims.co.at>
Tested-by: Steve Wise <swise@opengridcomputing.com>
Signed-off-by: Anna Schumaker <Anna.Schumaker@Netapp.com>
  • Loading branch information
Chuck Lever authored and Anna Schumaker committed Jun 4, 2014
1 parent 7f1d541 commit fc66448
Show file tree
Hide file tree
Showing 2 changed files with 137 additions and 92 deletions.
228 changes: 137 additions & 91 deletions net/sunrpc/xprtrdma/verbs.c
Original file line number Diff line number Diff line change
Expand Up @@ -142,96 +142,115 @@ rpcrdma_cq_async_error_upcall(struct ib_event *event, void *context)
}
}

static inline
void rpcrdma_event_process(struct ib_wc *wc)
static void
rpcrdma_sendcq_process_wc(struct ib_wc *wc)
{
struct rpcrdma_mw *frmr;
struct rpcrdma_rep *rep =
(struct rpcrdma_rep *)(unsigned long) wc->wr_id;
struct rpcrdma_mw *frmr = (struct rpcrdma_mw *)(unsigned long)wc->wr_id;

dprintk("RPC: %s: event rep %p status %X opcode %X length %u\n",
__func__, rep, wc->status, wc->opcode, wc->byte_len);
dprintk("RPC: %s: frmr %p status %X opcode %d\n",
__func__, frmr, wc->status, wc->opcode);

if (!rep) /* send completion that we don't care about */
if (wc->wr_id == 0ULL)
return;

if (IB_WC_SUCCESS != wc->status) {
dprintk("RPC: %s: WC opcode %d status %X, connection lost\n",
__func__, wc->opcode, wc->status);
rep->rr_len = ~0U;
if (wc->opcode != IB_WC_FAST_REG_MR && wc->opcode != IB_WC_LOCAL_INV)
rpcrdma_schedule_tasklet(rep);
if (wc->status != IB_WC_SUCCESS)
return;
}

switch (wc->opcode) {
case IB_WC_FAST_REG_MR:
frmr = (struct rpcrdma_mw *)(unsigned long)wc->wr_id;
if (wc->opcode == IB_WC_FAST_REG_MR)
frmr->r.frmr.state = FRMR_IS_VALID;
break;
case IB_WC_LOCAL_INV:
frmr = (struct rpcrdma_mw *)(unsigned long)wc->wr_id;
else if (wc->opcode == IB_WC_LOCAL_INV)
frmr->r.frmr.state = FRMR_IS_INVALID;
break;
case IB_WC_RECV:
rep->rr_len = wc->byte_len;
ib_dma_sync_single_for_cpu(
rdmab_to_ia(rep->rr_buffer)->ri_id->device,
rep->rr_iov.addr, rep->rr_len, DMA_FROM_DEVICE);
/* Keep (only) the most recent credits, after check validity */
if (rep->rr_len >= 16) {
struct rpcrdma_msg *p =
(struct rpcrdma_msg *) rep->rr_base;
unsigned int credits = ntohl(p->rm_credit);
if (credits == 0) {
dprintk("RPC: %s: server"
" dropped credits to 0!\n", __func__);
/* don't deadlock */
credits = 1;
} else if (credits > rep->rr_buffer->rb_max_requests) {
dprintk("RPC: %s: server"
" over-crediting: %d (%d)\n",
__func__, credits,
rep->rr_buffer->rb_max_requests);
credits = rep->rr_buffer->rb_max_requests;
}
atomic_set(&rep->rr_buffer->rb_credits, credits);
}
rpcrdma_schedule_tasklet(rep);
break;
default:
dprintk("RPC: %s: unexpected WC event %X\n",
__func__, wc->opcode);
break;
}
}

static inline int
rpcrdma_cq_poll(struct ib_cq *cq)
static int
rpcrdma_sendcq_poll(struct ib_cq *cq)
{
struct ib_wc wc;
int rc;

for (;;) {
rc = ib_poll_cq(cq, 1, &wc);
if (rc < 0) {
dprintk("RPC: %s: ib_poll_cq failed %i\n",
__func__, rc);
return rc;
}
if (rc == 0)
break;
while ((rc = ib_poll_cq(cq, 1, &wc)) == 1)
rpcrdma_sendcq_process_wc(&wc);
return rc;
}

rpcrdma_event_process(&wc);
/*
* Handle send, fast_reg_mr, and local_inv completions.
*
* Send events are typically suppressed and thus do not result
* in an upcall. Occasionally one is signaled, however. This
* prevents the provider's completion queue from wrapping and
* losing a completion.
*/
static void
rpcrdma_sendcq_upcall(struct ib_cq *cq, void *cq_context)
{
int rc;

rc = rpcrdma_sendcq_poll(cq);
if (rc) {
dprintk("RPC: %s: ib_poll_cq failed: %i\n",
__func__, rc);
return;
}

return 0;
rc = ib_req_notify_cq(cq, IB_CQ_NEXT_COMP);
if (rc) {
dprintk("RPC: %s: ib_req_notify_cq failed: %i\n",
__func__, rc);
return;
}

rpcrdma_sendcq_poll(cq);
}

static void
rpcrdma_recvcq_process_wc(struct ib_wc *wc)
{
struct rpcrdma_rep *rep =
(struct rpcrdma_rep *)(unsigned long)wc->wr_id;

dprintk("RPC: %s: rep %p status %X opcode %X length %u\n",
__func__, rep, wc->status, wc->opcode, wc->byte_len);

if (wc->status != IB_WC_SUCCESS) {
rep->rr_len = ~0U;
goto out_schedule;
}
if (wc->opcode != IB_WC_RECV)
return;

rep->rr_len = wc->byte_len;
ib_dma_sync_single_for_cpu(rdmab_to_ia(rep->rr_buffer)->ri_id->device,
rep->rr_iov.addr, rep->rr_len, DMA_FROM_DEVICE);

if (rep->rr_len >= 16) {
struct rpcrdma_msg *p = (struct rpcrdma_msg *)rep->rr_base;
unsigned int credits = ntohl(p->rm_credit);

if (credits == 0)
credits = 1; /* don't deadlock */
else if (credits > rep->rr_buffer->rb_max_requests)
credits = rep->rr_buffer->rb_max_requests;
atomic_set(&rep->rr_buffer->rb_credits, credits);
}

out_schedule:
rpcrdma_schedule_tasklet(rep);
}

static int
rpcrdma_recvcq_poll(struct ib_cq *cq)
{
struct ib_wc wc;
int rc;

while ((rc = ib_poll_cq(cq, 1, &wc)) == 1)
rpcrdma_recvcq_process_wc(&wc);
return rc;
}

/*
* rpcrdma_cq_event_upcall
* Handle receive completions.
*
* This upcall handles recv and send events.
* It is reentrant but processes single events in order to maintain
* ordering of receives to keep server credits.
*
Expand All @@ -240,26 +259,27 @@ rpcrdma_cq_poll(struct ib_cq *cq)
* connection shutdown. That is, the structures required for
* the completion of the reply handler must remain intact until
* all memory has been reclaimed.
*
* Note that send events are suppressed and do not result in an upcall.
*/
static void
rpcrdma_cq_event_upcall(struct ib_cq *cq, void *context)
rpcrdma_recvcq_upcall(struct ib_cq *cq, void *cq_context)
{
int rc;

rc = rpcrdma_cq_poll(cq);
if (rc)
rc = rpcrdma_recvcq_poll(cq);
if (rc) {
dprintk("RPC: %s: ib_poll_cq failed: %i\n",
__func__, rc);
return;
}

rc = ib_req_notify_cq(cq, IB_CQ_NEXT_COMP);
if (rc) {
dprintk("RPC: %s: ib_req_notify_cq failed %i\n",
dprintk("RPC: %s: ib_req_notify_cq failed: %i\n",
__func__, rc);
return;
}

rpcrdma_cq_poll(cq);
rpcrdma_recvcq_poll(cq);
}

#ifdef RPC_DEBUG
Expand Down Expand Up @@ -610,6 +630,7 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
struct rpcrdma_create_data_internal *cdata)
{
struct ib_device_attr devattr;
struct ib_cq *sendcq, *recvcq;
int rc, err;

rc = ib_query_device(ia->ri_id->device, &devattr);
Expand Down Expand Up @@ -685,34 +706,51 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
ep->rep_attr.cap.max_recv_sge);

/* set trigger for requesting send completion */
ep->rep_cqinit = ep->rep_attr.cap.max_send_wr/2 /* - 1*/;
ep->rep_cqinit = ep->rep_attr.cap.max_send_wr/2 - 1;
if (ep->rep_cqinit <= 2)
ep->rep_cqinit = 0;
INIT_CQCOUNT(ep);
ep->rep_ia = ia;
init_waitqueue_head(&ep->rep_connect_wait);
INIT_DELAYED_WORK(&ep->rep_connect_worker, rpcrdma_connect_worker);

ep->rep_cq = ib_create_cq(ia->ri_id->device, rpcrdma_cq_event_upcall,
sendcq = ib_create_cq(ia->ri_id->device, rpcrdma_sendcq_upcall,
rpcrdma_cq_async_error_upcall, NULL,
ep->rep_attr.cap.max_recv_wr +
ep->rep_attr.cap.max_send_wr + 1, 0);
if (IS_ERR(ep->rep_cq)) {
rc = PTR_ERR(ep->rep_cq);
dprintk("RPC: %s: ib_create_cq failed: %i\n",
if (IS_ERR(sendcq)) {
rc = PTR_ERR(sendcq);
dprintk("RPC: %s: failed to create send CQ: %i\n",
__func__, rc);
goto out1;
}

rc = ib_req_notify_cq(ep->rep_cq, IB_CQ_NEXT_COMP);
rc = ib_req_notify_cq(sendcq, IB_CQ_NEXT_COMP);
if (rc) {
dprintk("RPC: %s: ib_req_notify_cq failed: %i\n",
__func__, rc);
goto out2;
}

ep->rep_attr.send_cq = ep->rep_cq;
ep->rep_attr.recv_cq = ep->rep_cq;
recvcq = ib_create_cq(ia->ri_id->device, rpcrdma_recvcq_upcall,
rpcrdma_cq_async_error_upcall, NULL,
ep->rep_attr.cap.max_recv_wr + 1, 0);
if (IS_ERR(recvcq)) {
rc = PTR_ERR(recvcq);
dprintk("RPC: %s: failed to create recv CQ: %i\n",
__func__, rc);
goto out2;
}

rc = ib_req_notify_cq(recvcq, IB_CQ_NEXT_COMP);
if (rc) {
dprintk("RPC: %s: ib_req_notify_cq failed: %i\n",
__func__, rc);
ib_destroy_cq(recvcq);
goto out2;
}

ep->rep_attr.send_cq = sendcq;
ep->rep_attr.recv_cq = recvcq;

/* Initialize cma parameters */

Expand All @@ -734,7 +772,7 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
return 0;

out2:
err = ib_destroy_cq(ep->rep_cq);
err = ib_destroy_cq(sendcq);
if (err)
dprintk("RPC: %s: ib_destroy_cq returned %i\n",
__func__, err);
Expand Down Expand Up @@ -774,8 +812,14 @@ rpcrdma_ep_destroy(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
ep->rep_pad_mr = NULL;
}

rpcrdma_clean_cq(ep->rep_cq);
rc = ib_destroy_cq(ep->rep_cq);
rpcrdma_clean_cq(ep->rep_attr.recv_cq);
rc = ib_destroy_cq(ep->rep_attr.recv_cq);
if (rc)
dprintk("RPC: %s: ib_destroy_cq returned %i\n",
__func__, rc);

rpcrdma_clean_cq(ep->rep_attr.send_cq);
rc = ib_destroy_cq(ep->rep_attr.send_cq);
if (rc)
dprintk("RPC: %s: ib_destroy_cq returned %i\n",
__func__, rc);
Expand All @@ -798,7 +842,9 @@ rpcrdma_ep_connect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
if (rc && rc != -ENOTCONN)
dprintk("RPC: %s: rpcrdma_ep_disconnect"
" status %i\n", __func__, rc);
rpcrdma_clean_cq(ep->rep_cq);

rpcrdma_clean_cq(ep->rep_attr.recv_cq);
rpcrdma_clean_cq(ep->rep_attr.send_cq);

xprt = container_of(ia, struct rpcrdma_xprt, rx_ia);
id = rpcrdma_create_id(xprt, ia,
Expand Down Expand Up @@ -907,7 +953,8 @@ rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
{
int rc;

rpcrdma_clean_cq(ep->rep_cq);
rpcrdma_clean_cq(ep->rep_attr.recv_cq);
rpcrdma_clean_cq(ep->rep_attr.send_cq);
rc = rdma_disconnect(ia->ri_id);
if (!rc) {
/* returns without wait if not connected */
Expand Down Expand Up @@ -1727,7 +1774,6 @@ rpcrdma_ep_post_recv(struct rpcrdma_ia *ia,
ib_dma_sync_single_for_cpu(ia->ri_id->device,
rep->rr_iov.addr, rep->rr_iov.length, DMA_BIDIRECTIONAL);

DECR_CQCOUNT(ep);
rc = ib_post_recv(ia->ri_id->qp, &recv_wr, &recv_wr_fail);

if (rc)
Expand Down
1 change: 0 additions & 1 deletion net/sunrpc/xprtrdma/xprt_rdma.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@ struct rpcrdma_ep {
int rep_cqinit;
int rep_connected;
struct rpcrdma_ia *rep_ia;
struct ib_cq *rep_cq;
struct ib_qp_init_attr rep_attr;
wait_queue_head_t rep_connect_wait;
struct ib_sge rep_pad; /* holds zeroed pad */
Expand Down

0 comments on commit fc66448

Please sign in to comment.