Skip to content

Commit

Permalink
rxrpc: Wrap accesses to get call state to put the barrier in one place
Browse files Browse the repository at this point in the history
Wrap accesses to get the state of a call from outside of the I/O thread in
a single place so that the barrier needed to order wrt the error code and
abort code is in just that place.

Also use a barrier when setting the call state and again when reading the
call state such that the auxiliary completion info (error code, abort code)
can be read without taking a read lock on the call state lock.

Signed-off-by: David Howells <dhowells@redhat.com>
cc: Marc Dionne <marc.dionne@auristor.com>
cc: linux-afs@lists.infradead.org
  • Loading branch information
David Howells committed Jan 6, 2023
1 parent 0b9bb32 commit d41b3f5
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 24 deletions.
2 changes: 1 addition & 1 deletion net/rxrpc/af_rxrpc.c
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,7 @@ EXPORT_SYMBOL(rxrpc_kernel_end_call);
bool rxrpc_kernel_check_life(const struct socket *sock,
const struct rxrpc_call *call)
{
return call->state != RXRPC_CALL_COMPLETE;
return !rxrpc_call_is_complete(call);
}
EXPORT_SYMBOL(rxrpc_kernel_check_life);

Expand Down
16 changes: 16 additions & 0 deletions net/rxrpc/ar-internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -903,6 +903,22 @@ bool __rxrpc_abort_call(struct rxrpc_call *call, rxrpc_seq_t seq,
bool rxrpc_abort_call(struct rxrpc_call *call, rxrpc_seq_t seq,
u32 abort_code, int error, enum rxrpc_abort_reason why);

static inline enum rxrpc_call_state rxrpc_call_state(const struct rxrpc_call *call)
{
/* Order read ->state before read ->error. */
return smp_load_acquire(&call->state);
}

static inline bool rxrpc_call_is_complete(const struct rxrpc_call *call)
{
return rxrpc_call_state(call) == RXRPC_CALL_COMPLETE;
}

static inline bool rxrpc_call_has_failed(const struct rxrpc_call *call)
{
return rxrpc_call_is_complete(call) && call->completion != RXRPC_CALL_SUCCEEDED;
}

/*
* conn_client.c
*/
Expand Down
3 changes: 2 additions & 1 deletion net/rxrpc/call_state.c
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ bool __rxrpc_set_call_completion(struct rxrpc_call *call,
call->abort_code = abort_code;
call->error = error;
call->completion = compl;
call->state = RXRPC_CALL_COMPLETE;
/* Allow reader of completion state to operate locklessly */
smp_store_release(&call->state, RXRPC_CALL_COMPLETE);
trace_rxrpc_call_complete(call);
wake_up(&call->waitq);
rxrpc_notify_socket(call);
Expand Down
12 changes: 6 additions & 6 deletions net/rxrpc/recvmsg.c
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ static int rxrpc_recvmsg_term(struct rxrpc_call *call, struct msghdr *msg)
ret = put_cmsg(msg, SOL_RXRPC, RXRPC_LOCAL_ERROR, 4, &tmp);
break;
default:
pr_err("Invalid terminal call state %u\n", call->state);
pr_err("Invalid terminal call state %u\n", call->completion);
BUG();
break;
}
Expand All @@ -111,7 +111,7 @@ static void rxrpc_end_rx_phase(struct rxrpc_call *call, rxrpc_serial_t serial)

trace_rxrpc_receive(call, rxrpc_receive_end, 0, whigh);

if (call->state == RXRPC_CALL_CLIENT_RECV_REPLY)
if (rxrpc_call_state(call) == RXRPC_CALL_CLIENT_RECV_REPLY)
rxrpc_propose_delay_ACK(call, serial, rxrpc_propose_ack_terminal_ack);

write_lock(&call->state_lock);
Expand Down Expand Up @@ -210,7 +210,7 @@ static int rxrpc_recvmsg_data(struct socket *sock, struct rxrpc_call *call,
rx_pkt_offset = call->rx_pkt_offset;
rx_pkt_len = call->rx_pkt_len;

if (call->state >= RXRPC_CALL_SERVER_ACK_REQUEST) {
if (rxrpc_call_state(call) >= RXRPC_CALL_SERVER_ACK_REQUEST) {
seq = lower_32_bits(atomic64_read(&call->ackr_window)) - 1;
ret = 1;
goto done;
Expand Down Expand Up @@ -416,7 +416,7 @@ int rxrpc_recvmsg(struct socket *sock, struct msghdr *msg, size_t len,
msg->msg_namelen = len;
}

switch (READ_ONCE(call->state)) {
switch (rxrpc_call_state(call)) {
case RXRPC_CALL_CLIENT_RECV_REPLY:
case RXRPC_CALL_SERVER_RECV_REQUEST:
case RXRPC_CALL_SERVER_ACK_REQUEST:
Expand All @@ -436,7 +436,7 @@ int rxrpc_recvmsg(struct socket *sock, struct msghdr *msg, size_t len,
if (ret < 0)
goto error_unlock_call;

if (call->state == RXRPC_CALL_COMPLETE) {
if (rxrpc_call_is_complete(call)) {
ret = rxrpc_recvmsg_term(call, msg);
if (ret < 0)
goto error_unlock_call;
Expand Down Expand Up @@ -516,7 +516,7 @@ int rxrpc_kernel_recv_data(struct socket *sock, struct rxrpc_call *call,

mutex_lock(&call->user_mutex);

switch (READ_ONCE(call->state)) {
switch (rxrpc_call_state(call)) {
case RXRPC_CALL_CLIENT_RECV_REPLY:
case RXRPC_CALL_SERVER_RECV_REQUEST:
case RXRPC_CALL_SERVER_ACK_REQUEST:
Expand Down
29 changes: 13 additions & 16 deletions net/rxrpc/sendmsg.c
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ bool rxrpc_propose_abort(struct rxrpc_call *call, s32 abort_code, int error,
{
_enter("{%d},%d,%d,%u", call->debug_id, abort_code, error, why);

if (!call->send_abort && call->state < RXRPC_CALL_COMPLETE) {
if (!call->send_abort && !rxrpc_call_is_complete(call)) {
call->send_abort_why = why;
call->send_abort_err = error;
call->send_abort_seq = 0;
Expand Down Expand Up @@ -60,7 +60,7 @@ static int rxrpc_wait_for_tx_window_intr(struct rxrpc_sock *rx,
if (rxrpc_check_tx_space(call, NULL))
return 0;

if (call->state >= RXRPC_CALL_COMPLETE)
if (rxrpc_call_is_complete(call))
return call->error;

if (signal_pending(current))
Expand Down Expand Up @@ -95,7 +95,7 @@ static int rxrpc_wait_for_tx_window_waitall(struct rxrpc_sock *rx,
if (rxrpc_check_tx_space(call, &tx_win))
return 0;

if (call->state >= RXRPC_CALL_COMPLETE)
if (rxrpc_call_is_complete(call))
return call->error;

if (timeout == 0 &&
Expand Down Expand Up @@ -124,7 +124,7 @@ static int rxrpc_wait_for_tx_window_nonintr(struct rxrpc_sock *rx,
if (rxrpc_check_tx_space(call, NULL))
return 0;

if (call->state >= RXRPC_CALL_COMPLETE)
if (rxrpc_call_is_complete(call))
return call->error;

trace_rxrpc_txqueue(call, rxrpc_txqueue_wait);
Expand Down Expand Up @@ -273,7 +273,7 @@ static int rxrpc_send_data(struct rxrpc_sock *rx,
ret = -EPIPE;
if (sk->sk_shutdown & SEND_SHUTDOWN)
goto maybe_error;
state = READ_ONCE(call->state);
state = rxrpc_call_state(call);
ret = -ESHUTDOWN;
if (state >= RXRPC_CALL_COMPLETE)
goto maybe_error;
Expand Down Expand Up @@ -350,7 +350,7 @@ static int rxrpc_send_data(struct rxrpc_sock *rx,

/* check for the far side aborting the call or a network error
* occurring */
if (call->state == RXRPC_CALL_COMPLETE)
if (rxrpc_call_is_complete(call))
goto call_terminated;

/* add the packet to the send queue if it's now full */
Expand All @@ -375,12 +375,9 @@ static int rxrpc_send_data(struct rxrpc_sock *rx,

success:
ret = copied;
if (READ_ONCE(call->state) == RXRPC_CALL_COMPLETE) {
read_lock(&call->state_lock);
if (call->error < 0)
ret = call->error;
read_unlock(&call->state_lock);
}
if (rxrpc_call_is_complete(call) &&
call->error < 0)
ret = call->error;
out:
call->tx_pending = txb;
_leave(" = %d", ret);
Expand Down Expand Up @@ -618,10 +615,10 @@ int rxrpc_do_sendmsg(struct rxrpc_sock *rx, struct msghdr *msg, size_t len)
return PTR_ERR(call);
/* ... and we have the call lock. */
ret = 0;
if (READ_ONCE(call->state) == RXRPC_CALL_COMPLETE)
if (rxrpc_call_is_complete(call))
goto out_put_unlock;
} else {
switch (READ_ONCE(call->state)) {
switch (rxrpc_call_state(call)) {
case RXRPC_CALL_UNINITIALISED:
case RXRPC_CALL_CLIENT_AWAIT_CONN:
case RXRPC_CALL_SERVER_PREALLOC:
Expand Down Expand Up @@ -675,7 +672,7 @@ int rxrpc_do_sendmsg(struct rxrpc_sock *rx, struct msghdr *msg, size_t len)
break;
}

state = READ_ONCE(call->state);
state = rxrpc_call_state(call);
_debug("CALL %d USR %lx ST %d on CONN %p",
call->debug_id, call->user_call_ID, state, call->conn);

Expand Down Expand Up @@ -735,7 +732,7 @@ int rxrpc_kernel_send_data(struct socket *sock, struct rxrpc_call *call,
_debug("CALL %d USR %lx ST %d on CONN %p",
call->debug_id, call->user_call_ID, call->state, call->conn);

switch (READ_ONCE(call->state)) {
switch (rxrpc_call_state(call)) {
case RXRPC_CALL_CLIENT_SEND_REQUEST:
case RXRPC_CALL_SERVER_ACK_REQUEST:
case RXRPC_CALL_SERVER_SEND_REPLY:
Expand Down

0 comments on commit d41b3f5

Please sign in to comment.