Skip to content

Commit

Permalink
rxrpc: Simplify skbuff accounting in receive path
Browse files Browse the repository at this point in the history
A received skbuff needs a ref when it gets put on a call data queue or conn
packet queue, and rxrpc_input_packet() and co. jump through a lot of hoops
to avoid double-dropping the skbuff ref so that we can avoid getting a ref
when we queue the packet.

Change this so that the skbuff ref is unconditionally dropped by the caller
of rxrpc_input_packet().  An additional ref is then taken on the packet if
it is pushed onto a queue.

Signed-off-by: David Howells <dhowells@redhat.com>
cc: Marc Dionne <marc.dionne@auristor.com>
cc: linux-afs@lists.infradead.org
  • Loading branch information
David Howells committed Dec 1, 2022
1 parent 29fb4ec commit 2d1faf7
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 62 deletions.
3 changes: 2 additions & 1 deletion include/trace/events/rxrpc.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
EM(rxrpc_skb_eaten_by_unshare_nomem, "ETN unshar-nm") \
EM(rxrpc_skb_get_ack, "GET ack ") \
EM(rxrpc_skb_get_conn_work, "GET conn-work") \
EM(rxrpc_skb_get_local_work, "GET locl-work") \
EM(rxrpc_skb_get_reject_work, "GET rej-work ") \
EM(rxrpc_skb_get_to_recvmsg, "GET to-recv ") \
EM(rxrpc_skb_get_to_recvmsg_oos, "GET to-recv-o") \
EM(rxrpc_skb_new_encap_rcv, "NEW encap-rcv") \
Expand All @@ -39,7 +41,6 @@
EM(rxrpc_skb_put_error_report, "PUT error-rep") \
EM(rxrpc_skb_put_input, "PUT input ") \
EM(rxrpc_skb_put_jumbo_subpacket, "PUT jumbo-sub") \
EM(rxrpc_skb_put_lose, "PUT lose ") \
EM(rxrpc_skb_put_purge, "PUT purge ") \
EM(rxrpc_skb_put_rotate, "PUT rotate ") \
EM(rxrpc_skb_put_unknown, "PUT unknown ") \
Expand Down
45 changes: 22 additions & 23 deletions net/rxrpc/input.c
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,8 @@ static void rxrpc_input_queue_data(struct rxrpc_call *call, struct sk_buff *skb,
/*
* Process a DATA packet.
*/
static void rxrpc_input_data_one(struct rxrpc_call *call, struct sk_buff *skb)
static void rxrpc_input_data_one(struct rxrpc_call *call, struct sk_buff *skb,
bool *_notify)
{
struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
struct sk_buff *oos;
Expand All @@ -361,15 +362,15 @@ static void rxrpc_input_data_one(struct rxrpc_call *call, struct sk_buff *skb)
if (test_and_set_bit(RXRPC_CALL_RX_LAST, &call->flags) &&
seq + 1 != wtop) {
rxrpc_proto_abort("LSN", call, seq);
goto err_free;
return;
}
} else {
if (test_bit(RXRPC_CALL_RX_LAST, &call->flags) &&
after_eq(seq, wtop)) {
pr_warn("Packet beyond last: c=%x q=%x window=%x-%x wlimit=%x\n",
call->debug_id, seq, window, wtop, wlimit);
rxrpc_proto_abort("LSA", call, seq);
goto err_free;
return;
}
}

Expand Down Expand Up @@ -402,9 +403,11 @@ static void rxrpc_input_data_one(struct rxrpc_call *call, struct sk_buff *skb)
if (after(window, wtop))
wtop = window;

rxrpc_get_skb(skb, rxrpc_skb_get_to_recvmsg);

spin_lock(&call->recvmsg_queue.lock);
rxrpc_input_queue_data(call, skb, window, wtop, rxrpc_receive_queue);
skb = NULL;
*_notify = true;

while ((oos = skb_peek(&call->rx_oos_queue))) {
struct rxrpc_skb_priv *osp = rxrpc_skb(oos);
Expand Down Expand Up @@ -456,16 +459,17 @@ static void rxrpc_input_data_one(struct rxrpc_call *call, struct sk_buff *skb)
struct rxrpc_skb_priv *osp = rxrpc_skb(oos);

if (after(osp->hdr.seq, seq)) {
rxrpc_get_skb(skb, rxrpc_skb_get_to_recvmsg_oos);
__skb_queue_before(&call->rx_oos_queue, oos, skb);
goto oos_queued;
}
}

rxrpc_get_skb(skb, rxrpc_skb_get_to_recvmsg_oos);
__skb_queue_tail(&call->rx_oos_queue, skb);
oos_queued:
trace_rxrpc_receive(call, last ? rxrpc_receive_oos_last : rxrpc_receive_oos,
sp->hdr.serial, sp->hdr.seq);
skb = NULL;
}

send_ack:
Expand All @@ -483,9 +487,6 @@ static void rxrpc_input_data_one(struct rxrpc_call *call, struct sk_buff *skb)
else
rxrpc_propose_delay_ACK(call, serial,
rxrpc_propose_ack_input_data);

err_free:
rxrpc_free_skb(skb, rxrpc_skb_put_input);
}

/*
Expand All @@ -498,6 +499,7 @@ static bool rxrpc_input_split_jumbo(struct rxrpc_call *call, struct sk_buff *skb
struct sk_buff *jskb;
unsigned int offset = sizeof(struct rxrpc_wire_header);
unsigned int len = skb->len - offset;
bool notify = false;

while (sp->hdr.flags & RXRPC_JUMBO_PACKET) {
if (len < RXRPC_JUMBO_SUBPKTLEN)
Expand All @@ -517,7 +519,8 @@ static bool rxrpc_input_split_jumbo(struct rxrpc_call *call, struct sk_buff *skb
jsp = rxrpc_skb(jskb);
jsp->offset = offset;
jsp->len = RXRPC_JUMBO_DATALEN;
rxrpc_input_data_one(call, jskb);
rxrpc_input_data_one(call, jskb, &notify);
rxrpc_free_skb(jskb, rxrpc_skb_put_jumbo_subpacket);

sp->hdr.flags = jhdr.flags;
sp->hdr._rsvd = ntohs(jhdr._rsvd);
Expand All @@ -529,7 +532,11 @@ static bool rxrpc_input_split_jumbo(struct rxrpc_call *call, struct sk_buff *skb

sp->offset = offset;
sp->len = len;
rxrpc_input_data_one(call, skb);
rxrpc_input_data_one(call, skb, &notify);
if (notify) {
trace_rxrpc_notify_socket(call->debug_id, sp->hdr.serial);
rxrpc_notify_socket(call);
}
return true;

protocol_error:
Expand All @@ -552,10 +559,8 @@ static void rxrpc_input_data(struct rxrpc_call *call, struct sk_buff *skb)
skb->len, seq0);

state = READ_ONCE(call->state);
if (state >= RXRPC_CALL_COMPLETE) {
rxrpc_free_skb(skb, rxrpc_skb_put_input);
if (state >= RXRPC_CALL_COMPLETE)
return;
}

/* Unshare the packet so that it can be modified for in-place
* decryption.
Expand Down Expand Up @@ -605,7 +610,6 @@ static void rxrpc_input_data(struct rxrpc_call *call, struct sk_buff *skb)
out:
trace_rxrpc_notify_socket(call->debug_id, serial);
rxrpc_notify_socket(call);
rxrpc_free_skb(skb, rxrpc_skb_put_input);
_leave(" [queued]");
}

Expand Down Expand Up @@ -797,7 +801,7 @@ static void rxrpc_input_ack(struct rxrpc_call *call, struct sk_buff *skb)
struct rxrpc_ackpacket ack;
struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
struct rxrpc_ackinfo info;
struct sk_buff *skb_old = NULL, *skb_put = skb;
struct sk_buff *skb_old = NULL;
rxrpc_serial_t ack_serial, acked_serial;
rxrpc_seq_t first_soft_ack, hard_ack, prev_pkt;
int nr_acks, offset, ioffset;
Expand Down Expand Up @@ -963,14 +967,14 @@ static void rxrpc_input_ack(struct rxrpc_call *call, struct sk_buff *skb)
goto out;
}

rxrpc_get_skb(skb, rxrpc_skb_get_ack);
spin_lock(&call->acks_ack_lock);
skb_old = call->acks_soft_tbl;
call->acks_soft_tbl = skb;
spin_unlock(&call->acks_ack_lock);

rxrpc_input_soft_acks(call, skb->data + offset, first_soft_ack,
nr_acks, &summary);
skb_put = NULL;
} else if (call->acks_soft_tbl) {
spin_lock(&call->acks_ack_lock);
skb_old = call->acks_soft_tbl;
Expand All @@ -986,7 +990,6 @@ static void rxrpc_input_ack(struct rxrpc_call *call, struct sk_buff *skb)

rxrpc_congestion_management(call, skb, &summary, acked_serial);
out:
rxrpc_free_skb(skb_put, rxrpc_skb_put_input);
rxrpc_free_skb(skb_old, rxrpc_skb_put_ack);
}

Expand Down Expand Up @@ -1037,11 +1040,11 @@ void rxrpc_input_call_event(struct rxrpc_call *call, struct sk_buff *skb)
switch (sp->hdr.type) {
case RXRPC_PACKET_TYPE_DATA:
rxrpc_input_data(call, skb);
goto no_free;
break;

case RXRPC_PACKET_TYPE_ACK:
rxrpc_input_ack(call, skb);
goto no_free;
break;

case RXRPC_PACKET_TYPE_BUSY:
/* Just ignore BUSY packets from the server; the retry and
Expand All @@ -1061,10 +1064,6 @@ void rxrpc_input_call_event(struct rxrpc_call *call, struct sk_buff *skb)
default:
break;
}

rxrpc_free_skb(skb, rxrpc_skb_put_input);
no_free:
_leave("");
}

/*
Expand Down
Loading

0 comments on commit 2d1faf7

Please sign in to comment.