Skip to content

Commit

Permalink
tipc: add smart nagle feature
Browse files Browse the repository at this point in the history
We introduce a feature that works like a combination of TCP_NAGLE and
TCP_CORK, but without some of the weaknesses of those. In particular,
we will not observe long delivery delays because of delayed acks, since
the algorithm itself decides if and when acks are to be sent from the
receiving peer.

- The nagle property as such is determined by manipulating a new
  'maxnagle' field in struct tipc_sock. If certain conditions are met,
  'maxnagle' will define max size of the messages which can be bundled.
  If it is set to zero no messages are ever bundled, implying that the
  nagle property is disabled.
- A socket with the nagle property enabled enters nagle mode when more
  than 4 messages have been sent out without receiving any data message
  from the peer.
- A socket leaves nagle mode whenever it receives a data message from
  the peer.

In nagle mode, messages smaller than 'maxnagle' are accumulated in the
socket write queue. The last buffer in the queue is marked with a new
'ack_required' bit, which forces the receiving peer to send a CONN_ACK
message back to the sender upon reception.

The accumulated contents of the write queue is transmitted when one of
the following events or conditions occur.

- A CONN_ACK message is received from the peer.
- A data message is received from the peer.
- A SOCK_WAKEUP pseudo message is received from the link level.
- The write queue contains more than 64 1k blocks of data.
- The connection is being shut down.
- There is no CONN_ACK message to expect. I.e., there is currently
  no outstanding message where the 'ack_required' bit was set. As a
  consequence, the first message added after we enter nagle mode
  is always sent directly with this bit set.

This new feature gives a 50-100% improvement of throughput for small
(i.e., less than MTU size) messages, while it might add up to one RTT
to latency time when the socket is in nagle mode.

Acked-by: Ying Xue <ying.xue@windreiver.com>
Signed-off-by: Jon Maloy <jon.maloy@ericsson.com>
Signed-off-by: David S. Miller <davem@davemloft.net>
  • Loading branch information
Jon Maloy authored and David S. Miller committed Oct 30, 2019
1 parent 6c814e8 commit c0bceb9
Show file tree
Hide file tree
Showing 5 changed files with 170 additions and 20 deletions.
1 change: 1 addition & 0 deletions include/uapi/linux/tipc.h
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ struct sockaddr_tipc {
#define TIPC_GROUP_JOIN 135 /* Takes struct tipc_group_req* */
#define TIPC_GROUP_LEAVE 136 /* No argument */
#define TIPC_SOCK_RECVQ_USED 137 /* Default: none (read only) */
#define TIPC_NODELAY 138 /* Default: false */

/*
* Flag values
Expand Down
53 changes: 53 additions & 0 deletions net/tipc/msg.c
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,59 @@ int tipc_buf_append(struct sk_buff **headbuf, struct sk_buff **buf)
return 0;
}

/**
* tipc_msg_append(): Append data to tail of an existing buffer queue
* @hdr: header to be used
* @m: the data to be appended
* @mss: max allowable size of buffer
* @dlen: size of data to be appended
* @txq: queue to appand to
* Returns the number og 1k blocks appended or errno value
*/
int tipc_msg_append(struct tipc_msg *_hdr, struct msghdr *m, int dlen,
int mss, struct sk_buff_head *txq)
{
struct sk_buff *skb, *prev;
int accounted, total, curr;
int mlen, cpy, rem = dlen;
struct tipc_msg *hdr;

skb = skb_peek_tail(txq);
accounted = skb ? msg_blocks(buf_msg(skb)) : 0;
total = accounted;

while (rem) {
if (!skb || skb->len >= mss) {
prev = skb;
skb = tipc_buf_acquire(mss, GFP_KERNEL);
if (unlikely(!skb))
return -ENOMEM;
skb_orphan(skb);
skb_trim(skb, MIN_H_SIZE);
hdr = buf_msg(skb);
skb_copy_to_linear_data(skb, _hdr, MIN_H_SIZE);
msg_set_hdr_sz(hdr, MIN_H_SIZE);
msg_set_size(hdr, MIN_H_SIZE);
__skb_queue_tail(txq, skb);
total += 1;
if (prev)
msg_set_ack_required(buf_msg(prev), 0);
msg_set_ack_required(hdr, 1);
}
hdr = buf_msg(skb);
curr = msg_blocks(hdr);
mlen = msg_size(hdr);
cpy = min_t(int, rem, mss - mlen);
if (cpy != copy_from_iter(skb->data + mlen, cpy, &m->msg_iter))
return -EFAULT;
msg_set_size(hdr, mlen + cpy);
skb_put(skb, cpy);
rem -= cpy;
total += msg_blocks(hdr) - curr;
}
return total - accounted;
}

/* tipc_msg_validate - validate basic format of received message
*
* This routine ensures a TIPC message has an acceptable header, and at least
Expand Down
12 changes: 12 additions & 0 deletions net/tipc/msg.h
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,16 @@ static inline void msg_set_src_droppable(struct tipc_msg *m, u32 d)
msg_set_bits(m, 0, 18, 1, d);
}

static inline int msg_ack_required(struct tipc_msg *m)
{
return msg_bits(m, 0, 18, 1);
}

static inline void msg_set_ack_required(struct tipc_msg *m, u32 d)
{
msg_set_bits(m, 0, 18, 1, d);
}

static inline bool msg_is_rcast(struct tipc_msg *m)
{
return msg_bits(m, 0, 18, 0x1);
Expand Down Expand Up @@ -1079,6 +1089,8 @@ int tipc_msg_fragment(struct sk_buff *skb, const struct tipc_msg *hdr,
int pktmax, struct sk_buff_head *frags);
int tipc_msg_build(struct tipc_msg *mhdr, struct msghdr *m,
int offset, int dsz, int mtu, struct sk_buff_head *list);
int tipc_msg_append(struct tipc_msg *hdr, struct msghdr *m, int dlen,
int mss, struct sk_buff_head *txq);
bool tipc_msg_lookup_dest(struct net *net, struct sk_buff *skb, int *err);
bool tipc_msg_assemble(struct sk_buff_head *list);
bool tipc_msg_reassemble(struct sk_buff_head *list, struct sk_buff_head *rcvq);
Expand Down
7 changes: 5 additions & 2 deletions net/tipc/node.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ enum {
TIPC_LINK_PROTO_SEQNO = (1 << 6),
TIPC_MCAST_RBCTL = (1 << 7),
TIPC_GAP_ACK_BLOCK = (1 << 8),
TIPC_TUNNEL_ENHANCED = (1 << 9)
TIPC_TUNNEL_ENHANCED = (1 << 9),
TIPC_NAGLE = (1 << 10)
};

#define TIPC_NODE_CAPABILITIES (TIPC_SYN_BIT | \
Expand All @@ -66,7 +67,9 @@ enum {
TIPC_LINK_PROTO_SEQNO | \
TIPC_MCAST_RBCTL | \
TIPC_GAP_ACK_BLOCK | \
TIPC_TUNNEL_ENHANCED)
TIPC_TUNNEL_ENHANCED | \
TIPC_NAGLE)

#define INVALID_BEARER_ID -1

void tipc_node_stop(struct net *net);
Expand Down
117 changes: 99 additions & 18 deletions net/tipc/socket.c
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ struct sockaddr_pair {
* @conn_instance: TIPC instance used when connection was established
* @published: non-zero if port has one or more associated names
* @max_pkt: maximum packet size "hint" used when building messages sent by port
* @maxnagle: maximum size of msg which can be subject to nagle
* @portid: unique port identity in TIPC socket hash table
* @phdr: preformatted message header used when sending messages
* #cong_links: list of congested links
Expand All @@ -97,6 +98,7 @@ struct tipc_sock {
u32 conn_instance;
int published;
u32 max_pkt;
u32 maxnagle;
u32 portid;
struct tipc_msg phdr;
struct list_head cong_links;
Expand All @@ -116,6 +118,10 @@ struct tipc_sock {
struct tipc_mc_method mc_method;
struct rcu_head rcu;
struct tipc_group *group;
u32 oneway;
u16 snd_backlog;
bool expect_ack;
bool nodelay;
bool group_is_open;
};

Expand All @@ -137,6 +143,7 @@ static int tipc_sk_insert(struct tipc_sock *tsk);
static void tipc_sk_remove(struct tipc_sock *tsk);
static int __tipc_sendstream(struct socket *sock, struct msghdr *m, size_t dsz);
static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dsz);
static void tipc_sk_push_backlog(struct tipc_sock *tsk);

static const struct proto_ops packet_ops;
static const struct proto_ops stream_ops;
Expand Down Expand Up @@ -227,6 +234,26 @@ static u16 tsk_inc(struct tipc_sock *tsk, int msglen)
return 1;
}

/* tsk_set_nagle - enable/disable nagle property by manipulating maxnagle
*/
static void tsk_set_nagle(struct tipc_sock *tsk)
{
struct sock *sk = &tsk->sk;

tsk->maxnagle = 0;
if (sk->sk_type != SOCK_STREAM)
return;
if (tsk->nodelay)
return;
if (!(tsk->peer_caps & TIPC_NAGLE))
return;
/* Limit node local buffer size to avoid receive queue overflow */
if (tsk->max_pkt == MAX_MSG_SIZE)
tsk->maxnagle = 1500;
else
tsk->maxnagle = tsk->max_pkt;
}

/**
* tsk_advance_rx_queue - discard first buffer in socket receive queue
*
Expand Down Expand Up @@ -446,6 +473,7 @@ static int tipc_sk_create(struct net *net, struct socket *sock,

tsk = tipc_sk(sk);
tsk->max_pkt = MAX_PKT_DEFAULT;
tsk->maxnagle = 0;
INIT_LIST_HEAD(&tsk->publications);
INIT_LIST_HEAD(&tsk->cong_links);
msg = &tsk->phdr;
Expand Down Expand Up @@ -512,8 +540,12 @@ static void __tipc_shutdown(struct socket *sock, int error)
tipc_wait_for_cond(sock, &timeout, (!tsk->cong_link_cnt &&
!tsk_conn_cong(tsk)));

/* Remove any pending SYN message */
__skb_queue_purge(&sk->sk_write_queue);
/* Push out unsent messages or remove if pending SYN */
skb = skb_peek(&sk->sk_write_queue);
if (skb && !msg_is_syn(buf_msg(skb)))
tipc_sk_push_backlog(tsk);
else
__skb_queue_purge(&sk->sk_write_queue);

/* Reject all unreceived messages, except on an active connection
* (which disconnects locally & sends a 'FIN+' to peer).
Expand Down Expand Up @@ -1208,6 +1240,27 @@ void tipc_sk_mcast_rcv(struct net *net, struct sk_buff_head *arrvq,
tipc_sk_rcv(net, inputq);
}

/* tipc_sk_push_backlog(): send accumulated buffers in socket write queue
* when socket is in Nagle mode
*/
static void tipc_sk_push_backlog(struct tipc_sock *tsk)
{
struct sk_buff_head *txq = &tsk->sk.sk_write_queue;
struct net *net = sock_net(&tsk->sk);
u32 dnode = tsk_peer_node(tsk);
int rc;

if (skb_queue_empty(txq) || tsk->cong_link_cnt)
return;

tsk->snt_unacked += tsk->snd_backlog;
tsk->snd_backlog = 0;
tsk->expect_ack = true;
rc = tipc_node_xmit(net, txq, dnode, tsk->portid);
if (rc == -ELINKCONG)
tsk->cong_link_cnt = 1;
}

/**
* tipc_sk_conn_proto_rcv - receive a connection mng protocol message
* @tsk: receiving socket
Expand All @@ -1221,7 +1274,7 @@ static void tipc_sk_conn_proto_rcv(struct tipc_sock *tsk, struct sk_buff *skb,
u32 onode = tsk_own_node(tsk);
struct sock *sk = &tsk->sk;
int mtyp = msg_type(hdr);
bool conn_cong;
bool was_cong;

/* Ignore if connection cannot be validated: */
if (!tsk_peer_msg(tsk, hdr)) {
Expand Down Expand Up @@ -1254,11 +1307,13 @@ static void tipc_sk_conn_proto_rcv(struct tipc_sock *tsk, struct sk_buff *skb,
__skb_queue_tail(xmitq, skb);
return;
} else if (mtyp == CONN_ACK) {
conn_cong = tsk_conn_cong(tsk);
was_cong = tsk_conn_cong(tsk);
tsk->expect_ack = false;
tipc_sk_push_backlog(tsk);
tsk->snt_unacked -= msg_conn_ack(hdr);
if (tsk->peer_caps & TIPC_BLOCK_FLOWCTL)
tsk->snd_win = msg_adv_win(hdr);
if (conn_cong)
if (was_cong && !tsk_conn_cong(tsk))
sk->sk_write_space(sk);
} else if (mtyp != CONN_PROBE_REPLY) {
pr_warn("Received unknown CONN_PROTO msg\n");
Expand Down Expand Up @@ -1437,15 +1492,15 @@ static int __tipc_sendstream(struct socket *sock, struct msghdr *m, size_t dlen)
struct sock *sk = sock->sk;
DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name);
long timeout = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT);
struct sk_buff_head *txq = &sk->sk_write_queue;
struct tipc_sock *tsk = tipc_sk(sk);
struct tipc_msg *hdr = &tsk->phdr;
struct net *net = sock_net(sk);
struct sk_buff_head pkts;
u32 dnode = tsk_peer_node(tsk);
int maxnagle = tsk->maxnagle;
int maxpkt = tsk->max_pkt;
int send, sent = 0;
int rc = 0;

__skb_queue_head_init(&pkts);
int blocks, rc = 0;

if (unlikely(dlen > INT_MAX))
return -EMSGSIZE;
Expand All @@ -1467,21 +1522,35 @@ static int __tipc_sendstream(struct socket *sock, struct msghdr *m, size_t dlen)
tipc_sk_connected(sk)));
if (unlikely(rc))
break;

send = min_t(size_t, dlen - sent, TIPC_MAX_USER_MSG_SIZE);
rc = tipc_msg_build(hdr, m, sent, send, tsk->max_pkt, &pkts);
if (unlikely(rc != send))
break;

trace_tipc_sk_sendstream(sk, skb_peek(&pkts),
blocks = tsk->snd_backlog;
if (tsk->oneway++ >= 4 && send <= maxnagle) {
rc = tipc_msg_append(hdr, m, send, maxnagle, txq);
if (unlikely(rc < 0))
break;
blocks += rc;
if (blocks <= 64 && tsk->expect_ack) {
tsk->snd_backlog = blocks;
sent += send;
break;
}
tsk->expect_ack = true;
} else {
rc = tipc_msg_build(hdr, m, sent, send, maxpkt, txq);
if (unlikely(rc != send))
break;
blocks += tsk_inc(tsk, send + MIN_H_SIZE);
}
trace_tipc_sk_sendstream(sk, skb_peek(txq),
TIPC_DUMP_SK_SNDQ, " ");
rc = tipc_node_xmit(net, &pkts, dnode, tsk->portid);
rc = tipc_node_xmit(net, txq, dnode, tsk->portid);
if (unlikely(rc == -ELINKCONG)) {
tsk->cong_link_cnt = 1;
rc = 0;
}
if (likely(!rc)) {
tsk->snt_unacked += tsk_inc(tsk, send + MIN_H_SIZE);
tsk->snt_unacked += blocks;
tsk->snd_backlog = 0;
sent += send;
}
} while (sent < dlen && !rc);
Expand Down Expand Up @@ -1528,6 +1597,7 @@ static void tipc_sk_finish_conn(struct tipc_sock *tsk, u32 peer_port,
tipc_node_add_conn(net, peer_node, tsk->portid, peer_port);
tsk->max_pkt = tipc_node_get_mtu(net, peer_node, tsk->portid, true);
tsk->peer_caps = tipc_node_get_capabilities(net, peer_node);
tsk_set_nagle(tsk);
__skb_queue_purge(&sk->sk_write_queue);
if (tsk->peer_caps & TIPC_BLOCK_FLOWCTL)
return;
Expand Down Expand Up @@ -1848,6 +1918,7 @@ static int tipc_recvstream(struct socket *sock, struct msghdr *m,
bool peek = flags & MSG_PEEK;
int offset, required, copy, copied = 0;
int hlen, dlen, err, rc;
bool ack = false;
long timeout;

/* Catch invalid receive attempts */
Expand Down Expand Up @@ -1892,6 +1963,7 @@ static int tipc_recvstream(struct socket *sock, struct msghdr *m,

/* Copy data if msg ok, otherwise return error/partial data */
if (likely(!err)) {
ack = msg_ack_required(hdr);
offset = skb_cb->bytes_read;
copy = min_t(int, dlen - offset, buflen - copied);
rc = skb_copy_datagram_msg(skb, hlen + offset, m, copy);
Expand Down Expand Up @@ -1919,7 +1991,7 @@ static int tipc_recvstream(struct socket *sock, struct msghdr *m,

/* Send connection flow control advertisement when applicable */
tsk->rcv_unacked += tsk_inc(tsk, hlen + dlen);
if (unlikely(tsk->rcv_unacked >= tsk->rcv_win / TIPC_ACK_RATE))
if (ack || tsk->rcv_unacked >= tsk->rcv_win / TIPC_ACK_RATE)
tipc_sk_send_ack(tsk);

/* Exit if all requested data or FIN/error received */
Expand Down Expand Up @@ -1990,6 +2062,7 @@ static void tipc_sk_proto_rcv(struct sock *sk,
smp_wmb();
tsk->cong_link_cnt--;
wakeup = true;
tipc_sk_push_backlog(tsk);
break;
case GROUP_PROTOCOL:
tipc_group_proto_rcv(grp, &wakeup, hdr, inputq, xmitq);
Expand Down Expand Up @@ -2029,6 +2102,7 @@ static bool tipc_sk_filter_connect(struct tipc_sock *tsk, struct sk_buff *skb)

if (unlikely(msg_mcast(hdr)))
return false;
tsk->oneway = 0;

switch (sk->sk_state) {
case TIPC_CONNECTING:
Expand Down Expand Up @@ -2074,6 +2148,8 @@ static bool tipc_sk_filter_connect(struct tipc_sock *tsk, struct sk_buff *skb)
return true;
return false;
case TIPC_ESTABLISHED:
if (!skb_queue_empty(&sk->sk_write_queue))
tipc_sk_push_backlog(tsk);
/* Accept only connection-based messages sent by peer */
if (likely(con_msg && !err && pport == oport && pnode == onode))
return true;
Expand Down Expand Up @@ -2959,6 +3035,7 @@ static int tipc_setsockopt(struct socket *sock, int lvl, int opt,
case TIPC_SRC_DROPPABLE:
case TIPC_DEST_DROPPABLE:
case TIPC_CONN_TIMEOUT:
case TIPC_NODELAY:
if (ol < sizeof(value))
return -EINVAL;
if (get_user(value, (u32 __user *)ov))
Expand Down Expand Up @@ -3007,6 +3084,10 @@ static int tipc_setsockopt(struct socket *sock, int lvl, int opt,
case TIPC_GROUP_LEAVE:
res = tipc_sk_leave(tsk);
break;
case TIPC_NODELAY:
tsk->nodelay = !!value;
tsk_set_nagle(tsk);
break;
default:
res = -EINVAL;
}
Expand Down

0 comments on commit c0bceb9

Please sign in to comment.