Skip to content

Commit

Permalink
Merge branch 'sockmap: introduce BPF_SK_SKB_VERDICT and support UDP'
Browse files Browse the repository at this point in the history
Cong Wang says:

====================

From: Cong Wang <cong.wang@bytedance.com>

We have thousands of services connected to a daemon on every host
via AF_UNIX dgram sockets, after they are moved into VM, we have to
add a proxy to forward these communications from VM to host, because
rewriting thousands of them is not practical. This proxy uses an
AF_UNIX socket connected to services and a UDP socket to connect to
the host. It is inefficient because data is copied between kernel
space and user space twice, and we can not use splice() which only
supports TCP. Therefore, we want to use sockmap to do the splicing
without going to user-space at all (after the initial setup).

Currently sockmap only fully supports TCP, UDP is partially supported
as it is only allowed to add into sockmap. This patchset, as the second
part of the original large patchset, extends sockmap with:
1) cross-protocol support with BPF_SK_SKB_VERDICT; 2) full UDP support.

On the high level, ->read_sock() is required for each protocol to support
sockmap redirection, and in order to do sock proto update, a new ops
->psock_update_sk_prot() is introduced, which is also required. And the
BPF ->recvmsg() is also needed to replace the original ->recvmsg() to
retrieve skmsg. To make life easier, we have to get rid of lock_sock()
in sk_psock_handle_skb(), otherwise we would have to implement
->sendmsg_locked() on top of ->sendmsg(), which is ugly.

Please see each patch for more details.

To see the big picture, the original patchset is available here:
https://github.com/congwang/linux/tree/sockmap
this patchset is also available:
https://github.com/congwang/linux/tree/sockmap2
---
v8: get rid of 'offset' in udp_read_sock()
    add checks for skb_verdict/stream_verdict conflict
    add two cleanup patches for sock_map_link()
    add a new test case

v7: use work_mutex to protect psock->work
    return err in udp_read_sock()
    add patch 6/13
    clean up test case

v6: get rid of sk_psock_zap_ingress()
    add rcu work patch

v5: use INDIRECT_CALL_2() for function pointers
    use ingress_lock to fix a race condition found by Jacub
    rename two helper functions

v4: get rid of lock_sock() in sk_psock_handle_skb()
    get rid of udp_sendmsg_locked()
    remove an empty line
    update cover letter

v3: export tcp/udp_update_proto()
    rename sk->sk_prot->psock_update_sk_prot()
    improve changelogs

v2: separate from the original large patchset
    rebase to the latest bpf-next
    split UDP test case
    move inet_csk_has_ulp() check to tcp_bpf.c
    clean up udp_read_sock()
====================

Signed-off-by: Alexei Starovoitov <ast@kernel.org>
  • Loading branch information
Alexei Starovoitov committed Apr 1, 2021
2 parents e27bfef + 8d7cb74 commit 89d69c5
Show file tree
Hide file tree
Showing 26 changed files with 677 additions and 237 deletions.
1 change: 1 addition & 0 deletions include/linux/skbuff.h
Original file line number Diff line number Diff line change
Expand Up @@ -3626,6 +3626,7 @@ int skb_splice_bits(struct sk_buff *skb, struct sock *sk, unsigned int offset,
unsigned int flags);
int skb_send_sock_locked(struct sock *sk, struct sk_buff *skb, int offset,
int len);
int skb_send_sock(struct sock *sk, struct sk_buff *skb, int offset, int len);
void skb_copy_and_csum_dev(const struct sk_buff *skb, u8 *to);
unsigned int skb_zerocopy_headlen(const struct sk_buff *from);
int skb_zerocopy(struct sk_buff *to, struct sk_buff *from,
Expand Down
77 changes: 58 additions & 19 deletions include/linux/skmsg.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ struct sk_psock_progs {
struct bpf_prog *msg_parser;
struct bpf_prog *stream_parser;
struct bpf_prog *stream_verdict;
struct bpf_prog *skb_verdict;
};

enum sk_psock_state_bits {
Expand Down Expand Up @@ -89,6 +90,7 @@ struct sk_psock {
#endif
struct sk_buff_head ingress_skb;
struct list_head ingress_msg;
spinlock_t ingress_lock;
unsigned long state;
struct list_head link;
spinlock_t link_lock;
Expand All @@ -97,13 +99,12 @@ struct sk_psock {
void (*saved_close)(struct sock *sk, long timeout);
void (*saved_write_space)(struct sock *sk);
void (*saved_data_ready)(struct sock *sk);
int (*psock_update_sk_prot)(struct sock *sk, bool restore);
struct proto *sk_proto;
struct mutex work_mutex;
struct sk_psock_work_state work_state;
struct work_struct work;
union {
struct rcu_head rcu;
struct work_struct gc;
};
struct rcu_work rwork;
};

int sk_msg_alloc(struct sock *sk, struct sk_msg *msg, int len,
Expand All @@ -124,6 +125,10 @@ int sk_msg_zerocopy_from_iter(struct sock *sk, struct iov_iter *from,
struct sk_msg *msg, u32 bytes);
int sk_msg_memcopy_from_iter(struct sock *sk, struct iov_iter *from,
struct sk_msg *msg, u32 bytes);
int sk_msg_wait_data(struct sock *sk, struct sk_psock *psock, int flags,
long timeo, int *err);
int sk_msg_recvmsg(struct sock *sk, struct sk_psock *psock, struct msghdr *msg,
int len, int flags);

static inline void sk_msg_check_to_free(struct sk_msg *msg, u32 i, u32 bytes)
{
Expand Down Expand Up @@ -284,14 +289,59 @@ static inline struct sk_psock *sk_psock(const struct sock *sk)
static inline void sk_psock_queue_msg(struct sk_psock *psock,
struct sk_msg *msg)
{
spin_lock_bh(&psock->ingress_lock);
list_add_tail(&msg->list, &psock->ingress_msg);
spin_unlock_bh(&psock->ingress_lock);
}

static inline struct sk_msg *sk_psock_dequeue_msg(struct sk_psock *psock)
{
struct sk_msg *msg;

spin_lock_bh(&psock->ingress_lock);
msg = list_first_entry_or_null(&psock->ingress_msg, struct sk_msg, list);
if (msg)
list_del(&msg->list);
spin_unlock_bh(&psock->ingress_lock);
return msg;
}

static inline struct sk_msg *sk_psock_peek_msg(struct sk_psock *psock)
{
struct sk_msg *msg;

spin_lock_bh(&psock->ingress_lock);
msg = list_first_entry_or_null(&psock->ingress_msg, struct sk_msg, list);
spin_unlock_bh(&psock->ingress_lock);
return msg;
}

static inline struct sk_msg *sk_psock_next_msg(struct sk_psock *psock,
struct sk_msg *msg)
{
struct sk_msg *ret;

spin_lock_bh(&psock->ingress_lock);
if (list_is_last(&msg->list, &psock->ingress_msg))
ret = NULL;
else
ret = list_next_entry(msg, list);
spin_unlock_bh(&psock->ingress_lock);
return ret;
}

static inline bool sk_psock_queue_empty(const struct sk_psock *psock)
{
return psock ? list_empty(&psock->ingress_msg) : true;
}

static inline void kfree_sk_msg(struct sk_msg *msg)
{
if (msg->skb)
consume_skb(msg->skb);
kfree(msg);
}

static inline void sk_psock_report_error(struct sk_psock *psock, int err)
{
struct sock *sk = psock->sk;
Expand All @@ -301,6 +351,7 @@ static inline void sk_psock_report_error(struct sk_psock *psock, int err)
}

struct sk_psock *sk_psock_init(struct sock *sk, int node);
void sk_psock_stop(struct sk_psock *psock, bool wait);

#if IS_ENABLED(CONFIG_BPF_STREAM_PARSER)
int sk_psock_init_strp(struct sock *sk, struct sk_psock *psock);
Expand Down Expand Up @@ -349,25 +400,12 @@ static inline void sk_psock_cork_free(struct sk_psock *psock)
}
}

static inline void sk_psock_update_proto(struct sock *sk,
struct sk_psock *psock,
struct proto *ops)
{
/* Pairs with lockless read in sk_clone_lock() */
WRITE_ONCE(sk->sk_prot, ops);
}

static inline void sk_psock_restore_proto(struct sock *sk,
struct sk_psock *psock)
{
sk->sk_prot->unhash = psock->saved_unhash;
if (inet_csk_has_ulp(sk)) {
tcp_update_ulp(sk, psock->sk_proto, psock->saved_write_space);
} else {
sk->sk_write_space = psock->saved_write_space;
/* Pairs with lockless read in sk_clone_lock() */
WRITE_ONCE(sk->sk_prot, psock->sk_proto);
}
if (psock->psock_update_sk_prot)
psock->psock_update_sk_prot(sk, true);
}

static inline void sk_psock_set_state(struct sk_psock *psock,
Expand Down Expand Up @@ -442,6 +480,7 @@ static inline void psock_progs_drop(struct sk_psock_progs *progs)
psock_set_prog(&progs->msg_parser, NULL);
psock_set_prog(&progs->stream_parser, NULL);
psock_set_prog(&progs->stream_verdict, NULL);
psock_set_prog(&progs->skb_verdict, NULL);
}

int sk_psock_tls_strp_read(struct sk_psock *psock, struct sk_buff *skb);
Expand Down
3 changes: 3 additions & 0 deletions include/net/sock.h
Original file line number Diff line number Diff line change
Expand Up @@ -1184,6 +1184,9 @@ struct proto {
void (*unhash)(struct sock *sk);
void (*rehash)(struct sock *sk);
int (*get_port)(struct sock *sk, unsigned short snum);
#ifdef CONFIG_BPF_SYSCALL
int (*psock_update_sk_prot)(struct sock *sk, bool restore);
#endif

/* Keeping track of sockets in use */
#ifdef CONFIG_PROC_FS
Expand Down
3 changes: 1 addition & 2 deletions include/net/tcp.h
Original file line number Diff line number Diff line change
Expand Up @@ -2203,13 +2203,12 @@ struct sk_psock;

#ifdef CONFIG_BPF_SYSCALL
struct proto *tcp_bpf_get_proto(struct sock *sk, struct sk_psock *psock);
int tcp_bpf_update_proto(struct sock *sk, bool restore);
void tcp_bpf_clone(const struct sock *sk, struct sock *newsk);
#endif /* CONFIG_BPF_SYSCALL */

int tcp_bpf_sendmsg_redir(struct sock *sk, struct sk_msg *msg, u32 bytes,
int flags);
int __tcp_bpf_recvmsg(struct sock *sk, struct sk_psock *psock,
struct msghdr *msg, int len, int flags);
#endif /* CONFIG_NET_SOCK_MSG */

#if !defined(CONFIG_BPF_SYSCALL) || !defined(CONFIG_NET_SOCK_MSG)
Expand Down
3 changes: 3 additions & 0 deletions include/net/udp.h
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,8 @@ struct sock *__udp6_lib_lookup(struct net *net,
struct sk_buff *skb);
struct sock *udp6_lib_lookup_skb(const struct sk_buff *skb,
__be16 sport, __be16 dport);
int udp_read_sock(struct sock *sk, read_descriptor_t *desc,
sk_read_actor_t recv_actor);

/* UDP uses skb->dev_scratch to cache as much information as possible and avoid
* possibly multiple cache miss on dequeue()
Expand Down Expand Up @@ -518,6 +520,7 @@ static inline struct sk_buff *udp_rcv_segment(struct sock *sk,
#ifdef CONFIG_BPF_SYSCALL
struct sk_psock;
struct proto *udp_bpf_get_proto(struct sock *sk, struct sk_psock *psock);
int udp_bpf_update_proto(struct sock *sk, bool restore);
#endif

#endif /* _UDP_H */
1 change: 1 addition & 0 deletions include/uapi/linux/bpf.h
Original file line number Diff line number Diff line change
Expand Up @@ -957,6 +957,7 @@ enum bpf_attach_type {
BPF_XDP_CPUMAP,
BPF_SK_LOOKUP,
BPF_XDP,
BPF_SK_SKB_VERDICT,
__MAX_BPF_ATTACH_TYPE
};

Expand Down
1 change: 1 addition & 0 deletions kernel/bpf/syscall.c
Original file line number Diff line number Diff line change
Expand Up @@ -2948,6 +2948,7 @@ attach_type_to_prog_type(enum bpf_attach_type attach_type)
return BPF_PROG_TYPE_SK_MSG;
case BPF_SK_SKB_STREAM_PARSER:
case BPF_SK_SKB_STREAM_VERDICT:
case BPF_SK_SKB_VERDICT:
return BPF_PROG_TYPE_SK_SKB;
case BPF_LIRC_MODE2:
return BPF_PROG_TYPE_LIRC_MODE2;
Expand Down
55 changes: 48 additions & 7 deletions net/core/skbuff.c
Original file line number Diff line number Diff line change
Expand Up @@ -2500,9 +2500,32 @@ int skb_splice_bits(struct sk_buff *skb, struct sock *sk, unsigned int offset,
}
EXPORT_SYMBOL_GPL(skb_splice_bits);

/* Send skb data on a socket. Socket must be locked. */
int skb_send_sock_locked(struct sock *sk, struct sk_buff *skb, int offset,
int len)
static int sendmsg_unlocked(struct sock *sk, struct msghdr *msg,
struct kvec *vec, size_t num, size_t size)
{
struct socket *sock = sk->sk_socket;

if (!sock)
return -EINVAL;
return kernel_sendmsg(sock, msg, vec, num, size);
}

static int sendpage_unlocked(struct sock *sk, struct page *page, int offset,
size_t size, int flags)
{
struct socket *sock = sk->sk_socket;

if (!sock)
return -EINVAL;
return kernel_sendpage(sock, page, offset, size, flags);
}

typedef int (*sendmsg_func)(struct sock *sk, struct msghdr *msg,
struct kvec *vec, size_t num, size_t size);
typedef int (*sendpage_func)(struct sock *sk, struct page *page, int offset,
size_t size, int flags);
static int __skb_send_sock(struct sock *sk, struct sk_buff *skb, int offset,
int len, sendmsg_func sendmsg, sendpage_func sendpage)
{
unsigned int orig_len = len;
struct sk_buff *head = skb;
Expand All @@ -2522,7 +2545,8 @@ int skb_send_sock_locked(struct sock *sk, struct sk_buff *skb, int offset,
memset(&msg, 0, sizeof(msg));
msg.msg_flags = MSG_DONTWAIT;

ret = kernel_sendmsg_locked(sk, &msg, &kv, 1, slen);
ret = INDIRECT_CALL_2(sendmsg, kernel_sendmsg_locked,
sendmsg_unlocked, sk, &msg, &kv, 1, slen);
if (ret <= 0)
goto error;

Expand Down Expand Up @@ -2553,9 +2577,11 @@ int skb_send_sock_locked(struct sock *sk, struct sk_buff *skb, int offset,
slen = min_t(size_t, len, skb_frag_size(frag) - offset);

while (slen) {
ret = kernel_sendpage_locked(sk, skb_frag_page(frag),
skb_frag_off(frag) + offset,
slen, MSG_DONTWAIT);
ret = INDIRECT_CALL_2(sendpage, kernel_sendpage_locked,
sendpage_unlocked, sk,
skb_frag_page(frag),
skb_frag_off(frag) + offset,
slen, MSG_DONTWAIT);
if (ret <= 0)
goto error;

Expand Down Expand Up @@ -2587,8 +2613,23 @@ int skb_send_sock_locked(struct sock *sk, struct sk_buff *skb, int offset,
error:
return orig_len == len ? ret : orig_len - len;
}

/* Send skb data on a socket. Socket must be locked. */
int skb_send_sock_locked(struct sock *sk, struct sk_buff *skb, int offset,
int len)
{
return __skb_send_sock(sk, skb, offset, len, kernel_sendmsg_locked,
kernel_sendpage_locked);
}
EXPORT_SYMBOL_GPL(skb_send_sock_locked);

/* Send skb data on a socket. Socket must be unlocked. */
int skb_send_sock(struct sock *sk, struct sk_buff *skb, int offset, int len)
{
return __skb_send_sock(sk, skb, offset, len, sendmsg_unlocked,
sendpage_unlocked);
}

/**
* skb_store_bits - store bits from kernel buffer to skb
* @skb: destination buffer
Expand Down
Loading

0 comments on commit 89d69c5

Please sign in to comment.