Skip to content

Commit

Permalink
tracing: Consolidate protection of reader access to the ring buffer
Browse files Browse the repository at this point in the history
At the beginning, access to the ring buffer was fully serialized
by trace_types_lock. Patch d7350c3 gives more freedom to readers,
and patch b04cc6b adds code to protect trace_pipe and cpu#/trace_pipe.

But actually it is not enough, ring buffer readers are not always
read-only, they may consume data.

This patch makes accesses to trace, trace_pipe, trace_pipe_raw
cpu#/trace, cpu#/trace_pipe and cpu#/trace_pipe_raw serialized.
And removes tracing_reader_cpumask which is used to protect trace_pipe.

Details:

Ring buffer serializes readers, but it is low level protection.
The validity of the events (which returns by ring_buffer_peek() ..etc)
are not protected by ring buffer.

The content of events may become garbage if we allow another process to consume
these events concurrently:
  A) the page of the consumed events may become a normal page
     (not reader page) in ring buffer, and this page will be rewritten
     by the events producer.
  B) The page of the consumed events may become a page for splice_read,
     and this page will be returned to system.

This patch adds trace_access_lock() and trace_access_unlock() primitives.

These primitives allow multi process access to different cpu ring buffers
concurrently.

These primitives don't distinguish read-only and read-consume access.
Multi read-only access is also serialized.

And we don't use these primitives when we open files,
we only use them when we read files.

Signed-off-by: Lai Jiangshan <laijs@cn.fujitsu.com>
LKML-Reference: <4B447D52.1050602@cn.fujitsu.com>
Signed-off-by: Steven Rostedt <rostedt@goodmis.org>
  • Loading branch information
Lai Jiangshan authored and Steven Rostedt committed Jan 6, 2010
1 parent 0fa0eda commit 7e53bd4
Showing 1 changed file with 97 additions and 39 deletions.
136 changes: 97 additions & 39 deletions kernel/trace/trace.c
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
#include <linux/splice.h>
#include <linux/kdebug.h>
#include <linux/string.h>
#include <linux/rwsem.h>
#include <linux/ctype.h>
#include <linux/init.h>
#include <linux/poll.h>
Expand Down Expand Up @@ -102,9 +103,6 @@ static inline void ftrace_enable_cpu(void)

static cpumask_var_t __read_mostly tracing_buffer_mask;

/* Define which cpu buffers are currently read in trace_pipe */
static cpumask_var_t tracing_reader_cpumask;

#define for_each_tracing_cpu(cpu) \
for_each_cpu(cpu, tracing_buffer_mask)

Expand Down Expand Up @@ -243,12 +241,91 @@ static struct tracer *current_trace __read_mostly;

/*
* trace_types_lock is used to protect the trace_types list.
* This lock is also used to keep user access serialized.
* Accesses from userspace will grab this lock while userspace
* activities happen inside the kernel.
*/
static DEFINE_MUTEX(trace_types_lock);

/*
* serialize the access of the ring buffer
*
* ring buffer serializes readers, but it is low level protection.
* The validity of the events (which returns by ring_buffer_peek() ..etc)
* are not protected by ring buffer.
*
* The content of events may become garbage if we allow other process consumes
* these events concurrently:
* A) the page of the consumed events may become a normal page
* (not reader page) in ring buffer, and this page will be rewrited
* by events producer.
* B) The page of the consumed events may become a page for splice_read,
* and this page will be returned to system.
*
* These primitives allow multi process access to different cpu ring buffer
* concurrently.
*
* These primitives don't distinguish read-only and read-consume access.
* Multi read-only access are also serialized.
*/

#ifdef CONFIG_SMP
static DECLARE_RWSEM(all_cpu_access_lock);
static DEFINE_PER_CPU(struct mutex, cpu_access_lock);

static inline void trace_access_lock(int cpu)
{
if (cpu == TRACE_PIPE_ALL_CPU) {
/* gain it for accessing the whole ring buffer. */
down_write(&all_cpu_access_lock);
} else {
/* gain it for accessing a cpu ring buffer. */

/* Firstly block other trace_access_lock(TRACE_PIPE_ALL_CPU). */
down_read(&all_cpu_access_lock);

/* Secondly block other access to this @cpu ring buffer. */
mutex_lock(&per_cpu(cpu_access_lock, cpu));
}
}

static inline void trace_access_unlock(int cpu)
{
if (cpu == TRACE_PIPE_ALL_CPU) {
up_write(&all_cpu_access_lock);
} else {
mutex_unlock(&per_cpu(cpu_access_lock, cpu));
up_read(&all_cpu_access_lock);
}
}

static inline void trace_access_lock_init(void)
{
int cpu;

for_each_possible_cpu(cpu)
mutex_init(&per_cpu(cpu_access_lock, cpu));
}

#else

static DEFINE_MUTEX(access_lock);

static inline void trace_access_lock(int cpu)
{
(void)cpu;
mutex_lock(&access_lock);
}

static inline void trace_access_unlock(int cpu)
{
(void)cpu;
mutex_unlock(&access_lock);
}

static inline void trace_access_lock_init(void)
{
}

#endif

/* trace_wait is a waitqueue for tasks blocked on trace_poll */
static DECLARE_WAIT_QUEUE_HEAD(trace_wait);

Expand Down Expand Up @@ -1580,12 +1657,6 @@ static void tracing_iter_reset(struct trace_iterator *iter, int cpu)
}

/*
* No necessary locking here. The worst thing which can
* happen is loosing events consumed at the same time
* by a trace_pipe reader.
* Other than that, we don't risk to crash the ring buffer
* because it serializes the readers.
*
* The current tracer is copied to avoid a global locking
* all around.
*/
Expand Down Expand Up @@ -1640,12 +1711,16 @@ static void *s_start(struct seq_file *m, loff_t *pos)
}

trace_event_read_lock();
trace_access_lock(cpu_file);
return p;
}

static void s_stop(struct seq_file *m, void *p)
{
struct trace_iterator *iter = m->private;

atomic_dec(&trace_record_cmdline_disabled);
trace_access_unlock(iter->cpu_file);
trace_event_read_unlock();
}

Expand Down Expand Up @@ -2836,22 +2911,6 @@ static int tracing_open_pipe(struct inode *inode, struct file *filp)

mutex_lock(&trace_types_lock);

/* We only allow one reader per cpu */
if (cpu_file == TRACE_PIPE_ALL_CPU) {
if (!cpumask_empty(tracing_reader_cpumask)) {
ret = -EBUSY;
goto out;
}
cpumask_setall(tracing_reader_cpumask);
} else {
if (!cpumask_test_cpu(cpu_file, tracing_reader_cpumask))
cpumask_set_cpu(cpu_file, tracing_reader_cpumask);
else {
ret = -EBUSY;
goto out;
}
}

/* create a buffer to store the information to pass to userspace */
iter = kzalloc(sizeof(*iter), GFP_KERNEL);
if (!iter) {
Expand Down Expand Up @@ -2907,12 +2966,6 @@ static int tracing_release_pipe(struct inode *inode, struct file *file)

mutex_lock(&trace_types_lock);

if (iter->cpu_file == TRACE_PIPE_ALL_CPU)
cpumask_clear(tracing_reader_cpumask);
else
cpumask_clear_cpu(iter->cpu_file, tracing_reader_cpumask);


if (iter->trace->pipe_close)
iter->trace->pipe_close(iter);

Expand Down Expand Up @@ -3074,6 +3127,7 @@ tracing_read_pipe(struct file *filp, char __user *ubuf,
iter->pos = -1;

trace_event_read_lock();
trace_access_lock(iter->cpu_file);
while (find_next_entry_inc(iter) != NULL) {
enum print_line_t ret;
int len = iter->seq.len;
Expand All @@ -3090,6 +3144,7 @@ tracing_read_pipe(struct file *filp, char __user *ubuf,
if (iter->seq.len >= cnt)
break;
}
trace_access_unlock(iter->cpu_file);
trace_event_read_unlock();

/* Now copy what we have to the user */
Expand Down Expand Up @@ -3215,6 +3270,7 @@ static ssize_t tracing_splice_read_pipe(struct file *filp,
}

trace_event_read_lock();
trace_access_lock(iter->cpu_file);

/* Fill as many pages as possible. */
for (i = 0, rem = len; i < PIPE_BUFFERS && rem; i++) {
Expand All @@ -3238,6 +3294,7 @@ static ssize_t tracing_splice_read_pipe(struct file *filp,
trace_seq_init(&iter->seq);
}

trace_access_unlock(iter->cpu_file);
trace_event_read_unlock();
mutex_unlock(&iter->mutex);

Expand Down Expand Up @@ -3539,10 +3596,12 @@ tracing_buffers_read(struct file *filp, char __user *ubuf,

info->read = 0;

trace_access_lock(info->cpu);
ret = ring_buffer_read_page(info->tr->buffer,
&info->spare,
count,
info->cpu, 0);
trace_access_unlock(info->cpu);
if (ret < 0)
return 0;

Expand Down Expand Up @@ -3670,6 +3729,7 @@ tracing_buffers_splice_read(struct file *file, loff_t *ppos,
len &= PAGE_MASK;
}

trace_access_lock(info->cpu);
entries = ring_buffer_entries_cpu(info->tr->buffer, info->cpu);

for (i = 0; i < PIPE_BUFFERS && len && entries; i++, len -= PAGE_SIZE) {
Expand Down Expand Up @@ -3717,6 +3777,7 @@ tracing_buffers_splice_read(struct file *file, loff_t *ppos,
entries = ring_buffer_entries_cpu(info->tr->buffer, info->cpu);
}

trace_access_unlock(info->cpu);
spd.nr_pages = i;

/* did we read anything? */
Expand Down Expand Up @@ -4153,6 +4214,8 @@ static __init int tracer_init_debugfs(void)
struct dentry *d_tracer;
int cpu;

trace_access_lock_init();

d_tracer = tracing_init_dentry();

trace_create_file("tracing_enabled", 0644, d_tracer,
Expand Down Expand Up @@ -4387,9 +4450,6 @@ __init static int tracer_alloc_buffers(void)
if (!alloc_cpumask_var(&tracing_cpumask, GFP_KERNEL))
goto out_free_buffer_mask;

if (!zalloc_cpumask_var(&tracing_reader_cpumask, GFP_KERNEL))
goto out_free_tracing_cpumask;

/* To save memory, keep the ring buffer size to its minimum */
if (ring_buffer_expanded)
ring_buf_size = trace_buf_size;
Expand Down Expand Up @@ -4447,8 +4507,6 @@ __init static int tracer_alloc_buffers(void)
return 0;

out_free_cpumask:
free_cpumask_var(tracing_reader_cpumask);
out_free_tracing_cpumask:
free_cpumask_var(tracing_cpumask);
out_free_buffer_mask:
free_cpumask_var(tracing_buffer_mask);
Expand Down

0 comments on commit 7e53bd4

Please sign in to comment.