Skip to content

Commit

Permalink
rxrpc: Calls should only have one terminal state
Browse files Browse the repository at this point in the history
Condense the terminal states of a call state machine to a single state,
plus a separate completion type value.  The value is then set, along with
error and abort code values, only when the call is transitioned to the
completion state.

Helpers are provided to simplify this.

Signed-off-by: David Howells <dhowells@redhat.com>
  • Loading branch information
David Howells committed Aug 30, 2016
1 parent ccbd3db commit f5c17aa
Show file tree
Hide file tree
Showing 12 changed files with 226 additions and 184 deletions.
116 changes: 90 additions & 26 deletions net/rxrpc/ar-internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -289,8 +289,6 @@ enum rxrpc_conn_proto_state {
RXRPC_CONN_SERVICE, /* Service secured connection */
RXRPC_CONN_REMOTELY_ABORTED, /* Conn aborted by peer */
RXRPC_CONN_LOCALLY_ABORTED, /* Conn aborted locally */
RXRPC_CONN_NETWORK_ERROR, /* Conn terminated by network error */
RXRPC_CONN_LOCAL_ERROR, /* Conn terminated by local error */
RXRPC_CONN__NR_STATES
};

Expand Down Expand Up @@ -344,7 +342,6 @@ struct rxrpc_connection {
enum rxrpc_conn_proto_state state : 8; /* current state of connection */
u32 local_abort; /* local abort code */
u32 remote_abort; /* remote abort code */
int error; /* local error incurred */
int debug_id; /* debug ID for printks */
atomic_t serial; /* packet serial number counter */
unsigned int hi_serial; /* highest serial number received */
Expand Down Expand Up @@ -411,13 +408,22 @@ enum rxrpc_call_state {
RXRPC_CALL_SERVER_ACK_REQUEST, /* - server pending ACK of request */
RXRPC_CALL_SERVER_SEND_REPLY, /* - server sending reply */
RXRPC_CALL_SERVER_AWAIT_ACK, /* - server awaiting final ACK */
RXRPC_CALL_COMPLETE, /* - call completed */
RXRPC_CALL_COMPLETE, /* - call complete */
RXRPC_CALL_DEAD, /* - call is dead */
NR__RXRPC_CALL_STATES
};

/*
* Call completion condition (state == RXRPC_CALL_COMPLETE).
*/
enum rxrpc_call_completion {
RXRPC_CALL_SUCCEEDED, /* - Normal termination */
RXRPC_CALL_SERVER_BUSY, /* - call rejected by busy server */
RXRPC_CALL_REMOTELY_ABORTED, /* - call aborted by peer */
RXRPC_CALL_LOCALLY_ABORTED, /* - call aborted locally on error or close */
RXRPC_CALL_LOCAL_ERROR, /* - call failed due to local error */
RXRPC_CALL_NETWORK_ERROR, /* - call terminated by network error */
RXRPC_CALL_DEAD, /* - call is dead */
NR__RXRPC_CALL_STATES
NR__RXRPC_CALL_COMPLETIONS
};

/*
Expand Down Expand Up @@ -451,14 +457,13 @@ struct rxrpc_call {
unsigned long events;
spinlock_t lock;
rwlock_t state_lock; /* lock for state transition */
u32 abort_code; /* Local/remote abort code */
int error; /* Local error incurred */
enum rxrpc_call_state state : 8; /* current state of call */
enum rxrpc_call_completion completion : 8; /* Call completion condition */
atomic_t usage;
atomic_t skb_count; /* Outstanding packets on this call */
atomic_t sequence; /* Tx data packet sequence counter */
u32 local_abort; /* local abort code */
u32 remote_abort; /* remote abort code */
int error_report; /* Network error (ICMP/local transport) */
int error; /* Local error incurred */
enum rxrpc_call_state state : 8; /* current state of call */
u16 service_id; /* service ID */
u32 call_id; /* call ID on connection */
u32 cid; /* connection ID plus channel index */
Expand Down Expand Up @@ -493,20 +498,6 @@ struct rxrpc_call {
unsigned long ackr_window[RXRPC_ACKR_WINDOW_ASZ + 1];
};

/*
* locally abort an RxRPC call
*/
static inline void rxrpc_abort_call(struct rxrpc_call *call, u32 abort_code)
{
write_lock_bh(&call->state_lock);
if (call->state < RXRPC_CALL_COMPLETE) {
call->local_abort = abort_code;
call->state = RXRPC_CALL_LOCALLY_ABORTED;
set_bit(RXRPC_CALL_EV_ABORT, &call->events);
}
write_unlock_bh(&call->state_lock);
}

#include <trace/events/rxrpc.h>

/*
Expand Down Expand Up @@ -534,6 +525,8 @@ void rxrpc_process_call(struct work_struct *);
/*
* call_object.c
*/
extern const char *const rxrpc_call_states[];
extern const char *const rxrpc_call_completions[];
extern unsigned int rxrpc_max_call_lifetime;
extern unsigned int rxrpc_dead_call_expiry;
extern struct kmem_cache *rxrpc_call_jar;
Expand Down Expand Up @@ -563,6 +556,78 @@ static inline bool rxrpc_is_client_call(const struct rxrpc_call *call)
return !rxrpc_is_service_call(call);
}

/*
* Transition a call to the complete state.
*/
static inline bool __rxrpc_set_call_completion(struct rxrpc_call *call,
enum rxrpc_call_completion compl,
u32 abort_code,
int error)
{
if (call->state < RXRPC_CALL_COMPLETE) {
call->abort_code = abort_code;
call->error = error;
call->completion = compl,
call->state = RXRPC_CALL_COMPLETE;
return true;
}
return false;
}

static inline bool rxrpc_set_call_completion(struct rxrpc_call *call,
enum rxrpc_call_completion compl,
u32 abort_code,
int error)
{
int ret;

write_lock_bh(&call->state_lock);
ret = __rxrpc_set_call_completion(call, compl, abort_code, error);
write_unlock_bh(&call->state_lock);
return ret;
}

/*
* Record that a call successfully completed.
*/
static inline void __rxrpc_call_completed(struct rxrpc_call *call)
{
__rxrpc_set_call_completion(call, RXRPC_CALL_SUCCEEDED, 0, 0);
}

static inline void rxrpc_call_completed(struct rxrpc_call *call)
{
write_lock_bh(&call->state_lock);
__rxrpc_call_completed(call);
write_unlock_bh(&call->state_lock);
}

/*
* Record that a call is locally aborted.
*/
static inline bool __rxrpc_abort_call(struct rxrpc_call *call,
u32 abort_code, int error)
{
if (__rxrpc_set_call_completion(call,
RXRPC_CALL_LOCALLY_ABORTED,
abort_code, error)) {
set_bit(RXRPC_CALL_EV_ABORT, &call->events);
return true;
}
return false;
}

static inline bool rxrpc_abort_call(struct rxrpc_call *call,
u32 abort_code, int error)
{
bool ret;

write_lock_bh(&call->state_lock);
ret = __rxrpc_abort_call(call, abort_code, error);
write_unlock_bh(&call->state_lock);
return ret;
}

/*
* conn_client.c
*/
Expand Down Expand Up @@ -778,7 +843,6 @@ static inline void rxrpc_put_peer(struct rxrpc_peer *peer)
/*
* proc.c
*/
extern const char *const rxrpc_call_states[];
extern const struct file_operations rxrpc_call_seq_fops;
extern const struct file_operations rxrpc_connection_seq_fops;

Expand Down
19 changes: 6 additions & 13 deletions net/rxrpc/call_accept.c
Original file line number Diff line number Diff line change
Expand Up @@ -329,12 +329,8 @@ struct rxrpc_call *rxrpc_accept_call(struct rxrpc_sock *rx,
case RXRPC_CALL_SERVER_ACCEPTING:
call->state = RXRPC_CALL_SERVER_RECV_REQUEST;
break;
case RXRPC_CALL_REMOTELY_ABORTED:
case RXRPC_CALL_LOCALLY_ABORTED:
ret = -ECONNABORTED;
goto out_release;
case RXRPC_CALL_NETWORK_ERROR:
ret = call->conn->error;
case RXRPC_CALL_COMPLETE:
ret = call->error;
goto out_release;
case RXRPC_CALL_DEAD:
ret = -ETIME;
Expand Down Expand Up @@ -403,17 +399,14 @@ int rxrpc_reject_call(struct rxrpc_sock *rx)
write_lock_bh(&call->state_lock);
switch (call->state) {
case RXRPC_CALL_SERVER_ACCEPTING:
call->state = RXRPC_CALL_SERVER_BUSY;
__rxrpc_set_call_completion(call, RXRPC_CALL_SERVER_BUSY,
0, ECONNABORTED);
if (test_and_set_bit(RXRPC_CALL_EV_REJECT_BUSY, &call->events))
rxrpc_queue_call(call);
ret = 0;
goto out_release;
case RXRPC_CALL_REMOTELY_ABORTED:
case RXRPC_CALL_LOCALLY_ABORTED:
ret = -ECONNABORTED;
goto out_release;
case RXRPC_CALL_NETWORK_ERROR:
ret = call->conn->error;
case RXRPC_CALL_COMPLETE:
ret = call->error;
goto out_release;
case RXRPC_CALL_DEAD:
ret = -ETIME;
Expand Down
42 changes: 16 additions & 26 deletions net/rxrpc/call_event.c
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ void __rxrpc_propose_ACK(struct rxrpc_call *call, u8 ack_reason,
_debug("cancel timer %%%u", serial);
try_to_del_timer_sync(&call->ack_timer);
read_lock_bh(&call->state_lock);
if (call->state <= RXRPC_CALL_COMPLETE &&
if (call->state < RXRPC_CALL_COMPLETE &&
!test_and_set_bit(RXRPC_CALL_EV_ACK, &call->events))
rxrpc_queue_call(call);
read_unlock_bh(&call->state_lock);
Expand Down Expand Up @@ -123,7 +123,7 @@ static void rxrpc_set_resend(struct rxrpc_call *call, u8 resend,
unsigned long resend_at)
{
read_lock_bh(&call->state_lock);
if (call->state >= RXRPC_CALL_COMPLETE)
if (call->state == RXRPC_CALL_COMPLETE)
resend = 0;

if (resend & 1) {
Expand Down Expand Up @@ -230,7 +230,7 @@ static void rxrpc_resend_timer(struct rxrpc_call *call)
_enter("%d,%d,%d",
call->acks_tail, call->acks_unacked, call->acks_head);

if (call->state >= RXRPC_CALL_COMPLETE)
if (call->state == RXRPC_CALL_COMPLETE)
return;

resend = 0;
Expand Down Expand Up @@ -711,7 +711,7 @@ static int rxrpc_process_rx_queue(struct rxrpc_call *call,
break;
case RXRPC_CALL_SERVER_AWAIT_ACK:
_debug("srv complete");
call->state = RXRPC_CALL_COMPLETE;
__rxrpc_call_completed(call);
post_ACK = true;
break;
case RXRPC_CALL_CLIENT_SEND_REQUEST:
Expand Down Expand Up @@ -875,32 +875,30 @@ void rxrpc_process_call(struct work_struct *work)
clear_bit(RXRPC_CALL_EV_REJECT_BUSY, &call->events);
clear_bit(RXRPC_CALL_EV_ABORT, &call->events);

error = call->error_report;
if (error < RXRPC_LOCAL_ERROR_OFFSET) {
if (call->completion == RXRPC_CALL_NETWORK_ERROR) {
mark = RXRPC_SKB_MARK_NET_ERROR;
_debug("post net error %d", error);
} else {
mark = RXRPC_SKB_MARK_LOCAL_ERROR;
error -= RXRPC_LOCAL_ERROR_OFFSET;
_debug("post net local error %d", error);
}

if (rxrpc_post_message(call, mark, error, true) < 0)
if (rxrpc_post_message(call, mark, call->error, true) < 0)
goto no_mem;
clear_bit(RXRPC_CALL_EV_RCVD_ERROR, &call->events);
goto kill_ACKs;
}

if (test_bit(RXRPC_CALL_EV_CONN_ABORT, &call->events)) {
ASSERTCMP(call->state, >, RXRPC_CALL_COMPLETE);
ASSERTCMP(call->state, ==, RXRPC_CALL_COMPLETE);

clear_bit(RXRPC_CALL_EV_REJECT_BUSY, &call->events);
clear_bit(RXRPC_CALL_EV_ABORT, &call->events);

_debug("post conn abort");

if (rxrpc_post_message(call, RXRPC_SKB_MARK_LOCAL_ERROR,
call->conn->error, true) < 0)
call->error, true) < 0)
goto no_mem;
clear_bit(RXRPC_CALL_EV_CONN_ABORT, &call->events);
goto kill_ACKs;
Expand All @@ -913,13 +911,13 @@ void rxrpc_process_call(struct work_struct *work)
}

if (test_bit(RXRPC_CALL_EV_ABORT, &call->events)) {
ASSERTCMP(call->state, >, RXRPC_CALL_COMPLETE);
ASSERTCMP(call->state, ==, RXRPC_CALL_COMPLETE);

if (rxrpc_post_message(call, RXRPC_SKB_MARK_LOCAL_ERROR,
ECONNABORTED, true) < 0)
call->error, true) < 0)
goto no_mem;
whdr.type = RXRPC_PACKET_TYPE_ABORT;
data = htonl(call->local_abort);
data = htonl(call->abort_code);
iov[1].iov_base = &data;
iov[1].iov_len = sizeof(data);
genbit = RXRPC_CALL_EV_ABORT;
Expand Down Expand Up @@ -979,13 +977,7 @@ void rxrpc_process_call(struct work_struct *work)
}

if (test_bit(RXRPC_CALL_EV_LIFE_TIMER, &call->events)) {
write_lock_bh(&call->state_lock);
if (call->state <= RXRPC_CALL_COMPLETE) {
call->state = RXRPC_CALL_LOCALLY_ABORTED;
call->local_abort = RX_CALL_TIMEOUT;
set_bit(RXRPC_CALL_EV_ABORT, &call->events);
}
write_unlock_bh(&call->state_lock);
rxrpc_abort_call(call, RX_CALL_TIMEOUT, ETIME);

_debug("post timeout");
if (rxrpc_post_message(call, RXRPC_SKB_MARK_LOCAL_ERROR,
Expand All @@ -998,7 +990,8 @@ void rxrpc_process_call(struct work_struct *work)

/* deal with assorted inbound messages */
if (!skb_queue_empty(&call->rx_queue)) {
switch (rxrpc_process_rx_queue(call, &abort_code)) {
ret = rxrpc_process_rx_queue(call, &abort_code);
switch (ret) {
case 0:
case -EAGAIN:
break;
Expand All @@ -1007,7 +1000,7 @@ void rxrpc_process_call(struct work_struct *work)
case -EKEYEXPIRED:
case -EKEYREJECTED:
case -EPROTO:
rxrpc_abort_call(call, abort_code);
rxrpc_abort_call(call, abort_code, -ret);
goto kill_ACKs;
}
}
Expand Down Expand Up @@ -1232,10 +1225,7 @@ void rxrpc_process_call(struct work_struct *work)
goto kill_ACKs;

case RXRPC_CALL_EV_ACK_FINAL:
write_lock_bh(&call->state_lock);
if (call->state == RXRPC_CALL_CLIENT_FINAL_ACK)
call->state = RXRPC_CALL_COMPLETE;
write_unlock_bh(&call->state_lock);
rxrpc_call_completed(call);
goto kill_ACKs;

default:
Expand Down
Loading

0 comments on commit f5c17aa

Please sign in to comment.