Skip to content

Commit

Permalink
Merge branch 'af-xdp-tx-batch'
Browse files Browse the repository at this point in the history
Magnus Karlsson says:

====================
This patch set improves the performance of mainly the Tx processing of
AF_XDP sockets. Though, patch 3 also improves the Rx path. All in all,
this patch set improves the throughput of the l2fwd xdpsock application
by around 11%. If we just take a look at Tx processing part, it is
improved by 35% to 40%.

Hopefully the new batched Tx interfaces should be of value to other
drivers implementing AF_XDP zero-copy support. But patch #3 is generic
and will improve performance of all drivers when using AF_XDP sockets
(under the premises explained in that patch).

@Daniel. In patch 3, I apply all the padding required to hinder the
adjacency prefetcher to prefetch the wrong things. After this patch
set, I will submit another patch set that introduces
____cacheline_padding_in_smp in include/linux/cache.h according to your
suggestions. The last patch in that patch set will then convert the
explicit paddings that we have now to ____cacheline_padding_in_smp.

v2 -> v3:
* Fixed #pragma warning with clang and defined a loop_unrolled_for macro
  for easier readability [lkp, Nick]
* Simplified invalid descriptor handling in xskq_cons_read_desc_batch()

v1 -> v2:
* Removed added parameter in i40e_setup_tx_descriptors and adopted a
  simpler solution [Maciej]
* Added test for !xs in xsk_tx_peek_release_desc_batch() [John]
* Simplified return path in xsk_tx_peek_release_desc_batch() [John]
* Dropped patch #1 in v1 that introduced lazy completions. Hopefully
  this is not needed when we get busy poll [Jakub]
* Iterate over local variable in xskq_prod_reserve_addr_batch() for
  improved performance
* Fixed the fallback path in xsk_tx_peek_release_desc_batch() so that
  it also produces a batch of descriptors, albeit by using the slower
  (but more general) older code. This improves the performance of the
  case when multiple sockets are sharing the same device and queue id.
====================

Signed-off-by: Daniel Borkmann <daniel@iogearbox.net>
  • Loading branch information
Daniel Borkmann committed Nov 17, 2020
2 parents de91e63 + 3106c58 commit cbf398d
Show file tree
Hide file tree
Showing 8 changed files with 258 additions and 56 deletions.
11 changes: 11 additions & 0 deletions drivers/net/ethernet/intel/i40e/i40e_txrx.c
Original file line number Diff line number Diff line change
Expand Up @@ -676,6 +676,8 @@ void i40e_free_tx_resources(struct i40e_ring *tx_ring)
i40e_clean_tx_ring(tx_ring);
kfree(tx_ring->tx_bi);
tx_ring->tx_bi = NULL;
kfree(tx_ring->xsk_descs);
tx_ring->xsk_descs = NULL;

if (tx_ring->desc) {
dma_free_coherent(tx_ring->dev, tx_ring->size,
Expand Down Expand Up @@ -1277,6 +1279,13 @@ int i40e_setup_tx_descriptors(struct i40e_ring *tx_ring)
if (!tx_ring->tx_bi)
goto err;

if (ring_is_xdp(tx_ring)) {
tx_ring->xsk_descs = kcalloc(I40E_MAX_NUM_DESCRIPTORS, sizeof(*tx_ring->xsk_descs),
GFP_KERNEL);
if (!tx_ring->xsk_descs)
goto err;
}

u64_stats_init(&tx_ring->syncp);

/* round up to nearest 4K */
Expand All @@ -1300,6 +1309,8 @@ int i40e_setup_tx_descriptors(struct i40e_ring *tx_ring)
return 0;

err:
kfree(tx_ring->xsk_descs);
tx_ring->xsk_descs = NULL;
kfree(tx_ring->tx_bi);
tx_ring->tx_bi = NULL;
return -ENOMEM;
Expand Down
1 change: 1 addition & 0 deletions drivers/net/ethernet/intel/i40e/i40e_txrx.h
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,7 @@ struct i40e_ring {
struct i40e_channel *ch;
struct xdp_rxq_info xdp_rxq;
struct xsk_buff_pool *xsk_pool;
struct xdp_desc *xsk_descs; /* For storing descriptors in the AF_XDP ZC path */
} ____cacheline_internodealigned_in_smp;

static inline bool ring_uses_build_skb(struct i40e_ring *ring)
Expand Down
123 changes: 84 additions & 39 deletions drivers/net/ethernet/intel/i40e/i40e_xsk.c
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
/* Copyright(c) 2018 Intel Corporation. */

#include <linux/bpf_trace.h>
#include <linux/stringify.h>
#include <net/xdp_sock_drv.h>
#include <net/xdp.h>

Expand Down Expand Up @@ -381,58 +382,102 @@ int i40e_clean_rx_irq_zc(struct i40e_ring *rx_ring, int budget)
return failure ? budget : (int)total_rx_packets;
}

/**
* i40e_xmit_zc - Performs zero-copy Tx AF_XDP
* @xdp_ring: XDP Tx ring
* @budget: NAPI budget
*
* Returns true if the work is finished.
**/
static bool i40e_xmit_zc(struct i40e_ring *xdp_ring, unsigned int budget)
static void i40e_xmit_pkt(struct i40e_ring *xdp_ring, struct xdp_desc *desc,
unsigned int *total_bytes)
{
unsigned int sent_frames = 0, total_bytes = 0;
struct i40e_tx_desc *tx_desc = NULL;
struct i40e_tx_buffer *tx_bi;
struct xdp_desc desc;
struct i40e_tx_desc *tx_desc;
dma_addr_t dma;

while (budget-- > 0) {
if (!xsk_tx_peek_desc(xdp_ring->xsk_pool, &desc))
break;
dma = xsk_buff_raw_get_dma(xdp_ring->xsk_pool, desc->addr);
xsk_buff_raw_dma_sync_for_device(xdp_ring->xsk_pool, dma, desc->len);

dma = xsk_buff_raw_get_dma(xdp_ring->xsk_pool, desc.addr);
xsk_buff_raw_dma_sync_for_device(xdp_ring->xsk_pool, dma,
desc.len);
tx_desc = I40E_TX_DESC(xdp_ring, xdp_ring->next_to_use++);
tx_desc->buffer_addr = cpu_to_le64(dma);
tx_desc->cmd_type_offset_bsz = build_ctob(I40E_TX_DESC_CMD_ICRC | I40E_TX_DESC_CMD_EOP,
0, desc->len, 0);

tx_bi = &xdp_ring->tx_bi[xdp_ring->next_to_use];
tx_bi->bytecount = desc.len;
*total_bytes += desc->len;
}

tx_desc = I40E_TX_DESC(xdp_ring, xdp_ring->next_to_use);
tx_desc->buffer_addr = cpu_to_le64(dma);
tx_desc->cmd_type_offset_bsz =
build_ctob(I40E_TX_DESC_CMD_ICRC
| I40E_TX_DESC_CMD_EOP,
0, desc.len, 0);
static void i40e_xmit_pkt_batch(struct i40e_ring *xdp_ring, struct xdp_desc *desc,
unsigned int *total_bytes)
{
u16 ntu = xdp_ring->next_to_use;
struct i40e_tx_desc *tx_desc;
dma_addr_t dma;
u32 i;

sent_frames++;
total_bytes += tx_bi->bytecount;
loop_unrolled_for(i = 0; i < PKTS_PER_BATCH; i++) {
dma = xsk_buff_raw_get_dma(xdp_ring->xsk_pool, desc[i].addr);
xsk_buff_raw_dma_sync_for_device(xdp_ring->xsk_pool, dma, desc[i].len);

xdp_ring->next_to_use++;
if (xdp_ring->next_to_use == xdp_ring->count)
xdp_ring->next_to_use = 0;
tx_desc = I40E_TX_DESC(xdp_ring, ntu++);
tx_desc->buffer_addr = cpu_to_le64(dma);
tx_desc->cmd_type_offset_bsz = build_ctob(I40E_TX_DESC_CMD_ICRC |
I40E_TX_DESC_CMD_EOP,
0, desc[i].len, 0);

*total_bytes += desc[i].len;
}

if (tx_desc) {
/* Request an interrupt for the last frame and bump tail ptr. */
tx_desc->cmd_type_offset_bsz |= (I40E_TX_DESC_CMD_RS <<
I40E_TXD_QW1_CMD_SHIFT);
i40e_xdp_ring_update_tail(xdp_ring);
xdp_ring->next_to_use = ntu;
}

static void i40e_fill_tx_hw_ring(struct i40e_ring *xdp_ring, struct xdp_desc *descs, u32 nb_pkts,
unsigned int *total_bytes)
{
u32 batched, leftover, i;

batched = nb_pkts & ~(PKTS_PER_BATCH - 1);
leftover = nb_pkts & (PKTS_PER_BATCH - 1);
for (i = 0; i < batched; i += PKTS_PER_BATCH)
i40e_xmit_pkt_batch(xdp_ring, &descs[i], total_bytes);
for (i = batched; i < batched + leftover; i++)
i40e_xmit_pkt(xdp_ring, &descs[i], total_bytes);
}

xsk_tx_release(xdp_ring->xsk_pool);
i40e_update_tx_stats(xdp_ring, sent_frames, total_bytes);
static void i40e_set_rs_bit(struct i40e_ring *xdp_ring)
{
u16 ntu = xdp_ring->next_to_use ? xdp_ring->next_to_use - 1 : xdp_ring->count - 1;
struct i40e_tx_desc *tx_desc;

tx_desc = I40E_TX_DESC(xdp_ring, ntu);
tx_desc->cmd_type_offset_bsz |= (I40E_TX_DESC_CMD_RS << I40E_TXD_QW1_CMD_SHIFT);
}

/**
* i40e_xmit_zc - Performs zero-copy Tx AF_XDP
* @xdp_ring: XDP Tx ring
* @budget: NAPI budget
*
* Returns true if the work is finished.
**/
static bool i40e_xmit_zc(struct i40e_ring *xdp_ring, unsigned int budget)
{
struct xdp_desc *descs = xdp_ring->xsk_descs;
u32 nb_pkts, nb_processed = 0;
unsigned int total_bytes = 0;

nb_pkts = xsk_tx_peek_release_desc_batch(xdp_ring->xsk_pool, descs, budget);
if (!nb_pkts)
return false;

if (xdp_ring->next_to_use + nb_pkts >= xdp_ring->count) {
nb_processed = xdp_ring->count - xdp_ring->next_to_use;
i40e_fill_tx_hw_ring(xdp_ring, descs, nb_processed, &total_bytes);
xdp_ring->next_to_use = 0;
}

return !!budget;
i40e_fill_tx_hw_ring(xdp_ring, &descs[nb_processed], nb_pkts - nb_processed,
&total_bytes);

/* Request an interrupt for the last frame and bump tail ptr. */
i40e_set_rs_bit(xdp_ring);
i40e_xdp_ring_update_tail(xdp_ring);

i40e_update_tx_stats(xdp_ring, nb_pkts, total_bytes);

return true;
}

/**
Expand Down
16 changes: 16 additions & 0 deletions drivers/net/ethernet/intel/i40e/i40e_xsk.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,22 @@
#ifndef _I40E_XSK_H_
#define _I40E_XSK_H_

/* This value should match the pragma in the loop_unrolled_for
* macro. Why 4? It is strictly empirical. It seems to be a good
* compromise between the advantage of having simultaneous outstanding
* reads to the DMA array that can hide each others latency and the
* disadvantage of having a larger code path.
*/
#define PKTS_PER_BATCH 4

#ifdef __clang__
#define loop_unrolled_for _Pragma("clang loop unroll_count(4)") for
#elif __GNUC__ >= 8
#define loop_unrolled_for _Pragma("GCC unroll 4") for
#else
#define loop_unrolled_for for
#endif

struct i40e_vsi;
struct xsk_buff_pool;
struct zero_copy_allocator;
Expand Down
7 changes: 7 additions & 0 deletions include/net/xdp_sock_drv.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

void xsk_tx_completed(struct xsk_buff_pool *pool, u32 nb_entries);
bool xsk_tx_peek_desc(struct xsk_buff_pool *pool, struct xdp_desc *desc);
u32 xsk_tx_peek_release_desc_batch(struct xsk_buff_pool *pool, struct xdp_desc *desc, u32 max);
void xsk_tx_release(struct xsk_buff_pool *pool);
struct xsk_buff_pool *xsk_get_pool_from_qid(struct net_device *dev,
u16 queue_id);
Expand Down Expand Up @@ -128,6 +129,12 @@ static inline bool xsk_tx_peek_desc(struct xsk_buff_pool *pool,
return false;
}

static inline u32 xsk_tx_peek_release_desc_batch(struct xsk_buff_pool *pool, struct xdp_desc *desc,
u32 max)
{
return 0;
}

static inline void xsk_tx_release(struct xsk_buff_pool *pool)
{
}
Expand Down
57 changes: 57 additions & 0 deletions net/xdp/xsk.c
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,63 @@ bool xsk_tx_peek_desc(struct xsk_buff_pool *pool, struct xdp_desc *desc)
}
EXPORT_SYMBOL(xsk_tx_peek_desc);

static u32 xsk_tx_peek_release_fallback(struct xsk_buff_pool *pool, struct xdp_desc *descs,
u32 max_entries)
{
u32 nb_pkts = 0;

while (nb_pkts < max_entries && xsk_tx_peek_desc(pool, &descs[nb_pkts]))
nb_pkts++;

xsk_tx_release(pool);
return nb_pkts;
}

u32 xsk_tx_peek_release_desc_batch(struct xsk_buff_pool *pool, struct xdp_desc *descs,
u32 max_entries)
{
struct xdp_sock *xs;
u32 nb_pkts;

rcu_read_lock();
if (!list_is_singular(&pool->xsk_tx_list)) {
/* Fallback to the non-batched version */
rcu_read_unlock();
return xsk_tx_peek_release_fallback(pool, descs, max_entries);
}

xs = list_first_or_null_rcu(&pool->xsk_tx_list, struct xdp_sock, tx_list);
if (!xs) {
nb_pkts = 0;
goto out;
}

nb_pkts = xskq_cons_peek_desc_batch(xs->tx, descs, pool, max_entries);
if (!nb_pkts) {
xs->tx->queue_empty_descs++;
goto out;
}

/* This is the backpressure mechanism for the Tx path. Try to
* reserve space in the completion queue for all packets, but
* if there are fewer slots available, just process that many
* packets. This avoids having to implement any buffering in
* the Tx path.
*/
nb_pkts = xskq_prod_reserve_addr_batch(pool->cq, descs, nb_pkts);
if (!nb_pkts)
goto out;

xskq_cons_release_n(xs->tx, nb_pkts);
__xskq_cons_release(xs->tx);
xs->sk.sk_write_space(&xs->sk);

out:
rcu_read_unlock();
return nb_pkts;
}
EXPORT_SYMBOL(xsk_tx_peek_release_desc_batch);

static int xsk_wakeup(struct xdp_sock *xs, u8 flags)
{
struct net_device *dev = xs->dev;
Expand Down
Loading

0 comments on commit cbf398d

Please sign in to comment.