Skip to content

Commit

Permalink
---
Browse files Browse the repository at this point in the history
yaml
---
r: 206223
b: refs/heads/master
c: 7a22ad7
h: refs/heads/master
i:
  206221: 1a79673
  206219: 10aa972
  206215: 158b892
  206207: 935629f
v: v3
  • Loading branch information
Tejun Heo committed Jun 29, 2010
1 parent fb73bdd commit bea4476
Show file tree
Hide file tree
Showing 3 changed files with 110 additions and 62 deletions.
2 changes: 1 addition & 1 deletion [refs]
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
---
refs/heads/master: 8cca0eea3964b72b14e8c3f88e3a40bef7b9113e
refs/heads/master: 7a22ad757ec75186ad43a5b4670fa7423ee8f480
7 changes: 5 additions & 2 deletions trunk/include/linux/workqueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include <linux/linkage.h>
#include <linux/bitops.h>
#include <linux/lockdep.h>
#include <linux/threads.h>
#include <asm/atomic.h>

struct workqueue_struct;
Expand Down Expand Up @@ -59,6 +60,7 @@ enum {

WORK_STRUCT_FLAG_MASK = (1UL << WORK_STRUCT_FLAG_BITS) - 1,
WORK_STRUCT_WQ_DATA_MASK = ~WORK_STRUCT_FLAG_MASK,
WORK_STRUCT_NO_CPU = NR_CPUS << WORK_STRUCT_FLAG_BITS,
};

struct work_struct {
Expand All @@ -70,8 +72,9 @@ struct work_struct {
#endif
};

#define WORK_DATA_INIT() ATOMIC_LONG_INIT(0)
#define WORK_DATA_STATIC_INIT() ATOMIC_LONG_INIT(WORK_STRUCT_STATIC)
#define WORK_DATA_INIT() ATOMIC_LONG_INIT(WORK_STRUCT_NO_CPU)
#define WORK_DATA_STATIC_INIT() \
ATOMIC_LONG_INIT(WORK_STRUCT_NO_CPU | WORK_STRUCT_STATIC)

struct delayed_work {
struct work_struct work;
Expand Down
163 changes: 104 additions & 59 deletions trunk/kernel/workqueue.c
Original file line number Diff line number Diff line change
Expand Up @@ -319,31 +319,71 @@ static int work_next_color(int color)
}

/*
* Set the workqueue on which a work item is to be run
* - Must *only* be called if the pending flag is set
* Work data points to the cwq while a work is on queue. Once
* execution starts, it points to the cpu the work was last on. This
* can be distinguished by comparing the data value against
* PAGE_OFFSET.
*
* set_work_{cwq|cpu}() and clear_work_data() can be used to set the
* cwq, cpu or clear work->data. These functions should only be
* called while the work is owned - ie. while the PENDING bit is set.
*
* get_work_[g]cwq() can be used to obtain the gcwq or cwq
* corresponding to a work. gcwq is available once the work has been
* queued anywhere after initialization. cwq is available only from
* queueing until execution starts.
*/
static inline void set_wq_data(struct work_struct *work,
struct cpu_workqueue_struct *cwq,
unsigned long extra_flags)
static inline void set_work_data(struct work_struct *work, unsigned long data,
unsigned long flags)
{
BUG_ON(!work_pending(work));
atomic_long_set(&work->data, data | flags | work_static(work));
}

atomic_long_set(&work->data, (unsigned long)cwq | work_static(work) |
WORK_STRUCT_PENDING | extra_flags);
static void set_work_cwq(struct work_struct *work,
struct cpu_workqueue_struct *cwq,
unsigned long extra_flags)
{
set_work_data(work, (unsigned long)cwq,
WORK_STRUCT_PENDING | extra_flags);
}

/*
* Clear WORK_STRUCT_PENDING and the workqueue on which it was queued.
*/
static inline void clear_wq_data(struct work_struct *work)
static void set_work_cpu(struct work_struct *work, unsigned int cpu)
{
set_work_data(work, cpu << WORK_STRUCT_FLAG_BITS, WORK_STRUCT_PENDING);
}

static void clear_work_data(struct work_struct *work)
{
set_work_data(work, WORK_STRUCT_NO_CPU, 0);
}

static inline unsigned long get_work_data(struct work_struct *work)
{
return atomic_long_read(&work->data) & WORK_STRUCT_WQ_DATA_MASK;
}

static struct cpu_workqueue_struct *get_work_cwq(struct work_struct *work)
{
atomic_long_set(&work->data, work_static(work));
unsigned long data = get_work_data(work);

return data >= PAGE_OFFSET ? (void *)data : NULL;
}

static inline struct cpu_workqueue_struct *get_wq_data(struct work_struct *work)
static struct global_cwq *get_work_gcwq(struct work_struct *work)
{
return (void *)(atomic_long_read(&work->data) &
WORK_STRUCT_WQ_DATA_MASK);
unsigned long data = get_work_data(work);
unsigned int cpu;

if (data >= PAGE_OFFSET)
return ((struct cpu_workqueue_struct *)data)->gcwq;

cpu = data >> WORK_STRUCT_FLAG_BITS;
if (cpu == NR_CPUS)
return NULL;

BUG_ON(cpu >= num_possible_cpus());
return get_gcwq(cpu);
}

/**
Expand Down Expand Up @@ -443,7 +483,7 @@ static void insert_work(struct cpu_workqueue_struct *cwq,
unsigned int extra_flags)
{
/* we own @work, set data and link */
set_wq_data(work, cwq, extra_flags);
set_work_cwq(work, cwq, extra_flags);

/*
* Ensure that we get the right work->data if we see the
Expand Down Expand Up @@ -599,7 +639,7 @@ EXPORT_SYMBOL_GPL(queue_work_on);
static void delayed_work_timer_fn(unsigned long __data)
{
struct delayed_work *dwork = (struct delayed_work *)__data;
struct cpu_workqueue_struct *cwq = get_wq_data(&dwork->work);
struct cpu_workqueue_struct *cwq = get_work_cwq(&dwork->work);

__queue_work(smp_processor_id(), cwq->wq, &dwork->work);
}
Expand Down Expand Up @@ -639,13 +679,19 @@ int queue_delayed_work_on(int cpu, struct workqueue_struct *wq,
struct work_struct *work = &dwork->work;

if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work))) {
struct global_cwq *gcwq = get_work_gcwq(work);
unsigned int lcpu = gcwq ? gcwq->cpu : raw_smp_processor_id();

BUG_ON(timer_pending(timer));
BUG_ON(!list_empty(&work->entry));

timer_stats_timer_set_start_info(&dwork->timer);

/* This stores cwq for the moment, for the timer_fn */
set_wq_data(work, get_cwq(raw_smp_processor_id(), wq), 0);
/*
* This stores cwq for the moment, for the timer_fn.
* Note that the work's gcwq is preserved to allow
* reentrance detection for delayed works.
*/
set_work_cwq(work, get_cwq(lcpu, wq), 0);
timer->expires = jiffies + delay;
timer->data = (unsigned long)dwork;
timer->function = delayed_work_timer_fn;
Expand Down Expand Up @@ -970,11 +1016,14 @@ static void process_one_work(struct worker *worker, struct work_struct *work)
worker->current_work = work;
worker->current_cwq = cwq;
work_color = get_work_color(work);

BUG_ON(get_work_cwq(work) != cwq);
/* record the current cpu number in the work data and dequeue */
set_work_cpu(work, gcwq->cpu);
list_del_init(&work->entry);

spin_unlock_irq(&gcwq->lock);

BUG_ON(get_wq_data(work) != cwq);
work_clear_pending(work);
lock_map_acquire(&cwq->wq->lockdep_map);
lock_map_acquire(&lockdep_map);
Expand Down Expand Up @@ -1406,37 +1455,39 @@ EXPORT_SYMBOL_GPL(flush_workqueue);
int flush_work(struct work_struct *work)
{
struct worker *worker = NULL;
struct cpu_workqueue_struct *cwq;
struct global_cwq *gcwq;
struct cpu_workqueue_struct *cwq;
struct wq_barrier barr;

might_sleep();
cwq = get_wq_data(work);
if (!cwq)
gcwq = get_work_gcwq(work);
if (!gcwq)
return 0;
gcwq = cwq->gcwq;

lock_map_acquire(&cwq->wq->lockdep_map);
lock_map_release(&cwq->wq->lockdep_map);

spin_lock_irq(&gcwq->lock);
if (!list_empty(&work->entry)) {
/*
* See the comment near try_to_grab_pending()->smp_rmb().
* If it was re-queued under us we are not going to wait.
* If it was re-queued to a different gcwq under us, we
* are not going to wait.
*/
smp_rmb();
if (unlikely(cwq != get_wq_data(work)))
cwq = get_work_cwq(work);
if (unlikely(!cwq || gcwq != cwq->gcwq))
goto already_gone;
} else {
if (cwq->worker && cwq->worker->current_work == work)
worker = cwq->worker;
worker = find_worker_executing_work(gcwq, work);
if (!worker)
goto already_gone;
cwq = worker->current_cwq;
}

insert_wq_barrier(cwq, &barr, work, worker);
spin_unlock_irq(&gcwq->lock);

lock_map_acquire(&cwq->wq->lockdep_map);
lock_map_release(&cwq->wq->lockdep_map);

wait_for_completion(&barr.done);
destroy_work_on_stack(&barr.work);
return 1;
Expand All @@ -1453,7 +1504,6 @@ EXPORT_SYMBOL_GPL(flush_work);
static int try_to_grab_pending(struct work_struct *work)
{
struct global_cwq *gcwq;
struct cpu_workqueue_struct *cwq;
int ret = -1;

if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work)))
Expand All @@ -1463,24 +1513,23 @@ static int try_to_grab_pending(struct work_struct *work)
* The queueing is in progress, or it is already queued. Try to
* steal it from ->worklist without clearing WORK_STRUCT_PENDING.
*/

cwq = get_wq_data(work);
if (!cwq)
gcwq = get_work_gcwq(work);
if (!gcwq)
return ret;
gcwq = cwq->gcwq;

spin_lock_irq(&gcwq->lock);
if (!list_empty(&work->entry)) {
/*
* This work is queued, but perhaps we locked the wrong cwq.
* This work is queued, but perhaps we locked the wrong gcwq.
* In that case we must see the new value after rmb(), see
* insert_work()->wmb().
*/
smp_rmb();
if (cwq == get_wq_data(work)) {
if (gcwq == get_work_gcwq(work)) {
debug_work_deactivate(work);
list_del_init(&work->entry);
cwq_dec_nr_in_flight(cwq, get_work_color(work));
cwq_dec_nr_in_flight(get_work_cwq(work),
get_work_color(work));
ret = 1;
}
}
Expand All @@ -1489,20 +1538,16 @@ static int try_to_grab_pending(struct work_struct *work)
return ret;
}

static void wait_on_cpu_work(struct cpu_workqueue_struct *cwq,
struct work_struct *work)
static void wait_on_cpu_work(struct global_cwq *gcwq, struct work_struct *work)
{
struct global_cwq *gcwq = cwq->gcwq;
struct wq_barrier barr;
struct worker *worker;

spin_lock_irq(&gcwq->lock);

worker = NULL;
if (unlikely(cwq->worker && cwq->worker->current_work == work)) {
worker = cwq->worker;
insert_wq_barrier(cwq, &barr, work, worker);
}
worker = find_worker_executing_work(gcwq, work);
if (unlikely(worker))
insert_wq_barrier(worker->current_cwq, &barr, work, worker);

spin_unlock_irq(&gcwq->lock);

Expand All @@ -1514,23 +1559,15 @@ static void wait_on_cpu_work(struct cpu_workqueue_struct *cwq,

static void wait_on_work(struct work_struct *work)
{
struct cpu_workqueue_struct *cwq;
struct workqueue_struct *wq;
int cpu;

might_sleep();

lock_map_acquire(&work->lockdep_map);
lock_map_release(&work->lockdep_map);

cwq = get_wq_data(work);
if (!cwq)
return;

wq = cwq->wq;

for_each_possible_cpu(cpu)
wait_on_cpu_work(get_cwq(cpu, wq), work);
wait_on_cpu_work(get_gcwq(cpu), work);
}

static int __cancel_work_timer(struct work_struct *work,
Expand All @@ -1545,7 +1582,7 @@ static int __cancel_work_timer(struct work_struct *work,
wait_on_work(work);
} while (unlikely(ret < 0));

clear_wq_data(work);
clear_work_data(work);
return ret;
}

Expand Down Expand Up @@ -1647,7 +1684,7 @@ EXPORT_SYMBOL(schedule_delayed_work);
void flush_delayed_work(struct delayed_work *dwork)
{
if (del_timer_sync(&dwork->timer)) {
__queue_work(get_cpu(), get_wq_data(&dwork->work)->wq,
__queue_work(get_cpu(), get_work_cwq(&dwork->work)->wq,
&dwork->work);
put_cpu();
}
Expand Down Expand Up @@ -2405,6 +2442,14 @@ void __init init_workqueues(void)
unsigned int cpu;
int i;

/*
* The pointer part of work->data is either pointing to the
* cwq or contains the cpu number the work ran last on. Make
* sure cpu number won't overflow into kernel pointer area so
* that they can be distinguished.
*/
BUILD_BUG_ON(NR_CPUS << WORK_STRUCT_FLAG_BITS >= PAGE_OFFSET);

hotcpu_notifier(workqueue_cpu_callback, CPU_PRI_WORKQUEUE);

/* initialize gcwqs */
Expand Down

0 comments on commit bea4476

Please sign in to comment.