Skip to content

Commit

Permalink
---
Browse files Browse the repository at this point in the history
yaml
---
r: 58526
b: refs/heads/master
c: ebf9909
h: refs/heads/master
v: v3
  • Loading branch information
Tom Zanussi authored and Jens Axboe committed Jul 10, 2007
1 parent c67fd1b commit 05cfa71
Show file tree
Hide file tree
Showing 2 changed files with 192 additions and 51 deletions.
2 changes: 1 addition & 1 deletion [refs]
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
---
refs/heads/master: cf8208d0eabd1d5d2625ec02a175a294c3f30d36
refs/heads/master: ebf9909343392c929d9943c04f421cd42e03b530
241 changes: 191 additions & 50 deletions trunk/kernel/relay.c
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <linux/vmalloc.h>
#include <linux/mm.h>
#include <linux/cpu.h>
#include <linux/pipe_fs_i.h>

/* list of open channels, for cpu hotplug */
static DEFINE_MUTEX(relay_channels_mutex);
Expand Down Expand Up @@ -121,6 +122,7 @@ static void *relay_alloc_buf(struct rchan_buf *buf, size_t *size)
buf->page_array[i] = alloc_page(GFP_KERNEL);
if (unlikely(!buf->page_array[i]))
goto depopulate;
set_page_private(buf->page_array[i], (unsigned long)buf);
}
mem = vmap(buf->page_array, n_pages, VM_MAP, PAGE_KERNEL);
if (!mem)
Expand Down Expand Up @@ -970,43 +972,6 @@ static int subbuf_read_actor(size_t read_start,
return ret;
}

/*
* subbuf_send_actor - send up to one subbuf's worth of data
*/
static int subbuf_send_actor(size_t read_start,
struct rchan_buf *buf,
size_t avail,
read_descriptor_t *desc,
read_actor_t actor)
{
unsigned long pidx, poff;
unsigned int subbuf_pages;
int ret = 0;

subbuf_pages = buf->chan->alloc_size >> PAGE_SHIFT;
pidx = (read_start / PAGE_SIZE) % subbuf_pages;
poff = read_start & ~PAGE_MASK;
while (avail) {
struct page *p = buf->page_array[pidx];
unsigned int len;

len = PAGE_SIZE - poff;
if (len > avail)
len = avail;

len = actor(desc, p, poff, len);
if (desc->error)
break;

avail -= len;
ret += len;
poff = 0;
pidx = (pidx + 1) % subbuf_pages;
}

return ret;
}

typedef int (*subbuf_actor_t) (size_t read_start,
struct rchan_buf *buf,
size_t avail,
Expand Down Expand Up @@ -1067,19 +1032,195 @@ static ssize_t relay_file_read(struct file *filp,
NULL, &desc);
}

static ssize_t relay_file_sendfile(struct file *filp,
loff_t *ppos,
size_t count,
read_actor_t actor,
void *target)
static void relay_pipe_buf_release(struct pipe_inode_info *pipe,
struct pipe_buffer *buf)
{
read_descriptor_t desc;
desc.written = 0;
desc.count = count;
desc.arg.data = target;
desc.error = 0;
return relay_file_read_subbufs(filp, ppos, subbuf_send_actor,
actor, &desc);
struct rchan_buf *rbuf;

rbuf = (struct rchan_buf *)page_private(buf->page);

rbuf->bytes_consumed += PAGE_SIZE;

if (rbuf->bytes_consumed == rbuf->chan->subbuf_size) {
relay_subbufs_consumed(rbuf->chan, rbuf->cpu, 1);
rbuf->bytes_consumed = 0;
}
}

static struct pipe_buf_operations relay_pipe_buf_ops = {
.can_merge = 0,
.map = generic_pipe_buf_map,
.unmap = generic_pipe_buf_unmap,
.pin = generic_pipe_buf_pin,
.release = relay_pipe_buf_release,
.steal = generic_pipe_buf_steal,
.get = generic_pipe_buf_get,
};

/**
* subbuf_splice_actor - splice up to one subbuf's worth of data
*/
static int subbuf_splice_actor(struct file *in,
loff_t *ppos,
struct pipe_inode_info *pipe,
size_t len,
unsigned int flags,
int *nonpad_ret)
{
unsigned int pidx, poff;
unsigned int subbuf_pages;
int ret = 0;
int do_wakeup = 0;
struct rchan_buf *rbuf = in->private_data;
unsigned int subbuf_size = rbuf->chan->subbuf_size;
size_t read_start = ((size_t)*ppos) % rbuf->chan->alloc_size;
size_t avail = subbuf_size - read_start % subbuf_size;
size_t read_subbuf = read_start / subbuf_size;
size_t padding = rbuf->padding[read_subbuf];
size_t nonpad_end = read_subbuf * subbuf_size + subbuf_size - padding;

if (rbuf->subbufs_produced == rbuf->subbufs_consumed)
return 0;

if (len > avail)
len = avail;

if (pipe->inode)
mutex_lock(&pipe->inode->i_mutex);

subbuf_pages = rbuf->chan->alloc_size >> PAGE_SHIFT;
pidx = (read_start / PAGE_SIZE) % subbuf_pages;
poff = read_start & ~PAGE_MASK;

for (;;) {
unsigned int this_len;
unsigned int this_end;
int newbuf = (pipe->curbuf + pipe->nrbufs) & (PIPE_BUFFERS - 1);
struct pipe_buffer *buf = pipe->bufs + newbuf;

if (!pipe->readers) {
send_sig(SIGPIPE, current, 0);
if (!ret)
ret = -EPIPE;
break;
}

if (pipe->nrbufs < PIPE_BUFFERS) {
this_len = PAGE_SIZE - poff;
if (this_len > avail)
this_len = avail;

buf->page = rbuf->page_array[pidx];
buf->offset = poff;
this_end = read_start + ret + this_len;
if (this_end > nonpad_end) {
if (read_start + ret >= nonpad_end)
buf->len = 0;
else
buf->len = nonpad_end - (read_start + ret);
} else
buf->len = this_len;

*nonpad_ret += buf->len;

buf->ops = &relay_pipe_buf_ops;
pipe->nrbufs++;

avail -= this_len;
ret += this_len;
poff = 0;
pidx = (pidx + 1) % subbuf_pages;

if (pipe->inode)
do_wakeup = 1;

if (!avail)
break;

if (pipe->nrbufs < PIPE_BUFFERS)
continue;

break;
}

if (flags & SPLICE_F_NONBLOCK) {
if (!ret)
ret = -EAGAIN;
break;
}

if (signal_pending(current)) {
if (!ret)
ret = -ERESTARTSYS;
break;
}

if (do_wakeup) {
smp_mb();
if (waitqueue_active(&pipe->wait))
wake_up_interruptible_sync(&pipe->wait);
kill_fasync(&pipe->fasync_readers, SIGIO, POLL_IN);
do_wakeup = 0;
}

pipe->waiting_writers++;
pipe_wait(pipe);
pipe->waiting_writers--;
}

if (pipe->inode)
mutex_unlock(&pipe->inode->i_mutex);

if (do_wakeup) {
smp_mb();
if (waitqueue_active(&pipe->wait))
wake_up_interruptible(&pipe->wait);
kill_fasync(&pipe->fasync_readers, SIGIO, POLL_IN);
}

return ret;
}

static ssize_t relay_file_splice_read(struct file *in,
loff_t *ppos,
struct pipe_inode_info *pipe,
size_t len,
unsigned int flags)
{
ssize_t spliced;
int ret;
int nonpad_ret = 0;

ret = 0;
spliced = 0;

while (len) {
ret = subbuf_splice_actor(in, ppos, pipe, len, flags, &nonpad_ret);
if (ret < 0)
break;
else if (!ret) {
break;
if (spliced)
break;
if (flags & SPLICE_F_NONBLOCK) {
ret = -EAGAIN;
break;
}
}

*ppos += ret;
if (ret > len)
len = 0;
else
len -= ret;
spliced += nonpad_ret;
nonpad_ret = 0;
}

if (spliced)
return spliced;

return ret;
}

const struct file_operations relay_file_operations = {
Expand All @@ -1089,7 +1230,7 @@ const struct file_operations relay_file_operations = {
.read = relay_file_read,
.llseek = no_llseek,
.release = relay_file_release,
.sendfile = relay_file_sendfile,
.splice_read = relay_file_splice_read,
};
EXPORT_SYMBOL_GPL(relay_file_operations);

Expand Down

0 comments on commit 05cfa71

Please sign in to comment.