Skip to content

Commit

Permalink
Merge branch 'skb_array'
Browse files Browse the repository at this point in the history
Michael S. Tsirkin says:

====================
skb_array: array based FIFO for skbs

This is in response to the proposal by Jason to make tun
rx packet queue lockless using a circular buffer.
My testing seems to show that at least for the common usecase
in networking, which isn't lockless, circular buffer
with indices does not perform that well, because
each index access causes a cache line to bounce between
CPUs, and index access causes stalls due to the dependency.

By comparison, an array of pointers where NULL means invalid
and !NULL means valid, can be updated without messing up barriers
at all and does not have this issue.

On the flip side, cache pressure may be caused by using large queues.
tun has a queue of 1000 entries by default and that's 8K.
At this point I'm not sure this can be solved efficiently.
The correct solution might be sizing the queues appropriately.

Here's an implementation of this idea: it can be used more
or less whenever sk_buff_head can be used, except you need
to know the queue size in advance.

As this might be useful outside of networking, I implemented
a generic array of void pointers, with a type-safe wrapper for skbs.

It remains to be seen whether resizing is required, in case it is
I included patches implementing resizing by holding both the
consumer and the producer locks.

I think this code works fine without any extra memory barriers since we
always read and write the same location, so the accesses can not be
reordered.
Multiple writes of the same value into memory would mess things up
for us, I don't think compilers would do it though.
But if people feel it's better to be safe wrt compiler optimizations,
specifying queue as volatile would probably do it in a cleaner way
than converting all accesses to READ_ONCE/WRITE_ONCE. Thoughts?

The only issue is with calls within a loop using the __ptr_ring_XXX
accessors - in theory compiler could hoist accesses out of the loop.

Following volatile-considered-harmful.txt I merely
documented that callers that busy-poll should invoke cpu_relax().
Most people will use the external skb_array_XXX APIs with a spinlock,
so this should not be an issue for them.

Eric Dumazet suggested adding an extra pointer to skb for when
we have a single outstanding packet. I could not figure out
a way to implement this without a shared consumer/producer lock
though, which would cause cache line bounces by itself.

Jesper, Jason, I know that both of you tested this,
please post Tested-by tags for whatever was tested.

changes since v7
	fix typos noticed by Jesper Brouer

changes since v6
	resize implemented. peek/full calls are no longer lockless

	replaced _FIELD macros with _CALL which invoke a function
	on the pointer rather than just returning a value

	destroy now scans the array and frees all queued skbs

changes since v5
	implemented a generic ptr_ring api, and
		made skb_array a type-safe wrapper
	apis for taking the spinlock in different contexts
		following expected usecase in tun
changes since v4 (v3 was never posted)
	documentation
	dropped SKB_ARRAY_MIN_SIZE heuristic
	unit test (in userspace, included as patch 2)

changes since v2:
        fixed integer overflow pointed out by Eric.
        added some comments.

changes since v1:
        fixed bug pointed out by Eric.
====================

Tested-by: Jesper Dangaard Brouer <brouer@redhat.com>
Signed-off-by: David S. Miller <davem@davemloft.net>
  • Loading branch information
David S. Miller committed Jun 15, 2016
2 parents b231307 + 7d7072e commit 829e64d
Show file tree
Hide file tree
Showing 4 changed files with 758 additions and 1 deletion.
393 changes: 393 additions & 0 deletions include/linux/ptr_ring.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,393 @@
/*
* Definitions for the 'struct ptr_ring' datastructure.
*
* Author:
* Michael S. Tsirkin <mst@redhat.com>
*
* Copyright (C) 2016 Red Hat, Inc.
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License as published by the
* Free Software Foundation; either version 2 of the License, or (at your
* option) any later version.
*
* This is a limited-size FIFO maintaining pointers in FIFO order, with
* one CPU producing entries and another consuming entries from a FIFO.
*
* This implementation tries to minimize cache-contention when there is a
* single producer and a single consumer CPU.
*/

#ifndef _LINUX_PTR_RING_H
#define _LINUX_PTR_RING_H 1

#ifdef __KERNEL__
#include <linux/spinlock.h>
#include <linux/cache.h>
#include <linux/types.h>
#include <linux/compiler.h>
#include <linux/cache.h>
#include <linux/slab.h>
#include <asm/errno.h>
#endif

struct ptr_ring {
int producer ____cacheline_aligned_in_smp;
spinlock_t producer_lock;
int consumer ____cacheline_aligned_in_smp;
spinlock_t consumer_lock;
/* Shared consumer/producer data */
/* Read-only by both the producer and the consumer */
int size ____cacheline_aligned_in_smp; /* max entries in queue */
void **queue;
};

/* Note: callers invoking this in a loop must use a compiler barrier,
* for example cpu_relax(). If ring is ever resized, callers must hold
* producer_lock - see e.g. ptr_ring_full. Otherwise, if callers don't hold
* producer_lock, the next call to __ptr_ring_produce may fail.
*/
static inline bool __ptr_ring_full(struct ptr_ring *r)
{
return r->queue[r->producer];
}

static inline bool ptr_ring_full(struct ptr_ring *r)
{
bool ret;

spin_lock(&r->producer_lock);
ret = __ptr_ring_full(r);
spin_unlock(&r->producer_lock);

return ret;
}

static inline bool ptr_ring_full_irq(struct ptr_ring *r)
{
bool ret;

spin_lock_irq(&r->producer_lock);
ret = __ptr_ring_full(r);
spin_unlock_irq(&r->producer_lock);

return ret;
}

static inline bool ptr_ring_full_any(struct ptr_ring *r)
{
unsigned long flags;
bool ret;

spin_lock_irqsave(&r->producer_lock, flags);
ret = __ptr_ring_full(r);
spin_unlock_irqrestore(&r->producer_lock, flags);

return ret;
}

static inline bool ptr_ring_full_bh(struct ptr_ring *r)
{
bool ret;

spin_lock_bh(&r->producer_lock);
ret = __ptr_ring_full(r);
spin_unlock_bh(&r->producer_lock);

return ret;
}

/* Note: callers invoking this in a loop must use a compiler barrier,
* for example cpu_relax(). Callers must hold producer_lock.
*/
static inline int __ptr_ring_produce(struct ptr_ring *r, void *ptr)
{
if (r->queue[r->producer])
return -ENOSPC;

r->queue[r->producer++] = ptr;
if (unlikely(r->producer >= r->size))
r->producer = 0;
return 0;
}

static inline int ptr_ring_produce(struct ptr_ring *r, void *ptr)
{
int ret;

spin_lock(&r->producer_lock);
ret = __ptr_ring_produce(r, ptr);
spin_unlock(&r->producer_lock);

return ret;
}

static inline int ptr_ring_produce_irq(struct ptr_ring *r, void *ptr)
{
int ret;

spin_lock_irq(&r->producer_lock);
ret = __ptr_ring_produce(r, ptr);
spin_unlock_irq(&r->producer_lock);

return ret;
}

static inline int ptr_ring_produce_any(struct ptr_ring *r, void *ptr)
{
unsigned long flags;
int ret;

spin_lock_irqsave(&r->producer_lock, flags);
ret = __ptr_ring_produce(r, ptr);
spin_unlock_irqrestore(&r->producer_lock, flags);

return ret;
}

static inline int ptr_ring_produce_bh(struct ptr_ring *r, void *ptr)
{
int ret;

spin_lock_bh(&r->producer_lock);
ret = __ptr_ring_produce(r, ptr);
spin_unlock_bh(&r->producer_lock);

return ret;
}

/* Note: callers invoking this in a loop must use a compiler barrier,
* for example cpu_relax(). Callers must take consumer_lock
* if they dereference the pointer - see e.g. PTR_RING_PEEK_CALL.
* If ring is never resized, and if the pointer is merely
* tested, there's no need to take the lock - see e.g. __ptr_ring_empty.
*/
static inline void *__ptr_ring_peek(struct ptr_ring *r)
{
return r->queue[r->consumer];
}

/* Note: callers invoking this in a loop must use a compiler barrier,
* for example cpu_relax(). Callers must take consumer_lock
* if the ring is ever resized - see e.g. ptr_ring_empty.
*/
static inline bool __ptr_ring_empty(struct ptr_ring *r)
{
return !__ptr_ring_peek(r);
}

static inline bool ptr_ring_empty(struct ptr_ring *r)
{
bool ret;

spin_lock(&r->consumer_lock);
ret = __ptr_ring_empty(r);
spin_unlock(&r->consumer_lock);

return ret;
}

static inline bool ptr_ring_empty_irq(struct ptr_ring *r)
{
bool ret;

spin_lock_irq(&r->consumer_lock);
ret = __ptr_ring_empty(r);
spin_unlock_irq(&r->consumer_lock);

return ret;
}

static inline bool ptr_ring_empty_any(struct ptr_ring *r)
{
unsigned long flags;
bool ret;

spin_lock_irqsave(&r->consumer_lock, flags);
ret = __ptr_ring_empty(r);
spin_unlock_irqrestore(&r->consumer_lock, flags);

return ret;
}

static inline bool ptr_ring_empty_bh(struct ptr_ring *r)
{
bool ret;

spin_lock_bh(&r->consumer_lock);
ret = __ptr_ring_empty(r);
spin_unlock_bh(&r->consumer_lock);

return ret;
}

/* Must only be called after __ptr_ring_peek returned !NULL */
static inline void __ptr_ring_discard_one(struct ptr_ring *r)
{
r->queue[r->consumer++] = NULL;
if (unlikely(r->consumer >= r->size))
r->consumer = 0;
}

static inline void *__ptr_ring_consume(struct ptr_ring *r)
{
void *ptr;

ptr = __ptr_ring_peek(r);
if (ptr)
__ptr_ring_discard_one(r);

return ptr;
}

static inline void *ptr_ring_consume(struct ptr_ring *r)
{
void *ptr;

spin_lock(&r->consumer_lock);
ptr = __ptr_ring_consume(r);
spin_unlock(&r->consumer_lock);

return ptr;
}

static inline void *ptr_ring_consume_irq(struct ptr_ring *r)
{
void *ptr;

spin_lock_irq(&r->consumer_lock);
ptr = __ptr_ring_consume(r);
spin_unlock_irq(&r->consumer_lock);

return ptr;
}

static inline void *ptr_ring_consume_any(struct ptr_ring *r)
{
unsigned long flags;
void *ptr;

spin_lock_irqsave(&r->consumer_lock, flags);
ptr = __ptr_ring_consume(r);
spin_unlock_irqrestore(&r->consumer_lock, flags);

return ptr;
}

static inline void *ptr_ring_consume_bh(struct ptr_ring *r)
{
void *ptr;

spin_lock_bh(&r->consumer_lock);
ptr = __ptr_ring_consume(r);
spin_unlock_bh(&r->consumer_lock);

return ptr;
}

/* Cast to structure type and call a function without discarding from FIFO.
* Function must return a value.
* Callers must take consumer_lock.
*/
#define __PTR_RING_PEEK_CALL(r, f) ((f)(__ptr_ring_peek(r)))

#define PTR_RING_PEEK_CALL(r, f) ({ \
typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \
\
spin_lock(&(r)->consumer_lock); \
__PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \
spin_unlock(&(r)->consumer_lock); \
__PTR_RING_PEEK_CALL_v; \
})

#define PTR_RING_PEEK_CALL_IRQ(r, f) ({ \
typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \
\
spin_lock_irq(&(r)->consumer_lock); \
__PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \
spin_unlock_irq(&(r)->consumer_lock); \
__PTR_RING_PEEK_CALL_v; \
})

#define PTR_RING_PEEK_CALL_BH(r, f) ({ \
typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \
\
spin_lock_bh(&(r)->consumer_lock); \
__PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \
spin_unlock_bh(&(r)->consumer_lock); \
__PTR_RING_PEEK_CALL_v; \
})

#define PTR_RING_PEEK_CALL_ANY(r, f) ({ \
typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \
unsigned long __PTR_RING_PEEK_CALL_f;\
\
spin_lock_irqsave(&(r)->consumer_lock, __PTR_RING_PEEK_CALL_f); \
__PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \
spin_unlock_irqrestore(&(r)->consumer_lock, __PTR_RING_PEEK_CALL_f); \
__PTR_RING_PEEK_CALL_v; \
})

static inline void **__ptr_ring_init_queue_alloc(int size, gfp_t gfp)
{
return kzalloc(ALIGN(size * sizeof(void *), SMP_CACHE_BYTES), gfp);
}

static inline int ptr_ring_init(struct ptr_ring *r, int size, gfp_t gfp)
{
r->queue = __ptr_ring_init_queue_alloc(size, gfp);
if (!r->queue)
return -ENOMEM;

r->size = size;
r->producer = r->consumer = 0;
spin_lock_init(&r->producer_lock);
spin_lock_init(&r->consumer_lock);

return 0;
}

static inline int ptr_ring_resize(struct ptr_ring *r, int size, gfp_t gfp,
void (*destroy)(void *))
{
unsigned long flags;
int producer = 0;
void **queue = __ptr_ring_init_queue_alloc(size, gfp);
void **old;
void *ptr;

if (!queue)
return -ENOMEM;

spin_lock_irqsave(&(r)->producer_lock, flags);

while ((ptr = ptr_ring_consume(r)))
if (producer < size)
queue[producer++] = ptr;
else if (destroy)
destroy(ptr);

r->size = size;
r->producer = producer;
r->consumer = 0;
old = r->queue;
r->queue = queue;

spin_unlock_irqrestore(&(r)->producer_lock, flags);

kfree(old);

return 0;
}

static inline void ptr_ring_cleanup(struct ptr_ring *r, void (*destroy)(void *))
{
void *ptr;

if (destroy)
while ((ptr = ptr_ring_consume(r)))
destroy(ptr);
kfree(r->queue);
}

#endif /* _LINUX_PTR_RING_H */
Loading

0 comments on commit 829e64d

Please sign in to comment.