Skip to content

Commit

Permalink
xsk: support for Tx
Browse files Browse the repository at this point in the history
Here, Tx support is added. The user fills the Tx queue with frames to
be sent by the kernel, and let's the kernel know using the sendmsg
syscall.

Signed-off-by: Magnus Karlsson <magnus.karlsson@intel.com>
Signed-off-by: Alexei Starovoitov <ast@kernel.org>
  • Loading branch information
Magnus Karlsson authored and Alexei Starovoitov committed May 3, 2018
1 parent 865b03f commit 35fcde7
Show file tree
Hide file tree
Showing 2 changed files with 200 additions and 4 deletions.
111 changes: 108 additions & 3 deletions net/xdp/xsk.c
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
#include "xsk_queue.h"
#include "xdp_umem.h"

#define TX_BATCH_SIZE 16

static struct xdp_sock *xdp_sk(struct sock *sk)
{
return (struct xdp_sock *)sk;
Expand Down Expand Up @@ -101,6 +103,108 @@ int xsk_generic_rcv(struct xdp_sock *xs, struct xdp_buff *xdp)
return err;
}

static void xsk_destruct_skb(struct sk_buff *skb)
{
u32 id = (u32)(long)skb_shinfo(skb)->destructor_arg;
struct xdp_sock *xs = xdp_sk(skb->sk);

WARN_ON_ONCE(xskq_produce_id(xs->umem->cq, id));

sock_wfree(skb);
}

static int xsk_generic_xmit(struct sock *sk, struct msghdr *m,
size_t total_len)
{
bool need_wait = !(m->msg_flags & MSG_DONTWAIT);
u32 max_batch = TX_BATCH_SIZE;
struct xdp_sock *xs = xdp_sk(sk);
bool sent_frame = false;
struct xdp_desc desc;
struct sk_buff *skb;
int err = 0;

if (unlikely(!xs->tx))
return -ENOBUFS;
if (need_wait)
return -EOPNOTSUPP;

mutex_lock(&xs->mutex);

while (xskq_peek_desc(xs->tx, &desc)) {
char *buffer;
u32 id, len;

if (max_batch-- == 0) {
err = -EAGAIN;
goto out;
}

if (xskq_reserve_id(xs->umem->cq)) {
err = -EAGAIN;
goto out;
}

len = desc.len;
if (unlikely(len > xs->dev->mtu)) {
err = -EMSGSIZE;
goto out;
}

skb = sock_alloc_send_skb(sk, len, !need_wait, &err);
if (unlikely(!skb)) {
err = -EAGAIN;
goto out;
}

skb_put(skb, len);
id = desc.idx;
buffer = xdp_umem_get_data(xs->umem, id) + desc.offset;
err = skb_store_bits(skb, 0, buffer, len);
if (unlikely(err)) {
kfree_skb(skb);
goto out;
}

skb->dev = xs->dev;
skb->priority = sk->sk_priority;
skb->mark = sk->sk_mark;
skb_shinfo(skb)->destructor_arg = (void *)(long)id;
skb->destructor = xsk_destruct_skb;

err = dev_direct_xmit(skb, xs->queue_id);
/* Ignore NET_XMIT_CN as packet might have been sent */
if (err == NET_XMIT_DROP || err == NETDEV_TX_BUSY) {
err = -EAGAIN;
/* SKB consumed by dev_direct_xmit() */
goto out;
}

sent_frame = true;
xskq_discard_desc(xs->tx);
}

out:
if (sent_frame)
sk->sk_write_space(sk);

mutex_unlock(&xs->mutex);
return err;
}

static int xsk_sendmsg(struct socket *sock, struct msghdr *m, size_t total_len)
{
struct sock *sk = sock->sk;
struct xdp_sock *xs = xdp_sk(sk);

if (unlikely(!xs->dev))
return -ENXIO;
if (unlikely(!(xs->dev->flags & IFF_UP)))
return -ENETDOWN;

return xsk_generic_xmit(sk, m, total_len);
}

static unsigned int xsk_poll(struct file *file, struct socket *sock,
struct poll_table_struct *wait)
{
Expand All @@ -110,6 +214,8 @@ static unsigned int xsk_poll(struct file *file, struct socket *sock,

if (xs->rx && !xskq_empty_desc(xs->rx))
mask |= POLLIN | POLLRDNORM;
if (xs->tx && !xskq_full_desc(xs->tx))
mask |= POLLOUT | POLLWRNORM;

return mask;
}
Expand Down Expand Up @@ -270,6 +376,7 @@ static int xsk_bind(struct socket *sock, struct sockaddr *addr, int addr_len)
xs->queue_id = sxdp->sxdp_queue_id;

xskq_set_umem(xs->rx, &xs->umem->props);
xskq_set_umem(xs->tx, &xs->umem->props);

out_unlock:
if (err)
Expand Down Expand Up @@ -383,8 +490,6 @@ static int xsk_mmap(struct file *file, struct socket *sock,
q = xs->umem->fq;
else if (offset == XDP_UMEM_PGOFF_COMPLETION_RING)
q = xs->umem->cq;
else
return -EINVAL;
}

if (!q)
Expand Down Expand Up @@ -420,7 +525,7 @@ static const struct proto_ops xsk_proto_ops = {
.shutdown = sock_no_shutdown,
.setsockopt = xsk_setsockopt,
.getsockopt = sock_no_getsockopt,
.sendmsg = sock_no_sendmsg,
.sendmsg = xsk_sendmsg,
.recvmsg = sock_no_recvmsg,
.mmap = xsk_mmap,
.sendpage = sock_no_sendpage,
Expand Down
93 changes: 92 additions & 1 deletion net/xdp/xsk_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,93 @@ static inline void xskq_discard_id(struct xsk_queue *q)
(void)xskq_validate_id(q);
}

/* Rx queue */
static inline int xskq_produce_id(struct xsk_queue *q, u32 id)
{
struct xdp_umem_ring *ring = (struct xdp_umem_ring *)q->ring;

ring->desc[q->prod_tail++ & q->ring_mask] = id;

/* Order producer and data */
smp_wmb();

WRITE_ONCE(q->ring->producer, q->prod_tail);
return 0;
}

static inline int xskq_reserve_id(struct xsk_queue *q)
{
if (xskq_nb_free(q, q->prod_head, 1) == 0)
return -ENOSPC;

q->prod_head++;
return 0;
}

/* Rx/Tx queue */

static inline bool xskq_is_valid_desc(struct xsk_queue *q, struct xdp_desc *d)
{
u32 buff_len;

if (unlikely(d->idx >= q->umem_props.nframes)) {
q->invalid_descs++;
return false;
}

buff_len = q->umem_props.frame_size;
if (unlikely(d->len > buff_len || d->len == 0 ||
d->offset > buff_len || d->offset + d->len > buff_len)) {
q->invalid_descs++;
return false;
}

return true;
}

static inline struct xdp_desc *xskq_validate_desc(struct xsk_queue *q,
struct xdp_desc *desc)
{
while (q->cons_tail != q->cons_head) {
struct xdp_rxtx_ring *ring = (struct xdp_rxtx_ring *)q->ring;
unsigned int idx = q->cons_tail & q->ring_mask;

if (xskq_is_valid_desc(q, &ring->desc[idx])) {
if (desc)
*desc = ring->desc[idx];
return desc;
}

q->cons_tail++;
}

return NULL;
}

static inline struct xdp_desc *xskq_peek_desc(struct xsk_queue *q,
struct xdp_desc *desc)
{
struct xdp_rxtx_ring *ring;

if (q->cons_tail == q->cons_head) {
WRITE_ONCE(q->ring->consumer, q->cons_tail);
q->cons_head = q->cons_tail + xskq_nb_avail(q, RX_BATCH_SIZE);

/* Order consumer and data */
smp_rmb();

return xskq_validate_desc(q, desc);
}

ring = (struct xdp_rxtx_ring *)q->ring;
*desc = ring->desc[q->cons_tail & q->ring_mask];
return desc;
}

static inline void xskq_discard_desc(struct xsk_queue *q)
{
q->cons_tail++;
(void)xskq_validate_desc(q, NULL);
}

static inline int xskq_produce_batch_desc(struct xsk_queue *q,
u32 id, u32 len, u16 offset)
Expand Down Expand Up @@ -139,6 +225,11 @@ static inline void xskq_produce_flush_desc(struct xsk_queue *q)
WRITE_ONCE(q->ring->producer, q->prod_tail);
}

static inline bool xskq_full_desc(struct xsk_queue *q)
{
return (xskq_nb_avail(q, q->nentries) == q->nentries);
}

static inline bool xskq_empty_desc(struct xsk_queue *q)
{
return (xskq_nb_free(q, q->prod_tail, 1) == q->nentries);
Expand Down

0 comments on commit 35fcde7

Please sign in to comment.