Skip to content

Commit

Permalink
rxrpc: Don't use a ring buffer for call Tx queue
Browse files Browse the repository at this point in the history
Change the way the Tx queueing works to make the following ends easier to
achieve:

 (1) The filling of packets, the encryption of packets and the transmission
     of packets can be handled in parallel by separate threads, rather than
     rxrpc_sendmsg() allocating, filling, encrypting and transmitting each
     packet before moving onto the next one.

 (2) Get rid of the fixed-size ring which sets a hard limit on the number
     of packets that can be retained in the ring.  This allows the number
     of packets to increase without having to allocate a very large ring or
     having variable-sized rings.

     [Note: the downside of this is that it's then less efficient to locate
     a packet for retransmission as we then have to step through a list and
     examine each buffer in the list.]

 (3) Allow the filler/encrypter to run ahead of the transmission window.

 (4) Make it easier to do zero copy UDP from the packet buffers.

 (5) Make it easier to do zero copy from userspace to the packet buffers -
     and thence to UDP (only if for unauthenticated connections).

To that end, the following changes are made:

 (1) Use the new rxrpc_txbuf struct instead of sk_buff for keeping packets
     to be transmitted in.  This allows them to be placed on multiple
     queues simultaneously.  An sk_buff isn't really necessary as it's
     never passed on to lower-level networking code.

 (2) Keep the transmissable packets in a linked list on the call struct
     rather than in a ring.  As a consequence, the annotation buffer isn't
     used either; rather a flag is set on the packet to indicate ackedness.

 (3) Use the RXRPC_CALL_TX_LAST flag to indicate that the last packet to be
     transmitted has been queued.  Add RXRPC_CALL_TX_ALL_ACKED to indicate
     that all packets up to and including the last got hard acked.

 (4) Wire headers are now stored in the txbuf rather than being concocted
     on the stack and they're stored immediately before the data, thereby
     allowing zerocopy of a single span.

 (5) Don't bother with instant-resend on transmission failure; rather,
     leave it for a timer or an ACK packet to trigger.

Signed-off-by: David Howells <dhowells@redhat.com>
cc: Marc Dionne <marc.dionne@auristor.com>
cc: linux-afs@lists.infradead.org
  • Loading branch information
David Howells committed Nov 8, 2022
1 parent 5d7edbc commit a4ea4c4
Show file tree
Hide file tree
Showing 12 changed files with 349 additions and 461 deletions.
78 changes: 40 additions & 38 deletions include/trace/events/rxrpc.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
EM(rxrpc_call_got, "GOT") \
EM(rxrpc_call_got_kernel, "Gke") \
EM(rxrpc_call_got_timer, "GTM") \
EM(rxrpc_call_got_tx, "Gtx") \
EM(rxrpc_call_got_userid, "Gus") \
EM(rxrpc_call_new_client, "NWc") \
EM(rxrpc_call_new_service, "NWs") \
Expand All @@ -83,20 +84,22 @@
EM(rxrpc_call_put_noqueue, "PnQ") \
EM(rxrpc_call_put_notimer, "PnT") \
EM(rxrpc_call_put_timer, "PTM") \
EM(rxrpc_call_put_tx, "Ptx") \
EM(rxrpc_call_put_userid, "Pus") \
EM(rxrpc_call_queued, "QUE") \
EM(rxrpc_call_queued_ref, "QUR") \
EM(rxrpc_call_release, "RLS") \
E_(rxrpc_call_seen, "SEE")

#define rxrpc_transmit_traces \
EM(rxrpc_transmit_await_reply, "AWR") \
EM(rxrpc_transmit_end, "END") \
EM(rxrpc_transmit_queue, "QUE") \
EM(rxrpc_transmit_queue_last, "QLS") \
EM(rxrpc_transmit_rotate, "ROT") \
EM(rxrpc_transmit_rotate_last, "RLS") \
E_(rxrpc_transmit_wait, "WAI")
#define rxrpc_txqueue_traces \
EM(rxrpc_txqueue_await_reply, "AWR") \
EM(rxrpc_txqueue_dequeue, "DEQ") \
EM(rxrpc_txqueue_end, "END") \
EM(rxrpc_txqueue_queue, "QUE") \
EM(rxrpc_txqueue_queue_last, "QLS") \
EM(rxrpc_txqueue_rotate, "ROT") \
EM(rxrpc_txqueue_rotate_last, "RLS") \
E_(rxrpc_txqueue_wait, "WAI")

#define rxrpc_receive_traces \
EM(rxrpc_receive_end, "END") \
Expand Down Expand Up @@ -259,13 +262,15 @@
EM(rxrpc_txbuf_alloc_ack, "ALLOC ACK ") \
EM(rxrpc_txbuf_alloc_data, "ALLOC DATA ") \
EM(rxrpc_txbuf_free, "FREE ") \
EM(rxrpc_txbuf_get_buffer, "GET BUFFER ") \
EM(rxrpc_txbuf_get_trans, "GET TRANS ") \
EM(rxrpc_txbuf_get_retrans, "GET RETRANS") \
EM(rxrpc_txbuf_put_ack_tx, "PUT ACK TX ") \
EM(rxrpc_txbuf_put_cleaned, "PUT CLEANED") \
EM(rxrpc_txbuf_put_nomem, "PUT NOMEM ") \
EM(rxrpc_txbuf_put_rotated, "PUT ROTATED") \
EM(rxrpc_txbuf_put_send_aborted, "PUT SEND-X ") \
EM(rxrpc_txbuf_put_trans, "PUT TRANS ") \
EM(rxrpc_txbuf_see_send_more, "SEE SEND+ ") \
E_(rxrpc_txbuf_see_unacked, "SEE UNACKED")

Expand Down Expand Up @@ -295,9 +300,9 @@ enum rxrpc_rtt_rx_trace { rxrpc_rtt_rx_traces } __mode(byte);
enum rxrpc_rtt_tx_trace { rxrpc_rtt_tx_traces } __mode(byte);
enum rxrpc_skb_trace { rxrpc_skb_traces } __mode(byte);
enum rxrpc_timer_trace { rxrpc_timer_traces } __mode(byte);
enum rxrpc_transmit_trace { rxrpc_transmit_traces } __mode(byte);
enum rxrpc_tx_point { rxrpc_tx_points } __mode(byte);
enum rxrpc_txbuf_trace { rxrpc_txbuf_traces } __mode(byte);
enum rxrpc_txqueue_trace { rxrpc_txqueue_traces } __mode(byte);

#endif /* end __RXRPC_DECLARE_TRACE_ENUMS_ONCE_ONLY */

Expand All @@ -323,9 +328,9 @@ rxrpc_rtt_rx_traces;
rxrpc_rtt_tx_traces;
rxrpc_skb_traces;
rxrpc_timer_traces;
rxrpc_transmit_traces;
rxrpc_tx_points;
rxrpc_txbuf_traces;
rxrpc_txqueue_traces;

/*
* Now redefine the EM() and E_() macros to map the enums to the strings that
Expand Down Expand Up @@ -605,32 +610,36 @@ TRACE_EVENT(rxrpc_call_complete,
__entry->abort_code)
);

TRACE_EVENT(rxrpc_transmit,
TP_PROTO(struct rxrpc_call *call, enum rxrpc_transmit_trace why),
TRACE_EVENT(rxrpc_txqueue,
TP_PROTO(struct rxrpc_call *call, enum rxrpc_txqueue_trace why),

TP_ARGS(call, why),

TP_STRUCT__entry(
__field(unsigned int, call )
__field(enum rxrpc_transmit_trace, why )
__field(rxrpc_seq_t, tx_hard_ack )
__field(enum rxrpc_txqueue_trace, why )
__field(rxrpc_seq_t, acks_hard_ack )
__field(rxrpc_seq_t, tx_bottom )
__field(rxrpc_seq_t, tx_top )
__field(int, tx_winsize )
),

TP_fast_assign(
__entry->call = call->debug_id;
__entry->why = why;
__entry->tx_hard_ack = call->tx_hard_ack;
__entry->acks_hard_ack = call->acks_hard_ack;
__entry->tx_bottom = call->tx_bottom;
__entry->tx_top = call->tx_top;
__entry->tx_winsize = call->tx_winsize;
),

TP_printk("c=%08x %s f=%08x n=%u/%u",
TP_printk("c=%08x %s f=%08x h=%08x n=%u/%u/%u",
__entry->call,
__print_symbolic(__entry->why, rxrpc_transmit_traces),
__entry->tx_hard_ack + 1,
__entry->tx_top - __entry->tx_hard_ack,
__print_symbolic(__entry->why, rxrpc_txqueue_traces),
__entry->tx_bottom,
__entry->acks_hard_ack,
__entry->tx_top - __entry->tx_bottom,
__entry->tx_top - __entry->acks_hard_ack,
__entry->tx_winsize)
);

Expand Down Expand Up @@ -1200,29 +1209,25 @@ TRACE_EVENT(rxrpc_drop_ack,
);

TRACE_EVENT(rxrpc_retransmit,
TP_PROTO(struct rxrpc_call *call, rxrpc_seq_t seq, u8 annotation,
s64 expiry),
TP_PROTO(struct rxrpc_call *call, rxrpc_seq_t seq, s64 expiry),

TP_ARGS(call, seq, annotation, expiry),
TP_ARGS(call, seq, expiry),

TP_STRUCT__entry(
__field(unsigned int, call )
__field(rxrpc_seq_t, seq )
__field(u8, annotation )
__field(s64, expiry )
),

TP_fast_assign(
__entry->call = call->debug_id;
__entry->seq = seq;
__entry->annotation = annotation;
__entry->expiry = expiry;
),

TP_printk("c=%08x q=%x a=%02x xp=%lld",
TP_printk("c=%08x q=%x xp=%lld",
__entry->call,
__entry->seq,
__entry->annotation,
__entry->expiry)
);

Expand All @@ -1245,14 +1250,14 @@ TRACE_EVENT(rxrpc_congest,
TP_fast_assign(
__entry->call = call->debug_id;
__entry->change = change;
__entry->hard_ack = call->tx_hard_ack;
__entry->hard_ack = call->acks_hard_ack;
__entry->top = call->tx_top;
__entry->lowest_nak = call->acks_lowest_nak;
__entry->ack_serial = ack_serial;
memcpy(&__entry->sum, summary, sizeof(__entry->sum));
),

TP_printk("c=%08x r=%08x %s q=%08x %s cw=%u ss=%u nr=%u,%u nw=%u,%u r=%u b=%u u=%u d=%u l=%x%s%s%s",
TP_printk("c=%08x r=%08x %s q=%08x %s cw=%u ss=%u nA=%u,%u+%u,%u r=%u b=%u u=%u d=%u l=%x%s%s%s",
__entry->call,
__entry->ack_serial,
__print_symbolic(__entry->sum.ack_reason, rxrpc_ack_names),
Expand Down Expand Up @@ -1362,26 +1367,23 @@ TRACE_EVENT(rxrpc_connect_call,
);

TRACE_EVENT(rxrpc_resend,
TP_PROTO(struct rxrpc_call *call, int ix),
TP_PROTO(struct rxrpc_call *call),

TP_ARGS(call, ix),
TP_ARGS(call),

TP_STRUCT__entry(
__field(unsigned int, call )
__field(int, ix )
__array(u8, anno, 64 )
__field(rxrpc_seq_t, seq )
),

TP_fast_assign(
__entry->call = call->debug_id;
__entry->ix = ix;
memcpy(__entry->anno, call->rxtx_annotations, 64);
__entry->seq = call->acks_hard_ack;
),

TP_printk("c=%08x ix=%u a=%64phN",
TP_printk("c=%08x q=%x",
__entry->call,
__entry->ix,
__entry->anno)
__entry->seq)
);

TRACE_EVENT(rxrpc_rx_icmp,
Expand Down Expand Up @@ -1461,7 +1463,7 @@ TRACE_EVENT(rxrpc_call_reset,
__entry->call_id = call->call_id;
__entry->call_serial = call->rx_serial;
__entry->conn_serial = call->conn->hi_serial;
__entry->tx_seq = call->tx_hard_ack;
__entry->tx_seq = call->acks_hard_ack;
__entry->rx_seq = call->rx_highest_seq;
),

Expand Down
5 changes: 2 additions & 3 deletions net/rxrpc/af_rxrpc.c
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ atomic_t rxrpc_debug_id;
EXPORT_SYMBOL(rxrpc_debug_id);

/* count of skbs currently in use */
atomic_t rxrpc_n_tx_skbs, rxrpc_n_rx_skbs;
atomic_t rxrpc_n_rx_skbs;

struct workqueue_struct *rxrpc_workqueue;

Expand Down Expand Up @@ -979,7 +979,7 @@ static int __init af_rxrpc_init(void)
goto error_call_jar;
}

rxrpc_workqueue = alloc_workqueue("krxrpcd", 0, 1);
rxrpc_workqueue = alloc_workqueue("krxrpcd", WQ_HIGHPRI | WQ_MEM_RECLAIM | WQ_UNBOUND, 1);
if (!rxrpc_workqueue) {
pr_notice("Failed to allocate work queue\n");
goto error_work_queue;
Expand Down Expand Up @@ -1059,7 +1059,6 @@ static void __exit af_rxrpc_exit(void)
sock_unregister(PF_RXRPC);
proto_unregister(&rxrpc_proto);
unregister_pernet_device(&rxrpc_net_ops);
ASSERTCMP(atomic_read(&rxrpc_n_tx_skbs), ==, 0);
ASSERTCMP(atomic_read(&rxrpc_n_rx_skbs), ==, 0);

/* Make sure the local and peer records pinned by any dying connections
Expand Down
31 changes: 13 additions & 18 deletions net/rxrpc/ar-internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,6 @@ struct rxrpc_host_header {
* - max 48 bytes (struct sk_buff::cb)
*/
struct rxrpc_skb_priv {
u16 remain;
u16 offset; /* Offset of data */
u16 len; /* Length of data */
u8 flags;
Expand Down Expand Up @@ -243,7 +242,7 @@ struct rxrpc_security {
size_t *, size_t *, size_t *);

/* impose security on a packet */
int (*secure_packet)(struct rxrpc_call *, struct sk_buff *, size_t);
int (*secure_packet)(struct rxrpc_call *, struct rxrpc_txbuf *);

/* verify the security on a received packet */
int (*verify_packet)(struct rxrpc_call *, struct sk_buff *);
Expand Down Expand Up @@ -497,6 +496,7 @@ enum rxrpc_call_flag {
RXRPC_CALL_EXPOSED, /* The call was exposed to the world */
RXRPC_CALL_RX_LAST, /* Received the last packet (at rxtx_top) */
RXRPC_CALL_TX_LAST, /* Last packet in Tx buffer (at rxtx_top) */
RXRPC_CALL_TX_ALL_ACKED, /* Last packet has been hard-acked */
RXRPC_CALL_SEND_PING, /* A ping will need to be sent */
RXRPC_CALL_RETRANS_TIMEOUT, /* Retransmission due to timeout occurred */
RXRPC_CALL_BEGAN_RX_TIMER, /* We began the expect_rx_by timer */
Expand Down Expand Up @@ -594,7 +594,7 @@ struct rxrpc_call {
struct list_head recvmsg_link; /* Link in rx->recvmsg_q */
struct list_head sock_link; /* Link in rx->sock_calls */
struct rb_node sock_node; /* Node in rx->calls */
struct sk_buff *tx_pending; /* Tx socket buffer being filled */
struct rxrpc_txbuf *tx_pending; /* Tx buffer being filled */
wait_queue_head_t waitq; /* Wait queue for channel or Tx */
s64 tx_total_len; /* Total length left to be transmitted (or -1) */
__be32 crypto_buf[2]; /* Temporary packet crypto buffer */
Expand Down Expand Up @@ -632,22 +632,16 @@ struct rxrpc_call {
#define RXRPC_INIT_RX_WINDOW_SIZE 63
struct sk_buff **rxtx_buffer;
u8 *rxtx_annotations;
#define RXRPC_TX_ANNO_ACK 0
#define RXRPC_TX_ANNO_UNACK 1
#define RXRPC_TX_ANNO_NAK 2
#define RXRPC_TX_ANNO_RETRANS 3
#define RXRPC_TX_ANNO_MASK 0x03
#define RXRPC_TX_ANNO_LAST 0x04
#define RXRPC_TX_ANNO_RESENT 0x08

rxrpc_seq_t tx_hard_ack; /* Dead slot in buffer; the first transmitted but
* not hard-ACK'd packet follows this.
*/

/* Transmitted data tracking. */
spinlock_t tx_lock; /* Transmit queue lock */
struct list_head tx_buffer; /* Buffer of transmissible packets */
rxrpc_seq_t tx_bottom; /* First packet in buffer */
rxrpc_seq_t tx_transmitted; /* Highest packet transmitted */
rxrpc_seq_t tx_top; /* Highest Tx slot allocated. */
u16 tx_backoff; /* Delay to insert due to Tx failure */
u8 tx_winsize; /* Maximum size of Tx window */
#define RXRPC_TX_MAX_WINDOW 128

/* Received data tracking */
struct sk_buff_head recvmsg_queue; /* Queue of packets ready for recvmsg() */
Expand All @@ -657,6 +651,7 @@ struct rxrpc_call {
rxrpc_seq_t rx_consumed; /* Highest packet consumed */
rxrpc_serial_t rx_serial; /* Highest serial received for this call */
u8 rx_winsize; /* Size of Rx window */
spinlock_t input_lock; /* Lock for packet input to this call */

/* TCP-style slow-start congestion control [RFC5681]. Since the SMSS
* is fixed, we keep these numbers in terms of segments (ie. DATA
Expand All @@ -671,8 +666,6 @@ struct rxrpc_call {
u8 cong_cumul_acks; /* Cumulative ACK count */
ktime_t cong_tstamp; /* Last time cwnd was changed */

spinlock_t input_lock; /* Lock for packet input to this call */

/* Receive-phase ACK management (ACKs we send). */
u8 ackr_reason; /* reason to ACK */
rxrpc_serial_t ackr_serial; /* serial of packet being ACK'd */
Expand All @@ -697,6 +690,7 @@ struct rxrpc_call {
ktime_t acks_latest_ts; /* Timestamp of latest ACK received */
rxrpc_seq_t acks_first_seq; /* first sequence number received */
rxrpc_seq_t acks_prev_seq; /* Highest previousPacket received */
rxrpc_seq_t acks_hard_ack; /* Latest hard-ack point */
rxrpc_seq_t acks_lowest_nak; /* Lowest NACK in the buffer (or ==tx_hard_ack) */
rxrpc_seq_t acks_lost_top; /* tx_top at the time lost-ack ping sent */
rxrpc_serial_t acks_lost_ping; /* Serial number of probe ACK */
Expand Down Expand Up @@ -809,7 +803,7 @@ static inline bool rxrpc_sending_to_client(const struct rxrpc_txbuf *txb)
/*
* af_rxrpc.c
*/
extern atomic_t rxrpc_n_tx_skbs, rxrpc_n_rx_skbs;
extern atomic_t rxrpc_n_rx_skbs;
extern struct workqueue_struct *rxrpc_workqueue;

/*
Expand All @@ -831,6 +825,7 @@ void rxrpc_propose_ping(struct rxrpc_call *call, u32 serial,
void rxrpc_send_ACK(struct rxrpc_call *, u8, rxrpc_serial_t, enum rxrpc_propose_ack_trace);
void rxrpc_propose_delay_ACK(struct rxrpc_call *, rxrpc_serial_t,
enum rxrpc_propose_ack_trace);
void rxrpc_shrink_call_tx_buffer(struct rxrpc_call *);
void rxrpc_process_call(struct work_struct *);

void rxrpc_reduce_call_timer(struct rxrpc_call *call,
Expand Down Expand Up @@ -1034,7 +1029,7 @@ static inline struct rxrpc_net *rxrpc_net(struct net *net)
*/
void rxrpc_transmit_ack_packets(struct rxrpc_local *);
int rxrpc_send_abort_packet(struct rxrpc_call *);
int rxrpc_send_data_packet(struct rxrpc_call *, struct sk_buff *, bool);
int rxrpc_send_data_packet(struct rxrpc_call *, struct rxrpc_txbuf *);
void rxrpc_reject_packets(struct rxrpc_local *);
void rxrpc_send_keepalive(struct rxrpc_peer *);

Expand Down
Loading

0 comments on commit a4ea4c4

Please sign in to comment.