Skip to content

Commit

Permalink
fuse: support splice() reading from fuse device
Browse files Browse the repository at this point in the history
Allow userspace filesystem implementation to use splice() to read from
the fuse device.

The userspace filesystem can now transfer data coming from a WRITE
request to an arbitrary file descriptor (regular file, block device or
socket) without having to go through a userspace buffer.

The semantics of using splice() to read messages are:

 1)  with a single splice() call move the whole message from the fuse
     device to a temporary pipe
 2)  read the header from the pipe and determine the message type
 3a) if message is a WRITE then splice data from pipe to destination
 3b) else read rest of message to userspace buffer

Signed-off-by: Miklos Szeredi <mszeredi@suse.cz>
  • Loading branch information
Miklos Szeredi committed May 25, 2010
1 parent ce534fb commit c302162
Showing 1 changed file with 187 additions and 41 deletions.
228 changes: 187 additions & 41 deletions fs/fuse/dev.c
Original file line number Diff line number Diff line change
Expand Up @@ -515,13 +515,12 @@ struct fuse_copy_state {
};

static void fuse_copy_init(struct fuse_copy_state *cs, struct fuse_conn *fc,
int write, struct fuse_req *req,
int write,
const struct iovec *iov, unsigned long nr_segs)
{
memset(cs, 0, sizeof(*cs));
cs->fc = fc;
cs->write = write;
cs->req = req;
cs->iov = iov;
cs->nr_segs = nr_segs;
}
Expand All @@ -532,8 +531,12 @@ static void fuse_copy_finish(struct fuse_copy_state *cs)
if (cs->currbuf) {
struct pipe_buffer *buf = cs->currbuf;

buf->ops->unmap(cs->pipe, buf, cs->mapaddr);

if (!cs->write) {
buf->ops->unmap(cs->pipe, buf, cs->mapaddr);
} else {
kunmap_atomic(cs->mapaddr, KM_USER0);
buf->len = PAGE_SIZE - cs->len;
}
cs->currbuf = NULL;
cs->mapaddr = NULL;
} else if (cs->mapaddr) {
Expand Down Expand Up @@ -561,17 +564,39 @@ static int fuse_copy_fill(struct fuse_copy_state *cs)
if (cs->pipebufs) {
struct pipe_buffer *buf = cs->pipebufs;

err = buf->ops->confirm(cs->pipe, buf);
if (err)
return err;
if (!cs->write) {
err = buf->ops->confirm(cs->pipe, buf);
if (err)
return err;

BUG_ON(!cs->nr_segs);
cs->currbuf = buf;
cs->mapaddr = buf->ops->map(cs->pipe, buf, 1);
cs->len = buf->len;
cs->buf = cs->mapaddr + buf->offset;
cs->pipebufs++;
cs->nr_segs--;
} else {
struct page *page;

BUG_ON(!cs->nr_segs);
cs->currbuf = buf;
cs->mapaddr = buf->ops->map(cs->pipe, buf, 1);
cs->len = buf->len;
cs->buf = cs->mapaddr + buf->offset;
cs->pipebufs++;
cs->nr_segs--;
if (cs->nr_segs == cs->pipe->buffers)
return -EIO;

page = alloc_page(GFP_HIGHUSER);
if (!page)
return -ENOMEM;

buf->page = page;
buf->offset = 0;
buf->len = 0;

cs->currbuf = buf;
cs->mapaddr = kmap_atomic(page, KM_USER0);
cs->buf = cs->mapaddr;
cs->len = PAGE_SIZE;
cs->pipebufs++;
cs->nr_segs++;
}
} else {
if (!cs->seglen) {
BUG_ON(!cs->nr_segs);
Expand Down Expand Up @@ -731,6 +756,30 @@ static int fuse_try_move_page(struct fuse_copy_state *cs, struct page **pagep)
return 1;
}

static int fuse_ref_page(struct fuse_copy_state *cs, struct page *page,
unsigned offset, unsigned count)
{
struct pipe_buffer *buf;

if (cs->nr_segs == cs->pipe->buffers)
return -EIO;

unlock_request(cs->fc, cs->req);
fuse_copy_finish(cs);

buf = cs->pipebufs;
page_cache_get(page);
buf->page = page;
buf->offset = offset;
buf->len = count;

cs->pipebufs++;
cs->nr_segs++;
cs->len = 0;

return 0;
}

/*
* Copy a page in the request to/from the userspace buffer. Must be
* done atomically
Expand All @@ -747,7 +796,9 @@ static int fuse_copy_page(struct fuse_copy_state *cs, struct page **pagep,
kunmap_atomic(mapaddr, KM_USER1);
}
while (count) {
if (!cs->len) {
if (cs->write && cs->pipebufs && page) {
return fuse_ref_page(cs, page, offset, count);
} else if (!cs->len) {
if (cs->move_pages && page &&
offset == 0 && count == PAGE_SIZE) {
err = fuse_try_move_page(cs, pagep);
Expand Down Expand Up @@ -862,11 +913,10 @@ __acquires(&fc->lock)
*
* Called with fc->lock held, releases it
*/
static int fuse_read_interrupt(struct fuse_conn *fc, struct fuse_req *req,
const struct iovec *iov, unsigned long nr_segs)
static int fuse_read_interrupt(struct fuse_conn *fc, struct fuse_copy_state *cs,
size_t nbytes, struct fuse_req *req)
__releases(&fc->lock)
{
struct fuse_copy_state cs;
struct fuse_in_header ih;
struct fuse_interrupt_in arg;
unsigned reqsize = sizeof(ih) + sizeof(arg);
Expand All @@ -882,14 +932,13 @@ __releases(&fc->lock)
arg.unique = req->in.h.unique;

spin_unlock(&fc->lock);
if (iov_length(iov, nr_segs) < reqsize)
if (nbytes < reqsize)
return -EINVAL;

fuse_copy_init(&cs, fc, 1, NULL, iov, nr_segs);
err = fuse_copy_one(&cs, &ih, sizeof(ih));
err = fuse_copy_one(cs, &ih, sizeof(ih));
if (!err)
err = fuse_copy_one(&cs, &arg, sizeof(arg));
fuse_copy_finish(&cs);
err = fuse_copy_one(cs, &arg, sizeof(arg));
fuse_copy_finish(cs);

return err ? err : reqsize;
}
Expand All @@ -903,18 +952,13 @@ __releases(&fc->lock)
* request_end(). Otherwise add it to the processing list, and set
* the 'sent' flag.
*/
static ssize_t fuse_dev_read(struct kiocb *iocb, const struct iovec *iov,
unsigned long nr_segs, loff_t pos)
static ssize_t fuse_dev_do_read(struct fuse_conn *fc, struct file *file,
struct fuse_copy_state *cs, size_t nbytes)
{
int err;
struct fuse_req *req;
struct fuse_in *in;
struct fuse_copy_state cs;
unsigned reqsize;
struct file *file = iocb->ki_filp;
struct fuse_conn *fc = fuse_get_conn(file);
if (!fc)
return -EPERM;

restart:
spin_lock(&fc->lock);
Expand All @@ -934,7 +978,7 @@ static ssize_t fuse_dev_read(struct kiocb *iocb, const struct iovec *iov,
if (!list_empty(&fc->interrupts)) {
req = list_entry(fc->interrupts.next, struct fuse_req,
intr_entry);
return fuse_read_interrupt(fc, req, iov, nr_segs);
return fuse_read_interrupt(fc, cs, nbytes, req);
}

req = list_entry(fc->pending.next, struct fuse_req, list);
Expand All @@ -944,7 +988,7 @@ static ssize_t fuse_dev_read(struct kiocb *iocb, const struct iovec *iov,
in = &req->in;
reqsize = in->h.len;
/* If request is too large, reply with an error and restart the read */
if (iov_length(iov, nr_segs) < reqsize) {
if (nbytes < reqsize) {
req->out.h.error = -EIO;
/* SETXATTR is special, since it may contain too large data */
if (in->h.opcode == FUSE_SETXATTR)
Expand All @@ -953,12 +997,12 @@ static ssize_t fuse_dev_read(struct kiocb *iocb, const struct iovec *iov,
goto restart;
}
spin_unlock(&fc->lock);
fuse_copy_init(&cs, fc, 1, req, iov, nr_segs);
err = fuse_copy_one(&cs, &in->h, sizeof(in->h));
cs->req = req;
err = fuse_copy_one(cs, &in->h, sizeof(in->h));
if (!err)
err = fuse_copy_args(&cs, in->numargs, in->argpages,
err = fuse_copy_args(cs, in->numargs, in->argpages,
(struct fuse_arg *) in->args, 0);
fuse_copy_finish(&cs);
fuse_copy_finish(cs);
spin_lock(&fc->lock);
req->locked = 0;
if (req->aborted) {
Expand Down Expand Up @@ -986,6 +1030,110 @@ static ssize_t fuse_dev_read(struct kiocb *iocb, const struct iovec *iov,
return err;
}

static ssize_t fuse_dev_read(struct kiocb *iocb, const struct iovec *iov,
unsigned long nr_segs, loff_t pos)
{
struct fuse_copy_state cs;
struct file *file = iocb->ki_filp;
struct fuse_conn *fc = fuse_get_conn(file);
if (!fc)
return -EPERM;

fuse_copy_init(&cs, fc, 1, iov, nr_segs);

return fuse_dev_do_read(fc, file, &cs, iov_length(iov, nr_segs));
}

static int fuse_dev_pipe_buf_steal(struct pipe_inode_info *pipe,
struct pipe_buffer *buf)
{
return 1;
}

static const struct pipe_buf_operations fuse_dev_pipe_buf_ops = {
.can_merge = 0,
.map = generic_pipe_buf_map,
.unmap = generic_pipe_buf_unmap,
.confirm = generic_pipe_buf_confirm,
.release = generic_pipe_buf_release,
.steal = fuse_dev_pipe_buf_steal,
.get = generic_pipe_buf_get,
};

static ssize_t fuse_dev_splice_read(struct file *in, loff_t *ppos,
struct pipe_inode_info *pipe,
size_t len, unsigned int flags)
{
int ret;
int page_nr = 0;
int do_wakeup = 0;
struct pipe_buffer *bufs;
struct fuse_copy_state cs;
struct fuse_conn *fc = fuse_get_conn(in);
if (!fc)
return -EPERM;

bufs = kmalloc(pipe->buffers * sizeof (struct pipe_buffer), GFP_KERNEL);
if (!bufs)
return -ENOMEM;

fuse_copy_init(&cs, fc, 1, NULL, 0);
cs.pipebufs = bufs;
cs.pipe = pipe;
ret = fuse_dev_do_read(fc, in, &cs, len);
if (ret < 0)
goto out;

ret = 0;
pipe_lock(pipe);

if (!pipe->readers) {
send_sig(SIGPIPE, current, 0);
if (!ret)
ret = -EPIPE;
goto out_unlock;
}

if (pipe->nrbufs + cs.nr_segs > pipe->buffers) {
ret = -EIO;
goto out_unlock;
}

while (page_nr < cs.nr_segs) {
int newbuf = (pipe->curbuf + pipe->nrbufs) & (pipe->buffers - 1);
struct pipe_buffer *buf = pipe->bufs + newbuf;

buf->page = bufs[page_nr].page;
buf->offset = bufs[page_nr].offset;
buf->len = bufs[page_nr].len;
buf->ops = &fuse_dev_pipe_buf_ops;

pipe->nrbufs++;
page_nr++;
ret += buf->len;

if (pipe->inode)
do_wakeup = 1;
}

out_unlock:
pipe_unlock(pipe);

if (do_wakeup) {
smp_mb();
if (waitqueue_active(&pipe->wait))
wake_up_interruptible(&pipe->wait);
kill_fasync(&pipe->fasync_readers, SIGIO, POLL_IN);
}

out:
for (; page_nr < cs.nr_segs; page_nr++)
page_cache_release(bufs[page_nr].page);

kfree(bufs);
return ret;
}

static int fuse_notify_poll(struct fuse_conn *fc, unsigned int size,
struct fuse_copy_state *cs)
{
Expand Down Expand Up @@ -1246,7 +1394,7 @@ static ssize_t fuse_dev_write(struct kiocb *iocb, const struct iovec *iov,
if (!fc)
return -EPERM;

fuse_copy_init(&cs, fc, 0, NULL, iov, nr_segs);
fuse_copy_init(&cs, fc, 0, iov, nr_segs);

return fuse_dev_do_write(fc, &cs, iov_length(iov, nr_segs));
}
Expand Down Expand Up @@ -1311,11 +1459,8 @@ static ssize_t fuse_dev_splice_write(struct pipe_inode_info *pipe,
}
pipe_unlock(pipe);

memset(&cs, 0, sizeof(struct fuse_copy_state));
cs.fc = fc;
cs.write = 0;
fuse_copy_init(&cs, fc, 0, NULL, nbuf);
cs.pipebufs = bufs;
cs.nr_segs = nbuf;
cs.pipe = pipe;

if (flags & SPLICE_F_MOVE)
Expand Down Expand Up @@ -1473,6 +1618,7 @@ const struct file_operations fuse_dev_operations = {
.llseek = no_llseek,
.read = do_sync_read,
.aio_read = fuse_dev_read,
.splice_read = fuse_dev_splice_read,
.write = do_sync_write,
.aio_write = fuse_dev_write,
.splice_write = fuse_dev_splice_write,
Expand Down

0 comments on commit c302162

Please sign in to comment.