Skip to content

Commit

Permalink
Btrfs: Allow worker threads to exit when idle
Browse files Browse the repository at this point in the history
The Btrfs worker threads don't currently die off after they have
been idle for a while, leading to a lot of threads sitting around
doing nothing for each mount.

Also, they are unable to start atomically (from end_io hanlders).

This commit reworks the worker threads so they can be started
from end_io handlers (just setting a flag that asks for a thread
to be added at a later date) and so they can exit if they
have been idle for a long time.

Signed-off-by: Chris Mason <chris.mason@oracle.com>
  • Loading branch information
Chris Mason committed Sep 11, 2009
1 parent ceab36e commit 9042846
Show file tree
Hide file tree
Showing 3 changed files with 132 additions and 32 deletions.
133 changes: 111 additions & 22 deletions fs/btrfs/async-thread.c
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ struct btrfs_worker_thread {
/* number of things on the pending list */
atomic_t num_pending;

/* reference counter for this struct */
atomic_t refs;

unsigned long sequence;

/* protects the pending list. */
Expand Down Expand Up @@ -93,6 +96,31 @@ static void check_busy_worker(struct btrfs_worker_thread *worker)
}
}

static void check_pending_worker_creates(struct btrfs_worker_thread *worker)
{
struct btrfs_workers *workers = worker->workers;
unsigned long flags;

rmb();
if (!workers->atomic_start_pending)
return;

spin_lock_irqsave(&workers->lock, flags);
if (!workers->atomic_start_pending)
goto out;

workers->atomic_start_pending = 0;
if (workers->num_workers >= workers->max_workers)
goto out;

spin_unlock_irqrestore(&workers->lock, flags);
btrfs_start_workers(workers, 1);
return;

out:
spin_unlock_irqrestore(&workers->lock, flags);
}

static noinline int run_ordered_completions(struct btrfs_workers *workers,
struct btrfs_work *work)
{
Expand Down Expand Up @@ -140,6 +168,36 @@ static noinline int run_ordered_completions(struct btrfs_workers *workers,
return 0;
}

static void put_worker(struct btrfs_worker_thread *worker)
{
if (atomic_dec_and_test(&worker->refs))
kfree(worker);
}

static int try_worker_shutdown(struct btrfs_worker_thread *worker)
{
int freeit = 0;

spin_lock_irq(&worker->lock);
spin_lock_irq(&worker->workers->lock);
if (worker->workers->num_workers > 1 &&
worker->idle &&
!worker->working &&
!list_empty(&worker->worker_list) &&
list_empty(&worker->prio_pending) &&
list_empty(&worker->pending)) {
freeit = 1;
list_del_init(&worker->worker_list);
worker->workers->num_workers--;
}
spin_unlock_irq(&worker->workers->lock);
spin_unlock_irq(&worker->lock);

if (freeit)
put_worker(worker);
return freeit;
}

/*
* main loop for servicing work items
*/
Expand Down Expand Up @@ -175,6 +233,8 @@ static int worker_loop(void *arg)
*/
run_ordered_completions(worker->workers, work);

check_pending_worker_creates(worker);

spin_lock_irq(&worker->lock);
check_idle_worker(worker);
}
Expand Down Expand Up @@ -226,8 +286,13 @@ static int worker_loop(void *arg)
worker->working = 0;
spin_unlock_irq(&worker->lock);

if (!kthread_should_stop())
schedule();
if (!kthread_should_stop()) {
schedule_timeout(HZ * 120);
if (!worker->working &&
try_worker_shutdown(worker)) {
return 0;
}
}
}
__set_current_state(TASK_RUNNING);
}
Expand All @@ -242,16 +307,30 @@ int btrfs_stop_workers(struct btrfs_workers *workers)
{
struct list_head *cur;
struct btrfs_worker_thread *worker;
int can_stop;

spin_lock_irq(&workers->lock);
list_splice_init(&workers->idle_list, &workers->worker_list);
while (!list_empty(&workers->worker_list)) {
cur = workers->worker_list.next;
worker = list_entry(cur, struct btrfs_worker_thread,
worker_list);
kthread_stop(worker->task);
list_del(&worker->worker_list);
kfree(worker);

atomic_inc(&worker->refs);
workers->num_workers -= 1;
if (!list_empty(&worker->worker_list)) {
list_del_init(&worker->worker_list);
put_worker(worker);
can_stop = 1;
} else
can_stop = 0;
spin_unlock_irq(&workers->lock);
if (can_stop)
kthread_stop(worker->task);
spin_lock_irq(&workers->lock);
put_worker(worker);
}
spin_unlock_irq(&workers->lock);
return 0;
}

Expand All @@ -270,6 +349,8 @@ void btrfs_init_workers(struct btrfs_workers *workers, char *name, int max)
workers->idle_thresh = 32;
workers->name = name;
workers->ordered = 0;
workers->atomic_start_pending = 0;
workers->atomic_worker_start = 0;
}

/*
Expand All @@ -294,6 +375,7 @@ int btrfs_start_workers(struct btrfs_workers *workers, int num_workers)
INIT_LIST_HEAD(&worker->worker_list);
spin_lock_init(&worker->lock);
atomic_set(&worker->num_pending, 0);
atomic_set(&worker->refs, 1);
worker->workers = workers;
worker->task = kthread_run(worker_loop, worker,
"btrfs-%s-%d", workers->name,
Expand All @@ -303,7 +385,6 @@ int btrfs_start_workers(struct btrfs_workers *workers, int num_workers)
kfree(worker);
goto fail;
}

spin_lock_irq(&workers->lock);
list_add_tail(&worker->worker_list, &workers->idle_list);
worker->idle = 1;
Expand Down Expand Up @@ -367,6 +448,7 @@ static struct btrfs_worker_thread *find_worker(struct btrfs_workers *workers)
{
struct btrfs_worker_thread *worker;
unsigned long flags;
struct list_head *fallback;

again:
spin_lock_irqsave(&workers->lock, flags);
Expand All @@ -376,19 +458,10 @@ static struct btrfs_worker_thread *find_worker(struct btrfs_workers *workers)
if (!worker) {
spin_lock_irqsave(&workers->lock, flags);
if (workers->num_workers >= workers->max_workers) {
struct list_head *fallback = NULL;
/*
* we have failed to find any workers, just
* return the force one
*/
if (!list_empty(&workers->worker_list))
fallback = workers->worker_list.next;
if (!list_empty(&workers->idle_list))
fallback = workers->idle_list.next;
BUG_ON(!fallback);
worker = list_entry(fallback,
struct btrfs_worker_thread, worker_list);
spin_unlock_irqrestore(&workers->lock, flags);
goto fallback;
} else if (workers->atomic_worker_start) {
workers->atomic_start_pending = 1;
goto fallback;
} else {
spin_unlock_irqrestore(&workers->lock, flags);
/* we're below the limit, start another worker */
Expand All @@ -397,6 +470,22 @@ static struct btrfs_worker_thread *find_worker(struct btrfs_workers *workers)
}
}
return worker;

fallback:
fallback = NULL;
/*
* we have failed to find any workers, just
* return the first one we can find.
*/
if (!list_empty(&workers->worker_list))
fallback = workers->worker_list.next;
if (!list_empty(&workers->idle_list))
fallback = workers->idle_list.next;
BUG_ON(!fallback);
worker = list_entry(fallback,
struct btrfs_worker_thread, worker_list);
spin_unlock_irqrestore(&workers->lock, flags);
return worker;
}

/*
Expand Down Expand Up @@ -435,9 +524,9 @@ int btrfs_requeue_work(struct btrfs_work *work)
worker->working = 1;
}

spin_unlock_irqrestore(&worker->lock, flags);
if (wake)
wake_up_process(worker->task);
spin_unlock_irqrestore(&worker->lock, flags);
out:

return 0;
Expand Down Expand Up @@ -492,10 +581,10 @@ int btrfs_queue_worker(struct btrfs_workers *workers, struct btrfs_work *work)
wake = 1;
worker->working = 1;

spin_unlock_irqrestore(&worker->lock, flags);

if (wake)
wake_up_process(worker->task);
spin_unlock_irqrestore(&worker->lock, flags);

out:
return 0;
}
9 changes: 9 additions & 0 deletions fs/btrfs/async-thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,15 @@ struct btrfs_workers {
/* force completions in the order they were queued */
int ordered;

/* more workers required, but in an interrupt handler */
int atomic_start_pending;

/*
* are we allowed to sleep while starting workers or are we required
* to start them at a later time?
*/
int atomic_worker_start;

/* list with all the work threads. The workers on the idle thread
* may be actively servicing jobs, but they haven't yet hit the
* idle thresh limit above.
Expand Down
22 changes: 12 additions & 10 deletions fs/btrfs/disk-io.c
Original file line number Diff line number Diff line change
Expand Up @@ -1682,7 +1682,7 @@ struct btrfs_root *open_ctree(struct super_block *sb,
err = -EINVAL;
goto fail_iput;
}

printk("thread pool is %d\n", fs_info->thread_pool_size);
/*
* we need to start all the end_io workers up front because the
* queue work function gets called at interrupt time, and so it
Expand Down Expand Up @@ -1727,20 +1727,22 @@ struct btrfs_root *open_ctree(struct super_block *sb,
fs_info->endio_workers.idle_thresh = 4;
fs_info->endio_meta_workers.idle_thresh = 4;

fs_info->endio_write_workers.idle_thresh = 64;
fs_info->endio_meta_write_workers.idle_thresh = 64;
fs_info->endio_write_workers.idle_thresh = 2;
fs_info->endio_meta_write_workers.idle_thresh = 2;

fs_info->endio_workers.atomic_worker_start = 1;
fs_info->endio_meta_workers.atomic_worker_start = 1;
fs_info->endio_write_workers.atomic_worker_start = 1;
fs_info->endio_meta_write_workers.atomic_worker_start = 1;

btrfs_start_workers(&fs_info->workers, 1);
btrfs_start_workers(&fs_info->submit_workers, 1);
btrfs_start_workers(&fs_info->delalloc_workers, 1);
btrfs_start_workers(&fs_info->fixup_workers, 1);
btrfs_start_workers(&fs_info->endio_workers, fs_info->thread_pool_size);
btrfs_start_workers(&fs_info->endio_meta_workers,
fs_info->thread_pool_size);
btrfs_start_workers(&fs_info->endio_meta_write_workers,
fs_info->thread_pool_size);
btrfs_start_workers(&fs_info->endio_write_workers,
fs_info->thread_pool_size);
btrfs_start_workers(&fs_info->endio_workers, 1);
btrfs_start_workers(&fs_info->endio_meta_workers, 1);
btrfs_start_workers(&fs_info->endio_meta_write_workers, 1);
btrfs_start_workers(&fs_info->endio_write_workers, 1);

fs_info->bdi.ra_pages *= btrfs_super_num_devices(disk_super);
fs_info->bdi.ra_pages = max(fs_info->bdi.ra_pages,
Expand Down

0 comments on commit 9042846

Please sign in to comment.