Skip to content

Commit

Permalink
SUNRPC: Use poll() to fix up the socket requeue races
Browse files Browse the repository at this point in the history
Because we clear XPRT_SOCK_DATA_READY before reading, we can end up
with a situation where new data arrives, causing xs_data_ready() to
queue up a second receive worker job for the same socket, which then
immediately gets stuck waiting on the transport receive mutex.
The fix is to only clear XPRT_SOCK_DATA_READY once we're done reading,
and then to use poll() to check if we might need to queue up a new
job in order to deal with any new data.

Signed-off-by: Trond Myklebust <trond.myklebust@hammerspace.com>
  • Loading branch information
Trond Myklebust committed Feb 20, 2019
1 parent a1231fd commit 0ffe86f
Showing 1 changed file with 24 additions and 2 deletions.
26 changes: 24 additions & 2 deletions net/sunrpc/xprtsock.c
Original file line number Diff line number Diff line change
Expand Up @@ -656,13 +656,34 @@ xs_read_stream(struct sock_xprt *transport, int flags)
return ret != 0 ? ret : -ESHUTDOWN;
}

static __poll_t xs_poll_socket(struct sock_xprt *transport)
{
return transport->sock->ops->poll(NULL, transport->sock, NULL);
}

static bool xs_poll_socket_readable(struct sock_xprt *transport)
{
__poll_t events = xs_poll_socket(transport);

return (events & (EPOLLIN | EPOLLRDNORM)) && !(events & EPOLLRDHUP);
}

static void xs_poll_check_readable(struct sock_xprt *transport)
{

clear_bit(XPRT_SOCK_DATA_READY, &transport->sock_state);
if (!xs_poll_socket_readable(transport))
return;
if (!test_and_set_bit(XPRT_SOCK_DATA_READY, &transport->sock_state))
queue_work(xprtiod_workqueue, &transport->recv_worker);
}

static void xs_stream_data_receive(struct sock_xprt *transport)
{
size_t read = 0;
ssize_t ret = 0;

mutex_lock(&transport->recv_mutex);
clear_bit(XPRT_SOCK_DATA_READY, &transport->sock_state);
if (transport->sock == NULL)
goto out;
for (;;) {
Expand All @@ -672,6 +693,7 @@ static void xs_stream_data_receive(struct sock_xprt *transport)
read += ret;
cond_resched();
}
xs_poll_check_readable(transport);
out:
mutex_unlock(&transport->recv_mutex);
trace_xs_stream_read_data(&transport->xprt, ret, read);
Expand Down Expand Up @@ -1362,7 +1384,6 @@ static void xs_udp_data_receive(struct sock_xprt *transport)
int err;

mutex_lock(&transport->recv_mutex);
clear_bit(XPRT_SOCK_DATA_READY, &transport->sock_state);
sk = transport->inet;
if (sk == NULL)
goto out;
Expand All @@ -1374,6 +1395,7 @@ static void xs_udp_data_receive(struct sock_xprt *transport)
consume_skb(skb);
cond_resched();
}
xs_poll_check_readable(transport);
out:
mutex_unlock(&transport->recv_mutex);
}
Expand Down

0 comments on commit 0ffe86f

Please sign in to comment.