Skip to content

Commit

Permalink
---
Browse files Browse the repository at this point in the history
yaml
---
r: 206221
b: refs/heads/master
c: 502ca9d
h: refs/heads/master
i:
  206219: 10aa972
v: v3
  • Loading branch information
Tejun Heo committed Jun 29, 2010
1 parent e8ac394 commit 1a79673
Show file tree
Hide file tree
Showing 3 changed files with 104 additions and 39 deletions.
2 changes: 1 addition & 1 deletion [refs]
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
---
refs/heads/master: db7bccf45cb87522096b8f43144e31ca605a9f24
refs/heads/master: 502ca9d819792e7d79b6e002afe9094c641fe410
6 changes: 3 additions & 3 deletions trunk/include/linux/workqueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ static inline unsigned int work_static(struct work_struct *work) { return 0; }

enum {
WQ_FREEZEABLE = 1 << 0, /* freeze during suspend */
WQ_SINGLE_THREAD = 1 << 1, /* no per-cpu worker */
WQ_SINGLE_CPU = 1 << 1, /* only single cpu at a time */
};

extern struct workqueue_struct *
Expand Down Expand Up @@ -250,9 +250,9 @@ __create_workqueue_key(const char *name, unsigned int flags, int max_active,
#define create_workqueue(name) \
__create_workqueue((name), 0, 1)
#define create_freezeable_workqueue(name) \
__create_workqueue((name), WQ_FREEZEABLE | WQ_SINGLE_THREAD, 1)
__create_workqueue((name), WQ_FREEZEABLE | WQ_SINGLE_CPU, 1)
#define create_singlethread_workqueue(name) \
__create_workqueue((name), WQ_SINGLE_THREAD, 1)
__create_workqueue((name), WQ_SINGLE_CPU, 1)

extern void destroy_workqueue(struct workqueue_struct *wq);

Expand Down
135 changes: 100 additions & 35 deletions trunk/kernel/workqueue.c
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,7 @@ struct global_cwq {
} ____cacheline_aligned_in_smp;

/*
* The per-CPU workqueue (if single thread, we always use the first
* possible cpu). The lower WORK_STRUCT_FLAG_BITS of
* The per-CPU workqueue. The lower WORK_STRUCT_FLAG_BITS of
* work_struct->data are used for flags and thus cwqs need to be
* aligned at two's power of the number of flag bits.
*/
Expand Down Expand Up @@ -159,6 +158,8 @@ struct workqueue_struct {
struct list_head flusher_queue; /* F: flush waiters */
struct list_head flusher_overflow; /* F: flush overflow list */

unsigned long single_cpu; /* cpu for single cpu wq */

int saved_max_active; /* I: saved cwq max_active */
const char *name; /* I: workqueue name */
#ifdef CONFIG_LOCKDEP
Expand Down Expand Up @@ -289,8 +290,6 @@ static DEFINE_PER_CPU(struct global_cwq, global_cwq);

static int worker_thread(void *__worker);

static int singlethread_cpu __read_mostly;

static struct global_cwq *get_gcwq(unsigned int cpu)
{
return &per_cpu(global_cwq, cpu);
Expand All @@ -302,14 +301,6 @@ static struct cpu_workqueue_struct *get_cwq(unsigned int cpu,
return per_cpu_ptr(wq->cpu_wq, cpu);
}

static struct cpu_workqueue_struct *target_cwq(unsigned int cpu,
struct workqueue_struct *wq)
{
if (unlikely(wq->flags & WQ_SINGLE_THREAD))
cpu = singlethread_cpu;
return get_cwq(cpu, wq);
}

static unsigned int work_color_to_flags(int color)
{
return color << WORK_STRUCT_COLOR_SHIFT;
Expand Down Expand Up @@ -410,17 +401,87 @@ static void insert_work(struct cpu_workqueue_struct *cwq,
wake_up_process(cwq->worker->task);
}

/**
* cwq_unbind_single_cpu - unbind cwq from single cpu workqueue processing
* @cwq: cwq to unbind
*
* Try to unbind @cwq from single cpu workqueue processing. If
* @cwq->wq is frozen, unbind is delayed till the workqueue is thawed.
*
* CONTEXT:
* spin_lock_irq(gcwq->lock).
*/
static void cwq_unbind_single_cpu(struct cpu_workqueue_struct *cwq)
{
struct workqueue_struct *wq = cwq->wq;
struct global_cwq *gcwq = cwq->gcwq;

BUG_ON(wq->single_cpu != gcwq->cpu);
/*
* Unbind from workqueue if @cwq is not frozen. If frozen,
* thaw_workqueues() will either restart processing on this
* cpu or unbind if empty. This keeps works queued while
* frozen fully ordered and flushable.
*/
if (likely(!(gcwq->flags & GCWQ_FREEZING))) {
smp_wmb(); /* paired with cmpxchg() in __queue_work() */
wq->single_cpu = NR_CPUS;
}
}

static void __queue_work(unsigned int cpu, struct workqueue_struct *wq,
struct work_struct *work)
{
struct cpu_workqueue_struct *cwq = target_cwq(cpu, wq);
struct global_cwq *gcwq = cwq->gcwq;
struct global_cwq *gcwq;
struct cpu_workqueue_struct *cwq;
struct list_head *worklist;
unsigned long flags;
bool arbitrate;

debug_work_activate(work);

spin_lock_irqsave(&gcwq->lock, flags);
/* determine gcwq to use */
if (!(wq->flags & WQ_SINGLE_CPU)) {
/* just use the requested cpu for multicpu workqueues */
gcwq = get_gcwq(cpu);
spin_lock_irqsave(&gcwq->lock, flags);
} else {
unsigned int req_cpu = cpu;

/*
* It's a bit more complex for single cpu workqueues.
* We first need to determine which cpu is going to be
* used. If no cpu is currently serving this
* workqueue, arbitrate using atomic accesses to
* wq->single_cpu; otherwise, use the current one.
*/
retry:
cpu = wq->single_cpu;
arbitrate = cpu == NR_CPUS;
if (arbitrate)
cpu = req_cpu;

gcwq = get_gcwq(cpu);
spin_lock_irqsave(&gcwq->lock, flags);

/*
* The following cmpxchg() is a full barrier paired
* with smp_wmb() in cwq_unbind_single_cpu() and
* guarantees that all changes to wq->st_* fields are
* visible on the new cpu after this point.
*/
if (arbitrate)
cmpxchg(&wq->single_cpu, NR_CPUS, cpu);

if (unlikely(wq->single_cpu != cpu)) {
spin_unlock_irqrestore(&gcwq->lock, flags);
goto retry;
}
}

/* gcwq determined, get cwq and queue */
cwq = get_cwq(gcwq->cpu, wq);

BUG_ON(!list_empty(&work->entry));

cwq->nr_in_flight[cwq->work_color]++;
Expand Down Expand Up @@ -530,7 +591,7 @@ int queue_delayed_work_on(int cpu, struct workqueue_struct *wq,
timer_stats_timer_set_start_info(&dwork->timer);

/* This stores cwq for the moment, for the timer_fn */
set_wq_data(work, target_cwq(raw_smp_processor_id(), wq), 0);
set_wq_data(work, get_cwq(raw_smp_processor_id(), wq), 0);
timer->expires = jiffies + delay;
timer->data = (unsigned long)dwork;
timer->function = delayed_work_timer_fn;
Expand Down Expand Up @@ -790,10 +851,14 @@ static void cwq_dec_nr_in_flight(struct cpu_workqueue_struct *cwq, int color)
cwq->nr_in_flight[color]--;
cwq->nr_active--;

/* one down, submit a delayed one */
if (!list_empty(&cwq->delayed_works) &&
cwq->nr_active < cwq->max_active)
cwq_activate_first_delayed(cwq);
if (!list_empty(&cwq->delayed_works)) {
/* one down, submit a delayed one */
if (cwq->nr_active < cwq->max_active)
cwq_activate_first_delayed(cwq);
} else if (!cwq->nr_active && cwq->wq->flags & WQ_SINGLE_CPU) {
/* this was the last work, unbind from single cpu */
cwq_unbind_single_cpu(cwq);
}

/* is flush in progress and are we at the flushing tip? */
if (likely(cwq->flush_color != color))
Expand Down Expand Up @@ -1727,7 +1792,6 @@ struct workqueue_struct *__create_workqueue_key(const char *name,
struct lock_class_key *key,
const char *lock_name)
{
bool singlethread = flags & WQ_SINGLE_THREAD;
struct workqueue_struct *wq;
bool failed = false;
unsigned int cpu;
Expand All @@ -1748,6 +1812,8 @@ struct workqueue_struct *__create_workqueue_key(const char *name,
atomic_set(&wq->nr_cwqs_to_flush, 0);
INIT_LIST_HEAD(&wq->flusher_queue);
INIT_LIST_HEAD(&wq->flusher_overflow);
wq->single_cpu = NR_CPUS;

wq->name = name;
lockdep_init_map(&wq->lockdep_map, lock_name, key, 0);
INIT_LIST_HEAD(&wq->list);
Expand All @@ -1773,8 +1839,7 @@ struct workqueue_struct *__create_workqueue_key(const char *name,

if (failed)
continue;
cwq->worker = create_worker(cwq,
cpu_online(cpu) && !singlethread);
cwq->worker = create_worker(cwq, cpu_online(cpu));
if (cwq->worker)
start_worker(cwq->worker);
else
Expand Down Expand Up @@ -1958,18 +2023,16 @@ static int __cpuinit trustee_thread(void *__gcwq)

spin_lock_irq(&gcwq->lock);
/*
* Make all multithread workers rogue. Trustee must be bound
* to the target cpu and can't be cancelled.
* Make all workers rogue. Trustee must be bound to the
* target cpu and can't be cancelled.
*/
BUG_ON(gcwq->cpu != smp_processor_id());

list_for_each_entry(worker, &gcwq->idle_list, entry)
if (!(worker->cwq->wq->flags & WQ_SINGLE_THREAD))
worker->flags |= WORKER_ROGUE;
worker->flags |= WORKER_ROGUE;

for_each_busy_worker(worker, i, pos, gcwq)
if (!(worker->cwq->wq->flags & WQ_SINGLE_THREAD))
worker->flags |= WORKER_ROGUE;
worker->flags |= WORKER_ROGUE;

/*
* We're now in charge. Notify and proceed to drain. We need
Expand Down Expand Up @@ -2074,14 +2137,12 @@ static int __devinit workqueue_cpu_callback(struct notifier_block *nfb,
wait_trustee_state(gcwq, TRUSTEE_DONE);
}

/* clear ROGUE from all multithread workers */
/* clear ROGUE from all workers */
list_for_each_entry(worker, &gcwq->idle_list, entry)
if (!(worker->cwq->wq->flags & WQ_SINGLE_THREAD))
worker->flags &= ~WORKER_ROGUE;
worker->flags &= ~WORKER_ROGUE;

for_each_busy_worker(worker, i, pos, gcwq)
if (!(worker->cwq->wq->flags & WQ_SINGLE_THREAD))
worker->flags &= ~WORKER_ROGUE;
worker->flags &= ~WORKER_ROGUE;
break;
}

Expand Down Expand Up @@ -2266,6 +2327,11 @@ void thaw_workqueues(void)
cwq->nr_active < cwq->max_active)
cwq_activate_first_delayed(cwq);

/* perform delayed unbind from single cpu if empty */
if (wq->single_cpu == gcwq->cpu &&
!cwq->nr_active && list_empty(&cwq->delayed_works))
cwq_unbind_single_cpu(cwq);

wake_up_process(cwq->worker->task);
}

Expand All @@ -2283,7 +2349,6 @@ void __init init_workqueues(void)
unsigned int cpu;
int i;

singlethread_cpu = cpumask_first(cpu_possible_mask);
hotcpu_notifier(workqueue_cpu_callback, CPU_PRI_WORKQUEUE);

/* initialize gcwqs */
Expand Down

0 comments on commit 1a79673

Please sign in to comment.