Skip to content

Commit

Permalink
workqueue: make hotplug processing per-pool
Browse files Browse the repository at this point in the history
Instead of holding locks from both pools and then processing the pools
together, make hotplug processing per-pool - grab locks of one pool,
process it, release it and then proceed to the next pool.

rebind_workers() is updated to take and process @pool instead of @gcwq
which results in a lot of de-indentation.  gcwq_claim_assoc_and_lock()
and its counterpart are replaced with in-line per-pool locking.

While this patch changes processing order across pools, order within
each pool remains the same.  As each pool is independent, this
shouldn't break anything.

This is part of an effort to remove global_cwq and make worker_pool
the top level abstraction, which in turn will help implementing worker
pools with user-specified attributes.

Signed-off-by: Tejun Heo <tj@kernel.org>
Reviewed-by: Lai Jiangshan <laijs@cn.fujitsu.com>
  • Loading branch information
Tejun Heo committed Jan 24, 2013
1 parent d565ed6 commit 94cf58b
Showing 1 changed file with 62 additions and 87 deletions.
149 changes: 62 additions & 87 deletions kernel/workqueue.c
Original file line number Diff line number Diff line change
Expand Up @@ -1670,10 +1670,10 @@ static void busy_worker_rebind_fn(struct work_struct *work)
}

/**
* rebind_workers - rebind all workers of a gcwq to the associated CPU
* @gcwq: gcwq of interest
* rebind_workers - rebind all workers of a pool to the associated CPU
* @pool: pool of interest
*
* @gcwq->cpu is coming online. Rebind all workers to the CPU. Rebinding
* @pool->cpu is coming online. Rebind all workers to the CPU. Rebinding
* is different for idle and busy ones.
*
* Idle ones will be removed from the idle_list and woken up. They will
Expand All @@ -1691,60 +1691,53 @@ static void busy_worker_rebind_fn(struct work_struct *work)
* including the manager will not appear on @idle_list until rebind is
* complete, making local wake-ups safe.
*/
static void rebind_workers(struct global_cwq *gcwq)
static void rebind_workers(struct worker_pool *pool)
{
struct worker_pool *pool;
struct worker *worker, *n;
struct hlist_node *pos;
int i;

for_each_worker_pool(pool, gcwq) {
lockdep_assert_held(&pool->assoc_mutex);
lockdep_assert_held(&pool->lock);
}
lockdep_assert_held(&pool->assoc_mutex);
lockdep_assert_held(&pool->lock);

/* dequeue and kick idle ones */
for_each_worker_pool(pool, gcwq) {
list_for_each_entry_safe(worker, n, &pool->idle_list, entry) {
/*
* idle workers should be off @pool->idle_list
* until rebind is complete to avoid receiving
* premature local wake-ups.
*/
list_del_init(&worker->entry);
list_for_each_entry_safe(worker, n, &pool->idle_list, entry) {
/*
* idle workers should be off @pool->idle_list until rebind
* is complete to avoid receiving premature local wake-ups.
*/
list_del_init(&worker->entry);

/*
* worker_thread() will see the above dequeuing
* and call idle_worker_rebind().
*/
wake_up_process(worker->task);
}
/*
* worker_thread() will see the above dequeuing and call
* idle_worker_rebind().
*/
wake_up_process(worker->task);
}

/* rebind busy workers */
for_each_busy_worker(worker, i, pos, pool) {
struct work_struct *rebind_work = &worker->rebind_work;
struct workqueue_struct *wq;
/* rebind busy workers */
for_each_busy_worker(worker, i, pos, pool) {
struct work_struct *rebind_work = &worker->rebind_work;
struct workqueue_struct *wq;

if (test_and_set_bit(WORK_STRUCT_PENDING_BIT,
work_data_bits(rebind_work)))
continue;
if (test_and_set_bit(WORK_STRUCT_PENDING_BIT,
work_data_bits(rebind_work)))
continue;

debug_work_activate(rebind_work);
debug_work_activate(rebind_work);

/*
* wq doesn't really matter but let's keep
* @worker->pool and @cwq->pool consistent for
* sanity.
*/
if (std_worker_pool_pri(worker->pool))
wq = system_highpri_wq;
else
wq = system_wq;

insert_work(get_cwq(pool->cpu, wq), rebind_work,
worker->scheduled.next,
work_color_to_flags(WORK_NO_COLOR));
}
/*
* wq doesn't really matter but let's keep @worker->pool
* and @cwq->pool consistent for sanity.
*/
if (std_worker_pool_pri(worker->pool))
wq = system_highpri_wq;
else
wq = system_wq;

insert_work(get_cwq(pool->cpu, wq), rebind_work,
worker->scheduled.next,
work_color_to_flags(WORK_NO_COLOR));
}
}

Expand Down Expand Up @@ -3497,40 +3490,14 @@ EXPORT_SYMBOL_GPL(work_busy);
* are a lot of assumptions on strong associations among work, cwq and
* gcwq which make migrating pending and scheduled works very
* difficult to implement without impacting hot paths. Secondly,
* gcwqs serve mix of short, long and very long running works making
* worker pools serve mix of short, long and very long running works making
* blocked draining impractical.
*
* This is solved by allowing the pools to be disassociated from the CPU
* running as an unbound one and allowing it to be reattached later if the
* cpu comes back online.
*/

/* claim manager positions of all pools */
static void gcwq_claim_assoc_and_lock(struct global_cwq *gcwq)
{
struct worker_pool *pool;

for_each_worker_pool(pool, gcwq)
mutex_lock_nested(&pool->assoc_mutex, pool - gcwq->pools);

local_irq_disable();
for_each_worker_pool(pool, gcwq)
spin_lock_nested(&pool->lock, pool - gcwq->pools);
}

/* release manager positions */
static void gcwq_release_assoc_and_unlock(struct global_cwq *gcwq)
{
struct worker_pool *pool;

for_each_worker_pool(pool, gcwq)
spin_unlock(&pool->lock);
local_irq_enable();

for_each_worker_pool(pool, gcwq)
mutex_unlock(&pool->assoc_mutex);
}

static void gcwq_unbind_fn(struct work_struct *work)
{
struct global_cwq *gcwq = get_gcwq(smp_processor_id());
Expand All @@ -3539,27 +3506,30 @@ static void gcwq_unbind_fn(struct work_struct *work)
struct hlist_node *pos;
int i;

BUG_ON(gcwq->pools[0].cpu != smp_processor_id());
for_each_worker_pool(pool, gcwq) {
BUG_ON(pool->cpu != smp_processor_id());

gcwq_claim_assoc_and_lock(gcwq);
mutex_lock(&pool->assoc_mutex);
spin_lock_irq(&pool->lock);

/*
* We've claimed all manager positions. Make all workers unbound
* and set DISASSOCIATED. Before this, all workers except for the
* ones which are still executing works from before the last CPU
* down must be on the cpu. After this, they may become diasporas.
*/
for_each_worker_pool(pool, gcwq) {
/*
* We've claimed all manager positions. Make all workers
* unbound and set DISASSOCIATED. Before this, all workers
* except for the ones which are still executing works from
* before the last CPU down must be on the cpu. After
* this, they may become diasporas.
*/
list_for_each_entry(worker, &pool->idle_list, entry)
worker->flags |= WORKER_UNBOUND;

for_each_busy_worker(worker, i, pos, pool)
worker->flags |= WORKER_UNBOUND;

pool->flags |= POOL_DISASSOCIATED;
}

gcwq_release_assoc_and_unlock(gcwq);
spin_unlock_irq(&pool->lock);
mutex_unlock(&pool->assoc_mutex);
}

/*
* Call schedule() so that we cross rq->lock and thus can guarantee
Expand Down Expand Up @@ -3615,11 +3585,16 @@ static int __cpuinit workqueue_cpu_up_callback(struct notifier_block *nfb,

case CPU_DOWN_FAILED:
case CPU_ONLINE:
gcwq_claim_assoc_and_lock(gcwq);
for_each_worker_pool(pool, gcwq)
for_each_worker_pool(pool, gcwq) {
mutex_lock(&pool->assoc_mutex);
spin_lock_irq(&pool->lock);

pool->flags &= ~POOL_DISASSOCIATED;
rebind_workers(gcwq);
gcwq_release_assoc_and_unlock(gcwq);
rebind_workers(pool);

spin_unlock_irq(&pool->lock);
mutex_unlock(&pool->assoc_mutex);
}
break;
}
return NOTIFY_OK;
Expand Down

0 comments on commit 94cf58b

Please sign in to comment.