Skip to content

Commit

Permalink
rxrpc: Do zerocopy using MSG_SPLICE_PAGES and page frags
Browse files Browse the repository at this point in the history
Switch from keeping the transmission buffers in the rxrpc_txbuf struct and
allocated from the slab, to allocating them using page fragment allocators
(which uses raw pages), thereby allowing them to be passed to
MSG_SPLICE_PAGES and avoid copying into the UDP buffers.

Signed-off-by: David Howells <dhowells@redhat.com>
cc: Marc Dionne <marc.dionne@auristor.com>
cc: "David S. Miller" <davem@davemloft.net>
cc: Eric Dumazet <edumazet@google.com>
cc: Jakub Kicinski <kuba@kernel.org>
cc: Paolo Abeni <pabeni@redhat.com>
cc: linux-afs@lists.infradead.org
cc: netdev@vger.kernel.org
  • Loading branch information
David Howells committed Mar 5, 2024
1 parent 8985f2b commit 49489bb
Show file tree
Hide file tree
Showing 8 changed files with 219 additions and 144 deletions.
32 changes: 10 additions & 22 deletions net/rxrpc/ar-internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -248,10 +248,9 @@ struct rxrpc_security {
struct rxrpc_key_token *);

/* Work out how much data we can store in a packet, given an estimate
* of the amount of data remaining.
* of the amount of data remaining and allocate a data buffer.
*/
int (*how_much_data)(struct rxrpc_call *, size_t,
size_t *, size_t *, size_t *);
struct rxrpc_txbuf *(*alloc_txbuf)(struct rxrpc_call *call, size_t remaining, gfp_t gfp);

/* impose security on a packet */
int (*secure_packet)(struct rxrpc_call *, struct rxrpc_txbuf *);
Expand Down Expand Up @@ -292,6 +291,7 @@ struct rxrpc_local {
struct socket *socket; /* my UDP socket */
struct task_struct *io_thread;
struct completion io_thread_ready; /* Indication that the I/O thread started */
struct page_frag_cache tx_alloc; /* Tx control packet allocation (I/O thread only) */
struct rxrpc_sock *service; /* Service(s) listening on this endpoint */
#ifdef CONFIG_AF_RXRPC_INJECT_RX_DELAY
struct sk_buff_head rx_delay_queue; /* Delay injection queue */
Expand Down Expand Up @@ -500,6 +500,8 @@ struct rxrpc_connection {
struct list_head proc_link; /* link in procfs list */
struct list_head link; /* link in master connection list */
struct sk_buff_head rx_queue; /* received conn-level packets */
struct page_frag_cache tx_data_alloc; /* Tx DATA packet allocation */
struct mutex tx_data_alloc_lock;

struct mutex security_lock; /* Lock for security management */
const struct rxrpc_security *security; /* applied security module */
Expand Down Expand Up @@ -788,7 +790,6 @@ struct rxrpc_send_params {
* Buffer of data to be output as a packet.
*/
struct rxrpc_txbuf {
struct rcu_head rcu;
struct list_head call_link; /* Link in call->tx_sendmsg/tx_buffer */
struct list_head tx_link; /* Link in live Enc queue or Tx queue */
ktime_t last_sent; /* Time at which last transmitted */
Expand All @@ -806,22 +807,8 @@ struct rxrpc_txbuf {
__be16 cksum; /* Checksum to go in header */
unsigned short ack_rwind; /* ACK receive window */
u8 /*enum rxrpc_propose_ack_trace*/ ack_why; /* If ack, why */
u8 nr_kvec;
struct kvec kvec[1];
struct {
/* The packet for encrypting and DMA'ing. We align it such
* that data[] aligns correctly for any crypto blocksize.
*/
u8 pad[64 - sizeof(struct rxrpc_wire_header)];
struct rxrpc_wire_header _wire; /* Network-ready header */
union {
u8 data[RXRPC_JUMBO_DATALEN]; /* Data packet */
struct {
struct rxrpc_ackpacket _ack;
DECLARE_FLEX_ARRAY(u8, acks);
};
};
} __aligned(64);
u8 nr_kvec; /* Amount of kvec[] used */
struct kvec kvec[3];
};

static inline bool rxrpc_sending_to_server(const struct rxrpc_txbuf *txb)
Expand Down Expand Up @@ -1299,8 +1286,9 @@ static inline void rxrpc_sysctl_exit(void) {}
* txbuf.c
*/
extern atomic_t rxrpc_nr_txbuf;
struct rxrpc_txbuf *rxrpc_alloc_txbuf(struct rxrpc_call *call, u8 packet_type,
gfp_t gfp);
struct rxrpc_txbuf *rxrpc_alloc_data_txbuf(struct rxrpc_call *call, size_t data_size,
size_t data_align, gfp_t gfp);
struct rxrpc_txbuf *rxrpc_alloc_ack_txbuf(struct rxrpc_call *call, size_t sack_size);
void rxrpc_get_txbuf(struct rxrpc_txbuf *txb, enum rxrpc_txbuf_trace what);
void rxrpc_see_txbuf(struct rxrpc_txbuf *txb, enum rxrpc_txbuf_trace what);
void rxrpc_put_txbuf(struct rxrpc_txbuf *txb, enum rxrpc_txbuf_trace what);
Expand Down
4 changes: 4 additions & 0 deletions net/rxrpc/conn_object.c
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ struct rxrpc_connection *rxrpc_alloc_connection(struct rxrpc_net *rxnet,
INIT_LIST_HEAD(&conn->proc_link);
INIT_LIST_HEAD(&conn->link);
mutex_init(&conn->security_lock);
mutex_init(&conn->tx_data_alloc_lock);
skb_queue_head_init(&conn->rx_queue);
conn->rxnet = rxnet;
conn->security = &rxrpc_no_security;
Expand Down Expand Up @@ -341,6 +342,9 @@ static void rxrpc_clean_up_connection(struct work_struct *work)
*/
rxrpc_purge_queue(&conn->rx_queue);

if (conn->tx_data_alloc.va)
__page_frag_cache_drain(virt_to_page(conn->tx_data_alloc.va),
conn->tx_data_alloc.pagecnt_bias);
call_rcu(&conn->rcu, rxrpc_rcu_free_connection);
}

Expand Down
11 changes: 4 additions & 7 deletions net/rxrpc/insecure.c
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,11 @@ static int none_init_connection_security(struct rxrpc_connection *conn,
}

/*
* Work out how much data we can put in an unsecured packet.
* Allocate an appropriately sized buffer for the amount of data remaining.
*/
static int none_how_much_data(struct rxrpc_call *call, size_t remain,
size_t *_buf_size, size_t *_data_size, size_t *_offset)
static struct rxrpc_txbuf *none_alloc_txbuf(struct rxrpc_call *call, size_t remain, gfp_t gfp)
{
*_buf_size = *_data_size = min_t(size_t, remain, RXRPC_JUMBO_DATALEN);
*_offset = 0;
return 0;
return rxrpc_alloc_data_txbuf(call, min_t(size_t, remain, RXRPC_JUMBO_DATALEN), 0, gfp);
}

static int none_secure_packet(struct rxrpc_call *call, struct rxrpc_txbuf *txb)
Expand Down Expand Up @@ -79,7 +76,7 @@ const struct rxrpc_security rxrpc_no_security = {
.exit = none_exit,
.init_connection_security = none_init_connection_security,
.free_call_crypto = none_free_call_crypto,
.how_much_data = none_how_much_data,
.alloc_txbuf = none_alloc_txbuf,
.secure_packet = none_secure_packet,
.verify_packet = none_verify_packet,
.respond_to_challenge = none_respond_to_challenge,
Expand Down
3 changes: 3 additions & 0 deletions net/rxrpc/local_object.c
Original file line number Diff line number Diff line change
Expand Up @@ -452,6 +452,9 @@ void rxrpc_destroy_local(struct rxrpc_local *local)
#endif
rxrpc_purge_queue(&local->rx_queue);
rxrpc_purge_client_connections(local);
if (local->tx_alloc.va)
__page_frag_cache_drain(virt_to_page(local->tx_alloc.va),
local->tx_alloc.pagecnt_bias);
}

/*
Expand Down
65 changes: 31 additions & 34 deletions net/rxrpc/output.c
Original file line number Diff line number Diff line change
Expand Up @@ -83,18 +83,16 @@ static void rxrpc_fill_out_ack(struct rxrpc_call *call,
rxrpc_serial_t serial)
{
struct rxrpc_wire_header *whdr = txb->kvec[0].iov_base;
struct rxrpc_acktrailer *trailer = txb->kvec[2].iov_base + 3;
struct rxrpc_ackpacket *ack = (struct rxrpc_ackpacket *)(whdr + 1);
struct rxrpc_acktrailer trailer;
unsigned int qsize, sack, wrap, to;
rxrpc_seq_t window, wtop;
int rsize;
u32 mtu, jmax;
u8 *ackp = txb->acks;
u8 *filler = txb->kvec[2].iov_base;
u8 *sackp = txb->kvec[1].iov_base;

call->ackr_nr_unacked = 0;
atomic_set(&call->ackr_nr_consumed, 0);
rxrpc_inc_stat(call->rxnet, stat_tx_ack_fill);
clear_bit(RXRPC_CALL_RX_IS_IDLE, &call->flags);

window = call->ackr_window;
wtop = call->ackr_wtop;
Expand All @@ -110,20 +108,27 @@ static void rxrpc_fill_out_ack(struct rxrpc_call *call,
ack->serial = htonl(serial);
ack->reason = ack_reason;
ack->nAcks = wtop - window;
filler[0] = 0;
filler[1] = 0;
filler[2] = 0;

if (ack_reason == RXRPC_ACK_PING)
txb->flags |= RXRPC_REQUEST_ACK;

if (after(wtop, window)) {
txb->len += ack->nAcks;
txb->kvec[1].iov_base = sackp;
txb->kvec[1].iov_len = ack->nAcks;

wrap = RXRPC_SACK_SIZE - sack;
to = min_t(unsigned int, ack->nAcks, RXRPC_SACK_SIZE);

if (sack + ack->nAcks <= RXRPC_SACK_SIZE) {
memcpy(txb->acks, call->ackr_sack_table + sack, ack->nAcks);
memcpy(sackp, call->ackr_sack_table + sack, ack->nAcks);
} else {
memcpy(txb->acks, call->ackr_sack_table + sack, wrap);
memcpy(txb->acks + wrap, call->ackr_sack_table,
to - wrap);
memcpy(sackp, call->ackr_sack_table + sack, wrap);
memcpy(sackp + wrap, call->ackr_sack_table, to - wrap);
}

ackp += to;
} else if (before(wtop, window)) {
pr_warn("ack window backward %x %x", window, wtop);
} else if (ack->reason == RXRPC_ACK_DELAY) {
Expand All @@ -135,18 +140,11 @@ static void rxrpc_fill_out_ack(struct rxrpc_call *call,
jmax = rxrpc_rx_jumbo_max;
qsize = (window - 1) - call->rx_consumed;
rsize = max_t(int, call->rx_winsize - qsize, 0);
txb->ack_rwind = rsize;
trailer.maxMTU = htonl(rxrpc_rx_mtu);
trailer.ifMTU = htonl(mtu);
trailer.rwind = htonl(rsize);
trailer.jumbo_max = htonl(jmax);

*ackp++ = 0;
*ackp++ = 0;
*ackp++ = 0;
memcpy(ackp, &trailer, sizeof(trailer));
txb->kvec[0].iov_len += sizeof(*ack) + ack->nAcks + 3 + sizeof(trailer);
txb->len = txb->kvec[0].iov_len;
txb->ack_rwind = rsize;
trailer->maxMTU = htonl(rxrpc_rx_mtu);
trailer->ifMTU = htonl(mtu);
trailer->rwind = htonl(rsize);
trailer->jumbo_max = htonl(jmax);
}

/*
Expand Down Expand Up @@ -195,7 +193,7 @@ static void rxrpc_cancel_rtt_probe(struct rxrpc_call *call,
/*
* Transmit an ACK packet.
*/
static int rxrpc_send_ack_packet(struct rxrpc_call *call, struct rxrpc_txbuf *txb)
static void rxrpc_send_ack_packet(struct rxrpc_call *call, struct rxrpc_txbuf *txb)
{
struct rxrpc_wire_header *whdr = txb->kvec[0].iov_base;
struct rxrpc_connection *conn;
Expand All @@ -204,18 +202,16 @@ static int rxrpc_send_ack_packet(struct rxrpc_call *call, struct rxrpc_txbuf *tx
int ret, rtt_slot = -1;

if (test_bit(RXRPC_CALL_DISCONNECTED, &call->flags))
return -ECONNRESET;
return;

conn = call->conn;

msg.msg_name = &call->peer->srx.transport;
msg.msg_namelen = call->peer->srx.transport_len;
msg.msg_control = NULL;
msg.msg_controllen = 0;
msg.msg_flags = 0;
msg.msg_flags = MSG_SPLICE_PAGES;

if (ack->reason == RXRPC_ACK_PING)
txb->flags |= RXRPC_REQUEST_ACK;
whdr->flags = txb->flags & RXRPC_TXBUF_WIRE_FLAGS;

txb->serial = rxrpc_get_next_serial(conn);
Expand Down Expand Up @@ -250,8 +246,6 @@ static int rxrpc_send_ack_packet(struct rxrpc_call *call, struct rxrpc_txbuf *tx
rxrpc_cancel_rtt_probe(call, txb->serial, rtt_slot);
rxrpc_set_keepalive(call);
}

return ret;
}

/*
Expand All @@ -267,16 +261,19 @@ void rxrpc_send_ACK(struct rxrpc_call *call, u8 ack_reason,

rxrpc_inc_stat(call->rxnet, stat_tx_acks[ack_reason]);

txb = rxrpc_alloc_txbuf(call, RXRPC_PACKET_TYPE_ACK,
rcu_read_lock_held() ? GFP_ATOMIC | __GFP_NOWARN : GFP_NOFS);
txb = rxrpc_alloc_ack_txbuf(call, call->ackr_wtop - call->ackr_window);
if (!txb) {
kleave(" = -ENOMEM");
return;
}

txb->ack_why = why;

rxrpc_fill_out_ack(call, txb, ack_reason, serial);
call->ackr_nr_unacked = 0;
atomic_set(&call->ackr_nr_consumed, 0);
clear_bit(RXRPC_CALL_RX_IS_IDLE, &call->flags);

txb->ack_why = why;
trace_rxrpc_send_ack(call, why, ack_reason, serial);
rxrpc_send_ack_packet(call, txb);
rxrpc_put_txbuf(txb, rxrpc_txbuf_put_ack_tx);
Expand Down Expand Up @@ -465,7 +462,7 @@ static int rxrpc_send_data_packet(struct rxrpc_call *call, struct rxrpc_txbuf *t
msg.msg_namelen = call->peer->srx.transport_len;
msg.msg_control = NULL;
msg.msg_controllen = 0;
msg.msg_flags = 0;
msg.msg_flags = MSG_SPLICE_PAGES;

/* Track what we've attempted to transmit at least once so that the
* retransmission algorithm doesn't try to resend what we haven't sent
Expand Down
46 changes: 24 additions & 22 deletions net/rxrpc/rxkad.c
Original file line number Diff line number Diff line change
Expand Up @@ -145,16 +145,17 @@ static int rxkad_init_connection_security(struct rxrpc_connection *conn,
/*
* Work out how much data we can put in a packet.
*/
static int rxkad_how_much_data(struct rxrpc_call *call, size_t remain,
size_t *_buf_size, size_t *_data_size, size_t *_offset)
static struct rxrpc_txbuf *rxkad_alloc_txbuf(struct rxrpc_call *call, size_t remain, gfp_t gfp)
{
size_t shdr, buf_size, chunk;
struct rxrpc_txbuf *txb;
size_t shdr, space;

remain = min(remain, 65535 - sizeof(struct rxrpc_wire_header));

switch (call->conn->security_level) {
default:
buf_size = chunk = min_t(size_t, remain, RXRPC_JUMBO_DATALEN);
shdr = 0;
goto out;
space = min_t(size_t, remain, RXRPC_JUMBO_DATALEN);
return rxrpc_alloc_data_txbuf(call, space, 0, gfp);
case RXRPC_SECURITY_AUTH:
shdr = sizeof(struct rxkad_level1_hdr);
break;
Expand All @@ -163,17 +164,16 @@ static int rxkad_how_much_data(struct rxrpc_call *call, size_t remain,
break;
}

buf_size = round_down(RXRPC_JUMBO_DATALEN, RXKAD_ALIGN);
space = min_t(size_t, round_down(RXRPC_JUMBO_DATALEN, RXKAD_ALIGN), remain + shdr);
space = round_up(space, RXKAD_ALIGN);

chunk = buf_size - shdr;
if (remain < chunk)
buf_size = round_up(shdr + remain, RXKAD_ALIGN);
txb = rxrpc_alloc_data_txbuf(call, space, RXKAD_ALIGN, gfp);
if (!txb)
return NULL;

out:
*_buf_size = buf_size;
*_data_size = chunk;
*_offset = shdr;
return 0;
txb->offset += shdr;
txb->space -= shdr;
return txb;
}

/*
Expand Down Expand Up @@ -251,7 +251,8 @@ static int rxkad_secure_packet_auth(const struct rxrpc_call *call,
struct rxrpc_txbuf *txb,
struct skcipher_request *req)
{
struct rxkad_level1_hdr *hdr = (void *)txb->data;
struct rxrpc_wire_header *whdr = txb->kvec[0].iov_base;
struct rxkad_level1_hdr *hdr = (void *)(whdr + 1);
struct rxrpc_crypt iv;
struct scatterlist sg;
size_t pad;
Expand All @@ -267,14 +268,14 @@ static int rxkad_secure_packet_auth(const struct rxrpc_call *call,
pad = RXKAD_ALIGN - pad;
pad &= RXKAD_ALIGN - 1;
if (pad) {
memset(txb->data + txb->offset, 0, pad);
memset(txb->kvec[0].iov_base + txb->offset, 0, pad);
txb->len += pad;
}

/* start the encryption afresh */
memset(&iv, 0, sizeof(iv));

sg_init_one(&sg, txb->data, 8);
sg_init_one(&sg, hdr, 8);
skcipher_request_set_sync_tfm(req, call->conn->rxkad.cipher);
skcipher_request_set_callback(req, 0, NULL, NULL);
skcipher_request_set_crypt(req, &sg, &sg, 8, iv.x);
Expand All @@ -293,7 +294,8 @@ static int rxkad_secure_packet_encrypt(const struct rxrpc_call *call,
struct skcipher_request *req)
{
const struct rxrpc_key_token *token;
struct rxkad_level2_hdr *rxkhdr = (void *)txb->data;
struct rxrpc_wire_header *whdr = txb->kvec[0].iov_base;
struct rxkad_level2_hdr *rxkhdr = (void *)(whdr + 1);
struct rxrpc_crypt iv;
struct scatterlist sg;
size_t pad;
Expand All @@ -312,15 +314,15 @@ static int rxkad_secure_packet_encrypt(const struct rxrpc_call *call,
pad = RXKAD_ALIGN - pad;
pad &= RXKAD_ALIGN - 1;
if (pad) {
memset(txb->data + txb->offset, 0, pad);
memset(txb->kvec[0].iov_base + txb->offset, 0, pad);
txb->len += pad;
}

/* encrypt from the session key */
token = call->conn->key->payload.data[0];
memcpy(&iv, token->kad->session_key, sizeof(iv));

sg_init_one(&sg, txb->data, txb->len);
sg_init_one(&sg, rxkhdr, txb->len);
skcipher_request_set_sync_tfm(req, call->conn->rxkad.cipher);
skcipher_request_set_callback(req, 0, NULL, NULL);
skcipher_request_set_crypt(req, &sg, &sg, txb->len, iv.x);
Expand Down Expand Up @@ -1255,7 +1257,7 @@ const struct rxrpc_security rxkad = {
.free_preparse_server_key = rxkad_free_preparse_server_key,
.destroy_server_key = rxkad_destroy_server_key,
.init_connection_security = rxkad_init_connection_security,
.how_much_data = rxkad_how_much_data,
.alloc_txbuf = rxkad_alloc_txbuf,
.secure_packet = rxkad_secure_packet,
.verify_packet = rxkad_verify_packet,
.free_call_crypto = rxkad_free_call_crypto,
Expand Down
Loading

0 comments on commit 49489bb

Please sign in to comment.