Skip to content

Commit

Permalink
Merge tag 'rxrpc-rewrite-20160908' of git://git.kernel.org/pub/scm/li…
Browse files Browse the repository at this point in the history
…nux/kernel/git/dhowells/linux-fs

David Howells says:

====================
rxrpc: Rewrite data and ack handling

This patch set constitutes the main portion of the AF_RXRPC rewrite.  It
consists of five fix/helper patches:

 (1) Fix ASSERTCMP's and ASSERTIFCMP's handling of signed values.

 (2) Update some protocol definitions slightly.

 (3) Use of an hlist for RCU purposes.

 (4) Removal of per-call sk_buff accounting (not really needed when skbs
     aren't being queued on the main queue).

 (5) Addition of a tracepoint to log incoming packets in the data_ready
     callback and to log the end of the data_ready callback.

And then there are two patches that form the main part:

 (6) Preallocation of resources for incoming calls so that in patch (7) the
     data_ready handler can be made to fully instantiate an incoming call
     and make it live.  This extends through into AFS so that AFS can
     preallocate its own incoming call resources.

     The preallocation size is capped at the listen() backlog setting - and
     that is capped at a sysctl limit which can be set between 4 and 32.

     The preallocation is (re)charged either by accepting/rejecting pending
     calls or, in the case of AFS, manually.  If insufficient preallocation
     resources exist, a BUSY packet will be transmitted.

     The advantage of using this preallocation is that once a call is set
     up in the data_ready handler, DATA packets can be queued on it
     immediately rather than the DATA packets being queued for a background
     work item to do all the allocation and then try and sort out the DATA
     packets whilst other DATA packets may still be coming in and going
     either to the background thread or the new call.

 (7) Rewrite the handling of DATA, ACK and ABORT packets.

     In the receive phase, DATA packets are now held in per-call circular
     buffers with deduplication, out of sequence detection and suchlike
     being done in data_ready.  Since there is only one producer and only
     once consumer, no locks need be used on the receive queue.

     Received ACK and ABORT packets are now parsed and discarded in
     data_ready to recycle resources as fast as possible.

     sk_buffs are no longer pulled, trimmed or cloned, but rather the
     offset and size of the content is tracked.  This particularly affects
     jumbo DATA packets which need insertion into the receive buffer in
     multiple places.  Annotations are kept to track which bit is which.

     Packets are no longer queued on the socket receive queue; rather,
     calls are queued.  Dummy packets to convey events therefore no longer
     need to be invented and metadata packets can be discarded as soon as
     parsed rather then being pushed onto the socket receive queue to
     indicate terminal events.

     The preallocation facility added in (6) is now used to set up incoming
     calls with very little locking required and no calls to the allocator
     in data_ready.

     Decryption and verification is now handled in recvmsg() rather than in
     a background thread.  This allows for the future possibility of
     decrypting directly into the user buffer.

     With this patch, the code is a lot simpler and most of the mass of
     call event and state wangling code in call_event.c is gone.

With this, the majority of the AF_RXRPC rewrite is complete.  However,
there are still things to be done, including:

 (*) Limit the number of active service calls to prevent an attacker from
     filling up a server's memory.

 (*) Limit the number of calls on the rebuff-with-BUSY queue.

 (*) Transmit delayed/deferred ACKs from recvmsg() if possible, rather than
     punting to the background thread.  Ideally, the background thread
     shouldn't run at all, but data_ready can't call kernel_sendmsg() and
     we can't rely on recvmsg() attending to the call in a timely fashion.

 (*) Prevent the call at the front of the socket queue from hogging
     recvmsg()'s attention if there's a sufficiently continuous supply of
     data.

 (*) Distribute ICMP errors by connection rather than by call.  Possibly
     parse the ICMP packet to try and pin down the exact connection and
     call.

 (*) Encrypt/decrypt directly between user buffers and socket buffers where
     possible.

 (*) IPv6.

 (*) Service ID upgrade.  This is a facility whereby a special flag bit is
     set in the DATA packet header when making a call that tells the server
     that it is allowed to change the service ID to an upgraded one and
     reply with an equivalent call from the upgraded service.

     This is used, for example, to override certain AFS calls so that IPv6
     addresses can be returned.

 (*) Allow userspace to preallocate call user IDs for incoming calls.
====================

Signed-off-by: David S. Miller <davem@davemloft.net>
  • Loading branch information
David S. Miller committed Sep 10, 2016
2 parents 46dfc23 + 248f219 commit fa5f4aa
Show file tree
Hide file tree
Showing 26 changed files with 2,416 additions and 3,353 deletions.
88 changes: 54 additions & 34 deletions fs/afs/rxrpc.c
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

struct socket *afs_socket; /* my RxRPC socket */
static struct workqueue_struct *afs_async_calls;
static struct afs_call *afs_spare_incoming_call;
static atomic_t afs_outstanding_calls;

static void afs_free_call(struct afs_call *);
Expand All @@ -26,7 +27,8 @@ static int afs_wait_for_call_to_complete(struct afs_call *);
static void afs_wake_up_async_call(struct sock *, struct rxrpc_call *, unsigned long);
static int afs_dont_wait_for_call_to_complete(struct afs_call *);
static void afs_process_async_call(struct work_struct *);
static void afs_rx_new_call(struct sock *);
static void afs_rx_new_call(struct sock *, struct rxrpc_call *, unsigned long);
static void afs_rx_discard_new_call(struct rxrpc_call *, unsigned long);
static int afs_deliver_cm_op_id(struct afs_call *);

/* synchronous call management */
Expand All @@ -53,9 +55,9 @@ static const struct afs_call_type afs_RXCMxxxx = {
.abort_to_error = afs_abort_to_error,
};

static void afs_collect_incoming_call(struct work_struct *);
static void afs_charge_preallocation(struct work_struct *);

static DECLARE_WORK(afs_collect_incoming_call_work, afs_collect_incoming_call);
static DECLARE_WORK(afs_charge_preallocation_work, afs_charge_preallocation);

static int afs_wait_atomic_t(atomic_t *p)
{
Expand Down Expand Up @@ -100,13 +102,15 @@ int afs_open_socket(void)
if (ret < 0)
goto error_2;

rxrpc_kernel_new_call_notification(socket, afs_rx_new_call);
rxrpc_kernel_new_call_notification(socket, afs_rx_new_call,
afs_rx_discard_new_call);

ret = kernel_listen(socket, INT_MAX);
if (ret < 0)
goto error_2;

afs_socket = socket;
afs_charge_preallocation(NULL);
_leave(" = 0");
return 0;

Expand All @@ -126,11 +130,19 @@ void afs_close_socket(void)
{
_enter("");

if (afs_spare_incoming_call) {
atomic_inc(&afs_outstanding_calls);
afs_free_call(afs_spare_incoming_call);
afs_spare_incoming_call = NULL;
}

_debug("outstanding %u", atomic_read(&afs_outstanding_calls));
wait_on_atomic_t(&afs_outstanding_calls, afs_wait_atomic_t,
TASK_UNINTERRUPTIBLE);
_debug("no outstanding calls");

flush_workqueue(afs_async_calls);
kernel_sock_shutdown(afs_socket, SHUT_RDWR);
flush_workqueue(afs_async_calls);
sock_release(afs_socket);

Expand Down Expand Up @@ -590,57 +602,65 @@ static void afs_process_async_call(struct work_struct *work)
_leave("");
}

static void afs_rx_attach(struct rxrpc_call *rxcall, unsigned long user_call_ID)
{
struct afs_call *call = (struct afs_call *)user_call_ID;

call->rxcall = rxcall;
}

/*
* accept the backlog of incoming calls
* Charge the incoming call preallocation.
*/
static void afs_collect_incoming_call(struct work_struct *work)
static void afs_charge_preallocation(struct work_struct *work)
{
struct rxrpc_call *rxcall;
struct afs_call *call = NULL;

_enter("");
struct afs_call *call = afs_spare_incoming_call;

do {
for (;;) {
if (!call) {
call = kzalloc(sizeof(struct afs_call), GFP_KERNEL);
if (!call) {
rxrpc_kernel_reject_call(afs_socket);
return;
}
if (!call)
break;

INIT_WORK(&call->async_work, afs_process_async_call);
call->wait_mode = &afs_async_incoming_call;
call->type = &afs_RXCMxxxx;
init_waitqueue_head(&call->waitq);
call->state = AFS_CALL_AWAIT_OP_ID;

_debug("CALL %p{%s} [%d]",
call, call->type->name,
atomic_read(&afs_outstanding_calls));
atomic_inc(&afs_outstanding_calls);
}

rxcall = rxrpc_kernel_accept_call(afs_socket,
(unsigned long)call,
afs_wake_up_async_call);
if (!IS_ERR(rxcall)) {
call->rxcall = rxcall;
call->need_attention = true;
queue_work(afs_async_calls, &call->async_work);
call = NULL;
}
} while (!call);
if (rxrpc_kernel_charge_accept(afs_socket,
afs_wake_up_async_call,
afs_rx_attach,
(unsigned long)call,
GFP_KERNEL) < 0)
break;
call = NULL;
}
afs_spare_incoming_call = call;
}

if (call)
afs_free_call(call);
/*
* Discard a preallocated call when a socket is shut down.
*/
static void afs_rx_discard_new_call(struct rxrpc_call *rxcall,
unsigned long user_call_ID)
{
struct afs_call *call = (struct afs_call *)user_call_ID;

atomic_inc(&afs_outstanding_calls);
call->rxcall = NULL;
afs_free_call(call);
}

/*
* Notification of an incoming call.
*/
static void afs_rx_new_call(struct sock *sk)
static void afs_rx_new_call(struct sock *sk, struct rxrpc_call *rxcall,
unsigned long user_call_ID)
{
queue_work(afs_wq, &afs_collect_incoming_call_work);
atomic_inc(&afs_outstanding_calls);
queue_work(afs_wq, &afs_charge_preallocation_work);
}

/*
Expand Down
13 changes: 8 additions & 5 deletions include/net/af_rxrpc.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,14 @@ struct rxrpc_call;

typedef void (*rxrpc_notify_rx_t)(struct sock *, struct rxrpc_call *,
unsigned long);
typedef void (*rxrpc_notify_new_call_t)(struct sock *);
typedef void (*rxrpc_notify_new_call_t)(struct sock *, struct rxrpc_call *,
unsigned long);
typedef void (*rxrpc_discard_new_call_t)(struct rxrpc_call *, unsigned long);
typedef void (*rxrpc_user_attach_call_t)(struct rxrpc_call *, unsigned long);

void rxrpc_kernel_new_call_notification(struct socket *,
rxrpc_notify_new_call_t);
rxrpc_notify_new_call_t,
rxrpc_discard_new_call_t);
struct rxrpc_call *rxrpc_kernel_begin_call(struct socket *,
struct sockaddr_rxrpc *,
struct key *,
Expand All @@ -38,10 +42,9 @@ int rxrpc_kernel_recv_data(struct socket *, struct rxrpc_call *,
void rxrpc_kernel_abort_call(struct socket *, struct rxrpc_call *,
u32, int, const char *);
void rxrpc_kernel_end_call(struct socket *, struct rxrpc_call *);
struct rxrpc_call *rxrpc_kernel_accept_call(struct socket *, unsigned long,
rxrpc_notify_rx_t);
int rxrpc_kernel_reject_call(struct socket *);
void rxrpc_kernel_get_peer(struct socket *, struct rxrpc_call *,
struct sockaddr_rxrpc *);
int rxrpc_kernel_charge_accept(struct socket *, rxrpc_notify_rx_t,
rxrpc_user_attach_call_t, unsigned long, gfp_t);

#endif /* _NET_RXRPC_H */
15 changes: 12 additions & 3 deletions include/rxrpc/packet.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,6 @@ struct rxrpc_wire_header {
#define RXRPC_CID_INC (1 << RXRPC_CIDSHIFT) /* connection ID increment */

__be32 callNumber; /* call ID (0 for connection-level packets) */
#define RXRPC_PROCESS_MAXCALLS (1<<2) /* maximum number of active calls per conn (power of 2) */

__be32 seq; /* sequence number of pkt in call stream */
__be32 serial; /* serial number of pkt sent to network */

Expand Down Expand Up @@ -93,10 +91,14 @@ struct rxrpc_wire_header {
struct rxrpc_jumbo_header {
uint8_t flags; /* packet flags (as per rxrpc_header) */
uint8_t pad;
__be16 _rsvd; /* reserved (used by kerberos security as cksum) */
union {
__be16 _rsvd; /* reserved */
__be16 cksum; /* kerberos security checksum */
};
};

#define RXRPC_JUMBO_DATALEN 1412 /* non-terminal jumbo packet data length */
#define RXRPC_JUMBO_SUBPKTLEN (RXRPC_JUMBO_DATALEN + sizeof(struct rxrpc_jumbo_header))

/*****************************************************************************/
/*
Expand Down Expand Up @@ -131,6 +133,13 @@ struct rxrpc_ackpacket {

} __packed;

/* Some ACKs refer to specific packets and some are general and can be updated. */
#define RXRPC_ACK_UPDATEABLE ((1 << RXRPC_ACK_REQUESTED) | \
(1 << RXRPC_ACK_PING_RESPONSE) | \
(1 << RXRPC_ACK_DELAY) | \
(1 << RXRPC_ACK_IDLE))


/*
* ACK packets can have a further piece of information tagged on the end
*/
Expand Down
48 changes: 41 additions & 7 deletions include/trace/events/rxrpc.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,14 @@

TRACE_EVENT(rxrpc_call,
TP_PROTO(struct rxrpc_call *call, enum rxrpc_call_trace op,
int usage, int nskb,
const void *where, const void *aux),
int usage, const void *where, const void *aux),

TP_ARGS(call, op, usage, nskb, where, aux),
TP_ARGS(call, op, usage, where, aux),

TP_STRUCT__entry(
__field(struct rxrpc_call *, call )
__field(int, op )
__field(int, usage )
__field(int, nskb )
__field(const void *, where )
__field(const void *, aux )
),
Expand All @@ -36,16 +34,14 @@ TRACE_EVENT(rxrpc_call,
__entry->call = call;
__entry->op = op;
__entry->usage = usage;
__entry->nskb = nskb;
__entry->where = where;
__entry->aux = aux;
),

TP_printk("c=%p %s u=%d s=%d p=%pSR a=%p",
TP_printk("c=%p %s u=%d sp=%pSR a=%p",
__entry->call,
rxrpc_call_traces[__entry->op],
__entry->usage,
__entry->nskb,
__entry->where,
__entry->aux)
);
Expand Down Expand Up @@ -84,6 +80,44 @@ TRACE_EVENT(rxrpc_skb,
__entry->where)
);

TRACE_EVENT(rxrpc_rx_packet,
TP_PROTO(struct rxrpc_skb_priv *sp),

TP_ARGS(sp),

TP_STRUCT__entry(
__field_struct(struct rxrpc_host_header, hdr )
),

TP_fast_assign(
memcpy(&__entry->hdr, &sp->hdr, sizeof(__entry->hdr));
),

TP_printk("%08x:%08x:%08x:%04x %08x %08x %02x %02x",
__entry->hdr.epoch, __entry->hdr.cid,
__entry->hdr.callNumber, __entry->hdr.serviceId,
__entry->hdr.serial, __entry->hdr.seq,
__entry->hdr.type, __entry->hdr.flags)
);

TRACE_EVENT(rxrpc_rx_done,
TP_PROTO(int result, int abort_code),

TP_ARGS(result, abort_code),

TP_STRUCT__entry(
__field(int, result )
__field(int, abort_code )
),

TP_fast_assign(
__entry->result = result;
__entry->abort_code = abort_code;
),

TP_printk("r=%d a=%d", __entry->result, __entry->abort_code)
);

TRACE_EVENT(rxrpc_abort,
TP_PROTO(const char *why, u32 cid, u32 call_id, rxrpc_seq_t seq,
int abort_code, int error),
Expand Down
Loading

0 comments on commit fa5f4aa

Please sign in to comment.