Skip to content

Commit

Permalink
Merge tag 'rxrpc-rewrite-20160823-2' of git://git.kernel.org/pub/scm/…
Browse files Browse the repository at this point in the history
…linux/kernel/git/dhowells/linux-fs

David Howells says:

====================
rxrpc: Miscellaneous improvements

Here are some improvements that are part of the AF_RXRPC rewrite.  They
need to be applied on top of the just posted cleanups.

 (1) Set the connection expiry on the connection becoming idle when its
     last currently active call completes rather than each time put is
     called.

     This means that the connection isn't held open by retransmissions,
     pings and duplicate packets.  Future patches will limit the number of
     live connections that the kernel will support, so making sure that old
     connections don't overstay their welcome is necessary.

 (2) Calculate packet serial skew in the UDP data_ready callback rather
     than in the call processor on a work queue.  Deferring it like this
     causes the skew to be elevated by further packets coming in before we
     get to make the calculation.

 (3) Move retransmission of the terminal ACK or ABORT packet for a
     connection to the connection processor, using the terminal state
     cached in the rxrpc_connection struct.  This means that once last_call
     is set in a channel to the current call's ID, no more packets will be
     routed to that rxrpc_call struct.
====================

Signed-off-by: David S. Miller <davem@davemloft.net>
  • Loading branch information
David S. Miller committed Aug 24, 2016
2 parents 3a69101 + 18bfeba commit 85d2c92
Show file tree
Hide file tree
Showing 7 changed files with 226 additions and 62 deletions.
25 changes: 19 additions & 6 deletions net/rxrpc/ar-internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,12 @@ struct rxrpc_connection {
u32 call_id; /* ID of current call */
u32 call_counter; /* Call ID counter */
u32 last_call; /* ID of last call */
u32 last_result; /* Result of last call (0/abort) */
u8 last_type; /* Type of last packet */
u16 last_service_id;
union {
u32 last_seq;
u32 last_abort;
};
} channels[RXRPC_MAXCALLS];
wait_queue_head_t channel_wq; /* queue to wait for channel to become available */

Expand All @@ -313,7 +318,7 @@ struct rxrpc_connection {
struct rxrpc_crypt csum_iv; /* packet checksum base */
unsigned long flags;
unsigned long events;
unsigned long put_time; /* Time at which last put */
unsigned long idle_timestamp; /* Time at which last became idle */
spinlock_t state_lock; /* state-change lock */
atomic_t usage;
enum rxrpc_conn_proto_state state : 8; /* current state of connection */
Expand All @@ -322,7 +327,7 @@ struct rxrpc_connection {
int error; /* local error incurred */
int debug_id; /* debug ID for printks */
atomic_t serial; /* packet serial number counter */
atomic_t hi_serial; /* highest serial number received */
unsigned int hi_serial; /* highest serial number received */
atomic_t avail_chans; /* number of channels available */
u8 size_align; /* data size alignment (for security) */
u8 header_size; /* rxrpc + security header size */
Expand Down Expand Up @@ -457,6 +462,7 @@ struct rxrpc_call {
rxrpc_seq_t ackr_win_top; /* top of ACK window (rx_data_eaten is bottom) */
rxrpc_seq_t ackr_prev_seq; /* previous sequence number received */
u8 ackr_reason; /* reason to ACK */
u16 ackr_skew; /* skew on packet being ACK'd */
rxrpc_serial_t ackr_serial; /* serial of packet being ACK'd */
atomic_t ackr_not_idle; /* number of packets in Rx queue */

Expand Down Expand Up @@ -499,8 +505,8 @@ int rxrpc_reject_call(struct rxrpc_sock *);
/*
* call_event.c
*/
void __rxrpc_propose_ACK(struct rxrpc_call *, u8, u32, bool);
void rxrpc_propose_ACK(struct rxrpc_call *, u8, u32, bool);
void __rxrpc_propose_ACK(struct rxrpc_call *, u8, u16, u32, bool);
void rxrpc_propose_ACK(struct rxrpc_call *, u8, u16, u32, bool);
void rxrpc_process_call(struct work_struct *);

/*
Expand Down Expand Up @@ -565,7 +571,7 @@ struct rxrpc_connection *rxrpc_find_connection_rcu(struct rxrpc_local *,
struct sk_buff *);
void __rxrpc_disconnect_call(struct rxrpc_call *);
void rxrpc_disconnect_call(struct rxrpc_call *);
void rxrpc_put_connection(struct rxrpc_connection *);
void __rxrpc_put_connection(struct rxrpc_connection *);
void __exit rxrpc_destroy_all_connections(void);

static inline bool rxrpc_conn_is_client(const struct rxrpc_connection *conn)
Expand All @@ -589,6 +595,13 @@ struct rxrpc_connection *rxrpc_get_connection_maybe(struct rxrpc_connection *con
return atomic_inc_not_zero(&conn->usage) ? conn : NULL;
}

static inline void rxrpc_put_connection(struct rxrpc_connection *conn)
{
if (conn && atomic_dec_return(&conn->usage) == 1)
__rxrpc_put_connection(conn);
}


static inline bool rxrpc_queue_conn(struct rxrpc_connection *conn)
{
if (!rxrpc_get_connection_maybe(conn))
Expand Down
18 changes: 10 additions & 8 deletions net/rxrpc/call_event.c
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
* propose an ACK be sent
*/
void __rxrpc_propose_ACK(struct rxrpc_call *call, u8 ack_reason,
u32 serial, bool immediate)
u16 skew, u32 serial, bool immediate)
{
unsigned long expiry;
s8 prior = rxrpc_ack_priority[ack_reason];
Expand All @@ -44,8 +44,10 @@ void __rxrpc_propose_ACK(struct rxrpc_call *call, u8 ack_reason,
/* update DELAY, IDLE, REQUESTED and PING_RESPONSE ACK serial
* numbers */
if (prior == rxrpc_ack_priority[call->ackr_reason]) {
if (prior <= 4)
if (prior <= 4) {
call->ackr_skew = skew;
call->ackr_serial = serial;
}
if (immediate)
goto cancel_timer;
return;
Expand Down Expand Up @@ -103,13 +105,13 @@ void __rxrpc_propose_ACK(struct rxrpc_call *call, u8 ack_reason,
* propose an ACK be sent, locking the call structure
*/
void rxrpc_propose_ACK(struct rxrpc_call *call, u8 ack_reason,
u32 serial, bool immediate)
u16 skew, u32 serial, bool immediate)
{
s8 prior = rxrpc_ack_priority[ack_reason];

if (prior > rxrpc_ack_priority[call->ackr_reason]) {
spin_lock_bh(&call->lock);
__rxrpc_propose_ACK(call, ack_reason, serial, immediate);
__rxrpc_propose_ACK(call, ack_reason, skew, serial, immediate);
spin_unlock_bh(&call->lock);
}
}
Expand Down Expand Up @@ -628,7 +630,7 @@ static int rxrpc_process_rx_queue(struct rxrpc_call *call,
if (ack.reason == RXRPC_ACK_PING) {
_proto("Rx ACK %%%u PING Request", latest);
rxrpc_propose_ACK(call, RXRPC_ACK_PING_RESPONSE,
sp->hdr.serial, true);
skb->priority, sp->hdr.serial, true);
}

/* discard any out-of-order or duplicate ACKs */
Expand Down Expand Up @@ -1153,8 +1155,7 @@ void rxrpc_process_call(struct work_struct *work)
goto maybe_reschedule;

send_ACK_with_skew:
ack.maxSkew = htons(atomic_read(&call->conn->hi_serial) -
ntohl(ack.serial));
ack.maxSkew = htons(call->ackr_skew);
send_ACK:
mtu = call->conn->params.peer->if_mtu;
mtu -= call->conn->params.peer->hdrsize;
Expand Down Expand Up @@ -1244,7 +1245,8 @@ void rxrpc_process_call(struct work_struct *work)
case RXRPC_CALL_SERVER_ACK_REQUEST:
_debug("start ACK timer");
rxrpc_propose_ACK(call, RXRPC_ACK_DELAY,
call->ackr_serial, false);
call->ackr_skew, call->ackr_serial,
false);
default:
break;
}
Expand Down
113 changes: 113 additions & 0 deletions net/rxrpc/conn_event.c
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,113 @@
#include <net/ip.h>
#include "ar-internal.h"

/*
* Retransmit terminal ACK or ABORT of the previous call.
*/
static void rxrpc_conn_retransmit(struct rxrpc_connection *conn,
struct sk_buff *skb)
{
struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
struct rxrpc_channel *chan;
struct msghdr msg;
struct kvec iov;
struct {
struct rxrpc_wire_header whdr;
union {
struct {
__be32 code;
} abort;
struct {
struct rxrpc_ackpacket ack;
struct rxrpc_ackinfo info;
};
};
} __attribute__((packed)) pkt;
size_t len;
u32 serial, mtu, call_id;

_enter("%d", conn->debug_id);

chan = &conn->channels[sp->hdr.cid & RXRPC_CHANNELMASK];

/* If the last call got moved on whilst we were waiting to run, just
* ignore this packet.
*/
call_id = READ_ONCE(chan->last_call);
/* Sync with __rxrpc_disconnect_call() */
smp_rmb();
if (call_id != sp->hdr.callNumber)
return;

msg.msg_name = &conn->params.peer->srx.transport;
msg.msg_namelen = conn->params.peer->srx.transport_len;
msg.msg_control = NULL;
msg.msg_controllen = 0;
msg.msg_flags = 0;

pkt.whdr.epoch = htonl(sp->hdr.epoch);
pkt.whdr.cid = htonl(sp->hdr.cid);
pkt.whdr.callNumber = htonl(sp->hdr.callNumber);
pkt.whdr.seq = 0;
pkt.whdr.type = chan->last_type;
pkt.whdr.flags = conn->out_clientflag;
pkt.whdr.userStatus = 0;
pkt.whdr.securityIndex = conn->security_ix;
pkt.whdr._rsvd = 0;
pkt.whdr.serviceId = htons(chan->last_service_id);

len = sizeof(pkt.whdr);
switch (chan->last_type) {
case RXRPC_PACKET_TYPE_ABORT:
pkt.abort.code = htonl(chan->last_abort);
len += sizeof(pkt.abort);
break;

case RXRPC_PACKET_TYPE_ACK:
mtu = conn->params.peer->if_mtu;
mtu -= conn->params.peer->hdrsize;
pkt.ack.bufferSpace = 0;
pkt.ack.maxSkew = htons(skb->priority);
pkt.ack.firstPacket = htonl(chan->last_seq);
pkt.ack.previousPacket = htonl(chan->last_seq - 1);
pkt.ack.serial = htonl(sp->hdr.serial);
pkt.ack.reason = RXRPC_ACK_DUPLICATE;
pkt.ack.nAcks = 0;
pkt.info.rxMTU = htonl(rxrpc_rx_mtu);
pkt.info.maxMTU = htonl(mtu);
pkt.info.rwind = htonl(rxrpc_rx_window_size);
pkt.info.jumbo_max = htonl(rxrpc_rx_jumbo_max);
len += sizeof(pkt.ack) + sizeof(pkt.info);
break;
}

/* Resync with __rxrpc_disconnect_call() and check that the last call
* didn't get advanced whilst we were filling out the packets.
*/
smp_rmb();
if (READ_ONCE(chan->last_call) != call_id)
return;

iov.iov_base = &pkt;
iov.iov_len = len;

serial = atomic_inc_return(&conn->serial);
pkt.whdr.serial = htonl(serial);

switch (chan->last_type) {
case RXRPC_PACKET_TYPE_ABORT:
_proto("Tx ABORT %%%u { %d } [re]", serial, conn->local_abort);
break;
case RXRPC_PACKET_TYPE_ACK:
_proto("Tx ACK %%%u [re]", serial);
break;
}

kernel_sendmsg(conn->params.local->socket, &msg, &iov, 1, len);
_leave("");
return;
}

/*
* pass a connection-level abort onto all calls on that connection
*/
Expand Down Expand Up @@ -166,6 +273,12 @@ static int rxrpc_process_event(struct rxrpc_connection *conn,
_enter("{%d},{%u,%%%u},", conn->debug_id, sp->hdr.type, sp->hdr.serial);

switch (sp->hdr.type) {
case RXRPC_PACKET_TYPE_DATA:
case RXRPC_PACKET_TYPE_ACK:
rxrpc_conn_retransmit(conn, skb);
rxrpc_free_skb(skb);
return 0;

case RXRPC_PACKET_TYPE_ABORT:
if (skb_copy_bits(skb, 0, &wtmp, sizeof(wtmp)) < 0)
return -EPROTO;
Expand Down
52 changes: 26 additions & 26 deletions net/rxrpc/conn_object.c
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ struct rxrpc_connection *rxrpc_alloc_connection(gfp_t gfp)
atomic_set(&conn->avail_chans, RXRPC_MAXCALLS);
conn->size_align = 4;
conn->header_size = sizeof(struct rxrpc_wire_header);
conn->idle_timestamp = jiffies;
}

_leave(" = %p{%d}", conn, conn ? conn->debug_id : 0);
Expand Down Expand Up @@ -165,7 +166,15 @@ void __rxrpc_disconnect_call(struct rxrpc_call *call)
/* Save the result of the call so that we can repeat it if necessary
* through the channel, whilst disposing of the actual call record.
*/
chan->last_result = call->local_abort;
chan->last_service_id = call->service_id;
if (call->local_abort) {
chan->last_abort = call->local_abort;
chan->last_type = RXRPC_PACKET_TYPE_ABORT;
} else {
chan->last_seq = call->rx_data_eaten;
chan->last_type = RXRPC_PACKET_TYPE_ACK;
}
/* Sync with rxrpc_conn_retransmit(). */
smp_wmb();
chan->last_call = chan->call_id;
chan->call_id = chan->call_counter;
Expand All @@ -191,29 +200,16 @@ void rxrpc_disconnect_call(struct rxrpc_call *call)
spin_unlock(&conn->channel_lock);

call->conn = NULL;
conn->idle_timestamp = jiffies;
rxrpc_put_connection(conn);
}

/*
* release a virtual connection
*/
void rxrpc_put_connection(struct rxrpc_connection *conn)
void __rxrpc_put_connection(struct rxrpc_connection *conn)
{
if (!conn)
return;

_enter("%p{u=%d,d=%d}",
conn, atomic_read(&conn->usage), conn->debug_id);

ASSERTCMP(atomic_read(&conn->usage), >, 1);

conn->put_time = ktime_get_seconds();
if (atomic_dec_return(&conn->usage) == 1) {
_debug("zombie");
rxrpc_queue_delayed_work(&rxrpc_connection_reap, 0);
}

_leave("");
rxrpc_queue_delayed_work(&rxrpc_connection_reap, 0);
}

/*
Expand Down Expand Up @@ -248,14 +244,14 @@ static void rxrpc_destroy_connection(struct rcu_head *rcu)
static void rxrpc_connection_reaper(struct work_struct *work)
{
struct rxrpc_connection *conn, *_p;
unsigned long reap_older_than, earliest, put_time, now;
unsigned long reap_older_than, earliest, idle_timestamp, now;

LIST_HEAD(graveyard);

_enter("");

now = ktime_get_seconds();
reap_older_than = now - rxrpc_connection_expiry;
now = jiffies;
reap_older_than = now - rxrpc_connection_expiry * HZ;
earliest = ULONG_MAX;

write_lock(&rxrpc_connection_lock);
Expand All @@ -264,10 +260,14 @@ static void rxrpc_connection_reaper(struct work_struct *work)
if (likely(atomic_read(&conn->usage) > 1))
continue;

put_time = READ_ONCE(conn->put_time);
if (time_after(put_time, reap_older_than)) {
if (time_before(put_time, earliest))
earliest = put_time;
idle_timestamp = READ_ONCE(conn->idle_timestamp);
_debug("reap CONN %d { u=%d,t=%ld }",
conn->debug_id, atomic_read(&conn->usage),
(long)reap_older_than - (long)idle_timestamp);

if (time_after(idle_timestamp, reap_older_than)) {
if (time_before(idle_timestamp, earliest))
earliest = idle_timestamp;
continue;
}

Expand All @@ -288,9 +288,9 @@ static void rxrpc_connection_reaper(struct work_struct *work)

if (earliest != ULONG_MAX) {
_debug("reschedule reaper %ld", (long) earliest - now);
ASSERTCMP(earliest, >, now);
ASSERT(time_after(earliest, now));
rxrpc_queue_delayed_work(&rxrpc_connection_reap,
(earliest - now) * HZ);
earliest - now);
}

while (!list_empty(&graveyard)) {
Expand Down
Loading

0 comments on commit 85d2c92

Please sign in to comment.