Skip to content

Commit

Permalink
xsk: Build skb by page (aka generic zerocopy xmit)
Browse files Browse the repository at this point in the history
This patch is used to construct skb based on page to save memory copy
overhead.

This function is implemented based on IFF_TX_SKB_NO_LINEAR. Only the
network card priv_flags supports IFF_TX_SKB_NO_LINEAR will use page to
directly construct skb. If this feature is not supported, it is still
necessary to copy data to construct skb.

  ---------------- Performance Testing ------------

  The test environment is Aliyun ECS server.

  Test cmd:
  ```
  xdpsock -i eth0 -t  -S -s <msg size>
  ```

  Test result data:

  size    64      512     1024    1500
  copy    1916747 1775988 1600203 1440054
  page    1974058 1953655 1945463 1904478
  percent 3.0%    10.0%   21.58%  32.3%

Signed-off-by: Xuan Zhuo <xuanzhuo@linux.alibaba.com>
Signed-off-by: Alexander Lobakin <alobakin@pm.me>
Signed-off-by: Daniel Borkmann <daniel@iogearbox.net>
Reviewed-by: Dust Li <dust.li@linux.alibaba.com>
Acked-by: Magnus Karlsson <magnus.karlsson@intel.com>
Acked-by: John Fastabend <john.fastabend@gmail.com>
Link: https://lore.kernel.org/bpf/20210218204908.5455-6-alobakin@pm.me
  • Loading branch information
Xuan Zhuo authored and Daniel Borkmann committed Feb 24, 2021
1 parent 3914d88 commit 9c8f21e
Showing 1 changed file with 96 additions and 24 deletions.
120 changes: 96 additions & 24 deletions net/xdp/xsk.c
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,97 @@ static void xsk_destruct_skb(struct sk_buff *skb)
sock_wfree(skb);
}

static struct sk_buff *xsk_build_skb_zerocopy(struct xdp_sock *xs,
struct xdp_desc *desc)
{
struct xsk_buff_pool *pool = xs->pool;
u32 hr, len, ts, offset, copy, copied;
struct sk_buff *skb;
struct page *page;
void *buffer;
int err, i;
u64 addr;

hr = max(NET_SKB_PAD, L1_CACHE_ALIGN(xs->dev->needed_headroom));

skb = sock_alloc_send_skb(&xs->sk, hr, 1, &err);
if (unlikely(!skb))
return ERR_PTR(err);

skb_reserve(skb, hr);

addr = desc->addr;
len = desc->len;
ts = pool->unaligned ? len : pool->chunk_size;

buffer = xsk_buff_raw_get_data(pool, addr);
offset = offset_in_page(buffer);
addr = buffer - pool->addrs;

for (copied = 0, i = 0; copied < len; i++) {
page = pool->umem->pgs[addr >> PAGE_SHIFT];
get_page(page);

copy = min_t(u32, PAGE_SIZE - offset, len - copied);
skb_fill_page_desc(skb, i, page, offset, copy);

copied += copy;
addr += copy;
offset = 0;
}

skb->len += len;
skb->data_len += len;
skb->truesize += ts;

refcount_add(ts, &xs->sk.sk_wmem_alloc);

return skb;
}

static struct sk_buff *xsk_build_skb(struct xdp_sock *xs,
struct xdp_desc *desc)
{
struct net_device *dev = xs->dev;
struct sk_buff *skb;

if (dev->priv_flags & IFF_TX_SKB_NO_LINEAR) {
skb = xsk_build_skb_zerocopy(xs, desc);
if (IS_ERR(skb))
return skb;
} else {
u32 hr, tr, len;
void *buffer;
int err;

hr = max(NET_SKB_PAD, L1_CACHE_ALIGN(dev->needed_headroom));
tr = dev->needed_tailroom;
len = desc->len;

skb = sock_alloc_send_skb(&xs->sk, hr + len + tr, 1, &err);
if (unlikely(!skb))
return ERR_PTR(err);

skb_reserve(skb, hr);
skb_put(skb, len);

buffer = xsk_buff_raw_get_data(xs->pool, desc->addr);
err = skb_store_bits(skb, 0, buffer, len);
if (unlikely(err)) {
kfree_skb(skb);
return ERR_PTR(err);
}
}

skb->dev = dev;
skb->priority = xs->sk.sk_priority;
skb->mark = xs->sk.sk_mark;
skb_shinfo(skb)->destructor_arg = (void *)(long)desc->addr;
skb->destructor = xsk_destruct_skb;

return skb;
}

static int xsk_generic_xmit(struct sock *sk)
{
struct xdp_sock *xs = xdp_sk(sk);
Expand All @@ -454,56 +545,37 @@ static int xsk_generic_xmit(struct sock *sk)
struct sk_buff *skb;
unsigned long flags;
int err = 0;
u32 hr, tr;

mutex_lock(&xs->mutex);

if (xs->queue_id >= xs->dev->real_num_tx_queues)
goto out;

hr = max(NET_SKB_PAD, L1_CACHE_ALIGN(xs->dev->needed_headroom));
tr = xs->dev->needed_tailroom;

while (xskq_cons_peek_desc(xs->tx, &desc, xs->pool)) {
char *buffer;
u64 addr;
u32 len;

if (max_batch-- == 0) {
err = -EAGAIN;
goto out;
}

len = desc.len;
skb = sock_alloc_send_skb(sk, hr + len + tr, 1, &err);
if (unlikely(!skb))
skb = xsk_build_skb(xs, &desc);
if (IS_ERR(skb)) {
err = PTR_ERR(skb);
goto out;
}

skb_reserve(skb, hr);
skb_put(skb, len);

addr = desc.addr;
buffer = xsk_buff_raw_get_data(xs->pool, addr);
err = skb_store_bits(skb, 0, buffer, len);
/* This is the backpressure mechanism for the Tx path.
* Reserve space in the completion queue and only proceed
* if there is space in it. This avoids having to implement
* any buffering in the Tx path.
*/
spin_lock_irqsave(&xs->pool->cq_lock, flags);
if (unlikely(err) || xskq_prod_reserve(xs->pool->cq)) {
if (xskq_prod_reserve(xs->pool->cq)) {
spin_unlock_irqrestore(&xs->pool->cq_lock, flags);
kfree_skb(skb);
goto out;
}
spin_unlock_irqrestore(&xs->pool->cq_lock, flags);

skb->dev = xs->dev;
skb->priority = sk->sk_priority;
skb->mark = sk->sk_mark;
skb_shinfo(skb)->destructor_arg = (void *)(long)desc.addr;
skb->destructor = xsk_destruct_skb;

err = __dev_direct_xmit(skb, xs->queue_id);
if (err == NETDEV_TX_BUSY) {
/* Tell user-space to retry the send */
Expand Down

0 comments on commit 9c8f21e

Please sign in to comment.