Skip to content

Commit

Permalink
rxrpc: Fix error distribution
Browse files Browse the repository at this point in the history
Fix error distribution by immediately delivering the errors to all the
affected calls rather than deferring them to a worker thread.  The problem
with the latter is that retries and things can happen in the meantime when we
want to stop that sooner.

To this end:

 (1) Stop the error distributor from removing calls from the error_targets
     list so that peer->lock isn't needed to synchronise against other adds
     and removals.

 (2) Require the peer's error_targets list to be accessed with RCU, thereby
     avoiding the need to take peer->lock over distribution.

 (3) Don't attempt to affect a call's state if it is already marked complete.

Signed-off-by: David Howells <dhowells@redhat.com>
  • Loading branch information
David Howells committed Sep 28, 2018
1 parent 37a675e commit f334430
Showing 7 changed files with 16 additions and 64 deletions.
4 changes: 1 addition & 3 deletions include/trace/events/rxrpc.h
Original file line number Diff line number Diff line change
@@ -56,7 +56,6 @@ enum rxrpc_peer_trace {
rxrpc_peer_new,
rxrpc_peer_processing,
rxrpc_peer_put,
rxrpc_peer_queued_error,
};

enum rxrpc_conn_trace {
@@ -257,8 +256,7 @@ enum rxrpc_tx_point {
EM(rxrpc_peer_got, "GOT") \
EM(rxrpc_peer_new, "NEW") \
EM(rxrpc_peer_processing, "PRO") \
EM(rxrpc_peer_put, "PUT") \
E_(rxrpc_peer_queued_error, "QER")
E_(rxrpc_peer_put, "PUT")

#define rxrpc_conn_traces \
EM(rxrpc_conn_got, "GOT") \
5 changes: 0 additions & 5 deletions net/rxrpc/ar-internal.h
Original file line number Diff line number Diff line change
@@ -288,7 +288,6 @@ struct rxrpc_peer {
struct hlist_node hash_link;
struct rxrpc_local *local;
struct hlist_head error_targets; /* targets for net error distribution */
struct work_struct error_distributor;
struct rb_root service_conns; /* Service connections */
struct list_head keepalive_link; /* Link in net->peer_keepalive[] */
time64_t last_tx_at; /* Last time packet sent here */
@@ -299,8 +298,6 @@ struct rxrpc_peer {
unsigned int maxdata; /* data size (MTU - hdrsize) */
unsigned short hdrsize; /* header size (IP + UDP + RxRPC) */
int debug_id; /* debug ID for printks */
int error_report; /* Net (+0) or local (+1000000) to distribute */
#define RXRPC_LOCAL_ERROR_OFFSET 1000000
struct sockaddr_rxrpc srx; /* remote address */

/* calculated RTT cache */
@@ -1039,7 +1036,6 @@ void rxrpc_send_keepalive(struct rxrpc_peer *);
* peer_event.c
*/
void rxrpc_error_report(struct sock *);
void rxrpc_peer_error_distributor(struct work_struct *);
void rxrpc_peer_add_rtt(struct rxrpc_call *, enum rxrpc_rtt_rx_trace,
rxrpc_serial_t, rxrpc_serial_t, ktime_t, ktime_t);
void rxrpc_peer_keepalive_worker(struct work_struct *);
@@ -1057,7 +1053,6 @@ void rxrpc_destroy_all_peers(struct rxrpc_net *);
struct rxrpc_peer *rxrpc_get_peer(struct rxrpc_peer *);
struct rxrpc_peer *rxrpc_get_peer_maybe(struct rxrpc_peer *);
void rxrpc_put_peer(struct rxrpc_peer *);
void __rxrpc_queue_peer_error(struct rxrpc_peer *);

/*
* proc.c
2 changes: 1 addition & 1 deletion net/rxrpc/call_object.c
Original file line number Diff line number Diff line change
@@ -400,7 +400,7 @@ void rxrpc_incoming_call(struct rxrpc_sock *rx,
rcu_assign_pointer(conn->channels[chan].call, call);

spin_lock(&conn->params.peer->lock);
hlist_add_head(&call->error_link, &conn->params.peer->error_targets);
hlist_add_head_rcu(&call->error_link, &conn->params.peer->error_targets);
spin_unlock(&conn->params.peer->lock);

_net("CALL incoming %d on CONN %d", call->debug_id, call->conn->debug_id);
4 changes: 2 additions & 2 deletions net/rxrpc/conn_client.c
Original file line number Diff line number Diff line change
@@ -710,8 +710,8 @@ int rxrpc_connect_call(struct rxrpc_call *call,
}

spin_lock_bh(&call->conn->params.peer->lock);
hlist_add_head(&call->error_link,
&call->conn->params.peer->error_targets);
hlist_add_head_rcu(&call->error_link,
&call->conn->params.peer->error_targets);
spin_unlock_bh(&call->conn->params.peer->lock);

out:
2 changes: 1 addition & 1 deletion net/rxrpc/conn_object.c
Original file line number Diff line number Diff line change
@@ -216,7 +216,7 @@ void rxrpc_disconnect_call(struct rxrpc_call *call)
call->peer->cong_cwnd = call->cong_cwnd;

spin_lock_bh(&conn->params.peer->lock);
hlist_del_init(&call->error_link);
hlist_del_rcu(&call->error_link);
spin_unlock_bh(&conn->params.peer->lock);

if (rxrpc_is_client_call(call))
46 changes: 11 additions & 35 deletions net/rxrpc/peer_event.c
Original file line number Diff line number Diff line change
@@ -23,6 +23,8 @@
#include "ar-internal.h"

static void rxrpc_store_error(struct rxrpc_peer *, struct sock_exterr_skb *);
static void rxrpc_distribute_error(struct rxrpc_peer *, int,
enum rxrpc_call_completion);

/*
* Find the peer associated with an ICMP packet.
@@ -194,8 +196,6 @@ void rxrpc_error_report(struct sock *sk)
rcu_read_unlock();
rxrpc_free_skb(skb, rxrpc_skb_rx_freed);

/* The ref we obtained is passed off to the work item */
__rxrpc_queue_peer_error(peer);
_leave("");
}

@@ -205,6 +205,7 @@ void rxrpc_error_report(struct sock *sk)
static void rxrpc_store_error(struct rxrpc_peer *peer,
struct sock_exterr_skb *serr)
{
enum rxrpc_call_completion compl = RXRPC_CALL_NETWORK_ERROR;
struct sock_extended_err *ee;
int err;

@@ -255,7 +256,7 @@ static void rxrpc_store_error(struct rxrpc_peer *peer,
case SO_EE_ORIGIN_NONE:
case SO_EE_ORIGIN_LOCAL:
_proto("Rx Received local error { error=%d }", err);
err += RXRPC_LOCAL_ERROR_OFFSET;
compl = RXRPC_CALL_LOCAL_ERROR;
break;

case SO_EE_ORIGIN_ICMP6:
@@ -264,48 +265,23 @@ static void rxrpc_store_error(struct rxrpc_peer *peer,
break;
}

peer->error_report = err;
rxrpc_distribute_error(peer, err, compl);
}

/*
* Distribute an error that occurred on a peer
* Distribute an error that occurred on a peer.
*/
void rxrpc_peer_error_distributor(struct work_struct *work)
static void rxrpc_distribute_error(struct rxrpc_peer *peer, int error,
enum rxrpc_call_completion compl)
{
struct rxrpc_peer *peer =
container_of(work, struct rxrpc_peer, error_distributor);
struct rxrpc_call *call;
enum rxrpc_call_completion compl;
int error;

_enter("");

error = READ_ONCE(peer->error_report);
if (error < RXRPC_LOCAL_ERROR_OFFSET) {
compl = RXRPC_CALL_NETWORK_ERROR;
} else {
compl = RXRPC_CALL_LOCAL_ERROR;
error -= RXRPC_LOCAL_ERROR_OFFSET;
}

_debug("ISSUE ERROR %s %d", rxrpc_call_completions[compl], error);

spin_lock_bh(&peer->lock);

while (!hlist_empty(&peer->error_targets)) {
call = hlist_entry(peer->error_targets.first,
struct rxrpc_call, error_link);
hlist_del_init(&call->error_link);
hlist_for_each_entry_rcu(call, &peer->error_targets, error_link) {
rxrpc_see_call(call);

if (rxrpc_set_call_completion(call, compl, 0, -error))
if (call->state < RXRPC_CALL_COMPLETE &&
rxrpc_set_call_completion(call, compl, 0, -error))
rxrpc_notify_socket(call);
}

spin_unlock_bh(&peer->lock);

rxrpc_put_peer(peer);
_leave("");
}

/*
17 changes: 0 additions & 17 deletions net/rxrpc/peer_object.c
Original file line number Diff line number Diff line change
@@ -220,8 +220,6 @@ struct rxrpc_peer *rxrpc_alloc_peer(struct rxrpc_local *local, gfp_t gfp)
atomic_set(&peer->usage, 1);
peer->local = local;
INIT_HLIST_HEAD(&peer->error_targets);
INIT_WORK(&peer->error_distributor,
&rxrpc_peer_error_distributor);
peer->service_conns = RB_ROOT;
seqlock_init(&peer->service_conn_lock);
spin_lock_init(&peer->lock);
@@ -402,21 +400,6 @@ struct rxrpc_peer *rxrpc_get_peer_maybe(struct rxrpc_peer *peer)
return peer;
}

/*
* Queue a peer record. This passes the caller's ref to the workqueue.
*/
void __rxrpc_queue_peer_error(struct rxrpc_peer *peer)
{
const void *here = __builtin_return_address(0);
int n;

n = atomic_read(&peer->usage);
if (rxrpc_queue_work(&peer->error_distributor))
trace_rxrpc_peer(peer, rxrpc_peer_queued_error, n, here);
else
rxrpc_put_peer(peer);
}

/*
* Discard a peer record.
*/

0 comments on commit f334430

Please sign in to comment.