Skip to content

Commit

Permalink
rds: deliver zerocopy completion notification with data
Browse files Browse the repository at this point in the history
This commit is an optimization over commit 01883ed
("rds: support for zcopy completion notification") for PF_RDS sockets.

RDS applications are predominantly request-response transactions, so
it is more efficient to reduce the number of system calls and have
zerocopy completion notification delivered as ancillary data on the
POLLIN channel.

Cookies are passed up as ancillary data (at level SOL_RDS) in a
struct rds_zcopy_cookies when the returned value of recvmsg() is
greater than, or equal to, 0. A max of RDS_MAX_ZCOOKIES may be passed
with each message.

This commit removes support for zerocopy completion notification on
MSG_ERRQUEUE for PF_RDS sockets.

Signed-off-by: Sowmini Varadhan <sowmini.varadhan@oracle.com>
Acked-by: Willem de Bruijn <willemb@google.com>
Acked-by: Santosh Shilimkar <santosh.shilimkar@oracle.com>
Signed-off-by: David S. Miller <davem@davemloft.net>
  • Loading branch information
Sowmini Varadhan authored and David S. Miller committed Feb 27, 2018
1 parent 67490e3 commit 401910d
Show file tree
Hide file tree
Showing 6 changed files with 60 additions and 27 deletions.
2 changes: 0 additions & 2 deletions include/uapi/linux/errqueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,11 @@ struct sock_extended_err {
#define SO_EE_ORIGIN_ICMP6 3
#define SO_EE_ORIGIN_TXSTATUS 4
#define SO_EE_ORIGIN_ZEROCOPY 5
#define SO_EE_ORIGIN_ZCOOKIE 6
#define SO_EE_ORIGIN_TIMESTAMPING SO_EE_ORIGIN_TXSTATUS

#define SO_EE_OFFENDER(ee) ((struct sockaddr*)((ee)+1))

#define SO_EE_CODE_ZEROCOPY_COPIED 1
#define SO_EE_ORIGIN_MAX_ZCOOKIES 8

/**
* struct scm_timestamping - timestamps exposed through cmsg
Expand Down
7 changes: 7 additions & 0 deletions include/uapi/linux/rds.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@
#define RDS_CMSG_MASKED_ATOMIC_CSWP 9
#define RDS_CMSG_RXPATH_LATENCY 11
#define RDS_CMSG_ZCOPY_COOKIE 12
#define RDS_CMSG_ZCOPY_COMPLETION 13

#define RDS_INFO_FIRST 10000
#define RDS_INFO_COUNTERS 10000
Expand Down Expand Up @@ -317,6 +318,12 @@ struct rds_rdma_notify {
#define RDS_RDMA_DROPPED 3
#define RDS_RDMA_OTHER_ERROR 4

#define RDS_MAX_ZCOOKIES 8
struct rds_zcopy_cookies {
__u32 num;
__u32 cookies[RDS_MAX_ZCOOKIES];
};

/*
* Common set of flags for all RDMA related structs
*/
Expand Down
7 changes: 5 additions & 2 deletions net/rds/af_rds.c
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ static int rds_release(struct socket *sock)
rds_send_drop_to(rs, NULL);
rds_rdma_drop_keys(rs);
rds_notify_queue_get(rs, NULL);
__skb_queue_purge(&rs->rs_zcookie_queue);

spin_lock_bh(&rds_sock_lock);
list_del_init(&rs->rs_item);
Expand Down Expand Up @@ -144,7 +145,7 @@ static int rds_getname(struct socket *sock, struct sockaddr *uaddr,
* - to signal that a previously congested destination may have become
* uncongested
* - A notification has been queued to the socket (this can be a congestion
* update, or a RDMA completion).
* update, or a RDMA completion, or a MSG_ZEROCOPY completion).
*
* EPOLLOUT is asserted if there is room on the send queue. This does not mean
* however, that the next sendmsg() call will succeed. If the application tries
Expand Down Expand Up @@ -178,7 +179,8 @@ static __poll_t rds_poll(struct file *file, struct socket *sock,
spin_unlock(&rs->rs_lock);
}
if (!list_empty(&rs->rs_recv_queue) ||
!list_empty(&rs->rs_notify_queue))
!list_empty(&rs->rs_notify_queue) ||
!skb_queue_empty(&rs->rs_zcookie_queue))
mask |= (EPOLLIN | EPOLLRDNORM);
if (rs->rs_snd_bytes < rds_sk_sndbuf(rs))
mask |= (EPOLLOUT | EPOLLWRNORM);
Expand Down Expand Up @@ -513,6 +515,7 @@ static int __rds_create(struct socket *sock, struct sock *sk, int protocol)
INIT_LIST_HEAD(&rs->rs_recv_queue);
INIT_LIST_HEAD(&rs->rs_notify_queue);
INIT_LIST_HEAD(&rs->rs_cong_list);
skb_queue_head_init(&rs->rs_zcookie_queue);
spin_lock_init(&rs->rs_rdma_lock);
rs->rs_rdma_keys = RB_ROOT;
rs->rs_rx_traces = 0;
Expand Down
38 changes: 16 additions & 22 deletions net/rds/message.c
Original file line number Diff line number Diff line change
Expand Up @@ -58,55 +58,46 @@ EXPORT_SYMBOL_GPL(rds_message_addref);

static inline bool skb_zcookie_add(struct sk_buff *skb, u32 cookie)
{
struct sock_exterr_skb *serr = SKB_EXT_ERR(skb);
int ncookies;
u32 *ptr;
struct rds_zcopy_cookies *ck = (struct rds_zcopy_cookies *)skb->cb;
int ncookies = ck->num;

if (serr->ee.ee_origin != SO_EE_ORIGIN_ZCOOKIE)
if (ncookies == RDS_MAX_ZCOOKIES)
return false;
ncookies = serr->ee.ee_data;
if (ncookies == SO_EE_ORIGIN_MAX_ZCOOKIES)
return false;
ptr = skb_put(skb, sizeof(u32));
*ptr = cookie;
serr->ee.ee_data = ++ncookies;
ck->cookies[ncookies] = cookie;
ck->num = ++ncookies;
return true;
}

static void rds_rm_zerocopy_callback(struct rds_sock *rs,
struct rds_znotifier *znotif)
{
struct sock *sk = rds_rs_to_sk(rs);
struct sk_buff *skb, *tail;
struct sock_exterr_skb *serr;
unsigned long flags;
struct sk_buff_head *q;
u32 cookie = znotif->z_cookie;
struct rds_zcopy_cookies *ck;

q = &sk->sk_error_queue;
q = &rs->rs_zcookie_queue;
spin_lock_irqsave(&q->lock, flags);
tail = skb_peek_tail(q);

if (tail && skb_zcookie_add(tail, cookie)) {
spin_unlock_irqrestore(&q->lock, flags);
mm_unaccount_pinned_pages(&znotif->z_mmp);
consume_skb(rds_skb_from_znotifier(znotif));
sk->sk_error_report(sk);
/* caller invokes rds_wake_sk_sleep() */
return;
}

skb = rds_skb_from_znotifier(znotif);
serr = SKB_EXT_ERR(skb);
memset(&serr->ee, 0, sizeof(serr->ee));
serr->ee.ee_errno = 0;
serr->ee.ee_origin = SO_EE_ORIGIN_ZCOOKIE;
serr->ee.ee_info = 0;
ck = (struct rds_zcopy_cookies *)skb->cb;
memset(ck, 0, sizeof(*ck));
WARN_ON(!skb_zcookie_add(skb, cookie));

__skb_queue_tail(q, skb);

spin_unlock_irqrestore(&q->lock, flags);
sk->sk_error_report(sk);
/* caller invokes rds_wake_sk_sleep() */

mm_unaccount_pinned_pages(&znotif->z_mmp);
}
Expand All @@ -129,6 +120,7 @@ static void rds_message_purge(struct rds_message *rm)
if (rm->data.op_mmp_znotifier) {
zcopy = true;
rds_rm_zerocopy_callback(rs, rm->data.op_mmp_znotifier);
rds_wake_sk_sleep(rs);
rm->data.op_mmp_znotifier = NULL;
}
sock_put(rds_rs_to_sk(rs));
Expand Down Expand Up @@ -362,10 +354,12 @@ int rds_message_copy_from_user(struct rds_message *rm, struct iov_iter *from,
int total_copied = 0;
struct sk_buff *skb;

skb = alloc_skb(SO_EE_ORIGIN_MAX_ZCOOKIES * sizeof(u32),
GFP_KERNEL);
skb = alloc_skb(0, GFP_KERNEL);
if (!skb)
return -ENOMEM;
BUILD_BUG_ON(sizeof(skb->cb) <
max_t(int, sizeof(struct rds_znotifier),
sizeof(struct rds_zcopy_cookies)));
rm->data.op_mmp_znotifier = RDS_ZCOPY_SKB(skb);
if (mm_account_pinned_pages(&rm->data.op_mmp_znotifier->z_mmp,
length)) {
Expand Down
2 changes: 2 additions & 0 deletions net/rds/rds.h
Original file line number Diff line number Diff line change
Expand Up @@ -603,6 +603,8 @@ struct rds_sock {
/* Socket receive path trace points*/
u8 rs_rx_traces;
u8 rs_rx_trace[RDS_MSG_RX_DGRAM_TRACE_MAX];

struct sk_buff_head rs_zcookie_queue;
};

static inline struct rds_sock *rds_sk_to_rs(const struct sock *sk)
Expand Down
31 changes: 30 additions & 1 deletion net/rds/recv.c
Original file line number Diff line number Diff line change
Expand Up @@ -577,6 +577,32 @@ static int rds_cmsg_recv(struct rds_incoming *inc, struct msghdr *msg,
return ret;
}

static bool rds_recvmsg_zcookie(struct rds_sock *rs, struct msghdr *msg)
{
struct sk_buff *skb;
struct sk_buff_head *q = &rs->rs_zcookie_queue;
struct rds_zcopy_cookies *done;

if (!msg->msg_control)
return false;

if (!sock_flag(rds_rs_to_sk(rs), SOCK_ZEROCOPY) ||
msg->msg_controllen < CMSG_SPACE(sizeof(*done)))
return false;

skb = skb_dequeue(q);
if (!skb)
return false;
done = (struct rds_zcopy_cookies *)skb->cb;
if (put_cmsg(msg, SOL_RDS, RDS_CMSG_ZCOPY_COMPLETION, sizeof(*done),
done)) {
skb_queue_head(q, skb);
return false;
}
consume_skb(skb);
return true;
}

int rds_recvmsg(struct socket *sock, struct msghdr *msg, size_t size,
int msg_flags)
{
Expand Down Expand Up @@ -611,7 +637,9 @@ int rds_recvmsg(struct socket *sock, struct msghdr *msg, size_t size,

if (!rds_next_incoming(rs, &inc)) {
if (nonblock) {
ret = -EAGAIN;
bool reaped = rds_recvmsg_zcookie(rs, msg);

ret = reaped ? 0 : -EAGAIN;
break;
}

Expand Down Expand Up @@ -660,6 +688,7 @@ int rds_recvmsg(struct socket *sock, struct msghdr *msg, size_t size,
ret = -EFAULT;
goto out;
}
rds_recvmsg_zcookie(rs, msg);

rds_stats_inc(s_recv_delivered);

Expand Down

0 comments on commit 401910d

Please sign in to comment.