Skip to content

Commit

Permalink
[PATCH] RPC: separate TCP and UDP transport connection logic
Browse files Browse the repository at this point in the history
 Create separate connection worker functions for managing UDP and TCP
 transport sockets.  This eliminates several dependencies on "xprt->stream".

 Test-plan:
 Destructive testing (unplugging the network temporarily).  Connectathon with
 v2, v3, and v4.

 Version: Thu, 11 Aug 2005 16:08:18 -0400

 Signed-off-by: Chuck Lever <cel@netapp.com>
 Signed-off-by: Trond Myklebust <Trond.Myklebust@netapp.com>
  • Loading branch information
Chuck Lever authored and Trond Myklebust committed Sep 23, 2005
1 parent c7b2cae commit b0d93ad
Showing 1 changed file with 91 additions and 73 deletions.
164 changes: 91 additions & 73 deletions net/sunrpc/xprtsock.c
Original file line number Diff line number Diff line change
Expand Up @@ -836,102 +836,118 @@ static int xs_bindresvport(struct rpc_xprt *xprt, struct socket *sock)
return err;
}

static struct socket *xs_create(struct rpc_xprt *xprt, int proto, int resvport)
/**
* xs_udp_connect_worker - set up a UDP socket
* @args: RPC transport to connect
*
* Invoked by a work queue tasklet.
*/
static void xs_udp_connect_worker(void *args)
{
struct socket *sock;
int type, err;

dprintk("RPC: xs_create(%s %d)\n",
(proto == IPPROTO_UDP)? "udp" : "tcp", proto);
struct rpc_xprt *xprt = (struct rpc_xprt *) args;
struct socket *sock = xprt->sock;
int err, status = -EIO;

type = (proto == IPPROTO_UDP)? SOCK_DGRAM : SOCK_STREAM;
if (xprt->shutdown || xprt->addr.sin_port == 0)
goto out;

if ((err = sock_create_kern(PF_INET, type, proto, &sock)) < 0) {
dprintk("RPC: can't create socket (%d).\n", -err);
return NULL;
}
dprintk("RPC: xs_udp_connect_worker for xprt %p\n", xprt);

/* If the caller has the capability, bind to a reserved port */
if (resvport && xs_bindresvport(xprt, sock) < 0)
goto failed;
/* Start by resetting any existing state */
xs_close(xprt);

return sock;
if ((err = sock_create_kern(PF_INET, SOCK_DGRAM, IPPROTO_UDP, &sock)) < 0) {
dprintk("RPC: can't create UDP transport socket (%d).\n", -err);
goto out;
}

failed:
sock_release(sock);
return NULL;
}
if (xprt->resvport && xs_bindresvport(xprt, sock) < 0) {
sock_release(sock);
goto out;
}

static void xs_bind(struct rpc_xprt *xprt, struct socket *sock)
{
struct sock *sk = sock->sk;
if (!xprt->inet) {
struct sock *sk = sock->sk;

if (xprt->inet)
return;
write_lock_bh(&sk->sk_callback_lock);

write_lock_bh(&sk->sk_callback_lock);
sk->sk_user_data = xprt;
xprt->old_data_ready = sk->sk_data_ready;
xprt->old_state_change = sk->sk_state_change;
xprt->old_write_space = sk->sk_write_space;
if (xprt->prot == IPPROTO_UDP) {
sk->sk_user_data = xprt;
xprt->old_data_ready = sk->sk_data_ready;
xprt->old_state_change = sk->sk_state_change;
xprt->old_write_space = sk->sk_write_space;
sk->sk_data_ready = xs_udp_data_ready;
sk->sk_write_space = xs_udp_write_space;
sk->sk_no_check = UDP_CSUM_NORCV;

xprt_set_connected(xprt);
} else {
tcp_sk(sk)->nonagle = 1; /* disable Nagle's algorithm */
sk->sk_data_ready = xs_tcp_data_ready;
sk->sk_state_change = xs_tcp_state_change;
sk->sk_write_space = xs_tcp_write_space;
xprt_clear_connected(xprt);
}

/* Reset to new socket */
xprt->sock = sock;
xprt->inet = sk;
write_unlock_bh(&sk->sk_callback_lock);
/* Reset to new socket */
xprt->sock = sock;
xprt->inet = sk;

return;
write_unlock_bh(&sk->sk_callback_lock);
}
xs_set_buffer_size(xprt);
status = 0;
out:
xprt_wake_pending_tasks(xprt, status);
xprt_clear_connecting(xprt);
}

/**
* xs_connect_worker - try to connect a socket to a remote endpoint
* xs_tcp_connect_worker - connect a TCP socket to a remote endpoint
* @args: RPC transport to connect
*
* Invoked by a work queue tasklet.
*/
static void xs_connect_worker(void *args)
static void xs_tcp_connect_worker(void *args)
{
struct rpc_xprt *xprt = (struct rpc_xprt *)args;
struct socket *sock = xprt->sock;
int status = -EIO;
int err, status = -EIO;

if (xprt->shutdown || xprt->addr.sin_port == 0)
goto out;

dprintk("RPC: xs_connect_worker xprt %p\n", xprt);
dprintk("RPC: xs_tcp_connect_worker for xprt %p\n", xprt);

/*
* Start by resetting any existing state
*/
/* Start by resetting any existing socket state */
xs_close(xprt);
sock = xs_create(xprt, xprt->prot, xprt->resvport);
if (sock == NULL) {
/* couldn't create socket or bind to reserved port;
* this is likely a permanent error, so cause an abort */

if ((err = sock_create_kern(PF_INET, SOCK_STREAM, IPPROTO_TCP, &sock)) < 0) {
dprintk("RPC: can't create TCP transport socket (%d).\n", -err);
goto out;
}
xs_bind(xprt, sock);
xs_set_buffer_size(xprt);

status = 0;
if (!xprt->stream)
if (xprt->resvport && xs_bindresvport(xprt, sock) < 0) {
sock_release(sock);
goto out;
}

/*
* Tell the socket layer to start connecting...
*/
if (!xprt->inet) {
struct sock *sk = sock->sk;

write_lock_bh(&sk->sk_callback_lock);

sk->sk_user_data = xprt;
xprt->old_data_ready = sk->sk_data_ready;
xprt->old_state_change = sk->sk_state_change;
xprt->old_write_space = sk->sk_write_space;
sk->sk_data_ready = xs_tcp_data_ready;
sk->sk_state_change = xs_tcp_state_change;
sk->sk_write_space = xs_tcp_write_space;
tcp_sk(sk)->nonagle = 1;

xprt_clear_connected(xprt);

/* Reset to new socket */
xprt->sock = sock;
xprt->inet = sk;

write_unlock_bh(&sk->sk_callback_lock);
}

/* Tell the socket layer to start connecting... */
status = sock->ops->connect(sock, (struct sockaddr *) &xprt->addr,
sizeof(xprt->addr), O_NONBLOCK);
dprintk("RPC: %p connect status %d connected %d sock state %d\n",
Expand Down Expand Up @@ -959,18 +975,20 @@ static void xs_connect(struct rpc_task *task)
{
struct rpc_xprt *xprt = task->tk_xprt;

if (!xprt_test_and_set_connecting(xprt)) {
if (xprt->sock != NULL) {
dprintk("RPC: xs_connect delayed xprt %p\n", xprt);
schedule_delayed_work(&xprt->connect_worker,
if (xprt_test_and_set_connecting(xprt))
return;

if (xprt->sock != NULL) {
dprintk("RPC: xs_connect delayed xprt %p\n", xprt);
schedule_delayed_work(&xprt->connect_worker,
RPC_REESTABLISH_TIMEOUT);
} else {
dprintk("RPC: xs_connect scheduled xprt %p\n", xprt);
schedule_work(&xprt->connect_worker);
/* flush_scheduled_work can sleep... */
if (!RPC_IS_ASYNC(task))
flush_scheduled_work();
}
} else {
dprintk("RPC: xs_connect scheduled xprt %p\n", xprt);
schedule_work(&xprt->connect_worker);

/* flush_scheduled_work can sleep... */
if (!RPC_IS_ASYNC(task))
flush_scheduled_work();
}
}

Expand Down Expand Up @@ -1013,7 +1031,7 @@ int xs_setup_udp(struct rpc_xprt *xprt, struct rpc_timeout *to)
/* XXX: header size can vary due to auth type, IPv6, etc. */
xprt->max_payload = (1U << 16) - (MAX_HEADER << 3);

INIT_WORK(&xprt->connect_worker, xs_connect_worker, xprt);
INIT_WORK(&xprt->connect_worker, xs_udp_connect_worker, xprt);

xprt->ops = &xs_ops;

Expand Down Expand Up @@ -1052,7 +1070,7 @@ int xs_setup_tcp(struct rpc_xprt *xprt, struct rpc_timeout *to)
xprt->resvport = capable(CAP_NET_BIND_SERVICE) ? 1 : 0;
xprt->max_payload = (1U << 31) - 1;

INIT_WORK(&xprt->connect_worker, xs_connect_worker, xprt);
INIT_WORK(&xprt->connect_worker, xs_tcp_connect_worker, xprt);

xprt->ops = &xs_ops;

Expand Down

0 comments on commit b0d93ad

Please sign in to comment.