Skip to content

Commit

Permalink
rxrpc: Add a timeout for detecting lost ACKs/lost DATA
Browse files Browse the repository at this point in the history
Add an extra timeout that is set/updated when we send a DATA packet that
has the request-ack flag set.  This allows us to detect if we don't get an
ACK in response to the latest flagged packet.

The ACK packet is adjudged to have been lost if it doesn't turn up within
2*RTT of the transmission.

If the timeout occurs, we schedule the sending of a PING ACK to find out
the state of the other side.  If a new DATA packet is ready to go sooner,
we cancel the sending of the ping and set the request-ack flag on that
instead.

If we get back a PING-RESPONSE ACK that indicates a lower tx_top than what
we had at the time of the ping transmission, we adjudge all the DATA
packets sent between the response tx_top and the ping-time tx_top to have
been lost and retransmit immediately.

Rather than sending a PING ACK, we could just pick a DATA packet and
speculatively retransmit that with request-ack set.  It should result in
either a REQUESTED ACK or a DUPLICATE ACK which we can then use in lieu the
a PING-RESPONSE ACK mentioned above.

Signed-off-by: David Howells <dhowells@redhat.com>
  • Loading branch information
David Howells committed Nov 24, 2017
1 parent beb8e5e commit bd1fdf8
Show file tree
Hide file tree
Showing 8 changed files with 98 additions and 12 deletions.
11 changes: 9 additions & 2 deletions include/trace/events/rxrpc.h
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ enum rxrpc_timer_trace {
rxrpc_timer_exp_ack,
rxrpc_timer_exp_hard,
rxrpc_timer_exp_idle,
rxrpc_timer_exp_lost_ack,
rxrpc_timer_exp_normal,
rxrpc_timer_exp_ping,
rxrpc_timer_exp_resend,
Expand All @@ -151,6 +152,7 @@ enum rxrpc_timer_trace {
rxrpc_timer_set_for_ack,
rxrpc_timer_set_for_hard,
rxrpc_timer_set_for_idle,
rxrpc_timer_set_for_lost_ack,
rxrpc_timer_set_for_normal,
rxrpc_timer_set_for_ping,
rxrpc_timer_set_for_resend,
Expand Down Expand Up @@ -309,6 +311,7 @@ enum rxrpc_congest_change {
EM(rxrpc_timer_exp_ack, "ExpAck") \
EM(rxrpc_timer_exp_hard, "ExpHrd") \
EM(rxrpc_timer_exp_idle, "ExpIdl") \
EM(rxrpc_timer_exp_lost_ack, "ExpLoA") \
EM(rxrpc_timer_exp_normal, "ExpNml") \
EM(rxrpc_timer_exp_ping, "ExpPng") \
EM(rxrpc_timer_exp_resend, "ExpRsn") \
Expand All @@ -318,6 +321,7 @@ enum rxrpc_congest_change {
EM(rxrpc_timer_set_for_ack, "SetAck") \
EM(rxrpc_timer_set_for_hard, "SetHrd") \
EM(rxrpc_timer_set_for_idle, "SetIdl") \
EM(rxrpc_timer_set_for_lost_ack, "SetLoA") \
EM(rxrpc_timer_set_for_normal, "SetNml") \
EM(rxrpc_timer_set_for_ping, "SetPng") \
EM(rxrpc_timer_set_for_resend, "SetRTx") \
Expand Down Expand Up @@ -961,6 +965,7 @@ TRACE_EVENT(rxrpc_timer,
__field(enum rxrpc_timer_trace, why )
__field(long, now )
__field(long, ack_at )
__field(long, ack_lost_at )
__field(long, resend_at )
__field(long, ping_at )
__field(long, expect_rx_by )
Expand All @@ -974,17 +979,19 @@ TRACE_EVENT(rxrpc_timer,
__entry->why = why;
__entry->now = now;
__entry->ack_at = call->ack_at;
__entry->ack_lost_at = call->ack_lost_at;
__entry->resend_at = call->resend_at;
__entry->expect_rx_by = call->expect_rx_by;
__entry->expect_req_by = call->expect_req_by;
__entry->expect_term_by = call->expect_term_by;
__entry->timer = call->timer.expires;
),

TP_printk("c=%p %s a=%ld r=%ld xr=%ld xq=%ld xt=%ld t=%ld",
TP_printk("c=%p %s a=%ld la=%ld r=%ld xr=%ld xq=%ld xt=%ld t=%ld",
__entry->call,
__print_symbolic(__entry->why, rxrpc_timer_traces),
__entry->ack_at - __entry->now,
__entry->ack_lost_at - __entry->now,
__entry->resend_at - __entry->now,
__entry->expect_rx_by - __entry->now,
__entry->expect_req_by - __entry->now,
Expand Down Expand Up @@ -1105,7 +1112,7 @@ TRACE_EVENT(rxrpc_congest,
memcpy(&__entry->sum, summary, sizeof(__entry->sum));
),

TP_printk("c=%p %08x %s %08x %s cw=%u ss=%u nr=%u,%u nw=%u,%u r=%u b=%u u=%u d=%u l=%x%s%s%s",
TP_printk("c=%p r=%08x %s q=%08x %s cw=%u ss=%u nr=%u,%u nw=%u,%u r=%u b=%u u=%u d=%u l=%x%s%s%s",
__entry->call,
__entry->ack_serial,
__print_symbolic(__entry->sum.ack_reason, rxrpc_ack_names),
Expand Down
6 changes: 5 additions & 1 deletion net/rxrpc/ar-internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -471,6 +471,7 @@ enum rxrpc_call_event {
RXRPC_CALL_EV_RESEND, /* Tx resend required */
RXRPC_CALL_EV_PING, /* Ping send required */
RXRPC_CALL_EV_EXPIRED, /* Expiry occurred */
RXRPC_CALL_EV_ACK_LOST, /* ACK may be lost, send ping */
};

/*
Expand Down Expand Up @@ -515,6 +516,7 @@ struct rxrpc_call {
struct rxrpc_sock __rcu *socket; /* socket responsible */
struct mutex user_mutex; /* User access mutex */
unsigned long ack_at; /* When deferred ACK needs to happen */
unsigned long ack_lost_at; /* When ACK is figured as lost */
unsigned long resend_at; /* When next resend needs to happen */
unsigned long ping_at; /* When next to send a ping */
unsigned long expect_rx_by; /* When we expect to get a packet by */
Expand Down Expand Up @@ -624,6 +626,8 @@ struct rxrpc_call {
ktime_t acks_latest_ts; /* Timestamp of latest ACK received */
rxrpc_serial_t acks_latest; /* serial number of latest ACK received */
rxrpc_seq_t acks_lowest_nak; /* Lowest NACK in the buffer (or ==tx_hard_ack) */
rxrpc_seq_t acks_lost_top; /* tx_top at the time lost-ack ping sent */
rxrpc_serial_t acks_lost_ping; /* Serial number of probe ACK */
};

/*
Expand Down Expand Up @@ -1011,7 +1015,7 @@ static inline struct rxrpc_net *rxrpc_net(struct net *net)
/*
* output.c
*/
int rxrpc_send_ack_packet(struct rxrpc_call *, bool);
int rxrpc_send_ack_packet(struct rxrpc_call *, bool, rxrpc_serial_t *);
int rxrpc_send_abort_packet(struct rxrpc_call *);
int rxrpc_send_data_packet(struct rxrpc_call *, struct sk_buff *, bool);
void rxrpc_reject_packets(struct rxrpc_local *);
Expand Down
26 changes: 22 additions & 4 deletions net/rxrpc/call_event.c
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ static void rxrpc_resend(struct rxrpc_call *call, unsigned long now_j)
goto out;
rxrpc_propose_ACK(call, RXRPC_ACK_PING, 0, 0, true, false,
rxrpc_propose_ack_ping_for_lost_ack);
rxrpc_send_ack_packet(call, true);
rxrpc_send_ack_packet(call, true, NULL);
goto out;
}

Expand Down Expand Up @@ -310,6 +310,7 @@ void rxrpc_process_call(struct work_struct *work)
{
struct rxrpc_call *call =
container_of(work, struct rxrpc_call, processor);
rxrpc_serial_t *send_ack;
unsigned long now, next, t;

rxrpc_see_call(call);
Expand Down Expand Up @@ -358,6 +359,13 @@ void rxrpc_process_call(struct work_struct *work)
set_bit(RXRPC_CALL_EV_ACK, &call->events);
}

t = READ_ONCE(call->ack_lost_at);
if (time_after_eq(now, t)) {
trace_rxrpc_timer(call, rxrpc_timer_exp_lost_ack, now);
cmpxchg(&call->ack_lost_at, t, now + MAX_JIFFY_OFFSET);
set_bit(RXRPC_CALL_EV_ACK_LOST, &call->events);
}

t = READ_ONCE(call->ping_at);
if (time_after_eq(now, t)) {
trace_rxrpc_timer(call, rxrpc_timer_exp_ping, now);
Expand All @@ -379,15 +387,24 @@ void rxrpc_process_call(struct work_struct *work)
goto recheck_state;
}

if (test_and_clear_bit(RXRPC_CALL_EV_ACK, &call->events)) {
send_ack = NULL;
if (test_and_clear_bit(RXRPC_CALL_EV_ACK_LOST, &call->events)) {
call->acks_lost_top = call->tx_top;
rxrpc_propose_ACK(call, RXRPC_ACK_PING, 0, 0, true, false,
rxrpc_propose_ack_ping_for_lost_ack);
send_ack = &call->acks_lost_ping;
}

if (test_and_clear_bit(RXRPC_CALL_EV_ACK, &call->events) ||
send_ack) {
if (call->ackr_reason) {
rxrpc_send_ack_packet(call, false);
rxrpc_send_ack_packet(call, false, send_ack);
goto recheck_state;
}
}

if (test_and_clear_bit(RXRPC_CALL_EV_PING, &call->events)) {
rxrpc_send_ack_packet(call, true);
rxrpc_send_ack_packet(call, true, NULL);
goto recheck_state;
}

Expand All @@ -404,6 +421,7 @@ void rxrpc_process_call(struct work_struct *work)
set(call->expect_req_by);
set(call->expect_term_by);
set(call->ack_at);
set(call->ack_lost_at);
set(call->resend_at);
set(call->ping_at);

Expand Down
1 change: 1 addition & 0 deletions net/rxrpc/call_object.c
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ static void rxrpc_start_call_timer(struct rxrpc_call *call)
unsigned long j = now + MAX_JIFFY_OFFSET;

call->ack_at = j;
call->ack_lost_at = j;
call->resend_at = j;
call->ping_at = j;
call->expect_rx_by = j;
Expand Down
40 changes: 40 additions & 0 deletions net/rxrpc/input.c
Original file line number Diff line number Diff line change
Expand Up @@ -630,6 +630,43 @@ static void rxrpc_input_requested_ack(struct rxrpc_call *call,
orig_serial, ack_serial, sent_at, resp_time);
}

/*
* Process the response to a ping that we sent to find out if we lost an ACK.
*
* If we got back a ping response that indicates a lower tx_top than what we
* had at the time of the ping transmission, we adjudge all the DATA packets
* sent between the response tx_top and the ping-time tx_top to have been lost.
*/
static void rxrpc_input_check_for_lost_ack(struct rxrpc_call *call)
{
rxrpc_seq_t top, bottom, seq;
bool resend = false;

spin_lock_bh(&call->lock);

bottom = call->tx_hard_ack + 1;
top = call->acks_lost_top;
if (before(bottom, top)) {
for (seq = bottom; before_eq(seq, top); seq++) {
int ix = seq & RXRPC_RXTX_BUFF_MASK;
u8 annotation = call->rxtx_annotations[ix];
u8 anno_type = annotation & RXRPC_TX_ANNO_MASK;

if (anno_type != RXRPC_TX_ANNO_UNACK)
continue;
annotation &= ~RXRPC_TX_ANNO_MASK;
annotation |= RXRPC_TX_ANNO_RETRANS;
call->rxtx_annotations[ix] = annotation;
resend = true;
}
}

spin_unlock_bh(&call->lock);

if (resend && !test_and_set_bit(RXRPC_CALL_EV_RESEND, &call->events))
rxrpc_queue_call(call);
}

/*
* Process a ping response.
*/
Expand All @@ -645,6 +682,9 @@ static void rxrpc_input_ping_response(struct rxrpc_call *call,
smp_rmb();
ping_serial = call->ping_serial;

if (orig_serial == call->acks_lost_ping)
rxrpc_input_check_for_lost_ack(call);

if (!test_bit(RXRPC_CALL_PINGING, &call->flags) ||
before(orig_serial, ping_serial))
return;
Expand Down
20 changes: 18 additions & 2 deletions net/rxrpc/output.c
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,8 @@ static size_t rxrpc_fill_out_ack(struct rxrpc_connection *conn,
/*
* Send an ACK call packet.
*/
int rxrpc_send_ack_packet(struct rxrpc_call *call, bool ping)
int rxrpc_send_ack_packet(struct rxrpc_call *call, bool ping,
rxrpc_serial_t *_serial)
{
struct rxrpc_connection *conn = NULL;
struct rxrpc_ack_buffer *pkt;
Expand Down Expand Up @@ -165,6 +166,8 @@ int rxrpc_send_ack_packet(struct rxrpc_call *call, bool ping)
ntohl(pkt->ack.firstPacket),
ntohl(pkt->ack.serial),
pkt->ack.reason, pkt->ack.nAcks);
if (_serial)
*_serial = serial;

if (ping) {
call->ping_serial = serial;
Expand Down Expand Up @@ -323,7 +326,8 @@ int rxrpc_send_data_packet(struct rxrpc_call *call, struct sk_buff *skb,
* ACKs if a DATA packet appears to have been lost.
*/
if (!(sp->hdr.flags & RXRPC_LAST_PACKET) &&
(retrans ||
(test_and_clear_bit(RXRPC_CALL_EV_ACK_LOST, &call->events) ||
retrans ||
call->cong_mode == RXRPC_CALL_SLOW_START ||
(call->peer->rtt_usage < 3 && sp->hdr.seq & 1) ||
ktime_before(ktime_add_ms(call->peer->rtt_last_req, 1000),
Expand Down Expand Up @@ -370,6 +374,18 @@ int rxrpc_send_data_packet(struct rxrpc_call *call, struct sk_buff *skb,
if (whdr.flags & RXRPC_REQUEST_ACK) {
call->peer->rtt_last_req = now;
trace_rxrpc_rtt_tx(call, rxrpc_rtt_tx_data, serial);
if (call->peer->rtt_usage > 1) {
unsigned long nowj = jiffies, ack_lost_at;

ack_lost_at = nsecs_to_jiffies(2 * call->peer->rtt);
if (ack_lost_at < 1)
ack_lost_at = 1;

ack_lost_at += nowj;
WRITE_ONCE(call->ack_lost_at, ack_lost_at);
rxrpc_reduce_call_timer(call, ack_lost_at, nowj,
rxrpc_timer_set_for_lost_ack);
}
}
}
_leave(" = %d [%u]", ret, call->peer->maxdata);
Expand Down
4 changes: 2 additions & 2 deletions net/rxrpc/recvmsg.c
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ static void rxrpc_end_rx_phase(struct rxrpc_call *call, rxrpc_serial_t serial)
if (call->state == RXRPC_CALL_CLIENT_RECV_REPLY) {
rxrpc_propose_ACK(call, RXRPC_ACK_IDLE, 0, serial, true, false,
rxrpc_propose_ack_terminal_ack);
rxrpc_send_ack_packet(call, false);
rxrpc_send_ack_packet(call, false, NULL);
}
#endif

Expand Down Expand Up @@ -222,7 +222,7 @@ static void rxrpc_rotate_rx_window(struct rxrpc_call *call)
true, true,
rxrpc_propose_ack_rotate_rx);
if (call->ackr_reason && call->ackr_reason != RXRPC_ACK_DELAY)
rxrpc_send_ack_packet(call, false);
rxrpc_send_ack_packet(call, false, NULL);
}
}

Expand Down
2 changes: 1 addition & 1 deletion net/rxrpc/sendmsg.c
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ static int rxrpc_send_data(struct rxrpc_sock *rx,
do {
/* Check to see if there's a ping ACK to reply to. */
if (call->ackr_reason == RXRPC_ACK_PING_RESPONSE)
rxrpc_send_ack_packet(call, false);
rxrpc_send_ack_packet(call, false, NULL);

if (!skb) {
size_t size, chunk, max, space;
Expand Down

0 comments on commit bd1fdf8

Please sign in to comment.