Skip to content

Commit

Permalink
Merge branch 'master' of git://git.kernel.org/pub/scm/linux/kernel/gi…
Browse files Browse the repository at this point in the history
…t/mason/btrfs-unstable
  • Loading branch information
Chris Mason committed Sep 11, 2009
2 parents 74fca6a + 93c82d5 commit 83ebade
Show file tree
Hide file tree
Showing 18 changed files with 580 additions and 317 deletions.
230 changes: 184 additions & 46 deletions fs/btrfs/async-thread.c
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ struct btrfs_worker_thread {
/* number of things on the pending list */
atomic_t num_pending;

/* reference counter for this struct */
atomic_t refs;

unsigned long sequence;

/* protects the pending list. */
Expand Down Expand Up @@ -93,17 +96,40 @@ static void check_busy_worker(struct btrfs_worker_thread *worker)
}
}

static noinline int run_ordered_completions(struct btrfs_workers *workers,
struct btrfs_work *work)
static void check_pending_worker_creates(struct btrfs_worker_thread *worker)
{
struct btrfs_workers *workers = worker->workers;
unsigned long flags;

rmb();
if (!workers->atomic_start_pending)
return;

spin_lock_irqsave(&workers->lock, flags);
if (!workers->atomic_start_pending)
goto out;

workers->atomic_start_pending = 0;
if (workers->num_workers >= workers->max_workers)
goto out;

spin_unlock_irqrestore(&workers->lock, flags);
btrfs_start_workers(workers, 1);
return;

out:
spin_unlock_irqrestore(&workers->lock, flags);
}

static noinline int run_ordered_completions(struct btrfs_workers *workers,
struct btrfs_work *work)
{
if (!workers->ordered)
return 0;

set_bit(WORK_DONE_BIT, &work->flags);

spin_lock_irqsave(&workers->lock, flags);
spin_lock(&workers->order_lock);

while (1) {
if (!list_empty(&workers->prio_order_list)) {
Expand All @@ -126,45 +152,117 @@ static noinline int run_ordered_completions(struct btrfs_workers *workers,
if (test_and_set_bit(WORK_ORDER_DONE_BIT, &work->flags))
break;

spin_unlock_irqrestore(&workers->lock, flags);
spin_unlock(&workers->order_lock);

work->ordered_func(work);

/* now take the lock again and call the freeing code */
spin_lock_irqsave(&workers->lock, flags);
spin_lock(&workers->order_lock);
list_del(&work->order_list);
work->ordered_free(work);
}

spin_unlock_irqrestore(&workers->lock, flags);
spin_unlock(&workers->order_lock);
return 0;
}

static void put_worker(struct btrfs_worker_thread *worker)
{
if (atomic_dec_and_test(&worker->refs))
kfree(worker);
}

static int try_worker_shutdown(struct btrfs_worker_thread *worker)
{
int freeit = 0;

spin_lock_irq(&worker->lock);
spin_lock_irq(&worker->workers->lock);
if (worker->workers->num_workers > 1 &&
worker->idle &&
!worker->working &&
!list_empty(&worker->worker_list) &&
list_empty(&worker->prio_pending) &&
list_empty(&worker->pending)) {
freeit = 1;
list_del_init(&worker->worker_list);
worker->workers->num_workers--;
}
spin_unlock_irq(&worker->workers->lock);
spin_unlock_irq(&worker->lock);

if (freeit)
put_worker(worker);
return freeit;
}

static struct btrfs_work *get_next_work(struct btrfs_worker_thread *worker,
struct list_head *prio_head,
struct list_head *head)
{
struct btrfs_work *work = NULL;
struct list_head *cur = NULL;

if(!list_empty(prio_head))
cur = prio_head->next;

smp_mb();
if (!list_empty(&worker->prio_pending))
goto refill;

if (!list_empty(head))
cur = head->next;

if (cur)
goto out;

refill:
spin_lock_irq(&worker->lock);
list_splice_tail_init(&worker->prio_pending, prio_head);
list_splice_tail_init(&worker->pending, head);

if (!list_empty(prio_head))
cur = prio_head->next;
else if (!list_empty(head))
cur = head->next;
spin_unlock_irq(&worker->lock);

if (!cur)
goto out_fail;

out:
work = list_entry(cur, struct btrfs_work, list);

out_fail:
return work;
}

/*
* main loop for servicing work items
*/
static int worker_loop(void *arg)
{
struct btrfs_worker_thread *worker = arg;
struct list_head *cur;
struct list_head head;
struct list_head prio_head;
struct btrfs_work *work;

INIT_LIST_HEAD(&head);
INIT_LIST_HEAD(&prio_head);

do {
spin_lock_irq(&worker->lock);
again_locked:
again:
while (1) {
if (!list_empty(&worker->prio_pending))
cur = worker->prio_pending.next;
else if (!list_empty(&worker->pending))
cur = worker->pending.next;
else


work = get_next_work(worker, &prio_head, &head);
if (!work)
break;

work = list_entry(cur, struct btrfs_work, list);
list_del(&work->list);
clear_bit(WORK_QUEUED_BIT, &work->flags);

work->worker = worker;
spin_unlock_irq(&worker->lock);

work->func(work);

Expand All @@ -175,9 +273,13 @@ static int worker_loop(void *arg)
*/
run_ordered_completions(worker->workers, work);

spin_lock_irq(&worker->lock);
check_idle_worker(worker);
check_pending_worker_creates(worker);

}

spin_lock_irq(&worker->lock);
check_idle_worker(worker);

if (freezing(current)) {
worker->working = 0;
spin_unlock_irq(&worker->lock);
Expand Down Expand Up @@ -216,8 +318,10 @@ static int worker_loop(void *arg)
spin_lock_irq(&worker->lock);
set_current_state(TASK_INTERRUPTIBLE);
if (!list_empty(&worker->pending) ||
!list_empty(&worker->prio_pending))
goto again_locked;
!list_empty(&worker->prio_pending)) {
spin_unlock_irq(&worker->lock);
goto again;
}

/*
* this makes sure we get a wakeup when someone
Expand All @@ -226,8 +330,13 @@ static int worker_loop(void *arg)
worker->working = 0;
spin_unlock_irq(&worker->lock);

if (!kthread_should_stop())
schedule();
if (!kthread_should_stop()) {
schedule_timeout(HZ * 120);
if (!worker->working &&
try_worker_shutdown(worker)) {
return 0;
}
}
}
__set_current_state(TASK_RUNNING);
}
Expand All @@ -242,16 +351,30 @@ int btrfs_stop_workers(struct btrfs_workers *workers)
{
struct list_head *cur;
struct btrfs_worker_thread *worker;
int can_stop;

spin_lock_irq(&workers->lock);
list_splice_init(&workers->idle_list, &workers->worker_list);
while (!list_empty(&workers->worker_list)) {
cur = workers->worker_list.next;
worker = list_entry(cur, struct btrfs_worker_thread,
worker_list);
kthread_stop(worker->task);
list_del(&worker->worker_list);
kfree(worker);

atomic_inc(&worker->refs);
workers->num_workers -= 1;
if (!list_empty(&worker->worker_list)) {
list_del_init(&worker->worker_list);
put_worker(worker);
can_stop = 1;
} else
can_stop = 0;
spin_unlock_irq(&workers->lock);
if (can_stop)
kthread_stop(worker->task);
spin_lock_irq(&workers->lock);
put_worker(worker);
}
spin_unlock_irq(&workers->lock);
return 0;
}

Expand All @@ -266,10 +389,13 @@ void btrfs_init_workers(struct btrfs_workers *workers, char *name, int max)
INIT_LIST_HEAD(&workers->order_list);
INIT_LIST_HEAD(&workers->prio_order_list);
spin_lock_init(&workers->lock);
spin_lock_init(&workers->order_lock);
workers->max_workers = max;
workers->idle_thresh = 32;
workers->name = name;
workers->ordered = 0;
workers->atomic_start_pending = 0;
workers->atomic_worker_start = 0;
}

/*
Expand All @@ -293,7 +419,9 @@ int btrfs_start_workers(struct btrfs_workers *workers, int num_workers)
INIT_LIST_HEAD(&worker->prio_pending);
INIT_LIST_HEAD(&worker->worker_list);
spin_lock_init(&worker->lock);

atomic_set(&worker->num_pending, 0);
atomic_set(&worker->refs, 1);
worker->workers = workers;
worker->task = kthread_run(worker_loop, worker,
"btrfs-%s-%d", workers->name,
Expand All @@ -303,7 +431,6 @@ int btrfs_start_workers(struct btrfs_workers *workers, int num_workers)
kfree(worker);
goto fail;
}

spin_lock_irq(&workers->lock);
list_add_tail(&worker->worker_list, &workers->idle_list);
worker->idle = 1;
Expand Down Expand Up @@ -367,35 +494,42 @@ static struct btrfs_worker_thread *find_worker(struct btrfs_workers *workers)
{
struct btrfs_worker_thread *worker;
unsigned long flags;
struct list_head *fallback;

again:
spin_lock_irqsave(&workers->lock, flags);
worker = next_worker(workers);
spin_unlock_irqrestore(&workers->lock, flags);

if (!worker) {
spin_lock_irqsave(&workers->lock, flags);
if (workers->num_workers >= workers->max_workers) {
struct list_head *fallback = NULL;
/*
* we have failed to find any workers, just
* return the force one
*/
if (!list_empty(&workers->worker_list))
fallback = workers->worker_list.next;
if (!list_empty(&workers->idle_list))
fallback = workers->idle_list.next;
BUG_ON(!fallback);
worker = list_entry(fallback,
struct btrfs_worker_thread, worker_list);
spin_unlock_irqrestore(&workers->lock, flags);
goto fallback;
} else if (workers->atomic_worker_start) {
workers->atomic_start_pending = 1;
goto fallback;
} else {
spin_unlock_irqrestore(&workers->lock, flags);
/* we're below the limit, start another worker */
btrfs_start_workers(workers, 1);
goto again;
}
}
spin_unlock_irqrestore(&workers->lock, flags);
return worker;

fallback:
fallback = NULL;
/*
* we have failed to find any workers, just
* return the first one we can find.
*/
if (!list_empty(&workers->worker_list))
fallback = workers->worker_list.next;
if (!list_empty(&workers->idle_list))
fallback = workers->idle_list.next;
BUG_ON(!fallback);
worker = list_entry(fallback,
struct btrfs_worker_thread, worker_list);
spin_unlock_irqrestore(&workers->lock, flags);
return worker;
}

Expand Down Expand Up @@ -435,9 +569,9 @@ int btrfs_requeue_work(struct btrfs_work *work)
worker->working = 1;
}

spin_unlock_irqrestore(&worker->lock, flags);
if (wake)
wake_up_process(worker->task);
spin_unlock_irqrestore(&worker->lock, flags);
out:

return 0;
Expand All @@ -463,14 +597,18 @@ int btrfs_queue_worker(struct btrfs_workers *workers, struct btrfs_work *work)

worker = find_worker(workers);
if (workers->ordered) {
spin_lock_irqsave(&workers->lock, flags);
/*
* you're not allowed to do ordered queues from an
* interrupt handler
*/
spin_lock(&workers->order_lock);
if (test_bit(WORK_HIGH_PRIO_BIT, &work->flags)) {
list_add_tail(&work->order_list,
&workers->prio_order_list);
} else {
list_add_tail(&work->order_list, &workers->order_list);
}
spin_unlock_irqrestore(&workers->lock, flags);
spin_unlock(&workers->order_lock);
} else {
INIT_LIST_HEAD(&work->order_list);
}
Expand All @@ -492,10 +630,10 @@ int btrfs_queue_worker(struct btrfs_workers *workers, struct btrfs_work *work)
wake = 1;
worker->working = 1;

spin_unlock_irqrestore(&worker->lock, flags);

if (wake)
wake_up_process(worker->task);
spin_unlock_irqrestore(&worker->lock, flags);

out:
return 0;
}
Loading

0 comments on commit 83ebade

Please sign in to comment.