Skip to content

Commit

Permalink
io-wq: fix races around manager/worker creation and task exit
Browse files Browse the repository at this point in the history
These races have always been there, they are just more apparent now that
we do early cancel of io-wq when the task exits.

Ensure that the io-wq manager sets task state correctly to not miss
wakeups for task creation. This is important if we get a wakeup after
having marked ourselves as TASK_INTERRUPTIBLE. If we do end up creating
workers, then we flip the state back to running, making the subsequent
schedule() a no-op. Also increment the wq ref count before forking the
thread, to avoid a use-after-free.

Signed-off-by: Jens Axboe <axboe@kernel.dk>
  • Loading branch information
Jens Axboe committed Feb 24, 2021
1 parent 8a378fb commit 8b3e78b
Showing 1 changed file with 35 additions and 22 deletions.
57 changes: 35 additions & 22 deletions fs/io-wq.c
Original file line number Diff line number Diff line change
Expand Up @@ -605,6 +605,8 @@ static bool create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index)
struct io_worker *worker;
pid_t pid;

__set_current_state(TASK_RUNNING);

worker = kzalloc_node(sizeof(*worker), GFP_KERNEL, wqe->node);
if (!worker)
return false;
Expand All @@ -614,15 +616,18 @@ static bool create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index)
worker->wqe = wqe;
spin_lock_init(&worker->lock);

refcount_inc(&wq->refs);

if (index == IO_WQ_ACCT_BOUND)
pid = io_wq_fork_thread(task_thread_bound, worker);
else
pid = io_wq_fork_thread(task_thread_unbound, worker);
if (pid < 0) {
if (refcount_dec_and_test(&wq->refs))
complete(&wq->done);
kfree(worker);
return false;
}
refcount_inc(&wq->refs);
return true;
}

Expand Down Expand Up @@ -668,6 +673,30 @@ static bool io_wq_worker_wake(struct io_worker *worker, void *data)
return false;
}

static void io_wq_check_workers(struct io_wq *wq)
{
int node;

for_each_node(node) {
struct io_wqe *wqe = wq->wqes[node];
bool fork_worker[2] = { false, false };

if (!node_online(node))
continue;

raw_spin_lock_irq(&wqe->lock);
if (io_wqe_need_worker(wqe, IO_WQ_ACCT_BOUND))
fork_worker[IO_WQ_ACCT_BOUND] = true;
if (io_wqe_need_worker(wqe, IO_WQ_ACCT_UNBOUND))
fork_worker[IO_WQ_ACCT_UNBOUND] = true;
raw_spin_unlock_irq(&wqe->lock);
if (fork_worker[IO_WQ_ACCT_BOUND])
create_io_worker(wq, wqe, IO_WQ_ACCT_BOUND);
if (fork_worker[IO_WQ_ACCT_UNBOUND])
create_io_worker(wq, wqe, IO_WQ_ACCT_UNBOUND);
}
}

/*
* Manager thread. Tasked with creating new workers, if we need them.
*/
Expand All @@ -684,30 +713,15 @@ static int io_wq_manager(void *data)

complete(&wq->done);

while (!test_bit(IO_WQ_BIT_EXIT, &wq->state)) {
for_each_node(node) {
struct io_wqe *wqe = wq->wqes[node];
bool fork_worker[2] = { false, false };

if (!node_online(node))
continue;

raw_spin_lock_irq(&wqe->lock);
if (io_wqe_need_worker(wqe, IO_WQ_ACCT_BOUND))
fork_worker[IO_WQ_ACCT_BOUND] = true;
if (io_wqe_need_worker(wqe, IO_WQ_ACCT_UNBOUND))
fork_worker[IO_WQ_ACCT_UNBOUND] = true;
raw_spin_unlock_irq(&wqe->lock);
if (fork_worker[IO_WQ_ACCT_BOUND])
create_io_worker(wq, wqe, IO_WQ_ACCT_BOUND);
if (fork_worker[IO_WQ_ACCT_UNBOUND])
create_io_worker(wq, wqe, IO_WQ_ACCT_UNBOUND);
}
do {
set_current_state(TASK_INTERRUPTIBLE);
io_wq_check_workers(wq);
schedule_timeout(HZ);
if (fatal_signal_pending(current))
set_bit(IO_WQ_BIT_EXIT, &wq->state);
}
} while (!test_bit(IO_WQ_BIT_EXIT, &wq->state));

io_wq_check_workers(wq);

if (refcount_dec_and_test(&wq->refs)) {
complete(&wq->done);
Expand Down Expand Up @@ -970,7 +984,6 @@ struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data)
current->flags &= ~PF_IO_WORKER;
if (ret >= 0) {
wait_for_completion(&wq->done);
reinit_completion(&wq->done);
return wq;
}

Expand Down

0 comments on commit 8b3e78b

Please sign in to comment.