Skip to content

Commit

Permalink
Merge branch 'mptcp-improve-multiple-xmit-streams-support'
Browse files Browse the repository at this point in the history
Paolo Abeni says:

====================
mptcp: improve multiple xmit streams support

This series improves MPTCP handling of multiple concurrent
xmit streams.

The to-be-transmitted data is enqueued to a subflow only when
the send window is open, keeping the subflows xmit queue shorter
and allowing for faster switch-over.

The above requires a more accurate msk socket state tracking
and some additional infrastructure to allow pushing the data
pending in the msk xmit queue as soon as the MPTCP's send window
opens (patches 6-10).

As a side effect, the MPTCP socket could enqueue data to subflows
after close() time - to completely spooling the data sitting in the
msk xmit queue. Dealing with the requires some infrastructure and
core TCP changes (patches 1-5)

Finally, patches 11-12 introduce a more accurate tracking of the other
end's receive window.

Overall this refactor the MPTCP xmit path, without introducing
new features - the new code is covered by the existing self-tests.

v2 -> v3:
 - rebased,
 - fixed checkpatch issue in patch 1/13
 - fixed some state tracking issues in patch 8/13

v1 -> v2:
 - this is just a repost, to cope with patchwork issues, no changes
   at all
====================

Link: https://lore.kernel.org/r/cover.1605458224.git.pabeni@redhat.com
Signed-off-by: Jakub Kicinski <kuba@kernel.org>
  • Loading branch information
Jakub Kicinski committed Nov 16, 2020
2 parents c0a645a + 7ed9080 commit 72308ec
Show file tree
Hide file tree
Showing 8 changed files with 776 additions and 498 deletions.
4 changes: 4 additions & 0 deletions include/net/tcp.h
Original file line number Diff line number Diff line change
Expand Up @@ -322,13 +322,16 @@ void tcp_shutdown(struct sock *sk, int how);
int tcp_v4_early_demux(struct sk_buff *skb);
int tcp_v4_rcv(struct sk_buff *skb);

void tcp_remove_empty_skb(struct sock *sk, struct sk_buff *skb);
int tcp_v4_tw_remember_stamp(struct inet_timewait_sock *tw);
int tcp_sendmsg(struct sock *sk, struct msghdr *msg, size_t size);
int tcp_sendmsg_locked(struct sock *sk, struct msghdr *msg, size_t size);
int tcp_sendpage(struct sock *sk, struct page *page, int offset, size_t size,
int flags);
int tcp_sendpage_locked(struct sock *sk, struct page *page, int offset,
size_t size, int flags);
struct sk_buff *tcp_build_frag(struct sock *sk, int size_goal, int flags,
struct page *page, int offset, size_t *size);
ssize_t do_tcp_sendpages(struct sock *sk, struct page *page, int offset,
size_t size, int flags);
int tcp_send_mss(struct sock *sk, int *size_goal, int flags);
Expand Down Expand Up @@ -392,6 +395,7 @@ void tcp_update_metrics(struct sock *sk);
void tcp_init_metrics(struct sock *sk);
void tcp_metrics_init(void);
bool tcp_peer_is_proven(struct request_sock *req, struct dst_entry *dst);
void __tcp_close(struct sock *sk, long timeout);
void tcp_close(struct sock *sk, long timeout);
void tcp_init_sock(struct sock *sk);
void tcp_init_transfer(struct sock *sk, int bpf_op, struct sk_buff *skb);
Expand Down
128 changes: 74 additions & 54 deletions net/ipv4/tcp.c
Original file line number Diff line number Diff line change
Expand Up @@ -954,7 +954,7 @@ int tcp_send_mss(struct sock *sk, int *size_goal, int flags)
* importantly be able to generate EPOLLOUT for Edge Trigger epoll()
* users.
*/
static void tcp_remove_empty_skb(struct sock *sk, struct sk_buff *skb)
void tcp_remove_empty_skb(struct sock *sk, struct sk_buff *skb)
{
if (skb && !skb->len) {
tcp_unlink_write_queue(skb, sk);
Expand All @@ -964,6 +964,68 @@ static void tcp_remove_empty_skb(struct sock *sk, struct sk_buff *skb)
}
}

struct sk_buff *tcp_build_frag(struct sock *sk, int size_goal, int flags,
struct page *page, int offset, size_t *size)
{
struct sk_buff *skb = tcp_write_queue_tail(sk);
struct tcp_sock *tp = tcp_sk(sk);
bool can_coalesce;
int copy, i;

if (!skb || (copy = size_goal - skb->len) <= 0 ||
!tcp_skb_can_collapse_to(skb)) {
new_segment:
if (!sk_stream_memory_free(sk))
return NULL;

skb = sk_stream_alloc_skb(sk, 0, sk->sk_allocation,
tcp_rtx_and_write_queues_empty(sk));
if (!skb)
return NULL;

#ifdef CONFIG_TLS_DEVICE
skb->decrypted = !!(flags & MSG_SENDPAGE_DECRYPTED);
#endif
skb_entail(sk, skb);
copy = size_goal;
}

if (copy > *size)
copy = *size;

i = skb_shinfo(skb)->nr_frags;
can_coalesce = skb_can_coalesce(skb, i, page, offset);
if (!can_coalesce && i >= sysctl_max_skb_frags) {
tcp_mark_push(tp, skb);
goto new_segment;
}
if (!sk_wmem_schedule(sk, copy))
return NULL;

if (can_coalesce) {
skb_frag_size_add(&skb_shinfo(skb)->frags[i - 1], copy);
} else {
get_page(page);
skb_fill_page_desc(skb, i, page, offset, copy);
}

if (!(flags & MSG_NO_SHARED_FRAGS))
skb_shinfo(skb)->tx_flags |= SKBTX_SHARED_FRAG;

skb->len += copy;
skb->data_len += copy;
skb->truesize += copy;
sk_wmem_queued_add(sk, copy);
sk_mem_charge(sk, copy);
skb->ip_summed = CHECKSUM_PARTIAL;
WRITE_ONCE(tp->write_seq, tp->write_seq + copy);
TCP_SKB_CB(skb)->end_seq += copy;
tcp_skb_pcount_set(skb, 0);

*size = copy;
return skb;
}

ssize_t do_tcp_sendpages(struct sock *sk, struct page *page, int offset,
size_t size, int flags)
{
Expand Down Expand Up @@ -999,60 +1061,13 @@ ssize_t do_tcp_sendpages(struct sock *sk, struct page *page, int offset,
goto out_err;

while (size > 0) {
struct sk_buff *skb = tcp_write_queue_tail(sk);
int copy, i;
bool can_coalesce;
struct sk_buff *skb;
size_t copy = size;

if (!skb || (copy = size_goal - skb->len) <= 0 ||
!tcp_skb_can_collapse_to(skb)) {
new_segment:
if (!sk_stream_memory_free(sk))
goto wait_for_space;

skb = sk_stream_alloc_skb(sk, 0, sk->sk_allocation,
tcp_rtx_and_write_queues_empty(sk));
if (!skb)
goto wait_for_space;

#ifdef CONFIG_TLS_DEVICE
skb->decrypted = !!(flags & MSG_SENDPAGE_DECRYPTED);
#endif
skb_entail(sk, skb);
copy = size_goal;
}

if (copy > size)
copy = size;

i = skb_shinfo(skb)->nr_frags;
can_coalesce = skb_can_coalesce(skb, i, page, offset);
if (!can_coalesce && i >= sysctl_max_skb_frags) {
tcp_mark_push(tp, skb);
goto new_segment;
}
if (!sk_wmem_schedule(sk, copy))
skb = tcp_build_frag(sk, size_goal, flags, page, offset, &copy);
if (!skb)
goto wait_for_space;

if (can_coalesce) {
skb_frag_size_add(&skb_shinfo(skb)->frags[i - 1], copy);
} else {
get_page(page);
skb_fill_page_desc(skb, i, page, offset, copy);
}

if (!(flags & MSG_NO_SHARED_FRAGS))
skb_shinfo(skb)->tx_flags |= SKBTX_SHARED_FRAG;

skb->len += copy;
skb->data_len += copy;
skb->truesize += copy;
sk_wmem_queued_add(sk, copy);
sk_mem_charge(sk, copy);
skb->ip_summed = CHECKSUM_PARTIAL;
WRITE_ONCE(tp->write_seq, tp->write_seq + copy);
TCP_SKB_CB(skb)->end_seq += copy;
tcp_skb_pcount_set(skb, 0);

if (!copied)
TCP_SKB_CB(skb)->tcp_flags &= ~TCPHDR_PSH;

Expand Down Expand Up @@ -2405,13 +2420,12 @@ bool tcp_check_oom(struct sock *sk, int shift)
return too_many_orphans || out_of_socket_memory;
}

void tcp_close(struct sock *sk, long timeout)
void __tcp_close(struct sock *sk, long timeout)
{
struct sk_buff *skb;
int data_was_unread = 0;
int state;

lock_sock(sk);
sk->sk_shutdown = SHUTDOWN_MASK;

if (sk->sk_state == TCP_LISTEN) {
Expand Down Expand Up @@ -2575,6 +2589,12 @@ void tcp_close(struct sock *sk, long timeout)
out:
bh_unlock_sock(sk);
local_bh_enable();
}

void tcp_close(struct sock *sk, long timeout)
{
lock_sock(sk);
__tcp_close(sk, timeout);
release_sock(sk);
sock_put(sk);
}
Expand Down
30 changes: 23 additions & 7 deletions net/mptcp/options.c
Original file line number Diff line number Diff line change
Expand Up @@ -492,7 +492,7 @@ static bool mptcp_established_options_dss(struct sock *sk, struct sk_buff *skb,
bool ret = false;

mpext = skb ? mptcp_get_ext(skb) : NULL;
snd_data_fin_enable = READ_ONCE(msk->snd_data_fin_enable);
snd_data_fin_enable = mptcp_data_fin_enabled(msk);

if (!skb || (mpext && mpext->use_map) || snd_data_fin_enable) {
unsigned int map_size;
Expand Down Expand Up @@ -809,11 +809,14 @@ static u64 expand_ack(u64 old_ack, u64 cur_ack, bool use_64bit)
return cur_ack;
}

static void update_una(struct mptcp_sock *msk,
struct mptcp_options_received *mp_opt)
static void ack_update_msk(struct mptcp_sock *msk,
const struct sock *ssk,
struct mptcp_options_received *mp_opt)
{
u64 new_snd_una, snd_una, old_snd_una = atomic64_read(&msk->snd_una);
u64 write_seq = READ_ONCE(msk->write_seq);
u64 new_wnd_end, wnd_end, old_wnd_end = atomic64_read(&msk->wnd_end);
u64 snd_nxt = READ_ONCE(msk->snd_nxt);
struct sock *sk = (struct sock *)msk;

/* avoid ack expansion on update conflict, to reduce the risk of
* wrongly expanding to a future ack sequence number, which is way
Expand All @@ -822,15 +825,28 @@ static void update_una(struct mptcp_sock *msk,
new_snd_una = expand_ack(old_snd_una, mp_opt->data_ack, mp_opt->ack64);

/* ACK for data not even sent yet? Ignore. */
if (after64(new_snd_una, write_seq))
if (after64(new_snd_una, snd_nxt))
new_snd_una = old_snd_una;

new_wnd_end = new_snd_una + tcp_sk(ssk)->snd_wnd;

while (after64(new_wnd_end, old_wnd_end)) {
wnd_end = old_wnd_end;
old_wnd_end = atomic64_cmpxchg(&msk->wnd_end, wnd_end,
new_wnd_end);
if (old_wnd_end == wnd_end) {
if (mptcp_send_head(sk))
mptcp_schedule_work(sk);
break;
}
}

while (after64(new_snd_una, old_snd_una)) {
snd_una = old_snd_una;
old_snd_una = atomic64_cmpxchg(&msk->snd_una, snd_una,
new_snd_una);
if (old_snd_una == snd_una) {
mptcp_data_acked((struct sock *)msk);
mptcp_data_acked(sk);
break;
}
}
Expand Down Expand Up @@ -930,7 +946,7 @@ void mptcp_incoming_options(struct sock *sk, struct sk_buff *skb)
* monodirectional flows will stuck
*/
if (mp_opt.use_ack)
update_una(msk, &mp_opt);
ack_update_msk(msk, sk, &mp_opt);

/* Zero-data-length packets are dropped by the caller and not
* propagated to the MPTCP layer, so the skb extension does not
Expand Down
3 changes: 1 addition & 2 deletions net/mptcp/pm.c
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,7 @@ static bool mptcp_pm_schedule_work(struct mptcp_sock *msk,
return false;

msk->pm.status |= BIT(new_status);
if (schedule_work(&msk->work))
sock_hold((struct sock *)msk);
mptcp_schedule_work((struct sock *)msk);
return true;
}

Expand Down
6 changes: 2 additions & 4 deletions net/mptcp/pm_netlink.c
Original file line number Diff line number Diff line change
Expand Up @@ -416,14 +416,13 @@ void mptcp_pm_nl_rm_addr_received(struct mptcp_sock *msk)
list_for_each_entry_safe(subflow, tmp, &msk->conn_list, node) {
struct sock *ssk = mptcp_subflow_tcp_sock(subflow);
int how = RCV_SHUTDOWN | SEND_SHUTDOWN;
long timeout = 0;

if (msk->pm.rm_id != subflow->remote_id)
continue;

spin_unlock_bh(&msk->pm.lock);
mptcp_subflow_shutdown(sk, ssk, how);
__mptcp_close_ssk(sk, ssk, subflow, timeout);
__mptcp_close_ssk(sk, ssk, subflow);
spin_lock_bh(&msk->pm.lock);

msk->pm.add_addr_accepted--;
Expand Down Expand Up @@ -452,14 +451,13 @@ void mptcp_pm_nl_rm_subflow_received(struct mptcp_sock *msk, u8 rm_id)
list_for_each_entry_safe(subflow, tmp, &msk->conn_list, node) {
struct sock *ssk = mptcp_subflow_tcp_sock(subflow);
int how = RCV_SHUTDOWN | SEND_SHUTDOWN;
long timeout = 0;

if (rm_id != subflow->local_id)
continue;

spin_unlock_bh(&msk->pm.lock);
mptcp_subflow_shutdown(sk, ssk, how);
__mptcp_close_ssk(sk, ssk, subflow, timeout);
__mptcp_close_ssk(sk, ssk, subflow);
spin_lock_bh(&msk->pm.lock);

msk->pm.local_addr_used--;
Expand Down
Loading

0 comments on commit 72308ec

Please sign in to comment.