Skip to content

Commit

Permalink
svcrdma: Modify the RPC reply path to use FRMR when available
Browse files Browse the repository at this point in the history
Use FRMR to map local RPC reply data. This allows RDMA_WRITE to send reply
data using a single WR. The FRMR is invalidated by linking the LOCAL_INV WR
to the RDMA_SEND message used to complete the reply.

Signed-off-by: Tom Tucker <tom@opengridcomputing.com>
  • Loading branch information
Tom Tucker committed Oct 6, 2008
1 parent 146b6df commit afd566e
Showing 2 changed files with 217 additions and 40 deletions.
255 changes: 215 additions & 40 deletions net/sunrpc/xprtrdma/svc_rdma_sendto.c
Original file line number Diff line number Diff line change
@@ -69,9 +69,127 @@
* array is only concerned with the reply we are assured that we have
* on extra page for the RPCRMDA header.
*/
static void xdr_to_sge(struct svcxprt_rdma *xprt,
struct xdr_buf *xdr,
struct svc_rdma_req_map *vec)
int fast_reg_xdr(struct svcxprt_rdma *xprt,
struct xdr_buf *xdr,
struct svc_rdma_req_map *vec)
{
int sge_no;
u32 sge_bytes;
u32 page_bytes;
u32 page_off;
int page_no = 0;
u8 *frva;
struct svc_rdma_fastreg_mr *frmr;

frmr = svc_rdma_get_frmr(xprt);
if (IS_ERR(frmr))
return -ENOMEM;
vec->frmr = frmr;

/* Skip the RPCRDMA header */
sge_no = 1;

/* Map the head. */
frva = (void *)((unsigned long)(xdr->head[0].iov_base) & PAGE_MASK);
vec->sge[sge_no].iov_base = xdr->head[0].iov_base;
vec->sge[sge_no].iov_len = xdr->head[0].iov_len;
vec->count = 2;
sge_no++;

/* Build the FRMR */
frmr->kva = frva;
frmr->direction = DMA_TO_DEVICE;
frmr->access_flags = 0;
frmr->map_len = PAGE_SIZE;
frmr->page_list_len = 1;
frmr->page_list->page_list[page_no] =
ib_dma_map_single(xprt->sc_cm_id->device,
(void *)xdr->head[0].iov_base,
PAGE_SIZE, DMA_TO_DEVICE);
if (ib_dma_mapping_error(xprt->sc_cm_id->device,
frmr->page_list->page_list[page_no]))
goto fatal_err;
atomic_inc(&xprt->sc_dma_used);

page_off = xdr->page_base;
page_bytes = xdr->page_len + page_off;
if (!page_bytes)
goto encode_tail;

/* Map the pages */
vec->sge[sge_no].iov_base = frva + frmr->map_len + page_off;
vec->sge[sge_no].iov_len = page_bytes;
sge_no++;
while (page_bytes) {
struct page *page;

page = xdr->pages[page_no++];
sge_bytes = min_t(u32, page_bytes, (PAGE_SIZE - page_off));
page_bytes -= sge_bytes;

frmr->page_list->page_list[page_no] =
ib_dma_map_page(xprt->sc_cm_id->device, page, 0,
PAGE_SIZE, DMA_TO_DEVICE);
if (ib_dma_mapping_error(xprt->sc_cm_id->device,
frmr->page_list->page_list[page_no]))
goto fatal_err;

atomic_inc(&xprt->sc_dma_used);
page_off = 0; /* reset for next time through loop */
frmr->map_len += PAGE_SIZE;
frmr->page_list_len++;
}
vec->count++;

encode_tail:
/* Map tail */
if (0 == xdr->tail[0].iov_len)
goto done;

vec->count++;
vec->sge[sge_no].iov_len = xdr->tail[0].iov_len;

if (((unsigned long)xdr->tail[0].iov_base & PAGE_MASK) ==
((unsigned long)xdr->head[0].iov_base & PAGE_MASK)) {
/*
* If head and tail use the same page, we don't need
* to map it again.
*/
vec->sge[sge_no].iov_base = xdr->tail[0].iov_base;
} else {
void *va;

/* Map another page for the tail */
page_off = (unsigned long)xdr->tail[0].iov_base & ~PAGE_MASK;
va = (void *)((unsigned long)xdr->tail[0].iov_base & PAGE_MASK);
vec->sge[sge_no].iov_base = frva + frmr->map_len + page_off;

frmr->page_list->page_list[page_no] =
ib_dma_map_single(xprt->sc_cm_id->device, va, PAGE_SIZE,
DMA_TO_DEVICE);
if (ib_dma_mapping_error(xprt->sc_cm_id->device,
frmr->page_list->page_list[page_no]))
goto fatal_err;
atomic_inc(&xprt->sc_dma_used);
frmr->map_len += PAGE_SIZE;
frmr->page_list_len++;
}

done:
if (svc_rdma_fastreg(xprt, frmr))
goto fatal_err;

return 0;

fatal_err:
printk("svcrdma: Error fast registering memory for xprt %p\n", xprt);
svc_rdma_put_frmr(xprt, frmr);
return -EIO;
}

static int map_xdr(struct svcxprt_rdma *xprt,
struct xdr_buf *xdr,
struct svc_rdma_req_map *vec)
{
int sge_max = (xdr->len+PAGE_SIZE-1) / PAGE_SIZE + 3;
int sge_no;
@@ -83,6 +201,9 @@ static void xdr_to_sge(struct svcxprt_rdma *xprt,
BUG_ON(xdr->len !=
(xdr->head[0].iov_len + xdr->page_len + xdr->tail[0].iov_len));

if (xprt->sc_frmr_pg_list_len)
return fast_reg_xdr(xprt, xdr, vec);

/* Skip the first sge, this is for the RPCRDMA header */
sge_no = 1;

@@ -116,9 +237,12 @@ static void xdr_to_sge(struct svcxprt_rdma *xprt,

BUG_ON(sge_no > sge_max);
vec->count = sge_no;
return 0;
}

/* Assumptions:
* - We are using FRMR
* - or -
* - The specified write_len can be represented in sc_max_sge * PAGE_SIZE
*/
static int send_write(struct svcxprt_rdma *xprt, struct svc_rqst *rqstp,
@@ -158,30 +282,35 @@ static int send_write(struct svcxprt_rdma *xprt, struct svc_rqst *rqstp,
sge_no = 0;

/* Copy the remaining SGE */
while (bc != 0 && xdr_sge_no < vec->count) {
sge[sge_no].lkey = xprt->sc_phys_mr->lkey;
sge_bytes = min((size_t)bc,
(size_t)(vec->sge[xdr_sge_no].iov_len-sge_off));
while (bc != 0) {
sge_bytes = min_t(size_t,
bc, vec->sge[xdr_sge_no].iov_len-sge_off);
sge[sge_no].length = sge_bytes;
atomic_inc(&xprt->sc_dma_used);
sge[sge_no].addr =
ib_dma_map_single(xprt->sc_cm_id->device,
(void *)
vec->sge[xdr_sge_no].iov_base + sge_off,
sge_bytes, DMA_TO_DEVICE);
if (dma_mapping_error(xprt->sc_cm_id->device->dma_device,
sge[sge_no].addr))
goto err;
if (!vec->frmr) {
sge[sge_no].addr =
ib_dma_map_single(xprt->sc_cm_id->device,
(void *)
vec->sge[xdr_sge_no].iov_base + sge_off,
sge_bytes, DMA_TO_DEVICE);
if (ib_dma_mapping_error(xprt->sc_cm_id->device,
sge[sge_no].addr))
goto err;
atomic_inc(&xprt->sc_dma_used);
sge[sge_no].lkey = xprt->sc_dma_lkey;
} else {
sge[sge_no].addr = (unsigned long)
vec->sge[xdr_sge_no].iov_base + sge_off;
sge[sge_no].lkey = vec->frmr->mr->lkey;
}
ctxt->count++;
ctxt->frmr = vec->frmr;
sge_off = 0;
sge_no++;
ctxt->count++;
xdr_sge_no++;
BUG_ON(xdr_sge_no > vec->count);
bc -= sge_bytes;
}

BUG_ON(bc != 0);
BUG_ON(xdr_sge_no > vec->count);

/* Prepare WRITE WR */
memset(&write_wr, 0, sizeof write_wr);
ctxt->wr_op = IB_WR_RDMA_WRITE;
@@ -226,7 +355,10 @@ static int send_write_chunks(struct svcxprt_rdma *xprt,
res_ary = (struct rpcrdma_write_array *)
&rdma_resp->rm_body.rm_chunks[1];

max_write = xprt->sc_max_sge * PAGE_SIZE;
if (vec->frmr)
max_write = vec->frmr->map_len;
else
max_write = xprt->sc_max_sge * PAGE_SIZE;

/* Write chunks start at the pagelist */
for (xdr_off = rqstp->rq_res.head[0].iov_len, chunk_no = 0;
@@ -297,7 +429,10 @@ static int send_reply_chunks(struct svcxprt_rdma *xprt,
res_ary = (struct rpcrdma_write_array *)
&rdma_resp->rm_body.rm_chunks[2];

max_write = xprt->sc_max_sge * PAGE_SIZE;
if (vec->frmr)
max_write = vec->frmr->map_len;
else
max_write = xprt->sc_max_sge * PAGE_SIZE;

/* xdr offset starts at RPC message */
for (xdr_off = 0, chunk_no = 0;
@@ -307,7 +442,6 @@ static int send_reply_chunks(struct svcxprt_rdma *xprt,
ch = &arg_ary->wc_array[chunk_no].wc_target;
write_len = min(xfer_len, ch->rs_length);


/* Prepare the reply chunk given the length actually
* written */
rs_offset = get_unaligned(&(ch->rs_offset));
@@ -366,6 +500,7 @@ static int send_reply(struct svcxprt_rdma *rdma,
int byte_count)
{
struct ib_send_wr send_wr;
struct ib_send_wr inv_wr;
int sge_no;
int sge_bytes;
int page_no;
@@ -385,27 +520,45 @@ static int send_reply(struct svcxprt_rdma *rdma,
/* Prepare the context */
ctxt->pages[0] = page;
ctxt->count = 1;
ctxt->frmr = vec->frmr;
if (vec->frmr)
set_bit(RDMACTXT_F_FAST_UNREG, &ctxt->flags);
else
clear_bit(RDMACTXT_F_FAST_UNREG, &ctxt->flags);

/* Prepare the SGE for the RPCRDMA Header */
atomic_inc(&rdma->sc_dma_used);
ctxt->sge[0].addr =
ib_dma_map_page(rdma->sc_cm_id->device,
page, 0, PAGE_SIZE, DMA_TO_DEVICE);
if (ib_dma_mapping_error(rdma->sc_cm_id->device, ctxt->sge[0].addr))
goto err;
atomic_inc(&rdma->sc_dma_used);

ctxt->direction = DMA_TO_DEVICE;

ctxt->sge[0].length = svc_rdma_xdr_get_reply_hdr_len(rdma_resp);
ctxt->sge[0].lkey = rdma->sc_phys_mr->lkey;
ctxt->sge[0].lkey = rdma->sc_dma_lkey;

/* Determine how many of our SGE are to be transmitted */
for (sge_no = 1; byte_count && sge_no < vec->count; sge_no++) {
sge_bytes = min_t(size_t, vec->sge[sge_no].iov_len, byte_count);
byte_count -= sge_bytes;
atomic_inc(&rdma->sc_dma_used);
ctxt->sge[sge_no].addr =
ib_dma_map_single(rdma->sc_cm_id->device,
vec->sge[sge_no].iov_base,
sge_bytes, DMA_TO_DEVICE);
if (!vec->frmr) {
ctxt->sge[sge_no].addr =
ib_dma_map_single(rdma->sc_cm_id->device,
vec->sge[sge_no].iov_base,
sge_bytes, DMA_TO_DEVICE);
if (ib_dma_mapping_error(rdma->sc_cm_id->device,
ctxt->sge[sge_no].addr))
goto err;
atomic_inc(&rdma->sc_dma_used);
ctxt->sge[sge_no].lkey = rdma->sc_dma_lkey;
} else {
ctxt->sge[sge_no].addr = (unsigned long)
vec->sge[sge_no].iov_base;
ctxt->sge[sge_no].lkey = vec->frmr->mr->lkey;
}
ctxt->sge[sge_no].length = sge_bytes;
ctxt->sge[sge_no].lkey = rdma->sc_phys_mr->lkey;
}
BUG_ON(byte_count != 0);

@@ -417,24 +570,43 @@ static int send_reply(struct svcxprt_rdma *rdma,
ctxt->pages[page_no+1] = rqstp->rq_respages[page_no];
ctxt->count++;
rqstp->rq_respages[page_no] = NULL;
/* If there are more pages than SGE, terminate SGE list */
/*
* If there are more pages than SGE, terminate SGE
* list so that svc_rdma_unmap_dma doesn't attempt to
* unmap garbage.
*/
if (page_no+1 >= sge_no)
ctxt->sge[page_no+1].length = 0;
}
BUG_ON(sge_no > rdma->sc_max_sge);
BUG_ON(sge_no > ctxt->count);
memset(&send_wr, 0, sizeof send_wr);
ctxt->wr_op = IB_WR_SEND;
send_wr.wr_id = (unsigned long)ctxt;
send_wr.sg_list = ctxt->sge;
send_wr.num_sge = sge_no;
send_wr.opcode = IB_WR_SEND;
send_wr.send_flags = IB_SEND_SIGNALED;
if (vec->frmr) {
/* Prepare INVALIDATE WR */
memset(&inv_wr, 0, sizeof inv_wr);
inv_wr.opcode = IB_WR_LOCAL_INV;
inv_wr.send_flags = IB_SEND_SIGNALED;
inv_wr.ex.invalidate_rkey =
vec->frmr->mr->lkey;
send_wr.next = &inv_wr;
}

ret = svc_rdma_send(rdma, &send_wr);
if (ret)
svc_rdma_put_context(ctxt, 1);
goto err;

return ret;
return 0;

err:
svc_rdma_put_frmr(rdma, vec->frmr);
svc_rdma_put_context(ctxt, 1);
return -EIO;
}

void svc_rdma_prep_reply_hdr(struct svc_rqst *rqstp)
@@ -477,8 +649,9 @@ int svc_rdma_sendto(struct svc_rqst *rqstp)
ctxt = svc_rdma_get_context(rdma);
ctxt->direction = DMA_TO_DEVICE;
vec = svc_rdma_get_req_map();
xdr_to_sge(rdma, &rqstp->rq_res, vec);

ret = map_xdr(rdma, &rqstp->rq_res, vec);
if (ret)
goto err0;
inline_bytes = rqstp->rq_res.len;

/* Create the RDMA response header */
@@ -498,7 +671,7 @@ int svc_rdma_sendto(struct svc_rqst *rqstp)
if (ret < 0) {
printk(KERN_ERR "svcrdma: failed to send write chunks, rc=%d\n",
ret);
goto error;
goto err1;
}
inline_bytes -= ret;

@@ -508,7 +681,7 @@ int svc_rdma_sendto(struct svc_rqst *rqstp)
if (ret < 0) {
printk(KERN_ERR "svcrdma: failed to send reply chunks, rc=%d\n",
ret);
goto error;
goto err1;
}
inline_bytes -= ret;

@@ -517,9 +690,11 @@ int svc_rdma_sendto(struct svc_rqst *rqstp)
svc_rdma_put_req_map(vec);
dprintk("svcrdma: send_reply returns %d\n", ret);
return ret;
error:

err1:
put_page(res_page);
err0:
svc_rdma_put_req_map(vec);
svc_rdma_put_context(ctxt, 0);
put_page(res_page);
return ret;
}
Loading

0 comments on commit afd566e

Please sign in to comment.