Skip to content

Commit

Permalink
rxrpc: Reduce the use of RCU in packet input
Browse files Browse the repository at this point in the history
Shrink the region of rxrpc_input_packet() that is covered by the RCU read
lock so that it only covers the connection and call lookup.  This means
that the bits now outside of that can call sleepable functions such as
kmalloc and sendmsg.

Also take a ref on the conn or call we're going to use before we drop the
RCU read lock.

Signed-off-by: David Howells <dhowells@redhat.com>
cc: Marc Dionne <marc.dionne@auristor.com>
cc: linux-afs@lists.infradead.org
  • Loading branch information
David Howells committed Dec 1, 2022
1 parent 2d1faf7 commit cd21eff
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 32 deletions.
3 changes: 1 addition & 2 deletions net/rxrpc/ar-internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -961,8 +961,7 @@ void rxrpc_unpublish_service_conn(struct rxrpc_connection *);
* input.c
*/
void rxrpc_input_call_event(struct rxrpc_call *, struct sk_buff *);
void rxrpc_input_implicit_end_call(struct rxrpc_sock *, struct rxrpc_connection *,
struct rxrpc_call *);
void rxrpc_input_implicit_end_call(struct rxrpc_connection *, struct rxrpc_call *);

/*
* io_thread.c
Expand Down
13 changes: 3 additions & 10 deletions net/rxrpc/call_accept.c
Original file line number Diff line number Diff line change
Expand Up @@ -336,13 +336,13 @@ static struct rxrpc_call *rxrpc_alloc_incoming_call(struct rxrpc_sock *rx,
* If this is for a kernel service, when we allocate the call, it will have
* three refs on it: (1) the kernel service, (2) the user_call_ID tree, (3) the
* retainer ref obtained from the backlog buffer. Prealloc calls for userspace
* services only have the ref from the backlog buffer. We want to pass this
* ref to non-BH context to dispose of.
* services only have the ref from the backlog buffer. We pass this ref to the
* caller.
*
* If we want to report an error, we mark the skb with the packet type and
* abort code and return NULL.
*
* The call is returned with the user access mutex held.
* The call is returned with the user access mutex held and a ref on it.
*/
struct rxrpc_call *rxrpc_new_incoming_call(struct rxrpc_local *local,
struct rxrpc_sock *rx,
Expand Down Expand Up @@ -426,13 +426,6 @@ struct rxrpc_call *rxrpc_new_incoming_call(struct rxrpc_local *local,

rxrpc_send_ping(call, skb);

/* We have to discard the prealloc queue's ref here and rely on a
* combination of the RCU read lock and refs held either by the socket
* (recvmsg queue, to-be-accepted queue or user ID tree) or the kernel
* service to prevent the call from being deallocated too early.
*/
rxrpc_put_call(call, rxrpc_call_put_discard_prealloc);

if (hlist_unhashed(&call->error_link)) {
spin_lock(&call->peer->lock);
hlist_add_head(&call->error_link, &call->peer->error_targets);
Expand Down
7 changes: 3 additions & 4 deletions net/rxrpc/input.c
Original file line number Diff line number Diff line change
Expand Up @@ -1072,8 +1072,7 @@ void rxrpc_input_call_event(struct rxrpc_call *call, struct sk_buff *skb)
*
* TODO: If callNumber > call_id + 1, renegotiate security.
*/
void rxrpc_input_implicit_end_call(struct rxrpc_sock *rx,
struct rxrpc_connection *conn,
void rxrpc_input_implicit_end_call(struct rxrpc_connection *conn,
struct rxrpc_call *call)
{
switch (READ_ONCE(call->state)) {
Expand All @@ -1091,7 +1090,7 @@ void rxrpc_input_implicit_end_call(struct rxrpc_sock *rx,
break;
}

spin_lock(&rx->incoming_lock);
spin_lock(&conn->bundle->channel_lock);
__rxrpc_disconnect_call(conn, call);
spin_unlock(&rx->incoming_lock);
spin_unlock(&conn->bundle->channel_lock);
}
68 changes: 52 additions & 16 deletions net/rxrpc/io_thread.c
Original file line number Diff line number Diff line change
Expand Up @@ -257,14 +257,18 @@ static int rxrpc_input_packet(struct rxrpc_local *local, struct sk_buff **_skb)
if (sp->hdr.serviceId == 0)
goto bad_message;

rcu_read_lock();

if (rxrpc_to_server(sp)) {
/* Weed out packets to services we're not offering. Packets
* that would begin a call are explicitly rejected and the rest
* are just discarded.
*/
rx = rcu_dereference(local->service);
if (!rx || (sp->hdr.serviceId != rx->srx.srx_service &&
sp->hdr.serviceId != rx->second_service)) {
sp->hdr.serviceId != rx->second_service)
) {
rcu_read_unlock();
if (sp->hdr.type == RXRPC_PACKET_TYPE_DATA &&
sp->hdr.seq == 1)
goto unsupported_service;
Expand Down Expand Up @@ -293,7 +297,12 @@ static int rxrpc_input_packet(struct rxrpc_local *local, struct sk_buff **_skb)
if (sp->hdr.callNumber == 0) {
/* Connection-level packet */
_debug("CONN %p {%d}", conn, conn->debug_id);
rxrpc_post_packet_to_conn(conn, skb);
conn = rxrpc_get_connection_maybe(conn, rxrpc_conn_get_conn_input);
rcu_read_unlock();
if (conn) {
rxrpc_post_packet_to_conn(conn, skb);
rxrpc_put_connection(conn, rxrpc_conn_put_conn_input);
}
return 0;
}

Expand All @@ -305,20 +314,26 @@ static int rxrpc_input_packet(struct rxrpc_local *local, struct sk_buff **_skb)
chan = &conn->channels[channel];

/* Ignore really old calls */
if (sp->hdr.callNumber < chan->last_call)
if (sp->hdr.callNumber < chan->last_call) {
rcu_read_unlock();
return 0;
}

if (sp->hdr.callNumber == chan->last_call) {
if (chan->call ||
sp->hdr.type == RXRPC_PACKET_TYPE_ABORT)
sp->hdr.type == RXRPC_PACKET_TYPE_ABORT) {
rcu_read_unlock();
return 0;
}

/* For the previous service call, if completed
* successfully, we discard all further packets.
*/
if (rxrpc_conn_is_service(conn) &&
chan->last_type == RXRPC_PACKET_TYPE_ACK)
chan->last_type == RXRPC_PACKET_TYPE_ACK) {
rcu_read_unlock();
return 0;
}

/* But otherwise we need to retransmit the final packet
* from data cached in the connection record.
Expand All @@ -328,20 +343,32 @@ static int rxrpc_input_packet(struct rxrpc_local *local, struct sk_buff **_skb)
sp->hdr.seq,
sp->hdr.serial,
sp->hdr.flags);
rxrpc_post_packet_to_conn(conn, skb);
conn = rxrpc_get_connection_maybe(conn, rxrpc_conn_get_call_input);
rcu_read_unlock();
if (conn) {
rxrpc_post_packet_to_conn(conn, skb);
rxrpc_put_connection(conn, rxrpc_conn_put_call_input);
}
return 0;
}

call = rcu_dereference(chan->call);

if (sp->hdr.callNumber > chan->call_id) {
if (rxrpc_to_client(sp))
if (rxrpc_to_client(sp)) {
rcu_read_unlock();
goto reject_packet;
if (call)
rxrpc_input_implicit_end_call(rx, conn, call);
call = NULL;
}
if (call) {
rxrpc_input_implicit_end_call(conn, call);
chan->call = NULL;
call = NULL;
}
}

if (call && !rxrpc_try_get_call(call, rxrpc_call_get_input))
call = NULL;

if (call) {
if (sp->hdr.serviceId != call->dest_srx.srx_service)
call->dest_srx.srx_service = sp->hdr.serviceId;
Expand All @@ -352,23 +379,33 @@ static int rxrpc_input_packet(struct rxrpc_local *local, struct sk_buff **_skb)
}
}

if (!call || refcount_read(&call->ref) == 0) {
if (!call) {
if (rxrpc_to_client(sp) ||
sp->hdr.type != RXRPC_PACKET_TYPE_DATA)
sp->hdr.type != RXRPC_PACKET_TYPE_DATA) {
rcu_read_unlock();
goto bad_message;
if (sp->hdr.seq != 1)
}
if (sp->hdr.seq != 1) {
rcu_read_unlock();
return 0;
}
call = rxrpc_new_incoming_call(local, rx, skb);
if (!call)
if (!call) {
rcu_read_unlock();
goto reject_packet;
}
}

rcu_read_unlock();

/* Process a call packet. */
rxrpc_input_call_event(call, skb);
rxrpc_put_call(call, rxrpc_call_put_input);
trace_rxrpc_rx_done(0, 0);
return 0;

wrong_security:
rcu_read_unlock();
trace_rxrpc_abort(0, "SEC", sp->hdr.cid, sp->hdr.callNumber, sp->hdr.seq,
RXKADINCONSISTENCY, EBADMSG);
skb->priority = RXKADINCONSISTENCY;
Expand All @@ -381,6 +418,7 @@ static int rxrpc_input_packet(struct rxrpc_local *local, struct sk_buff **_skb)
goto post_abort;

reupgrade:
rcu_read_unlock();
trace_rxrpc_abort(0, "UPG", sp->hdr.cid, sp->hdr.callNumber, sp->hdr.seq,
RX_PROTOCOL_ERROR, EBADMSG);
goto protocol_error;
Expand Down Expand Up @@ -433,9 +471,7 @@ int rxrpc_io_thread(void *data)
switch (skb->mark) {
case RXRPC_SKB_MARK_PACKET:
skb->priority = 0;
rcu_read_lock();
rxrpc_input_packet(local, &skb);
rcu_read_unlock();
trace_rxrpc_rx_done(skb->mark, skb->priority);
rxrpc_free_skb(skb, rxrpc_skb_put_input);
break;
Expand Down

0 comments on commit cd21eff

Please sign in to comment.