Skip to content

Commit

Permalink
NFS: rewrite directio read to use async coalesce code
Browse files Browse the repository at this point in the history
This also has the advantage that it allows directio to use pnfs.

Signed-off-by: Fred Isaman <iisaman@netapp.com>
Signed-off-by: Trond Myklebust <Trond.Myklebust@netapp.com>
  • Loading branch information
Fred Isaman authored and Trond Myklebust committed Apr 27, 2012
1 parent 1825a0d commit 584aa81
Show file tree
Hide file tree
Showing 6 changed files with 138 additions and 144 deletions.
255 changes: 123 additions & 132 deletions fs/nfs/direct.c
Original file line number Diff line number Diff line change
Expand Up @@ -124,22 +124,6 @@ ssize_t nfs_direct_IO(int rw, struct kiocb *iocb, const struct iovec *iov, loff_
return -EINVAL;
}

static void nfs_direct_dirty_pages(struct page **pages, unsigned int pgbase, size_t count)
{
unsigned int npages;
unsigned int i;

if (count == 0)
return;
pages += (pgbase >> PAGE_SHIFT);
npages = (count + (pgbase & ~PAGE_MASK) + PAGE_SIZE - 1) >> PAGE_SHIFT;
for (i = 0; i < npages; i++) {
struct page *page = pages[i];
if (!PageCompound(page))
set_page_dirty(page);
}
}

static void nfs_direct_release_pages(struct page **pages, unsigned int npages)
{
unsigned int i;
Expand Down Expand Up @@ -226,177 +210,178 @@ static void nfs_direct_complete(struct nfs_direct_req *dreq)
nfs_direct_req_release(dreq);
}

/*
* We must hold a reference to all the pages in this direct read request
* until the RPCs complete. This could be long *after* we are woken up in
* nfs_direct_wait (for instance, if someone hits ^C on a slow server).
*/
static void nfs_direct_read_result(struct rpc_task *task, void *calldata)
void nfs_direct_readpage_release(struct nfs_page *req)
{
struct nfs_read_data *data = calldata;

nfs_readpage_result(task, data);
dprintk("NFS: direct read done (%s/%lld %d@%lld)\n",
req->wb_context->dentry->d_inode->i_sb->s_id,
(long long)NFS_FILEID(req->wb_context->dentry->d_inode),
req->wb_bytes,
(long long)req_offset(req));
nfs_release_request(req);
}

static void nfs_direct_read_release(void *calldata)
static void nfs_direct_read_completion(struct nfs_pgio_header *hdr)
{
unsigned long bytes = 0;
struct nfs_direct_req *dreq = hdr->dreq;

struct nfs_read_data *data = calldata;
struct nfs_direct_req *dreq = (struct nfs_direct_req *)data->header->req;
int status = data->task.tk_status;
if (test_bit(NFS_IOHDR_REDO, &hdr->flags))
goto out_put;

spin_lock(&dreq->lock);
if (unlikely(status < 0)) {
dreq->error = status;
spin_unlock(&dreq->lock);
if (test_bit(NFS_IOHDR_ERROR, &hdr->flags) && (hdr->good_bytes == 0))
dreq->error = hdr->error;
else
dreq->count += hdr->good_bytes;
spin_unlock(&dreq->lock);

if (!test_bit(NFS_IOHDR_ERROR, &hdr->flags)) {
while (!list_empty(&hdr->pages)) {
struct nfs_page *req = nfs_list_entry(hdr->pages.next);
struct page *page = req->wb_page;

if (test_bit(NFS_IOHDR_EOF, &hdr->flags)) {
if (bytes > hdr->good_bytes)
zero_user(page, 0, PAGE_SIZE);
else if (hdr->good_bytes - bytes < PAGE_SIZE)
zero_user_segment(page,
hdr->good_bytes & ~PAGE_MASK,
PAGE_SIZE);
}
bytes += req->wb_bytes;
nfs_list_remove_request(req);
nfs_direct_readpage_release(req);
if (!PageCompound(page))
set_page_dirty(page);
page_cache_release(page);
}
} else {
dreq->count += data->res.count;
spin_unlock(&dreq->lock);
nfs_direct_dirty_pages(data->pages.pagevec,
data->args.pgbase,
data->res.count);
while (!list_empty(&hdr->pages)) {
struct nfs_page *req = nfs_list_entry(hdr->pages.next);

if (bytes < hdr->good_bytes)
if (!PageCompound(req->wb_page))
set_page_dirty(req->wb_page);
bytes += req->wb_bytes;
page_cache_release(req->wb_page);
nfs_list_remove_request(req);
nfs_direct_readpage_release(req);
}
}
nfs_direct_release_pages(data->pages.pagevec, data->pages.npages);

out_put:
if (put_dreq(dreq))
nfs_direct_complete(dreq);
nfs_readdata_release(data);
hdr->release(hdr);
}

static const struct rpc_call_ops nfs_read_direct_ops = {
.rpc_call_prepare = nfs_read_prepare,
.rpc_call_done = nfs_direct_read_result,
.rpc_release = nfs_direct_read_release,
};

static void nfs_direct_readhdr_release(struct nfs_read_header *rhdr)
static void nfs_sync_pgio_error(struct list_head *head)
{
struct nfs_read_data *data = &rhdr->rpc_data;
struct nfs_page *req;

if (data->pages.pagevec != data->pages.page_array)
kfree(data->pages.pagevec);
nfs_readhdr_free(&rhdr->header);
while (!list_empty(head)) {
req = nfs_list_entry(head->next);
nfs_list_remove_request(req);
nfs_release_request(req);
}
}

static void nfs_direct_pgio_init(struct nfs_pgio_header *hdr)
{
get_dreq(hdr->dreq);
}

static const struct nfs_pgio_completion_ops nfs_direct_read_completion_ops = {
.error_cleanup = nfs_sync_pgio_error,
.init_hdr = nfs_direct_pgio_init,
.completion = nfs_direct_read_completion,
};

/*
* For each rsize'd chunk of the user's buffer, dispatch an NFS READ
* operation. If nfs_readdata_alloc() or get_user_pages() fails,
* bail and stop sending more reads. Read length accounting is
* handled automatically by nfs_direct_read_result(). Otherwise, if
* no requests have been sent, just return an error.
*/
static ssize_t nfs_direct_read_schedule_segment(struct nfs_direct_req *dreq,
static ssize_t nfs_direct_read_schedule_segment(struct nfs_pageio_descriptor *desc,
const struct iovec *iov,
loff_t pos)
{
struct nfs_direct_req *dreq = desc->pg_dreq;
struct nfs_open_context *ctx = dreq->ctx;
struct inode *inode = ctx->dentry->d_inode;
unsigned long user_addr = (unsigned long)iov->iov_base;
size_t count = iov->iov_len;
size_t rsize = NFS_SERVER(inode)->rsize;
struct rpc_task *task;
struct rpc_message msg = {
.rpc_cred = ctx->cred,
};
struct rpc_task_setup task_setup_data = {
.rpc_client = NFS_CLIENT(inode),
.rpc_message = &msg,
.callback_ops = &nfs_read_direct_ops,
.workqueue = nfsiod_workqueue,
.flags = RPC_TASK_ASYNC,
};
unsigned int pgbase;
int result;
ssize_t started = 0;
struct page **pagevec = NULL;
unsigned int npages;

do {
struct nfs_read_header *rhdr;
struct nfs_read_data *data;
struct nfs_page_array *pages;
size_t bytes;
int i;

pgbase = user_addr & ~PAGE_MASK;
bytes = min(rsize,count);
bytes = min(max(rsize, PAGE_SIZE), count);

result = -ENOMEM;
rhdr = nfs_readhdr_alloc();
if (unlikely(!rhdr))
break;
data = nfs_readdata_alloc(&rhdr->header, nfs_page_array_len(pgbase, bytes));
if (!data) {
nfs_readhdr_free(&rhdr->header);
npages = nfs_page_array_len(pgbase, bytes);
if (!pagevec)
pagevec = kmalloc(npages * sizeof(struct page *),
GFP_KERNEL);
if (!pagevec)
break;
}
data->header = &rhdr->header;
atomic_inc(&data->header->refcnt);
pages = &data->pages;

down_read(&current->mm->mmap_sem);
result = get_user_pages(current, current->mm, user_addr,
pages->npages, 1, 0, pages->pagevec, NULL);
npages, 1, 0, pagevec, NULL);
up_read(&current->mm->mmap_sem);
if (result < 0) {
nfs_direct_readhdr_release(rhdr);
if (result < 0)
break;
}
if ((unsigned)result < pages->npages) {
if ((unsigned)result < npages) {
bytes = result * PAGE_SIZE;
if (bytes <= pgbase) {
nfs_direct_release_pages(pages->pagevec, result);
nfs_direct_readhdr_release(rhdr);
nfs_direct_release_pages(pagevec, result);
break;
}
bytes -= pgbase;
pages->npages = result;
npages = result;
}

get_dreq(dreq);

rhdr->header.req = (struct nfs_page *) dreq;
rhdr->header.inode = inode;
rhdr->header.cred = msg.rpc_cred;
data->args.fh = NFS_FH(inode);
data->args.context = get_nfs_open_context(ctx);
data->args.lock_context = dreq->l_ctx;
data->args.offset = pos;
data->args.pgbase = pgbase;
data->args.pages = pages->pagevec;
data->args.count = bytes;
data->res.fattr = &data->fattr;
data->res.eof = 0;
data->res.count = bytes;
nfs_fattr_init(&data->fattr);
msg.rpc_argp = &data->args;
msg.rpc_resp = &data->res;

task_setup_data.task = &data->task;
task_setup_data.callback_data = data;
NFS_PROTO(inode)->read_setup(data, &msg);

task = rpc_run_task(&task_setup_data);
if (IS_ERR(task))
break;

dprintk("NFS: %5u initiated direct read call "
"(req %s/%Ld, %zu bytes @ offset %Lu)\n",
task->tk_pid,
inode->i_sb->s_id,
(long long)NFS_FILEID(inode),
bytes,
(unsigned long long)data->args.offset);
rpc_put_task(task);

started += bytes;
user_addr += bytes;
pos += bytes;
/* FIXME: Remove this unnecessary math from final patch */
pgbase += bytes;
pgbase &= ~PAGE_MASK;
BUG_ON(pgbase != (user_addr & ~PAGE_MASK));

count -= bytes;
for (i = 0; i < npages; i++) {
struct nfs_page *req;
unsigned int req_len = min(bytes, PAGE_SIZE - pgbase);
/* XXX do we need to do the eof zeroing found in async_filler? */
req = nfs_create_request(dreq->ctx, dreq->inode,
pagevec[i],
pgbase, req_len);
if (IS_ERR(req)) {
nfs_direct_release_pages(pagevec + i,
npages - i);
result = PTR_ERR(req);
break;
}
req->wb_index = pos >> PAGE_SHIFT;
req->wb_offset = pos & ~PAGE_MASK;
if (!nfs_pageio_add_request(desc, req)) {
result = desc->pg_error;
nfs_release_request(req);
nfs_direct_release_pages(pagevec + i,
npages - i);
break;
}
pgbase = 0;
bytes -= req_len;
started += req_len;
user_addr += req_len;
pos += req_len;
count -= req_len;
}
} while (count != 0);

kfree(pagevec);

if (started)
return started;
return result < 0 ? (ssize_t) result : -EFAULT;
Expand All @@ -407,15 +392,19 @@ static ssize_t nfs_direct_read_schedule_iovec(struct nfs_direct_req *dreq,
unsigned long nr_segs,
loff_t pos)
{
struct nfs_pageio_descriptor desc;
ssize_t result = -EINVAL;
size_t requested_bytes = 0;
unsigned long seg;

nfs_pageio_init_read(&desc, dreq->inode,
&nfs_direct_read_completion_ops);
get_dreq(dreq);
desc.pg_dreq = dreq;

for (seg = 0; seg < nr_segs; seg++) {
const struct iovec *vec = &iov[seg];
result = nfs_direct_read_schedule_segment(dreq, vec, pos);
result = nfs_direct_read_schedule_segment(&desc, vec, pos);
if (result < 0)
break;
requested_bytes += result;
Expand All @@ -424,6 +413,8 @@ static ssize_t nfs_direct_read_schedule_iovec(struct nfs_direct_req *dreq,
pos += vec->iov_len;
}

nfs_pageio_complete(&desc);

/*
* If no bytes were started, return the error, and let the
* generic layer handle the completion.
Expand Down
5 changes: 3 additions & 2 deletions fs/nfs/internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -304,8 +304,9 @@ struct nfs_pgio_completion_ops;
/* read.c */
extern struct nfs_read_header *nfs_readhdr_alloc(void);
extern void nfs_readhdr_free(struct nfs_pgio_header *hdr);
extern struct nfs_read_data *nfs_readdata_alloc(struct nfs_pgio_header *hdr,
unsigned int pagecount);
extern void nfs_pageio_init_read(struct nfs_pageio_descriptor *pgio,
struct inode *inode,
const struct nfs_pgio_completion_ops *compl_ops);
extern int nfs_initiate_read(struct rpc_clnt *clnt,
struct nfs_read_data *data,
const struct rpc_call_ops *call_ops);
Expand Down
7 changes: 4 additions & 3 deletions fs/nfs/pagelist.c
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,11 @@ void nfs_pgheader_init(struct nfs_pageio_descriptor *desc,
hdr->cred = hdr->req->wb_context->cred;
hdr->io_start = req_offset(hdr->req);
hdr->good_bytes = desc->pg_count;
hdr->dreq = desc->pg_dreq;
hdr->release = release;
hdr->completion_ops = desc->pg_completion_ops;
if (hdr->completion_ops->init_hdr)
hdr->completion_ops->init_hdr(hdr);
}

void nfs_set_pgio_error(struct nfs_pgio_header *hdr, int error, loff_t pos)
Expand Down Expand Up @@ -116,9 +119,6 @@ nfs_create_request(struct nfs_open_context *ctx, struct inode *inode,
req->wb_page = page;
req->wb_index = page->index;
page_cache_get(page);
BUG_ON(PagePrivate(page));
BUG_ON(!PageLocked(page));
BUG_ON(page->mapping->host != inode);
req->wb_offset = offset;
req->wb_pgbase = offset;
req->wb_bytes = count;
Expand Down Expand Up @@ -257,6 +257,7 @@ void nfs_pageio_init(struct nfs_pageio_descriptor *desc,
desc->pg_ioflags = io_flags;
desc->pg_error = 0;
desc->pg_lseg = NULL;
desc->pg_dreq = NULL;
}

/**
Expand Down
Loading

0 comments on commit 584aa81

Please sign in to comment.