Skip to content

Commit

Permalink
rxrpc: Get rid of the Rx ring
Browse files Browse the repository at this point in the history
Get rid of the Rx ring and replace it with a pair of queues instead.  One
queue gets the packets that are in-sequence and are ready for processing by
recvmsg(); the other queue gets the out-of-sequence packets for addition to
the first queue as the holes get filled.

The annotation ring is removed and replaced with a SACK table.  The SACK
table has the bits set that correspond exactly to the sequence number of
the packet being acked.  The SACK ring is copied when an ACK packet is
being assembled and rotated so that the first ACK is in byte 0.

Flow control handling is altered so that packets that are moved to the
in-sequence queue are hard-ACK'd even before they're consumed - and then
the Rx window size in the ACK packet (rsize) is shrunk down to compensate
(even going to 0 if the window is full).

Signed-off-by: David Howells <dhowells@redhat.com>
cc: Marc Dionne <marc.dionne@auristor.com>
cc: linux-afs@lists.infradead.org
  • Loading branch information
David Howells committed Nov 8, 2022
1 parent d4d02d8 commit 5d7edbc
Show file tree
Hide file tree
Showing 11 changed files with 290 additions and 207 deletions.
19 changes: 11 additions & 8 deletions include/trace/events/rxrpc.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,12 @@
EM(rxrpc_receive_incoming, "INC") \
EM(rxrpc_receive_queue, "QUE") \
EM(rxrpc_receive_queue_last, "QLS") \
E_(rxrpc_receive_rotate, "ROT")
EM(rxrpc_receive_queue_oos, "QUO") \
EM(rxrpc_receive_queue_oos_last, "QOL") \
EM(rxrpc_receive_oos, "OOS") \
EM(rxrpc_receive_oos_last, "OSL") \
EM(rxrpc_receive_rotate, "ROT") \
E_(rxrpc_receive_rotate_last, "RLS")

#define rxrpc_recvmsg_traces \
EM(rxrpc_recvmsg_cont, "CONT") \
Expand Down Expand Up @@ -860,26 +865,24 @@ TRACE_EVENT(rxrpc_receive,
__field(enum rxrpc_receive_trace, why )
__field(rxrpc_serial_t, serial )
__field(rxrpc_seq_t, seq )
__field(rxrpc_seq_t, hard_ack )
__field(rxrpc_seq_t, top )
__field(u64, window )
),

TP_fast_assign(
__entry->call = call->debug_id;
__entry->why = why;
__entry->serial = serial;
__entry->seq = seq;
__entry->hard_ack = call->rx_hard_ack;
__entry->top = call->rx_top;
__entry->window = atomic64_read(&call->ackr_window);
),

TP_printk("c=%08x %s r=%08x q=%08x w=%08x-%08x",
__entry->call,
__print_symbolic(__entry->why, rxrpc_receive_traces),
__entry->serial,
__entry->seq,
__entry->hard_ack,
__entry->top)
lower_32_bits(__entry->window),
upper_32_bits(__entry->window))
);

TRACE_EVENT(rxrpc_recvmsg,
Expand Down Expand Up @@ -1459,7 +1462,7 @@ TRACE_EVENT(rxrpc_call_reset,
__entry->call_serial = call->rx_serial;
__entry->conn_serial = call->conn->hi_serial;
__entry->tx_seq = call->tx_hard_ack;
__entry->rx_seq = call->rx_hard_ack;
__entry->rx_seq = call->rx_highest_seq;
),

TP_printk("c=%08x %08x:%08x r=%08x/%08x tx=%08x rx=%08x",
Expand Down
29 changes: 18 additions & 11 deletions net/rxrpc/ar-internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,6 @@ struct rxrpc_skb_priv {
u16 remain;
u16 offset; /* Offset of data */
u16 len; /* Length of data */
u8 rx_flags; /* Received packet flags */
u8 flags;
#define RXRPC_RX_VERIFIED 0x01

Expand Down Expand Up @@ -644,8 +643,20 @@ struct rxrpc_call {
rxrpc_seq_t tx_hard_ack; /* Dead slot in buffer; the first transmitted but
* not hard-ACK'd packet follows this.
*/

/* Transmitted data tracking. */
rxrpc_seq_t tx_top; /* Highest Tx slot allocated. */
u16 tx_backoff; /* Delay to insert due to Tx failure */
u8 tx_winsize; /* Maximum size of Tx window */

/* Received data tracking */
struct sk_buff_head recvmsg_queue; /* Queue of packets ready for recvmsg() */
struct sk_buff_head rx_oos_queue; /* Queue of out of sequence packets */

rxrpc_seq_t rx_highest_seq; /* Higest sequence number received */
rxrpc_seq_t rx_consumed; /* Highest packet consumed */
rxrpc_serial_t rx_serial; /* Highest serial received for this call */
u8 rx_winsize; /* Size of Rx window */

/* TCP-style slow-start congestion control [RFC5681]. Since the SMSS
* is fixed, we keep these numbers in terms of segments (ie. DATA
Expand All @@ -660,23 +671,19 @@ struct rxrpc_call {
u8 cong_cumul_acks; /* Cumulative ACK count */
ktime_t cong_tstamp; /* Last time cwnd was changed */

rxrpc_seq_t rx_hard_ack; /* Dead slot in buffer; the first received but not
* consumed packet follows this.
*/
rxrpc_seq_t rx_top; /* Highest Rx slot allocated. */
rxrpc_seq_t rx_expect_next; /* Expected next packet sequence number */
rxrpc_serial_t rx_serial; /* Highest serial received for this call */
u8 rx_winsize; /* Size of Rx window */
u8 tx_winsize; /* Maximum size of Tx window */

spinlock_t input_lock; /* Lock for packet input to this call */

/* Receive-phase ACK management (ACKs we send). */
u8 ackr_reason; /* reason to ACK */
rxrpc_serial_t ackr_serial; /* serial of packet being ACK'd */
rxrpc_seq_t ackr_highest_seq; /* Higest sequence number received */
atomic64_t ackr_window; /* Base (in LSW) and top (in MSW) of SACK window */
atomic_t ackr_nr_unacked; /* Number of unacked packets */
atomic_t ackr_nr_consumed; /* Number of packets needing hard ACK */
struct {
#define RXRPC_SACK_SIZE 256
/* SACK table for soft-acked packets */
u8 ackr_sack_table[RXRPC_SACK_SIZE];
} __aligned(8);

/* RTT management */
rxrpc_serial_t rtt_serial[4]; /* Serial number of DATA or PING sent */
Expand Down
7 changes: 5 additions & 2 deletions net/rxrpc/call_object.c
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,8 @@ struct rxrpc_call *rxrpc_alloc_call(struct rxrpc_sock *rx, gfp_t gfp,
INIT_LIST_HEAD(&call->accept_link);
INIT_LIST_HEAD(&call->recvmsg_link);
INIT_LIST_HEAD(&call->sock_link);
skb_queue_head_init(&call->recvmsg_queue);
skb_queue_head_init(&call->rx_oos_queue);
init_waitqueue_head(&call->waitq);
spin_lock_init(&call->lock);
spin_lock_init(&call->notify_lock);
Expand All @@ -165,13 +167,12 @@ struct rxrpc_call *rxrpc_alloc_call(struct rxrpc_sock *rx, gfp_t gfp,
call->tx_total_len = -1;
call->next_rx_timo = 20 * HZ;
call->next_req_timo = 1 * HZ;
atomic64_set(&call->ackr_window, 0x100000001ULL);

memset(&call->sock_node, 0xed, sizeof(call->sock_node));

/* Leave space in the ring to handle a maxed-out jumbo packet */
call->rx_winsize = rxrpc_rx_window_size;
call->tx_winsize = 16;
call->rx_expect_next = 1;

call->cong_cwnd = 2;
call->cong_ssthresh = RXRPC_RXTX_BUFF_SIZE - 1;
Expand Down Expand Up @@ -519,6 +520,8 @@ static void rxrpc_cleanup_ring(struct rxrpc_call *call)
rxrpc_free_skb(call->rxtx_buffer[i], rxrpc_skb_cleaned);
call->rxtx_buffer[i] = NULL;
}
skb_queue_purge(&call->recvmsg_queue);
skb_queue_purge(&call->rx_oos_queue);
}

/*
Expand Down
2 changes: 1 addition & 1 deletion net/rxrpc/conn_object.c
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ void __rxrpc_disconnect_call(struct rxrpc_connection *conn,
trace_rxrpc_disconnect_call(call);
switch (call->completion) {
case RXRPC_CALL_SUCCEEDED:
chan->last_seq = call->rx_hard_ack;
chan->last_seq = call->rx_highest_seq;
chan->last_type = RXRPC_PACKET_TYPE_ACK;
break;
case RXRPC_CALL_LOCALLY_ABORTED:
Expand Down
Loading

0 comments on commit 5d7edbc

Please sign in to comment.