Skip to content

Commit

Permalink
Merge tag 'for-5.14/io_uring-2021-06-30' of git://git.kernel.dk/linux…
Browse files Browse the repository at this point in the history
…-block

Pull io_uring updates from Jens Axboe:

 - Multi-queue iopoll improvement (Fam)

 - Allow configurable io-wq CPU masks (me)

 - renameat/linkat tightening (me)

 - poll re-arm improvement (Olivier)

 - SQPOLL race fix (Olivier)

 - Cancelation unification (Pavel)

 - SQPOLL cleanups (Pavel)

 - Enable file backed buffers for shmem/memfd (Pavel)

 - A ton of cleanups and performance improvements (Pavel)

 - Followup and misc fixes (Colin, Fam, Hao, Olivier)

* tag 'for-5.14/io_uring-2021-06-30' of git://git.kernel.dk/linux-block: (83 commits)
  io_uring: code clean for kiocb_done()
  io_uring: spin in iopoll() only when reqs are in a single queue
  io_uring: pre-initialise some of req fields
  io_uring: refactor io_submit_flush_completions
  io_uring: optimise hot path restricted checks
  io_uring: remove not needed PF_EXITING check
  io_uring: mainstream sqpoll task_work running
  io_uring: refactor io_arm_poll_handler()
  io_uring: reduce latency by reissueing the operation
  io_uring: add IOPOLL and reserved field checks to IORING_OP_UNLINKAT
  io_uring: add IOPOLL and reserved field checks to IORING_OP_RENAMEAT
  io_uring: refactor io_openat2()
  io_uring: simplify struct io_uring_sqe layout
  io_uring: update sqe layout build checks
  io_uring: fix code style problems
  io_uring: refactor io_sq_thread()
  io_uring: don't change sqpoll creds if not needed
  io_uring: Create define to modify a SQPOLL parameter
  io_uring: Fix race condition when sqp thread goes to sleep
  io_uring: improve in tctx_task_work() resubmission
  ...
  • Loading branch information
Linus Torvalds committed Jul 1, 2021
2 parents 911a299 + e149bd7 commit c288d9c
Show file tree
Hide file tree
Showing 5 changed files with 874 additions and 690 deletions.
103 changes: 71 additions & 32 deletions fs/io-wq.c
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@
#include <linux/init.h>
#include <linux/errno.h>
#include <linux/sched/signal.h>
#include <linux/mm.h>
#include <linux/sched/mm.h>
#include <linux/percpu.h>
#include <linux/slab.h>
#include <linux/rculist_nulls.h>
Expand Down Expand Up @@ -96,28 +94,29 @@ struct io_wqe {

struct io_wq *wq;
struct io_wq_work *hash_tail[IO_WQ_NR_HASH_BUCKETS];

cpumask_var_t cpu_mask;
};

/*
* Per io_wq state
*/
struct io_wq {
struct io_wqe **wqes;
unsigned long state;

free_work_fn *free_work;
io_wq_work_fn *do_work;

struct io_wq_hash *hash;

refcount_t refs;

atomic_t worker_refs;
struct completion worker_done;

struct hlist_node cpuhp_node;

struct task_struct *task;

struct io_wqe *wqes[];
};

static enum cpuhp_state io_wq_online;
Expand Down Expand Up @@ -241,7 +240,8 @@ static void io_wqe_wake_worker(struct io_wqe *wqe, struct io_wqe_acct *acct)
* Most likely an attempt to queue unbounded work on an io_wq that
* wasn't setup with any unbounded workers.
*/
WARN_ON_ONCE(!acct->max_workers);
if (unlikely(!acct->max_workers))
pr_warn_once("io-wq is not configured for unbound workers");

rcu_read_lock();
ret = io_wqe_activate_free_worker(wqe);
Expand Down Expand Up @@ -560,17 +560,13 @@ static int io_wqe_worker(void *data)
if (ret)
continue;
/* timed out, exit unless we're the fixed worker */
if (test_bit(IO_WQ_BIT_EXIT, &wq->state) ||
!(worker->flags & IO_WORKER_F_FIXED))
if (!(worker->flags & IO_WORKER_F_FIXED))
break;
}

if (test_bit(IO_WQ_BIT_EXIT, &wq->state)) {
raw_spin_lock_irq(&wqe->lock);
if (!wq_list_empty(&wqe->work_list))
io_worker_handle_work(worker);
else
raw_spin_unlock_irq(&wqe->lock);
io_worker_handle_work(worker);
}

io_worker_exit(worker);
Expand Down Expand Up @@ -645,7 +641,7 @@ static void create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index)

tsk->pf_io_worker = worker;
worker->task = tsk;
set_cpus_allowed_ptr(tsk, cpumask_of_node(wqe->node));
set_cpus_allowed_ptr(tsk, wqe->cpu_mask);
tsk->flags |= PF_NO_SETAFFINITY;

raw_spin_lock_irq(&wqe->lock);
Expand Down Expand Up @@ -901,23 +897,20 @@ static int io_wqe_hash_wake(struct wait_queue_entry *wait, unsigned mode,

struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data)
{
int ret = -ENOMEM, node;
int ret, node;
struct io_wq *wq;

if (WARN_ON_ONCE(!data->free_work || !data->do_work))
return ERR_PTR(-EINVAL);
if (WARN_ON_ONCE(!bounded))
return ERR_PTR(-EINVAL);

wq = kzalloc(sizeof(*wq), GFP_KERNEL);
wq = kzalloc(struct_size(wq, wqes, nr_node_ids), GFP_KERNEL);
if (!wq)
return ERR_PTR(-ENOMEM);

wq->wqes = kcalloc(nr_node_ids, sizeof(struct io_wqe *), GFP_KERNEL);
if (!wq->wqes)
goto err_wq;

ret = cpuhp_state_add_instance_nocalls(io_wq_online, &wq->cpuhp_node);
if (ret)
goto err_wqes;
goto err_wq;

refcount_inc(&data->hash->refs);
wq->hash = data->hash;
Expand All @@ -934,6 +927,9 @@ struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data)
wqe = kzalloc_node(sizeof(struct io_wqe), GFP_KERNEL, alloc_node);
if (!wqe)
goto err;
if (!alloc_cpumask_var(&wqe->cpu_mask, GFP_KERNEL))
goto err;
cpumask_copy(wqe->cpu_mask, cpumask_of_node(node));
wq->wqes[node] = wqe;
wqe->node = alloc_node;
wqe->acct[IO_WQ_ACCT_BOUND].index = IO_WQ_ACCT_BOUND;
Expand All @@ -953,17 +949,18 @@ struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data)
}

wq->task = get_task_struct(data->task);
refcount_set(&wq->refs, 1);
atomic_set(&wq->worker_refs, 1);
init_completion(&wq->worker_done);
return wq;
err:
io_wq_put_hash(data->hash);
cpuhp_state_remove_instance_nocalls(io_wq_online, &wq->cpuhp_node);
for_each_node(node)
for_each_node(node) {
if (!wq->wqes[node])
continue;
free_cpumask_var(wq->wqes[node]->cpu_mask);
kfree(wq->wqes[node]);
err_wqes:
kfree(wq->wqes);
}
err_wq:
kfree(wq);
return ERR_PTR(ret);
Expand Down Expand Up @@ -1033,10 +1030,10 @@ static void io_wq_destroy(struct io_wq *wq)
.cancel_all = true,
};
io_wqe_cancel_pending_work(wqe, &match);
free_cpumask_var(wqe->cpu_mask);
kfree(wqe);
}
io_wq_put_hash(wq->hash);
kfree(wq->wqes);
kfree(wq);
}

Expand All @@ -1045,25 +1042,67 @@ void io_wq_put_and_exit(struct io_wq *wq)
WARN_ON_ONCE(!test_bit(IO_WQ_BIT_EXIT, &wq->state));

io_wq_exit_workers(wq);
if (refcount_dec_and_test(&wq->refs))
io_wq_destroy(wq);
io_wq_destroy(wq);
}

struct online_data {
unsigned int cpu;
bool online;
};

static bool io_wq_worker_affinity(struct io_worker *worker, void *data)
{
set_cpus_allowed_ptr(worker->task, cpumask_of_node(worker->wqe->node));
struct online_data *od = data;

if (od->online)
cpumask_set_cpu(od->cpu, worker->wqe->cpu_mask);
else
cpumask_clear_cpu(od->cpu, worker->wqe->cpu_mask);
return false;
}

static int __io_wq_cpu_online(struct io_wq *wq, unsigned int cpu, bool online)
{
struct online_data od = {
.cpu = cpu,
.online = online
};
int i;

rcu_read_lock();
for_each_node(i)
io_wq_for_each_worker(wq->wqes[i], io_wq_worker_affinity, &od);
rcu_read_unlock();
return 0;
}

static int io_wq_cpu_online(unsigned int cpu, struct hlist_node *node)
{
struct io_wq *wq = hlist_entry_safe(node, struct io_wq, cpuhp_node);

return __io_wq_cpu_online(wq, cpu, true);
}

static int io_wq_cpu_offline(unsigned int cpu, struct hlist_node *node)
{
struct io_wq *wq = hlist_entry_safe(node, struct io_wq, cpuhp_node);

return __io_wq_cpu_online(wq, cpu, false);
}

int io_wq_cpu_affinity(struct io_wq *wq, cpumask_var_t mask)
{
int i;

rcu_read_lock();
for_each_node(i)
io_wq_for_each_worker(wq->wqes[i], io_wq_worker_affinity, NULL);
for_each_node(i) {
struct io_wqe *wqe = wq->wqes[i];

if (mask)
cpumask_copy(wqe->cpu_mask, mask);
else
cpumask_copy(wqe->cpu_mask, cpumask_of_node(i));
}
rcu_read_unlock();
return 0;
}
Expand All @@ -1073,7 +1112,7 @@ static __init int io_wq_init(void)
int ret;

ret = cpuhp_setup_state_multi(CPUHP_AP_ONLINE_DYN, "io-wq/online",
io_wq_cpu_online, NULL);
io_wq_cpu_online, io_wq_cpu_offline);
if (ret < 0)
return ret;
io_wq_online = ret;
Expand Down
3 changes: 2 additions & 1 deletion fs/io-wq.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,6 @@ static inline void wq_list_del(struct io_wq_work_list *list,

struct io_wq_work {
struct io_wq_work_node list;
const struct cred *creds;
unsigned flags;
};

Expand Down Expand Up @@ -128,6 +127,8 @@ void io_wq_put_and_exit(struct io_wq *wq);
void io_wq_enqueue(struct io_wq *wq, struct io_wq_work *work);
void io_wq_hash_work(struct io_wq_work *work, void *val);

int io_wq_cpu_affinity(struct io_wq *wq, cpumask_var_t mask);

static inline bool io_wq_is_hashed(struct io_wq_work *work)
{
return work->flags & IO_WQ_WORK_HASHED;
Expand Down
Loading

0 comments on commit c288d9c

Please sign in to comment.