Skip to content

Commit

Permalink
aio: make the lookup_ioctx() lockless
Browse files Browse the repository at this point in the history
The mm->ioctx_list is currently protected by a reader-writer lock,
so we always grab that lock on the read side for doing ioctx
lookups. As the workload is extremely reader biased, turn this into
an rcu hlist so we can make lookup_ioctx() lockless. Get rid of
the rwlock and use a spinlock for providing update side exclusion.

There's usually only 1 entry on this list, so it doesn't make sense
to look into fancier data structures.

Reviewed-by: Jeff Moyer <jmoyer@redhat.com>
Signed-off-by: Jens Axboe <jens.axboe@oracle.com>
  • Loading branch information
Jens Axboe committed Dec 29, 2008
1 parent 392ddc3 commit abf137d
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 51 deletions.
4 changes: 2 additions & 2 deletions arch/s390/mm/pgtable.c
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ int s390_enable_sie(void)
/* lets check if we are allowed to replace the mm */
task_lock(tsk);
if (!tsk->mm || atomic_read(&tsk->mm->mm_users) > 1 ||
tsk->mm != tsk->active_mm || tsk->mm->ioctx_list) {
tsk->mm != tsk->active_mm || !hlist_empty(&tsk->mm->ioctx_list)) {
task_unlock(tsk);
return -EINVAL;
}
Expand All @@ -279,7 +279,7 @@ int s390_enable_sie(void)
/* Now lets check again if something happened */
task_lock(tsk);
if (!tsk->mm || atomic_read(&tsk->mm->mm_users) > 1 ||
tsk->mm != tsk->active_mm || tsk->mm->ioctx_list) {
tsk->mm != tsk->active_mm || !hlist_empty(&tsk->mm->ioctx_list)) {
mmput(mm);
task_unlock(tsk);
return -EINVAL;
Expand Down
100 changes: 56 additions & 44 deletions fs/aio.c
Original file line number Diff line number Diff line change
Expand Up @@ -191,15 +191,27 @@ static int aio_setup_ring(struct kioctx *ctx)
kunmap_atomic((void *)((unsigned long)__event & PAGE_MASK), km); \
} while(0)

static void ctx_rcu_free(struct rcu_head *head)
{
struct kioctx *ctx = container_of(head, struct kioctx, rcu_head);
unsigned nr_events = ctx->max_reqs;

kmem_cache_free(kioctx_cachep, ctx);

if (nr_events) {
spin_lock(&aio_nr_lock);
BUG_ON(aio_nr - nr_events > aio_nr);
aio_nr -= nr_events;
spin_unlock(&aio_nr_lock);
}
}

/* __put_ioctx
* Called when the last user of an aio context has gone away,
* and the struct needs to be freed.
*/
static void __put_ioctx(struct kioctx *ctx)
{
unsigned nr_events = ctx->max_reqs;

BUG_ON(ctx->reqs_active);

cancel_delayed_work(&ctx->wq);
Expand All @@ -208,14 +220,7 @@ static void __put_ioctx(struct kioctx *ctx)
mmdrop(ctx->mm);
ctx->mm = NULL;
pr_debug("__put_ioctx: freeing %p\n", ctx);
kmem_cache_free(kioctx_cachep, ctx);

if (nr_events) {
spin_lock(&aio_nr_lock);
BUG_ON(aio_nr - nr_events > aio_nr);
aio_nr -= nr_events;
spin_unlock(&aio_nr_lock);
}
call_rcu(&ctx->rcu_head, ctx_rcu_free);
}

#define get_ioctx(kioctx) do { \
Expand All @@ -235,6 +240,7 @@ static struct kioctx *ioctx_alloc(unsigned nr_events)
{
struct mm_struct *mm;
struct kioctx *ctx;
int did_sync = 0;

/* Prevent overflows */
if ((nr_events > (0x10000000U / sizeof(struct io_event))) ||
Expand Down Expand Up @@ -267,21 +273,30 @@ static struct kioctx *ioctx_alloc(unsigned nr_events)
goto out_freectx;

/* limit the number of system wide aios */
spin_lock(&aio_nr_lock);
if (aio_nr + ctx->max_reqs > aio_max_nr ||
aio_nr + ctx->max_reqs < aio_nr)
ctx->max_reqs = 0;
else
aio_nr += ctx->max_reqs;
spin_unlock(&aio_nr_lock);
do {
spin_lock_bh(&aio_nr_lock);
if (aio_nr + nr_events > aio_max_nr ||
aio_nr + nr_events < aio_nr)
ctx->max_reqs = 0;
else
aio_nr += ctx->max_reqs;
spin_unlock_bh(&aio_nr_lock);
if (ctx->max_reqs || did_sync)
break;

/* wait for rcu callbacks to have completed before giving up */
synchronize_rcu();
did_sync = 1;
ctx->max_reqs = nr_events;
} while (1);

if (ctx->max_reqs == 0)
goto out_cleanup;

/* now link into global list. */
write_lock(&mm->ioctx_list_lock);
ctx->next = mm->ioctx_list;
mm->ioctx_list = ctx;
write_unlock(&mm->ioctx_list_lock);
spin_lock(&mm->ioctx_lock);
hlist_add_head_rcu(&ctx->list, &mm->ioctx_list);
spin_unlock(&mm->ioctx_lock);

dprintk("aio: allocated ioctx %p[%ld]: mm=%p mask=0x%x\n",
ctx, ctx->user_id, current->mm, ctx->ring_info.nr);
Expand Down Expand Up @@ -375,11 +390,12 @@ ssize_t wait_on_sync_kiocb(struct kiocb *iocb)
*/
void exit_aio(struct mm_struct *mm)
{
struct kioctx *ctx = mm->ioctx_list;
mm->ioctx_list = NULL;
while (ctx) {
struct kioctx *next = ctx->next;
ctx->next = NULL;
struct kioctx *ctx;

while (!hlist_empty(&mm->ioctx_list)) {
ctx = hlist_entry(mm->ioctx_list.first, struct kioctx, list);
hlist_del_rcu(&ctx->list);

aio_cancel_all(ctx);

wait_for_all_aios(ctx);
Expand All @@ -394,7 +410,6 @@ void exit_aio(struct mm_struct *mm)
atomic_read(&ctx->users), ctx->dead,
ctx->reqs_active);
put_ioctx(ctx);
ctx = next;
}
}

Expand Down Expand Up @@ -555,19 +570,21 @@ int aio_put_req(struct kiocb *req)

static struct kioctx *lookup_ioctx(unsigned long ctx_id)
{
struct kioctx *ioctx;
struct mm_struct *mm;
struct mm_struct *mm = current->mm;
struct kioctx *ctx = NULL;
struct hlist_node *n;

mm = current->mm;
read_lock(&mm->ioctx_list_lock);
for (ioctx = mm->ioctx_list; ioctx; ioctx = ioctx->next)
if (likely(ioctx->user_id == ctx_id && !ioctx->dead)) {
get_ioctx(ioctx);
rcu_read_lock();

hlist_for_each_entry_rcu(ctx, n, &mm->ioctx_list, list) {
if (ctx->user_id == ctx_id && !ctx->dead) {
get_ioctx(ctx);
break;
}
read_unlock(&mm->ioctx_list_lock);
}

return ioctx;
rcu_read_unlock();
return ctx;
}

/*
Expand Down Expand Up @@ -1215,19 +1232,14 @@ static int read_events(struct kioctx *ctx,
static void io_destroy(struct kioctx *ioctx)
{
struct mm_struct *mm = current->mm;
struct kioctx **tmp;
int was_dead;

/* delete the entry from the list is someone else hasn't already */
write_lock(&mm->ioctx_list_lock);
spin_lock(&mm->ioctx_lock);
was_dead = ioctx->dead;
ioctx->dead = 1;
for (tmp = &mm->ioctx_list; *tmp && *tmp != ioctx;
tmp = &(*tmp)->next)
;
if (*tmp)
*tmp = ioctx->next;
write_unlock(&mm->ioctx_list_lock);
hlist_del_rcu(&ioctx->list);
spin_unlock(&mm->ioctx_lock);

dprintk("aio_release(%p)\n", ioctx);
if (likely(!was_dead))
Expand Down
5 changes: 4 additions & 1 deletion include/linux/aio.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include <linux/workqueue.h>
#include <linux/aio_abi.h>
#include <linux/uio.h>
#include <linux/rcupdate.h>

#include <asm/atomic.h>

Expand Down Expand Up @@ -183,7 +184,7 @@ struct kioctx {

/* This needs improving */
unsigned long user_id;
struct kioctx *next;
struct hlist_node list;

wait_queue_head_t wait;

Expand All @@ -199,6 +200,8 @@ struct kioctx {
struct aio_ring_info ring_info;

struct delayed_work wq;

struct rcu_head rcu_head;
};

/* prototypes */
Expand Down
5 changes: 3 additions & 2 deletions include/linux/mm_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -232,8 +232,9 @@ struct mm_struct {
struct core_state *core_state; /* coredumping support */

/* aio bits */
rwlock_t ioctx_list_lock; /* aio lock */
struct kioctx *ioctx_list;
spinlock_t ioctx_lock;
struct hlist_head ioctx_list;

#ifdef CONFIG_MM_OWNER
/*
* "owner" points to a task that is regarded as the canonical
Expand Down
4 changes: 2 additions & 2 deletions kernel/fork.c
Original file line number Diff line number Diff line change
Expand Up @@ -415,8 +415,8 @@ static struct mm_struct * mm_init(struct mm_struct * mm, struct task_struct *p)
set_mm_counter(mm, file_rss, 0);
set_mm_counter(mm, anon_rss, 0);
spin_lock_init(&mm->page_table_lock);
rwlock_init(&mm->ioctx_list_lock);
mm->ioctx_list = NULL;
spin_lock_init(&mm->ioctx_lock);
INIT_HLIST_HEAD(&mm->ioctx_list);
mm->free_area_cache = TASK_UNMAPPED_BASE;
mm->cached_hole_size = ~0UL;
mm_init_owner(mm, p);
Expand Down

0 comments on commit abf137d

Please sign in to comment.