Skip to content

Commit

Permalink
mptcp: recvmsg() can drain data from multiple subflows
Browse files Browse the repository at this point in the history
With the previous patch in place, the msk can detect which subflow
has the current map with a simple walk, let's update the main
loop to always select the 'current' subflow. The exit conditions now
closely mirror tcp_recvmsg() to get expected timeout and signal
behavior.

Co-developed-by: Peter Krystad <peter.krystad@linux.intel.com>
Signed-off-by: Peter Krystad <peter.krystad@linux.intel.com>
Co-developed-by: Davide Caratti <dcaratti@redhat.com>
Signed-off-by: Davide Caratti <dcaratti@redhat.com>
Co-developed-by: Matthieu Baerts <matthieu.baerts@tessares.net>
Signed-off-by: Matthieu Baerts <matthieu.baerts@tessares.net>
Co-developed-by: Mat Martineau <mathew.j.martineau@linux.intel.com>
Signed-off-by: Mat Martineau <mathew.j.martineau@linux.intel.com>
Co-developed-by: Florian Westphal <fw@strlen.de>
Signed-off-by: Florian Westphal <fw@strlen.de>
Signed-off-by: Paolo Abeni <pabeni@redhat.com>
Signed-off-by: Christoph Paasch <cpaasch@apple.com>
Signed-off-by: David S. Miller <davem@davemloft.net>
  • Loading branch information
Paolo Abeni authored and David S. Miller committed Jan 24, 2020
1 parent 1891c4a commit 7a6a6cb
Showing 1 changed file with 168 additions and 10 deletions.
178 changes: 168 additions & 10 deletions net/mptcp/protocol.c
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
#include <linux/kernel.h>
#include <linux/module.h>
#include <linux/netdevice.h>
#include <linux/sched/signal.h>
#include <linux/atomic.h>
#include <net/sock.h>
#include <net/inet_common.h>
#include <net/inet_hashtables.h>
Expand Down Expand Up @@ -105,6 +107,21 @@ static bool mptcp_ext_cache_refill(struct mptcp_sock *msk)
return !!msk->cached_ext;
}

static struct sock *mptcp_subflow_recv_lookup(const struct mptcp_sock *msk)
{
struct mptcp_subflow_context *subflow;
struct sock *sk = (struct sock *)msk;

sock_owned_by_me(sk);

mptcp_for_each_subflow(msk, subflow) {
if (subflow->data_avail)
return mptcp_subflow_tcp_sock(subflow);
}

return NULL;
}

static int mptcp_sendmsg_frag(struct sock *sk, struct sock *ssk,
struct msghdr *msg, long *timeo)
{
Expand Down Expand Up @@ -269,13 +286,37 @@ int mptcp_read_actor(read_descriptor_t *desc, struct sk_buff *skb,
return copy_len;
}

static void mptcp_wait_data(struct sock *sk, long *timeo)
{
DEFINE_WAIT_FUNC(wait, woken_wake_function);
struct mptcp_sock *msk = mptcp_sk(sk);

add_wait_queue(sk_sleep(sk), &wait);
sk_set_bit(SOCKWQ_ASYNC_WAITDATA, sk);

sk_wait_event(sk, timeo,
test_and_clear_bit(MPTCP_DATA_READY, &msk->flags), &wait);

sk_clear_bit(SOCKWQ_ASYNC_WAITDATA, sk);
remove_wait_queue(sk_sleep(sk), &wait);
}

static int mptcp_recvmsg(struct sock *sk, struct msghdr *msg, size_t len,
int nonblock, int flags, int *addr_len)
{
struct mptcp_sock *msk = mptcp_sk(sk);
struct mptcp_subflow_context *subflow;
bool more_data_avail = false;
struct mptcp_read_arg arg;
read_descriptor_t desc;
bool wait_data = false;
struct socket *ssock;
struct tcp_sock *tp;
bool done = false;
struct sock *ssk;
int copied = 0;
int target;
long timeo;

if (msg->msg_flags & ~(MSG_WAITALL | MSG_DONTWAIT))
return -EOPNOTSUPP;
Expand All @@ -290,16 +331,124 @@ static int mptcp_recvmsg(struct sock *sk, struct msghdr *msg, size_t len,
return copied;
}

ssk = mptcp_subflow_get(msk);
if (!ssk) {
release_sock(sk);
return -ENOTCONN;
arg.msg = msg;
desc.arg.data = &arg;
desc.error = 0;

timeo = sock_rcvtimeo(sk, nonblock);

len = min_t(size_t, len, INT_MAX);
target = sock_rcvlowat(sk, flags & MSG_WAITALL, len);

while (!done) {
u32 map_remaining;
int bytes_read;

ssk = mptcp_subflow_recv_lookup(msk);
pr_debug("msk=%p ssk=%p", msk, ssk);
if (!ssk)
goto wait_for_data;

subflow = mptcp_subflow_ctx(ssk);
tp = tcp_sk(ssk);

lock_sock(ssk);
do {
/* try to read as much data as available */
map_remaining = subflow->map_data_len -
mptcp_subflow_get_map_offset(subflow);
desc.count = min_t(size_t, len - copied, map_remaining);
pr_debug("reading %zu bytes, copied %d", desc.count,
copied);
bytes_read = tcp_read_sock(ssk, &desc,
mptcp_read_actor);
if (bytes_read < 0) {
if (!copied)
copied = bytes_read;
done = true;
goto next;
}

pr_debug("msk ack_seq=%llx -> %llx", msk->ack_seq,
msk->ack_seq + bytes_read);
msk->ack_seq += bytes_read;
copied += bytes_read;
if (copied >= len) {
done = true;
goto next;
}
if (tp->urg_data && tp->urg_seq == tp->copied_seq) {
pr_err("Urgent data present, cannot proceed");
done = true;
goto next;
}
next:
more_data_avail = mptcp_subflow_data_available(ssk);
} while (more_data_avail && !done);
release_sock(ssk);
continue;

wait_for_data:
more_data_avail = false;

/* only the master socket status is relevant here. The exit
* conditions mirror closely tcp_recvmsg()
*/
if (copied >= target)
break;

if (copied) {
if (sk->sk_err ||
sk->sk_state == TCP_CLOSE ||
(sk->sk_shutdown & RCV_SHUTDOWN) ||
!timeo ||
signal_pending(current))
break;
} else {
if (sk->sk_err) {
copied = sock_error(sk);
break;
}

if (sk->sk_shutdown & RCV_SHUTDOWN)
break;

if (sk->sk_state == TCP_CLOSE) {
copied = -ENOTCONN;
break;
}

if (!timeo) {
copied = -EAGAIN;
break;
}

if (signal_pending(current)) {
copied = sock_intr_errno(timeo);
break;
}
}

pr_debug("block timeout %ld", timeo);
wait_data = true;
mptcp_wait_data(sk, &timeo);
}

copied = sock_recvmsg(ssk->sk_socket, msg, flags);
if (more_data_avail) {
if (!test_bit(MPTCP_DATA_READY, &msk->flags))
set_bit(MPTCP_DATA_READY, &msk->flags);
} else if (!wait_data) {
clear_bit(MPTCP_DATA_READY, &msk->flags);

release_sock(sk);
/* .. race-breaker: ssk might get new data after last
* data_available() returns false.
*/
ssk = mptcp_subflow_recv_lookup(msk);
if (unlikely(ssk))
set_bit(MPTCP_DATA_READY, &msk->flags);
}

release_sock(sk);
return copied;
}

Expand Down Expand Up @@ -460,10 +609,6 @@ static struct sock *mptcp_accept(struct sock *sk, int flags, int *err,
msk->write_seq = subflow->idsn + 1;
ack_seq++;
msk->ack_seq = ack_seq;
subflow->map_seq = ack_seq;
subflow->map_subflow_seq = 1;
subflow->rel_write_seq = 1;
subflow->tcp_sock = ssk;
newsk = new_mptcp_sock;
mptcp_copy_inaddrs(newsk, ssk);
list_add(&subflow->node, &msk->conn_list);
Expand All @@ -475,6 +620,19 @@ static struct sock *mptcp_accept(struct sock *sk, int flags, int *err,
bh_unlock_sock(new_mptcp_sock);
local_bh_enable();
release_sock(sk);

/* the subflow can already receive packet, avoid racing with
* the receive path and process the pending ones
*/
lock_sock(ssk);
subflow->map_seq = ack_seq;
subflow->map_subflow_seq = 1;
subflow->rel_write_seq = 1;
subflow->tcp_sock = ssk;
subflow->conn = new_mptcp_sock;
if (unlikely(!skb_queue_empty(&ssk->sk_receive_queue)))
mptcp_subflow_data_available(ssk);
release_sock(ssk);
}

return newsk;
Expand Down

0 comments on commit 7a6a6cb

Please sign in to comment.