Skip to content

Commit

Permalink
Merge branch 'rprpc-2nd-rewrite-part-1'
Browse files Browse the repository at this point in the history
David Howells says:

====================
RxRPC: 2nd rewrite part 1

Okay, I'm in the process of rewriting the RxRPC rewrite.  The primary aim of
this second rewrite is to strictly control the number of active connections we
know about and to get rid of connections we don't need much more quickly.

On top of this, there are fixes to the protocol handling which will all occur
in later parts.

Here's the first set of patches from the second go, aimed at net-next.  These
are all fixes and cleanups preparatory to the main event.

Notable parts of this set include:

 (1) A fix for the AFS filesystem to wait for outstanding calls to complete
     before closing the RxRPC socket.

 (2) Differentiation of local and remote abort codes.  At a future point
     userspace will get to see this via control message data on recvmsg().

 (3) Absorb the rxkad module into the af_rxrpc module to prevent a dependency
     loop.

 (4) Create a null security module and unconditionalise calls into the
     security module that's in force (there will always be a security module
     applied to a connection, even if it's just the null one).
====================

Signed-off-by: David S. Miller <davem@davemloft.net>
  • Loading branch information
David S. Miller committed Apr 11, 2016
2 parents c64a73d + e0e4d82 commit 7c3da7d
Show file tree
Hide file tree
Showing 20 changed files with 374 additions and 326 deletions.
30 changes: 24 additions & 6 deletions fs/afs/rxrpc.c
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,12 @@ static void afs_async_workfn(struct work_struct *work)
call->async_workfn(call);
}

static int afs_wait_atomic_t(atomic_t *p)
{
schedule();
return 0;
}

/*
* open an RxRPC socket and bind it to be a server for callback notifications
* - the socket is left in blocking mode and non-blocking ops use MSG_DONTWAIT
Expand Down Expand Up @@ -126,13 +132,16 @@ void afs_close_socket(void)
{
_enter("");

wait_on_atomic_t(&afs_outstanding_calls, afs_wait_atomic_t,
TASK_UNINTERRUPTIBLE);
_debug("no outstanding calls");

sock_release(afs_socket);

_debug("dework");
destroy_workqueue(afs_async_calls);

ASSERTCMP(atomic_read(&afs_outstanding_skbs), ==, 0);
ASSERTCMP(atomic_read(&afs_outstanding_calls), ==, 0);
_leave("");
}

Expand Down Expand Up @@ -178,8 +187,6 @@ static void afs_free_call(struct afs_call *call)
{
_debug("DONE %p{%s} [%d]",
call, call->type->name, atomic_read(&afs_outstanding_calls));
if (atomic_dec_return(&afs_outstanding_calls) == -1)
BUG();

ASSERTCMP(call->rxcall, ==, NULL);
ASSERT(!work_pending(&call->async_work));
Expand All @@ -188,6 +195,9 @@ static void afs_free_call(struct afs_call *call)

kfree(call->request);
kfree(call);

if (atomic_dec_and_test(&afs_outstanding_calls))
wake_up_atomic_t(&afs_outstanding_calls);
}

/*
Expand Down Expand Up @@ -420,9 +430,11 @@ int afs_make_call(struct in_addr *addr, struct afs_call *call, gfp_t gfp,
}

/*
* handles intercepted messages that were arriving in the socket's Rx queue
* - called with the socket receive queue lock held to ensure message ordering
* - called with softirqs disabled
* Handles intercepted messages that were arriving in the socket's Rx queue.
*
* Called from the AF_RXRPC call processor in waitqueue process context. For
* each call, it is guaranteed this will be called in order of packet to be
* delivered.
*/
static void afs_rx_interceptor(struct sock *sk, unsigned long user_call_ID,
struct sk_buff *skb)
Expand Down Expand Up @@ -513,6 +525,12 @@ static void afs_deliver_to_call(struct afs_call *call)
call->state = AFS_CALL_ABORTED;
_debug("Rcv ABORT %u -> %d", abort_code, call->error);
break;
case RXRPC_SKB_MARK_LOCAL_ABORT:
abort_code = rxrpc_kernel_get_abort_code(skb);
call->error = call->type->abort_to_error(abort_code);
call->state = AFS_CALL_ABORTED;
_debug("Loc ABORT %u -> %d", abort_code, call->error);
break;
case RXRPC_SKB_MARK_NET_ERROR:
call->error = -rxrpc_kernel_get_error_number(skb);
call->state = AFS_CALL_ERROR;
Expand Down
4 changes: 3 additions & 1 deletion include/net/af_rxrpc.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,20 @@
#ifndef _NET_RXRPC_H
#define _NET_RXRPC_H

#include <linux/skbuff.h>
#include <linux/rxrpc.h>

struct rxrpc_call;

/*
* the mark applied to socket buffers that may be intercepted
*/
enum {
enum rxrpc_skb_mark {
RXRPC_SKB_MARK_DATA, /* data message */
RXRPC_SKB_MARK_FINAL_ACK, /* final ACK received message */
RXRPC_SKB_MARK_BUSY, /* server busy message */
RXRPC_SKB_MARK_REMOTE_ABORT, /* remote abort message */
RXRPC_SKB_MARK_LOCAL_ABORT, /* local abort message */
RXRPC_SKB_MARK_NET_ERROR, /* network error message */
RXRPC_SKB_MARK_LOCAL_ERROR, /* local error message */
RXRPC_SKB_MARK_NEW_CALL, /* local error message */
Expand Down
2 changes: 0 additions & 2 deletions include/rxrpc/packet.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,6 @@ struct rxrpc_wire_header {

} __packed;

extern const char *rxrpc_pkts[];

#define RXRPC_SUPPORTED_PACKET_TYPES ( \
(1 << RXRPC_PACKET_TYPE_DATA) | \
(1 << RXRPC_PACKET_TYPE_ACK) | \
Expand Down
2 changes: 1 addition & 1 deletion net/rxrpc/Kconfig
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ config AF_RXRPC_DEBUG


config RXKAD
tristate "RxRPC Kerberos security"
bool "RxRPC Kerberos security"
depends on AF_RXRPC
select CRYPTO
select CRYPTO_MANAGER
Expand Down
7 changes: 4 additions & 3 deletions net/rxrpc/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,12 @@ af-rxrpc-y := \
ar-recvmsg.o \
ar-security.o \
ar-skbuff.o \
ar-transport.o
ar-transport.o \
insecure.o \
misc.o

af-rxrpc-$(CONFIG_PROC_FS) += ar-proc.o
af-rxrpc-$(CONFIG_RXKAD) += rxkad.o
af-rxrpc-$(CONFIG_SYSCTL) += sysctl.o

obj-$(CONFIG_AF_RXRPC) += af-rxrpc.o

obj-$(CONFIG_RXKAD) += rxkad.o
9 changes: 9 additions & 0 deletions net/rxrpc/af_rxrpc.c
Original file line number Diff line number Diff line change
Expand Up @@ -806,6 +806,12 @@ static int __init af_rxrpc_init(void)
goto error_work_queue;
}

ret = rxrpc_init_security();
if (ret < 0) {
printk(KERN_CRIT "RxRPC: Cannot initialise security\n");
goto error_security;
}

ret = proto_register(&rxrpc_proto, 1);
if (ret < 0) {
printk(KERN_CRIT "RxRPC: Cannot register protocol\n");
Expand Down Expand Up @@ -853,6 +859,8 @@ static int __init af_rxrpc_init(void)
proto_unregister(&rxrpc_proto);
error_proto:
destroy_workqueue(rxrpc_workqueue);
error_security:
rxrpc_exit_security();
error_work_queue:
kmem_cache_destroy(rxrpc_call_jar);
error_call_jar:
Expand Down Expand Up @@ -883,6 +891,7 @@ static void __exit af_rxrpc_exit(void)
remove_proc_entry("rxrpc_conns", init_net.proc_net);
remove_proc_entry("rxrpc_calls", init_net.proc_net);
destroy_workqueue(rxrpc_workqueue);
rxrpc_exit_security();
kmem_cache_destroy(rxrpc_call_jar);
_leave("");
}
Expand Down
4 changes: 2 additions & 2 deletions net/rxrpc/ar-accept.c
Original file line number Diff line number Diff line change
Expand Up @@ -108,15 +108,15 @@ static int rxrpc_accept_incoming_call(struct rxrpc_local *local,
goto error;
}

conn = rxrpc_incoming_connection(trans, &sp->hdr, GFP_NOIO);
conn = rxrpc_incoming_connection(trans, &sp->hdr);
rxrpc_put_transport(trans);
if (IS_ERR(conn)) {
_debug("no conn");
ret = PTR_ERR(conn);
goto error;
}

call = rxrpc_incoming_call(rx, conn, &sp->hdr, GFP_NOIO);
call = rxrpc_incoming_call(rx, conn, &sp->hdr);
rxrpc_put_connection(conn);
if (IS_ERR(call)) {
_debug("no call");
Expand Down
81 changes: 7 additions & 74 deletions net/rxrpc/ar-ack.c
Original file line number Diff line number Diff line change
Expand Up @@ -19,74 +19,6 @@
#include <net/af_rxrpc.h>
#include "ar-internal.h"

/*
* How long to wait before scheduling ACK generation after seeing a
* packet with RXRPC_REQUEST_ACK set (in jiffies).
*/
unsigned int rxrpc_requested_ack_delay = 1;

/*
* How long to wait before scheduling an ACK with subtype DELAY (in jiffies).
*
* We use this when we've received new data packets. If those packets aren't
* all consumed within this time we will send a DELAY ACK if an ACK was not
* requested to let the sender know it doesn't need to resend.
*/
unsigned int rxrpc_soft_ack_delay = 1 * HZ;

/*
* How long to wait before scheduling an ACK with subtype IDLE (in jiffies).
*
* We use this when we've consumed some previously soft-ACK'd packets when
* further packets aren't immediately received to decide when to send an IDLE
* ACK let the other end know that it can free up its Tx buffer space.
*/
unsigned int rxrpc_idle_ack_delay = 0.5 * HZ;

/*
* Receive window size in packets. This indicates the maximum number of
* unconsumed received packets we're willing to retain in memory. Once this
* limit is hit, we should generate an EXCEEDS_WINDOW ACK and discard further
* packets.
*/
unsigned int rxrpc_rx_window_size = 32;

/*
* Maximum Rx MTU size. This indicates to the sender the size of jumbo packet
* made by gluing normal packets together that we're willing to handle.
*/
unsigned int rxrpc_rx_mtu = 5692;

/*
* The maximum number of fragments in a received jumbo packet that we tell the
* sender that we're willing to handle.
*/
unsigned int rxrpc_rx_jumbo_max = 4;

static const char *rxrpc_acks(u8 reason)
{
static const char *const str[] = {
"---", "REQ", "DUP", "OOS", "WIN", "MEM", "PNG", "PNR", "DLY",
"IDL", "-?-"
};

if (reason >= ARRAY_SIZE(str))
reason = ARRAY_SIZE(str) - 1;
return str[reason];
}

static const s8 rxrpc_ack_priority[] = {
[0] = 0,
[RXRPC_ACK_DELAY] = 1,
[RXRPC_ACK_REQUESTED] = 2,
[RXRPC_ACK_IDLE] = 3,
[RXRPC_ACK_PING_RESPONSE] = 4,
[RXRPC_ACK_DUPLICATE] = 5,
[RXRPC_ACK_OUT_OF_SEQUENCE] = 6,
[RXRPC_ACK_EXCEEDS_WINDOW] = 7,
[RXRPC_ACK_NOSPACE] = 8,
};

/*
* propose an ACK be sent
*/
Expand Down Expand Up @@ -426,7 +358,7 @@ static void rxrpc_rotate_tx_window(struct rxrpc_call *call, u32 hard)
int tail = call->acks_tail, old_tail;
int win = CIRC_CNT(call->acks_head, tail, call->acks_winsz);

kenter("{%u,%u},%u", call->acks_hard, win, hard);
_enter("{%u,%u},%u", call->acks_hard, win, hard);

ASSERTCMP(hard - call->acks_hard, <=, win);

Expand Down Expand Up @@ -656,7 +588,8 @@ static int rxrpc_process_rx_queue(struct rxrpc_call *call,
_proto("OOSQ DATA %%%u { #%u }", sp->hdr.serial, sp->hdr.seq);

/* secured packets must be verified and possibly decrypted */
if (rxrpc_verify_packet(call, skb, _abort_code) < 0)
if (call->conn->security->verify_packet(call, skb,
_abort_code) < 0)
goto protocol_error;

rxrpc_insert_oos_packet(call, skb);
Expand Down Expand Up @@ -901,8 +834,8 @@ void rxrpc_process_call(struct work_struct *work)

/* there's a good chance we're going to have to send a message, so set
* one up in advance */
msg.msg_name = &call->conn->trans->peer->srx.transport.sin;
msg.msg_namelen = sizeof(call->conn->trans->peer->srx.transport.sin);
msg.msg_name = &call->conn->trans->peer->srx.transport;
msg.msg_namelen = call->conn->trans->peer->srx.transport_len;
msg.msg_control = NULL;
msg.msg_controllen = 0;
msg.msg_flags = 0;
Expand Down Expand Up @@ -973,7 +906,7 @@ void rxrpc_process_call(struct work_struct *work)
ECONNABORTED, true) < 0)
goto no_mem;
whdr.type = RXRPC_PACKET_TYPE_ABORT;
data = htonl(call->abort_code);
data = htonl(call->local_abort);
iov[1].iov_base = &data;
iov[1].iov_len = sizeof(data);
genbit = RXRPC_CALL_EV_ABORT;
Expand Down Expand Up @@ -1036,7 +969,7 @@ void rxrpc_process_call(struct work_struct *work)
write_lock_bh(&call->state_lock);
if (call->state <= RXRPC_CALL_COMPLETE) {
call->state = RXRPC_CALL_LOCALLY_ABORTED;
call->abort_code = RX_CALL_TIMEOUT;
call->local_abort = RX_CALL_TIMEOUT;
set_bit(RXRPC_CALL_EV_ABORT, &call->events);
}
write_unlock_bh(&call->state_lock);
Expand Down
11 changes: 5 additions & 6 deletions net/rxrpc/ar-call.c
Original file line number Diff line number Diff line change
Expand Up @@ -411,18 +411,17 @@ struct rxrpc_call *rxrpc_get_client_call(struct rxrpc_sock *rx,
*/
struct rxrpc_call *rxrpc_incoming_call(struct rxrpc_sock *rx,
struct rxrpc_connection *conn,
struct rxrpc_host_header *hdr,
gfp_t gfp)
struct rxrpc_host_header *hdr)
{
struct rxrpc_call *call, *candidate;
struct rb_node **p, *parent;
u32 call_id;

_enter(",%d,,%x", conn->debug_id, gfp);
_enter(",%d", conn->debug_id);

ASSERT(rx != NULL);

candidate = rxrpc_alloc_call(gfp);
candidate = rxrpc_alloc_call(GFP_NOIO);
if (!candidate)
return ERR_PTR(-EBUSY);

Expand Down Expand Up @@ -682,7 +681,7 @@ void rxrpc_release_call(struct rxrpc_call *call)
call->state != RXRPC_CALL_CLIENT_FINAL_ACK) {
_debug("+++ ABORTING STATE %d +++\n", call->state);
call->state = RXRPC_CALL_LOCALLY_ABORTED;
call->abort_code = RX_CALL_DEAD;
call->local_abort = RX_CALL_DEAD;
set_bit(RXRPC_CALL_EV_ABORT, &call->events);
rxrpc_queue_call(call);
}
Expand Down Expand Up @@ -758,7 +757,7 @@ static void rxrpc_mark_call_released(struct rxrpc_call *call)
if (call->state < RXRPC_CALL_COMPLETE) {
_debug("abort call %p", call);
call->state = RXRPC_CALL_LOCALLY_ABORTED;
call->abort_code = RX_CALL_DEAD;
call->local_abort = RX_CALL_DEAD;
if (!test_and_set_bit(RXRPC_CALL_EV_ABORT, &call->events))
sched = true;
}
Expand Down
14 changes: 8 additions & 6 deletions net/rxrpc/ar-connection.c
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ static struct rxrpc_connection *rxrpc_alloc_connection(gfp_t gfp)
INIT_LIST_HEAD(&conn->bundle_link);
conn->calls = RB_ROOT;
skb_queue_head_init(&conn->rx_queue);
conn->security = &rxrpc_no_security;
rwlock_init(&conn->lock);
spin_lock_init(&conn->state_lock);
atomic_set(&conn->usage, 1);
Expand Down Expand Up @@ -564,8 +565,7 @@ int rxrpc_connect_call(struct rxrpc_sock *rx,
candidate->debug_id, candidate->trans->debug_id);

rxrpc_assign_connection_id(candidate);
if (candidate->security)
candidate->security->prime_packet_security(candidate);
candidate->security->prime_packet_security(candidate);

/* leave the candidate lurking in zombie mode attached to the
* bundle until we're ready for it */
Expand Down Expand Up @@ -619,8 +619,7 @@ int rxrpc_connect_call(struct rxrpc_sock *rx,
*/
struct rxrpc_connection *
rxrpc_incoming_connection(struct rxrpc_transport *trans,
struct rxrpc_host_header *hdr,
gfp_t gfp)
struct rxrpc_host_header *hdr)
{
struct rxrpc_connection *conn, *candidate = NULL;
struct rb_node *p, **pp;
Expand Down Expand Up @@ -659,7 +658,7 @@ rxrpc_incoming_connection(struct rxrpc_transport *trans,

/* not yet present - create a candidate for a new record and then
* redo the search */
candidate = rxrpc_alloc_connection(gfp);
candidate = rxrpc_alloc_connection(GFP_NOIO);
if (!candidate) {
_leave(" = -ENOMEM");
return ERR_PTR(-ENOMEM);
Expand Down Expand Up @@ -831,7 +830,10 @@ static void rxrpc_destroy_connection(struct rxrpc_connection *conn)
ASSERT(RB_EMPTY_ROOT(&conn->calls));
rxrpc_purge_queue(&conn->rx_queue);

rxrpc_clear_conn_security(conn);
conn->security->clear(conn);
key_put(conn->key);
key_put(conn->server_key);

rxrpc_put_transport(conn->trans);
kfree(conn);
_leave("");
Expand Down
Loading

0 comments on commit 7c3da7d

Please sign in to comment.