Skip to content

Commit

Permalink
RDS: TCP: Enable multipath RDS for TCP
Browse files Browse the repository at this point in the history
Use RDS probe-ping to compute how many paths may be used with
the peer, and to synchronously start the multiple paths. If mprds is
supported, hash outgoing traffic to one of multiple paths in rds_sendmsg()
when multipath RDS is supported by the transport.

CC: Santosh Shilimkar <santosh.shilimkar@oracle.com>
Signed-off-by: Sowmini Varadhan <sowmini.varadhan@oracle.com>
Acked-by: Santosh Shilimkar <santosh.shilimkar@oracle.com>
Signed-off-by: David S. Miller <davem@davemloft.net>
  • Loading branch information
Sowmini Varadhan authored and David S. Miller committed Jul 15, 2016
1 parent ac3615e commit 5916e2c
Show file tree
Hide file tree
Showing 11 changed files with 257 additions and 30 deletions.
6 changes: 6 additions & 0 deletions net/rds/bind.c
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ static int rds_add_bound(struct rds_sock *rs, __be32 addr, __be16 *port)

if (*port != 0) {
rover = be16_to_cpu(*port);
if (rover == RDS_FLAG_PROBE_PORT)
return -EINVAL;
last = rover;
} else {
rover = max_t(u16, prandom_u32(), 2);
Expand All @@ -91,12 +93,16 @@ static int rds_add_bound(struct rds_sock *rs, __be32 addr, __be16 *port)
if (rover == 0)
rover++;

if (rover == RDS_FLAG_PROBE_PORT)
continue;
key = ((u64)addr << 32) | cpu_to_be16(rover);
if (rhashtable_lookup_fast(&bind_hash_table, &key, ht_parms))
continue;

rs->rs_bound_key = key;
rs->rs_bound_addr = addr;
net_get_random_once(&rs->rs_hash_initval,
sizeof(rs->rs_hash_initval));
rs->rs_bound_port = cpu_to_be16(rover);
rs->rs_bound_node.next = NULL;
rds_sock_addref(rs);
Expand Down
17 changes: 8 additions & 9 deletions net/rds/connection.c
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ static struct rds_connection *__rds_conn_create(struct net *net,
struct hlist_head *head = rds_conn_bucket(laddr, faddr);
struct rds_transport *loop_trans;
unsigned long flags;
int ret;
int ret, i;

rcu_read_lock();
conn = rds_conn_lookup(net, head, laddr, faddr, trans);
Expand Down Expand Up @@ -211,6 +211,12 @@ static struct rds_connection *__rds_conn_create(struct net *net,

conn->c_trans = trans;

init_waitqueue_head(&conn->c_hs_waitq);
for (i = 0; i < RDS_MPATH_WORKERS; i++) {
__rds_conn_path_init(conn, &conn->c_path[i],
is_outgoing);
conn->c_path[i].cp_index = i;
}
ret = trans->conn_alloc(conn, gfp);
if (ret) {
kmem_cache_free(rds_conn_slab, conn);
Expand Down Expand Up @@ -263,14 +269,6 @@ static struct rds_connection *__rds_conn_create(struct net *net,
kmem_cache_free(rds_conn_slab, conn);
conn = found;
} else {
int i;

for (i = 0; i < RDS_MPATH_WORKERS; i++) {
__rds_conn_path_init(conn, &conn->c_path[i],
is_outgoing);
conn->c_path[i].cp_index = i;
}

hlist_add_head_rcu(&conn->c_hash_node, head);
rds_cong_add_conn(conn);
rds_conn_count++;
Expand Down Expand Up @@ -668,6 +666,7 @@ EXPORT_SYMBOL_GPL(rds_conn_path_drop);

void rds_conn_drop(struct rds_connection *conn)
{
WARN_ON(conn->c_trans->t_mp_capable);
rds_conn_path_drop(&conn->c_path[0]);
}
EXPORT_SYMBOL_GPL(rds_conn_drop);
Expand Down
1 change: 1 addition & 0 deletions net/rds/message.c
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ static unsigned int rds_exthdr_size[__RDS_EXTHDR_MAX] = {
[RDS_EXTHDR_VERSION] = sizeof(struct rds_ext_header_version),
[RDS_EXTHDR_RDMA] = sizeof(struct rds_ext_header_rdma),
[RDS_EXTHDR_RDMA_DEST] = sizeof(struct rds_ext_header_rdma_dest),
[RDS_EXTHDR_NPATHS] = sizeof(u16),
};


Expand Down
25 changes: 23 additions & 2 deletions net/rds/rds.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,9 @@ enum {
#define RDS_RECV_REFILL 3

/* Max number of multipaths per RDS connection. Must be a power of 2 */
#define RDS_MPATH_WORKERS 1
#define RDS_MPATH_WORKERS 8
#define RDS_MPATH_HASH(rs, n) (jhash_1word((rs)->rs_bound_port, \
(rs)->rs_hash_initval) & ((n) - 1))

/* Per mpath connection state */
struct rds_conn_path {
Expand Down Expand Up @@ -131,7 +133,8 @@ struct rds_connection {
__be32 c_laddr;
__be32 c_faddr;
unsigned int c_loopback:1,
c_pad_to_32:31;
c_ping_triggered:1,
c_pad_to_32:30;
int c_npaths;
struct rds_connection *c_passive;
struct rds_transport *c_trans;
Expand All @@ -147,6 +150,7 @@ struct rds_connection {
unsigned long c_map_queued;

struct rds_conn_path c_path[RDS_MPATH_WORKERS];
wait_queue_head_t c_hs_waitq; /* handshake waitq */
};

static inline
Expand All @@ -166,6 +170,17 @@ void rds_conn_net_set(struct rds_connection *conn, struct net *net)
#define RDS_FLAG_RETRANSMITTED 0x04
#define RDS_MAX_ADV_CREDIT 255

/* RDS_FLAG_PROBE_PORT is the reserved sport used for sending a ping
* probe to exchange control information before establishing a connection.
* Currently the control information that is exchanged is the number of
* supported paths. If the peer is a legacy (older kernel revision) peer,
* it would return a pong message without additional control information
* that would then alert the sender that the peer was an older rev.
*/
#define RDS_FLAG_PROBE_PORT 1
#define RDS_HS_PROBE(sport, dport) \
((sport == RDS_FLAG_PROBE_PORT && dport == 0) || \
(sport == 0 && dport == RDS_FLAG_PROBE_PORT))
/*
* Maximum space available for extension headers.
*/
Expand Down Expand Up @@ -225,6 +240,11 @@ struct rds_ext_header_rdma_dest {
__be32 h_rdma_offset;
};

/* Extension header announcing number of paths.
* Implicit length = 2 bytes.
*/
#define RDS_EXTHDR_NPATHS 4

#define __RDS_EXTHDR_MAX 16 /* for now */

struct rds_incoming {
Expand Down Expand Up @@ -545,6 +565,7 @@ struct rds_sock {
/* Socket options - in case there will be more */
unsigned char rs_recverr,
rs_cong_monitor;
u32 rs_hash_initval;
};

static inline struct rds_sock *rds_sk_to_rs(const struct sock *sk)
Expand Down
75 changes: 75 additions & 0 deletions net/rds/recv.c
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,67 @@ static void rds_recv_incoming_exthdrs(struct rds_incoming *inc, struct rds_sock
}
}

static void rds_recv_hs_exthdrs(struct rds_header *hdr,
struct rds_connection *conn)
{
unsigned int pos = 0, type, len;
union {
struct rds_ext_header_version version;
u16 rds_npaths;
} buffer;

while (1) {
len = sizeof(buffer);
type = rds_message_next_extension(hdr, &pos, &buffer, &len);
if (type == RDS_EXTHDR_NONE)
break;
/* Process extension header here */
switch (type) {
case RDS_EXTHDR_NPATHS:
conn->c_npaths = min_t(int, RDS_MPATH_WORKERS,
buffer.rds_npaths);
break;
default:
pr_warn_ratelimited("ignoring unknown exthdr type "
"0x%x\n", type);
}
}
/* if RDS_EXTHDR_NPATHS was not found, default to a single-path */
conn->c_npaths = max_t(int, conn->c_npaths, 1);
}

/* rds_start_mprds() will synchronously start multiple paths when appropriate.
* The scheme is based on the following rules:
*
* 1. rds_sendmsg on first connect attempt sends the probe ping, with the
* sender's npaths (s_npaths)
* 2. rcvr of probe-ping knows the mprds_paths = min(s_npaths, r_npaths). It
* sends back a probe-pong with r_npaths. After that, if rcvr is the
* smaller ip addr, it starts rds_conn_path_connect_if_down on all
* mprds_paths.
* 3. sender gets woken up, and can move to rds_conn_path_connect_if_down.
* If it is the smaller ipaddr, rds_conn_path_connect_if_down can be
* called after reception of the probe-pong on all mprds_paths.
* Otherwise (sender of probe-ping is not the smaller ip addr): just call
* rds_conn_path_connect_if_down on the hashed path. (see rule 4)
* 4. when cp_index > 0, rds_connect_worker must only trigger
* a connection if laddr < faddr.
* 5. sender may end up queuing the packet on the cp. will get sent out later.
* when connection is completed.
*/
static void rds_start_mprds(struct rds_connection *conn)
{
int i;
struct rds_conn_path *cp;

if (conn->c_npaths > 1 && conn->c_laddr < conn->c_faddr) {
for (i = 1; i < conn->c_npaths; i++) {
cp = &conn->c_path[i];
rds_conn_path_connect_if_down(cp);
}
}
}

/*
* The transport must make sure that this is serialized against other
* rx and conn reset on this specific conn.
Expand Down Expand Up @@ -232,6 +293,20 @@ void rds_recv_incoming(struct rds_connection *conn, __be32 saddr, __be32 daddr,
}
rds_stats_inc(s_recv_ping);
rds_send_pong(cp, inc->i_hdr.h_sport);
/* if this is a handshake ping, start multipath if necessary */
if (RDS_HS_PROBE(inc->i_hdr.h_sport, inc->i_hdr.h_dport)) {
rds_recv_hs_exthdrs(&inc->i_hdr, cp->cp_conn);
rds_start_mprds(cp->cp_conn);
}
goto out;
}

if (inc->i_hdr.h_dport == RDS_FLAG_PROBE_PORT &&
inc->i_hdr.h_sport == 0) {
rds_recv_hs_exthdrs(&inc->i_hdr, cp->cp_conn);
/* if this is a handshake pong, start multipath if necessary */
rds_start_mprds(cp->cp_conn);
wake_up(&cp->cp_conn->c_hs_waitq);
goto out;
}

Expand Down
71 changes: 67 additions & 4 deletions net/rds/send.c
Original file line number Diff line number Diff line change
Expand Up @@ -963,6 +963,29 @@ static int rds_cmsg_send(struct rds_sock *rs, struct rds_message *rm,
return ret;
}

static void rds_send_ping(struct rds_connection *conn);

static int rds_send_mprds_hash(struct rds_sock *rs, struct rds_connection *conn)
{
int hash;

if (conn->c_npaths == 0)
hash = RDS_MPATH_HASH(rs, RDS_MPATH_WORKERS);
else
hash = RDS_MPATH_HASH(rs, conn->c_npaths);
if (conn->c_npaths == 0 && hash != 0) {
rds_send_ping(conn);

if (conn->c_npaths == 0) {
wait_event_interruptible(conn->c_hs_waitq,
(conn->c_npaths != 0));
}
if (conn->c_npaths == 1)
hash = 0;
}
return hash;
}

int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len)
{
struct sock *sk = sock->sk;
Expand Down Expand Up @@ -1075,7 +1098,10 @@ int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len)
goto out;
}

cpath = &conn->c_path[0];
if (conn->c_trans->t_mp_capable)
cpath = &conn->c_path[rds_send_mprds_hash(rs, conn)];
else
cpath = &conn->c_path[0];

rds_conn_path_connect_if_down(cpath);

Expand Down Expand Up @@ -1135,10 +1161,16 @@ int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len)
}

/*
* Reply to a ping packet.
* send out a probe. Can be shared by rds_send_ping,
* rds_send_pong, rds_send_hb.
* rds_send_hb should use h_flags
* RDS_FLAG_HB_PING|RDS_FLAG_ACK_REQUIRED
* or
* RDS_FLAG_HB_PONG|RDS_FLAG_ACK_REQUIRED
*/
int
rds_send_pong(struct rds_conn_path *cp, __be16 dport)
rds_send_probe(struct rds_conn_path *cp, __be16 sport,
__be16 dport, u8 h_flags)
{
struct rds_message *rm;
unsigned long flags;
Expand Down Expand Up @@ -1166,9 +1198,18 @@ rds_send_pong(struct rds_conn_path *cp, __be16 dport)
rm->m_inc.i_conn = cp->cp_conn;
rm->m_inc.i_conn_path = cp;

rds_message_populate_header(&rm->m_inc.i_hdr, 0, dport,
rds_message_populate_header(&rm->m_inc.i_hdr, sport, dport,
cp->cp_next_tx_seq);
rm->m_inc.i_hdr.h_flags |= h_flags;
cp->cp_next_tx_seq++;

if (RDS_HS_PROBE(sport, dport) && cp->cp_conn->c_trans->t_mp_capable) {
u16 npaths = RDS_MPATH_WORKERS;

rds_message_add_extension(&rm->m_inc.i_hdr,
RDS_EXTHDR_NPATHS, &npaths,
sizeof(npaths));
}
spin_unlock_irqrestore(&cp->cp_lock, flags);

rds_stats_inc(s_send_queued);
Expand All @@ -1185,3 +1226,25 @@ rds_send_pong(struct rds_conn_path *cp, __be16 dport)
rds_message_put(rm);
return ret;
}

int
rds_send_pong(struct rds_conn_path *cp, __be16 dport)
{
return rds_send_probe(cp, 0, dport, 0);
}

void
rds_send_ping(struct rds_connection *conn)
{
unsigned long flags;
struct rds_conn_path *cp = &conn->c_path[0];

spin_lock_irqsave(&cp->cp_lock, flags);
if (conn->c_ping_triggered) {
spin_unlock_irqrestore(&cp->cp_lock, flags);
return;
}
conn->c_ping_triggered = 1;
spin_unlock_irqrestore(&cp->cp_lock, flags);
rds_send_probe(&conn->c_path[0], RDS_FLAG_PROBE_PORT, 0, 0);
}
2 changes: 1 addition & 1 deletion net/rds/tcp.c
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
#include <net/net_namespace.h>
#include <net/netns/generic.h>

#include "rds_single_path.h"
#include "rds.h"
#include "tcp.h"

Expand Down Expand Up @@ -358,6 +357,7 @@ struct rds_transport rds_tcp_transport = {
.t_name = "tcp",
.t_type = RDS_TRANS_TCP,
.t_prefer_loopback = 1,
.t_mp_capable = 1,
};

static int rds_tcp_netid;
Expand Down
7 changes: 6 additions & 1 deletion net/rds/tcp_connect.c
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
#include <linux/in.h>
#include <net/tcp.h>

#include "rds_single_path.h"
#include "rds.h"
#include "tcp.h"

Expand Down Expand Up @@ -82,6 +81,12 @@ int rds_tcp_conn_path_connect(struct rds_conn_path *cp)
struct rds_connection *conn = cp->cp_conn;
struct rds_tcp_connection *tc = cp->cp_transport_data;

/* for multipath rds,we only trigger the connection after
* the handshake probe has determined the number of paths.
*/
if (cp->cp_index > 0 && cp->cp_conn->c_npaths < 2)
return -EAGAIN;

mutex_lock(&tc->t_conn_path_lock);

if (rds_conn_path_up(cp)) {
Expand Down
Loading

0 comments on commit 5916e2c

Please sign in to comment.