Skip to content

Commit

Permalink
Merge branch 'bpf-xsk-rx-batch'
Browse files Browse the repository at this point in the history
Magnus Karlsson says:

====================
This patch set introduces a batched interface for Rx buffer allocation
in AF_XDP buffer pool. Instead of using xsk_buff_alloc(*pool), drivers
can now use xsk_buff_alloc_batch(*pool, **xdp_buff_array,
max). Instead of returning a pointer to an xdp_buff, it returns the
number of xdp_buffs it managed to allocate up to the maximum value of
the max parameter in the function call. Pointers to the allocated
xdp_buff:s are put in the xdp_buff_array supplied in the call. This
could be a SW ring that already exists in the driver or a new
structure that the driver has allocated.

  u32 xsk_buff_alloc_batch(struct xsk_buff_pool *pool,
                           struct xdp_buff **xdp,
                           u32 max);

When using this interface, the driver should also use the new
interface below to set the relevant fields in the struct xdp_buff. The
reason for this is that xsk_buff_alloc_batch() does not fill in the
data and data_meta fields for you as is the case with
xsk_buff_alloc(). So it is not sufficient to just set data_end
(effectively the size) anymore in the driver. The reason for this is
performance as explained in detail in the commit message.

  void xsk_buff_set_size(struct xdp_buff *xdp, u32 size);

Patch 6 also optimizes the buffer allocation in the aligned case. In
this case, we can skip the reinitialization of most fields in the
xdp_buff_xsk struct at allocation time. As the number of elements in
the heads array is equal to the number of possible buffers in the
umem, we can initialize them once and for all at bind time and then
just point to the correct one in the xdp_buff_array that is returned
to the driver. No reason to have a stack of free head entries. In the
unaligned case, the buffers can reside anywhere in the umem, so this
optimization is not possible as we still have to fill in the right
information in the xdp_buff every single time one is allocated.

I have updated i40e and ice to use this new batched interface.

These are the throughput results on my 2.1 GHz Cascade Lake system:

Aligned mode:
ice: +11% / -9 cycles/pkt
i40e: +12% / -9 cycles/pkt

Unaligned mode:
ice: +1.5% / -1 cycle/pkt
i40e: +1% / -1 cycle/pkt

For the aligned case, batching provides around 40% of the performance
improvement and the aligned optimization the rest, around 60%. Would
have expected a ~4% boost for unaligned with this data, but I only get
around 1%. Do not know why. Note that memory consumption in aligned
mode is also reduced by this patch set.
====================

Signed-off-by: Daniel Borkmann <daniel@iogearbox.net>
  • Loading branch information
Daniel Borkmann committed Sep 27, 2021
2 parents e7d5184 + e34087f commit 4c9f093
Show file tree
Hide file tree
Showing 10 changed files with 376 additions and 156 deletions.
52 changes: 25 additions & 27 deletions drivers/net/ethernet/intel/i40e/i40e_xsk.c
Original file line number Diff line number Diff line change
Expand Up @@ -193,42 +193,40 @@ bool i40e_alloc_rx_buffers_zc(struct i40e_ring *rx_ring, u16 count)
{
u16 ntu = rx_ring->next_to_use;
union i40e_rx_desc *rx_desc;
struct xdp_buff **bi, *xdp;
struct xdp_buff **xdp;
u32 nb_buffs, i;
dma_addr_t dma;
bool ok = true;

rx_desc = I40E_RX_DESC(rx_ring, ntu);
bi = i40e_rx_bi(rx_ring, ntu);
do {
xdp = xsk_buff_alloc(rx_ring->xsk_pool);
if (!xdp) {
ok = false;
goto no_buffers;
}
*bi = xdp;
dma = xsk_buff_xdp_get_dma(xdp);
xdp = i40e_rx_bi(rx_ring, ntu);

nb_buffs = min_t(u16, count, rx_ring->count - ntu);
nb_buffs = xsk_buff_alloc_batch(rx_ring->xsk_pool, xdp, nb_buffs);
if (!nb_buffs)
return false;

i = nb_buffs;
while (i--) {
dma = xsk_buff_xdp_get_dma(*xdp);
rx_desc->read.pkt_addr = cpu_to_le64(dma);
rx_desc->read.hdr_addr = 0;

rx_desc++;
bi++;
ntu++;

if (unlikely(ntu == rx_ring->count)) {
rx_desc = I40E_RX_DESC(rx_ring, 0);
bi = i40e_rx_bi(rx_ring, 0);
ntu = 0;
}
} while (--count);
xdp++;
}

no_buffers:
if (rx_ring->next_to_use != ntu) {
/* clear the status bits for the next_to_use descriptor */
rx_desc->wb.qword1.status_error_len = 0;
i40e_release_rx_desc(rx_ring, ntu);
ntu += nb_buffs;
if (ntu == rx_ring->count) {
rx_desc = I40E_RX_DESC(rx_ring, 0);
xdp = i40e_rx_bi(rx_ring, 0);
ntu = 0;
}

return ok;
/* clear the status bits for the next_to_use descriptor */
rx_desc->wb.qword1.status_error_len = 0;
i40e_release_rx_desc(rx_ring, ntu);

return count == nb_buffs ? true : false;
}

/**
Expand Down Expand Up @@ -365,7 +363,7 @@ int i40e_clean_rx_irq_zc(struct i40e_ring *rx_ring, int budget)
break;

bi = *i40e_rx_bi(rx_ring, next_to_clean);
bi->data_end = bi->data + size;
xsk_buff_set_size(bi, size);
xsk_buff_dma_sync_for_cpu(bi, rx_ring->xsk_pool);

xdp_res = i40e_run_xdp_zc(rx_ring, bi);
Expand Down
16 changes: 5 additions & 11 deletions drivers/net/ethernet/intel/ice/ice_txrx.h
Original file line number Diff line number Diff line change
Expand Up @@ -164,17 +164,10 @@ struct ice_tx_offload_params {
};

struct ice_rx_buf {
union {
struct {
dma_addr_t dma;
struct page *page;
unsigned int page_offset;
u16 pagecnt_bias;
};
struct {
struct xdp_buff *xdp;
};
};
dma_addr_t dma;
struct page *page;
unsigned int page_offset;
u16 pagecnt_bias;
};

struct ice_q_stats {
Expand Down Expand Up @@ -270,6 +263,7 @@ struct ice_ring {
union {
struct ice_tx_buf *tx_buf;
struct ice_rx_buf *rx_buf;
struct xdp_buff **xdp_buf;
};
/* CL2 - 2nd cacheline starts here */
u16 q_index; /* Queue number of ring */
Expand Down
92 changes: 43 additions & 49 deletions drivers/net/ethernet/intel/ice/ice_xsk.c
Original file line number Diff line number Diff line change
Expand Up @@ -364,45 +364,39 @@ bool ice_alloc_rx_bufs_zc(struct ice_ring *rx_ring, u16 count)
{
union ice_32b_rx_flex_desc *rx_desc;
u16 ntu = rx_ring->next_to_use;
struct ice_rx_buf *rx_buf;
bool ok = true;
struct xdp_buff **xdp;
u32 nb_buffs, i;
dma_addr_t dma;

if (!count)
return true;

rx_desc = ICE_RX_DESC(rx_ring, ntu);
rx_buf = &rx_ring->rx_buf[ntu];
xdp = &rx_ring->xdp_buf[ntu];

do {
rx_buf->xdp = xsk_buff_alloc(rx_ring->xsk_pool);
if (!rx_buf->xdp) {
ok = false;
break;
}
nb_buffs = min_t(u16, count, rx_ring->count - ntu);
nb_buffs = xsk_buff_alloc_batch(rx_ring->xsk_pool, xdp, nb_buffs);
if (!nb_buffs)
return false;

dma = xsk_buff_xdp_get_dma(rx_buf->xdp);
i = nb_buffs;
while (i--) {
dma = xsk_buff_xdp_get_dma(*xdp);
rx_desc->read.pkt_addr = cpu_to_le64(dma);
rx_desc->wb.status_error0 = 0;

rx_desc++;
rx_buf++;
ntu++;

if (unlikely(ntu == rx_ring->count)) {
rx_desc = ICE_RX_DESC(rx_ring, 0);
rx_buf = rx_ring->rx_buf;
ntu = 0;
}
} while (--count);
xdp++;
}

if (rx_ring->next_to_use != ntu) {
/* clear the status bits for the next_to_use descriptor */
rx_desc->wb.status_error0 = 0;
ice_release_rx_desc(rx_ring, ntu);
ntu += nb_buffs;
if (ntu == rx_ring->count) {
rx_desc = ICE_RX_DESC(rx_ring, 0);
xdp = rx_ring->xdp_buf;
ntu = 0;
}

return ok;
/* clear the status bits for the next_to_use descriptor */
rx_desc->wb.status_error0 = 0;
ice_release_rx_desc(rx_ring, ntu);

return count == nb_buffs ? true : false;
}

/**
Expand All @@ -421,33 +415,33 @@ static void ice_bump_ntc(struct ice_ring *rx_ring)
/**
* ice_construct_skb_zc - Create an sk_buff from zero-copy buffer
* @rx_ring: Rx ring
* @rx_buf: zero-copy Rx buffer
* @xdp_arr: Pointer to the SW ring of xdp_buff pointers
*
* This function allocates a new skb from a zero-copy Rx buffer.
*
* Returns the skb on success, NULL on failure.
*/
static struct sk_buff *
ice_construct_skb_zc(struct ice_ring *rx_ring, struct ice_rx_buf *rx_buf)
ice_construct_skb_zc(struct ice_ring *rx_ring, struct xdp_buff **xdp_arr)
{
unsigned int metasize = rx_buf->xdp->data - rx_buf->xdp->data_meta;
unsigned int datasize = rx_buf->xdp->data_end - rx_buf->xdp->data;
unsigned int datasize_hard = rx_buf->xdp->data_end -
rx_buf->xdp->data_hard_start;
struct xdp_buff *xdp = *xdp_arr;
unsigned int metasize = xdp->data - xdp->data_meta;
unsigned int datasize = xdp->data_end - xdp->data;
unsigned int datasize_hard = xdp->data_end - xdp->data_hard_start;
struct sk_buff *skb;

skb = __napi_alloc_skb(&rx_ring->q_vector->napi, datasize_hard,
GFP_ATOMIC | __GFP_NOWARN);
if (unlikely(!skb))
return NULL;

skb_reserve(skb, rx_buf->xdp->data - rx_buf->xdp->data_hard_start);
memcpy(__skb_put(skb, datasize), rx_buf->xdp->data, datasize);
skb_reserve(skb, xdp->data - xdp->data_hard_start);
memcpy(__skb_put(skb, datasize), xdp->data, datasize);
if (metasize)
skb_metadata_set(skb, metasize);

xsk_buff_free(rx_buf->xdp);
rx_buf->xdp = NULL;
xsk_buff_free(xdp);
*xdp_arr = NULL;
return skb;
}

Expand Down Expand Up @@ -521,7 +515,7 @@ int ice_clean_rx_irq_zc(struct ice_ring *rx_ring, int budget)
while (likely(total_rx_packets < (unsigned int)budget)) {
union ice_32b_rx_flex_desc *rx_desc;
unsigned int size, xdp_res = 0;
struct ice_rx_buf *rx_buf;
struct xdp_buff **xdp;
struct sk_buff *skb;
u16 stat_err_bits;
u16 vlan_tag = 0;
Expand All @@ -544,18 +538,18 @@ int ice_clean_rx_irq_zc(struct ice_ring *rx_ring, int budget)
if (!size)
break;

rx_buf = &rx_ring->rx_buf[rx_ring->next_to_clean];
rx_buf->xdp->data_end = rx_buf->xdp->data + size;
xsk_buff_dma_sync_for_cpu(rx_buf->xdp, rx_ring->xsk_pool);
xdp = &rx_ring->xdp_buf[rx_ring->next_to_clean];
xsk_buff_set_size(*xdp, size);
xsk_buff_dma_sync_for_cpu(*xdp, rx_ring->xsk_pool);

xdp_res = ice_run_xdp_zc(rx_ring, rx_buf->xdp);
xdp_res = ice_run_xdp_zc(rx_ring, *xdp);
if (xdp_res) {
if (xdp_res & (ICE_XDP_TX | ICE_XDP_REDIR))
xdp_xmit |= xdp_res;
else
xsk_buff_free(rx_buf->xdp);
xsk_buff_free(*xdp);

rx_buf->xdp = NULL;
*xdp = NULL;
total_rx_bytes += size;
total_rx_packets++;
cleaned_count++;
Expand All @@ -565,7 +559,7 @@ int ice_clean_rx_irq_zc(struct ice_ring *rx_ring, int budget)
}

/* XDP_PASS path */
skb = ice_construct_skb_zc(rx_ring, rx_buf);
skb = ice_construct_skb_zc(rx_ring, xdp);
if (!skb) {
rx_ring->rx_stats.alloc_buf_failed++;
break;
Expand Down Expand Up @@ -813,12 +807,12 @@ void ice_xsk_clean_rx_ring(struct ice_ring *rx_ring)
u16 i;

for (i = 0; i < rx_ring->count; i++) {
struct ice_rx_buf *rx_buf = &rx_ring->rx_buf[i];
struct xdp_buff **xdp = &rx_ring->xdp_buf[i];

if (!rx_buf->xdp)
if (!xdp)
continue;

rx_buf->xdp = NULL;
*xdp = NULL;
}
}

Expand Down
22 changes: 22 additions & 0 deletions include/net/xdp_sock_drv.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,12 @@ static inline struct xdp_buff *xsk_buff_alloc(struct xsk_buff_pool *pool)
return xp_alloc(pool);
}

/* Returns as many entries as possible up to max. 0 <= N <= max. */
static inline u32 xsk_buff_alloc_batch(struct xsk_buff_pool *pool, struct xdp_buff **xdp, u32 max)
{
return xp_alloc_batch(pool, xdp, max);
}

static inline bool xsk_buff_can_alloc(struct xsk_buff_pool *pool, u32 count)
{
return xp_can_alloc(pool, count);
Expand All @@ -89,6 +95,13 @@ static inline void xsk_buff_free(struct xdp_buff *xdp)
xp_free(xskb);
}

static inline void xsk_buff_set_size(struct xdp_buff *xdp, u32 size)
{
xdp->data = xdp->data_hard_start + XDP_PACKET_HEADROOM;
xdp->data_meta = xdp->data;
xdp->data_end = xdp->data + size;
}

static inline dma_addr_t xsk_buff_raw_get_dma(struct xsk_buff_pool *pool,
u64 addr)
{
Expand Down Expand Up @@ -212,6 +225,11 @@ static inline struct xdp_buff *xsk_buff_alloc(struct xsk_buff_pool *pool)
return NULL;
}

static inline u32 xsk_buff_alloc_batch(struct xsk_buff_pool *pool, struct xdp_buff **xdp, u32 max)
{
return 0;
}

static inline bool xsk_buff_can_alloc(struct xsk_buff_pool *pool, u32 count)
{
return false;
Expand All @@ -221,6 +239,10 @@ static inline void xsk_buff_free(struct xdp_buff *xdp)
{
}

static inline void xsk_buff_set_size(struct xdp_buff *xdp, u32 size)
{
}

static inline dma_addr_t xsk_buff_raw_get_dma(struct xsk_buff_pool *pool,
u64 addr)
{
Expand Down
Loading

0 comments on commit 4c9f093

Please sign in to comment.