Skip to content

Commit

Permalink
rxrpc: Add RCU destruction for connections and calls
Browse files Browse the repository at this point in the history
Add RCU destruction for connections and calls as the RCU lookup from the
transport socket data_ready handler is going to come along shortly.

Whilst we're at it, move the cleanup workqueue flushing and RCU barrierage
into the destruction code for the objects that need it (locals and
connections) and add the extra RCU barrier required for connection cleanup.

Signed-off-by: David Howells <dhowells@redhat.com>
  • Loading branch information
David Howells committed Jul 6, 2016
1 parent e653cfe commit dee4636
Show file tree
Hide file tree
Showing 6 changed files with 60 additions and 36 deletions.
19 changes: 0 additions & 19 deletions net/rxrpc/af_rxrpc.c
Original file line number Diff line number Diff line change
Expand Up @@ -788,26 +788,7 @@ static void __exit af_rxrpc_exit(void)
proto_unregister(&rxrpc_proto);
rxrpc_destroy_all_calls();
rxrpc_destroy_all_connections();

ASSERTCMP(atomic_read(&rxrpc_n_skbs), ==, 0);

/* We need to flush the scheduled work twice because the local endpoint
* records involve a work item in their destruction as they can only be
* destroyed from process context. However, a connection may have a
* work item outstanding - and this will pin the local endpoint record
* until the connection goes away.
*
* Peers don't pin locals and calls pin sockets - which prevents the
* module from being unloaded - so we should only need two flushes.
*/
_debug("flush scheduled work");
flush_workqueue(rxrpc_workqueue);
_debug("flush scheduled work 2");
flush_workqueue(rxrpc_workqueue);
_debug("synchronise RCU");
rcu_barrier();
_debug("destroy locals");
rxrpc_destroy_client_conn_ids();
rxrpc_destroy_all_locals();

remove_proc_entry("rxrpc_conns", init_net.proc_net);
Expand Down
4 changes: 3 additions & 1 deletion net/rxrpc/ar-internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -292,9 +292,10 @@ struct rxrpc_connection {
struct rxrpc_conn_parameters params;

spinlock_t channel_lock;
struct rxrpc_call *channels[RXRPC_MAXCALLS]; /* active calls */
struct rxrpc_call __rcu *channels[RXRPC_MAXCALLS]; /* active calls */
wait_queue_head_t channel_wq; /* queue to wait for channel to become available */

struct rcu_head rcu;
struct work_struct processor; /* connection event processor */
union {
struct rb_node client_node; /* Node in local->client_conns */
Expand Down Expand Up @@ -398,6 +399,7 @@ enum rxrpc_call_state {
* - matched by { connection, call_id }
*/
struct rxrpc_call {
struct rcu_head rcu;
struct rxrpc_connection *conn; /* connection carrying call */
struct rxrpc_sock *socket; /* socket responsible */
struct timer_list lifetimer; /* lifetime remaining on call */
Expand Down
18 changes: 15 additions & 3 deletions net/rxrpc/call_object.c
Original file line number Diff line number Diff line change
Expand Up @@ -480,7 +480,8 @@ struct rxrpc_call *rxrpc_incoming_call(struct rxrpc_sock *rx,
write_lock_bh(&conn->lock);

/* set the channel for this call */
call = conn->channels[candidate->channel];
call = rcu_dereference_protected(conn->channels[candidate->channel],
lockdep_is_held(&conn->lock));
_debug("channel[%u] is %p", candidate->channel, call);
if (call && call->call_id == sp->hdr.callNumber) {
/* already set; must've been a duplicate packet */
Expand Down Expand Up @@ -544,7 +545,7 @@ struct rxrpc_call *rxrpc_incoming_call(struct rxrpc_sock *rx,
candidate = NULL;
rb_link_node(&call->conn_node, parent, p);
rb_insert_color(&call->conn_node, &conn->calls);
conn->channels[call->channel] = call;
rcu_assign_pointer(conn->channels[call->channel], call);
sock_hold(&rx->sk);
rxrpc_get_connection(conn);
write_unlock_bh(&conn->lock);
Expand Down Expand Up @@ -794,6 +795,17 @@ void __rxrpc_put_call(struct rxrpc_call *call)
_leave("");
}

/*
* Final call destruction under RCU.
*/
static void rxrpc_rcu_destroy_call(struct rcu_head *rcu)
{
struct rxrpc_call *call = container_of(rcu, struct rxrpc_call, rcu);

rxrpc_purge_queue(&call->rx_queue);
kmem_cache_free(rxrpc_call_jar, call);
}

/*
* clean up a call
*/
Expand Down Expand Up @@ -849,7 +861,7 @@ static void rxrpc_cleanup_call(struct rxrpc_call *call)
rxrpc_purge_queue(&call->rx_queue);
ASSERT(skb_queue_empty(&call->rx_oos_queue));
sock_put(&call->socket->sk);
kmem_cache_free(rxrpc_call_jar, call);
call_rcu(&call->rcu, rxrpc_rcu_destroy_call);
}

/*
Expand Down
5 changes: 4 additions & 1 deletion net/rxrpc/conn_event.c
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,10 @@ static int rxrpc_process_event(struct rxrpc_connection *conn,
if (conn->state == RXRPC_CONN_SERVICE_CHALLENGING) {
conn->state = RXRPC_CONN_SERVICE;
for (loop = 0; loop < RXRPC_MAXCALLS; loop++)
rxrpc_call_is_secure(conn->channels[loop]);
rxrpc_call_is_secure(
rcu_dereference_protected(
conn->channels[loop],
lockdep_is_held(&conn->lock)));
}

spin_unlock(&conn->state_lock);
Expand Down
31 changes: 27 additions & 4 deletions net/rxrpc/conn_object.c
Original file line number Diff line number Diff line change
Expand Up @@ -542,7 +542,7 @@ void rxrpc_disconnect_call(struct rxrpc_call *call)

spin_lock(&conn->channel_lock);

if (conn->channels[chan] == call) {
if (rcu_access_pointer(conn->channels[chan]) == call) {
rcu_assign_pointer(conn->channels[chan], NULL);
atomic_inc(&conn->avail_chans);
wake_up(&conn->channel_wq);
Expand Down Expand Up @@ -580,9 +580,12 @@ void rxrpc_put_connection(struct rxrpc_connection *conn)
/*
* destroy a virtual connection
*/
static void rxrpc_destroy_connection(struct rxrpc_connection *conn)
static void rxrpc_destroy_connection(struct rcu_head *rcu)
{
_enter("%p{%d}", conn, atomic_read(&conn->usage));
struct rxrpc_connection *conn =
container_of(rcu, struct rxrpc_connection, rcu);

_enter("{%d,u=%d}", conn->debug_id, atomic_read(&conn->usage));

ASSERTCMP(atomic_read(&conn->usage), ==, 0);

Expand Down Expand Up @@ -677,7 +680,8 @@ static void rxrpc_connection_reaper(struct work_struct *work)
list_del_init(&conn->link);

ASSERTCMP(atomic_read(&conn->usage), ==, 0);
rxrpc_destroy_connection(conn);
skb_queue_purge(&conn->rx_queue);
call_rcu(&conn->rcu, rxrpc_destroy_connection);
}

_leave("");
Expand All @@ -689,11 +693,30 @@ static void rxrpc_connection_reaper(struct work_struct *work)
*/
void __exit rxrpc_destroy_all_connections(void)
{
struct rxrpc_connection *conn, *_p;
bool leak = false;

_enter("");

rxrpc_connection_expiry = 0;
cancel_delayed_work(&rxrpc_connection_reap);
rxrpc_queue_delayed_work(&rxrpc_connection_reap, 0);
flush_workqueue(rxrpc_workqueue);

write_lock(&rxrpc_connection_lock);
list_for_each_entry_safe(conn, _p, &rxrpc_connections, link) {
pr_err("AF_RXRPC: Leaked conn %p {%d}\n",
conn, atomic_read(&conn->usage));
leak = true;
}
write_unlock(&rxrpc_connection_lock);
BUG_ON(leak);

/* Make sure the local and peer records pinned by any dying connections
* are released.
*/
rcu_barrier();
rxrpc_destroy_client_conn_ids();

_leave("");
}
19 changes: 11 additions & 8 deletions net/rxrpc/local_object.c
Original file line number Diff line number Diff line change
Expand Up @@ -374,14 +374,17 @@ void __exit rxrpc_destroy_all_locals(void)

_enter("");

if (list_empty(&rxrpc_local_endpoints))
return;
flush_workqueue(rxrpc_workqueue);

mutex_lock(&rxrpc_local_mutex);
list_for_each_entry(local, &rxrpc_local_endpoints, link) {
pr_err("AF_RXRPC: Leaked local %p {%d}\n",
local, atomic_read(&local->usage));
if (!list_empty(&rxrpc_local_endpoints)) {
mutex_lock(&rxrpc_local_mutex);
list_for_each_entry(local, &rxrpc_local_endpoints, link) {
pr_err("AF_RXRPC: Leaked local %p {%d}\n",
local, atomic_read(&local->usage));
}
mutex_unlock(&rxrpc_local_mutex);
BUG();
}
mutex_unlock(&rxrpc_local_mutex);
BUG();

rcu_barrier();
}

0 comments on commit dee4636

Please sign in to comment.