Skip to content

Commit

Permalink
Merge branch 'RDS-zerocopy-support'
Browse files Browse the repository at this point in the history
Sowmini Varadhan says:

====================
RDS: zerocopy support

This is version 3 of the series, following up on review comments for
 http://patchwork.ozlabs.org/project/netdev/list/?series=28530

Review comments addressed
Patch 4
  - fix fragile use of skb->cb[], do not set ee_code incorrectly.
Patch 5:
  - remove needless bzero of skb->cb[], consolidate err cleanup

A brief overview of this feature follows.

This patch series provides support for MSG_ZERCOCOPY
on a PF_RDS socket based on the APIs and infrastructure added
by Commit f214f91 ("tcp: enable MSG_ZEROCOPY")

For single threaded rds-stress testing using rds-tcp with the
ixgbe driver using 1M message sizes (-a 1M -q 1M) preliminary
results show that  there is a significant reduction in latency: about
90 usec with zerocopy, compared with 200 usec without zerocopy.

This patchset modifies the above for zerocopy in the following manner.
- if the MSG_ZEROCOPY flag is specified with rds_sendmsg(), and,
- if the SO_ZEROCOPY  socket option has been set on the PF_RDS socket,
application pages sent down with rds_sendmsg are pinned. The pinning
uses the accounting infrastructure added by a91dbff ("sock: ulimit
on MSG_ZEROCOPY pages"). The message is unpinned when all references
to the message go down to 0, and the message is freed by rds_message_purge.

A multithreaded application using this infrastructure must send down
a unique 32 bit cookie as ancillary data with each sendmsg invocation.
The format of this ancillary data is described in Patch 5 of the series.
The cookie is passed up to the application on the sk_error_queue when
the message is unpinned, indicating to the application that it is now
safe to free/reuse the message buffer. The details of the completion
notification are provided in Patch 4 of this series.
====================

Signed-off-by: David S. Miller <davem@davemloft.net>
  • Loading branch information
David S. Miller committed Feb 16, 2018
2 parents ee99b2d + dfb8434 commit 80c6d2b
Show file tree
Hide file tree
Showing 11 changed files with 339 additions and 35 deletions.
3 changes: 3 additions & 0 deletions include/linux/skbuff.h
Original file line number Diff line number Diff line change
Expand Up @@ -466,6 +466,9 @@ struct ubuf_info {

#define skb_uarg(SKB) ((struct ubuf_info *)(skb_shinfo(SKB)->destructor_arg))

int mm_account_pinned_pages(struct mmpin *mmp, size_t size);
void mm_unaccount_pinned_pages(struct mmpin *mmp);

struct ubuf_info *sock_zerocopy_alloc(struct sock *sk, size_t size);
struct ubuf_info *sock_zerocopy_realloc(struct sock *sk, size_t size,
struct ubuf_info *uarg);
Expand Down
2 changes: 2 additions & 0 deletions include/uapi/linux/errqueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@ struct sock_extended_err {
#define SO_EE_ORIGIN_ICMP6 3
#define SO_EE_ORIGIN_TXSTATUS 4
#define SO_EE_ORIGIN_ZEROCOPY 5
#define SO_EE_ORIGIN_ZCOOKIE 6
#define SO_EE_ORIGIN_TIMESTAMPING SO_EE_ORIGIN_TXSTATUS

#define SO_EE_OFFENDER(ee) ((struct sockaddr*)((ee)+1))

#define SO_EE_CODE_ZEROCOPY_COPIED 1
#define SO_EE_ORIGIN_MAX_ZCOOKIES 8

/**
* struct scm_timestamping - timestamps exposed through cmsg
Expand Down
1 change: 1 addition & 0 deletions include/uapi/linux/rds.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@
#define RDS_CMSG_MASKED_ATOMIC_FADD 8
#define RDS_CMSG_MASKED_ATOMIC_CSWP 9
#define RDS_CMSG_RXPATH_LATENCY 11
#define RDS_CMSG_ZCOPY_COOKIE 12

#define RDS_INFO_FIRST 10000
#define RDS_INFO_COUNTERS 10000
Expand Down
6 changes: 4 additions & 2 deletions net/core/skbuff.c
Original file line number Diff line number Diff line change
Expand Up @@ -890,7 +890,7 @@ struct sk_buff *skb_morph(struct sk_buff *dst, struct sk_buff *src)
}
EXPORT_SYMBOL_GPL(skb_morph);

static int mm_account_pinned_pages(struct mmpin *mmp, size_t size)
int mm_account_pinned_pages(struct mmpin *mmp, size_t size)
{
unsigned long max_pg, num_pg, new_pg, old_pg;
struct user_struct *user;
Expand Down Expand Up @@ -919,14 +919,16 @@ static int mm_account_pinned_pages(struct mmpin *mmp, size_t size)

return 0;
}
EXPORT_SYMBOL_GPL(mm_account_pinned_pages);

static void mm_unaccount_pinned_pages(struct mmpin *mmp)
void mm_unaccount_pinned_pages(struct mmpin *mmp)
{
if (mmp->user) {
atomic_long_sub(mmp->num_pg, &mmp->user->locked_vm);
free_uid(mmp->user);
}
}
EXPORT_SYMBOL_GPL(mm_unaccount_pinned_pages);

struct ubuf_info *sock_zerocopy_alloc(struct sock *sk, size_t size)
{
Expand Down
25 changes: 14 additions & 11 deletions net/core/sock.c
Original file line number Diff line number Diff line change
Expand Up @@ -1049,18 +1049,21 @@ int sock_setsockopt(struct socket *sock, int level, int optname,
break;

case SO_ZEROCOPY:
if (sk->sk_family != PF_INET && sk->sk_family != PF_INET6)
if (sk->sk_family == PF_INET || sk->sk_family == PF_INET6) {
if (sk->sk_protocol != IPPROTO_TCP)
ret = -ENOTSUPP;
else if (sk->sk_state != TCP_CLOSE)
ret = -EBUSY;
} else if (sk->sk_family != PF_RDS) {
ret = -ENOTSUPP;
else if (sk->sk_protocol != IPPROTO_TCP)
ret = -ENOTSUPP;
else if (sk->sk_state != TCP_CLOSE)
ret = -EBUSY;
else if (val < 0 || val > 1)
ret = -EINVAL;
else
sock_valbool_flag(sk, SOCK_ZEROCOPY, valbool);
break;

}
if (!ret) {
if (val < 0 || val > 1)
ret = -EINVAL;
else
sock_valbool_flag(sk, SOCK_ZEROCOPY, valbool);
break;
}
default:
ret = -ENOPROTOOPT;
break;
Expand Down
2 changes: 2 additions & 0 deletions net/rds/af_rds.c
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,8 @@ static __poll_t rds_poll(struct file *file, struct socket *sock,
mask |= (EPOLLIN | EPOLLRDNORM);
if (rs->rs_snd_bytes < rds_sk_sndbuf(rs))
mask |= (EPOLLOUT | EPOLLWRNORM);
if (sk->sk_err || !skb_queue_empty(&sk->sk_error_queue))
mask |= POLLERR;
read_unlock_irqrestore(&rs->rs_recv_lock, flags);

/* clear state any time we wake a seen-congested socket */
Expand Down
132 changes: 128 additions & 4 deletions net/rds/message.c
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@
#include <linux/kernel.h>
#include <linux/slab.h>
#include <linux/export.h>
#include <linux/skbuff.h>
#include <linux/list.h>
#include <linux/errqueue.h>

#include "rds.h"

Expand All @@ -53,20 +56,92 @@ void rds_message_addref(struct rds_message *rm)
}
EXPORT_SYMBOL_GPL(rds_message_addref);

static inline bool skb_zcookie_add(struct sk_buff *skb, u32 cookie)
{
struct sock_exterr_skb *serr = SKB_EXT_ERR(skb);
int ncookies;
u32 *ptr;

if (serr->ee.ee_origin != SO_EE_ORIGIN_ZCOOKIE)
return false;
ncookies = serr->ee.ee_data;
if (ncookies == SO_EE_ORIGIN_MAX_ZCOOKIES)
return false;
ptr = skb_put(skb, sizeof(u32));
*ptr = cookie;
serr->ee.ee_data = ++ncookies;
return true;
}

static void rds_rm_zerocopy_callback(struct rds_sock *rs,
struct rds_znotifier *znotif)
{
struct sock *sk = rds_rs_to_sk(rs);
struct sk_buff *skb, *tail;
struct sock_exterr_skb *serr;
unsigned long flags;
struct sk_buff_head *q;
u32 cookie = znotif->z_cookie;

q = &sk->sk_error_queue;
spin_lock_irqsave(&q->lock, flags);
tail = skb_peek_tail(q);

if (tail && skb_zcookie_add(tail, cookie)) {
spin_unlock_irqrestore(&q->lock, flags);
mm_unaccount_pinned_pages(&znotif->z_mmp);
consume_skb(rds_skb_from_znotifier(znotif));
sk->sk_error_report(sk);
return;
}

skb = rds_skb_from_znotifier(znotif);
serr = SKB_EXT_ERR(skb);
memset(&serr->ee, 0, sizeof(serr->ee));
serr->ee.ee_errno = 0;
serr->ee.ee_origin = SO_EE_ORIGIN_ZCOOKIE;
serr->ee.ee_info = 0;
WARN_ON(!skb_zcookie_add(skb, cookie));

__skb_queue_tail(q, skb);

spin_unlock_irqrestore(&q->lock, flags);
sk->sk_error_report(sk);

mm_unaccount_pinned_pages(&znotif->z_mmp);
}

/*
* This relies on dma_map_sg() not touching sg[].page during merging.
*/
static void rds_message_purge(struct rds_message *rm)
{
unsigned long i;
unsigned long i, flags;
bool zcopy = false;

if (unlikely(test_bit(RDS_MSG_PAGEVEC, &rm->m_flags)))
return;

spin_lock_irqsave(&rm->m_rs_lock, flags);
if (rm->m_rs) {
struct rds_sock *rs = rm->m_rs;

if (rm->data.op_mmp_znotifier) {
zcopy = true;
rds_rm_zerocopy_callback(rs, rm->data.op_mmp_znotifier);
rm->data.op_mmp_znotifier = NULL;
}
sock_put(rds_rs_to_sk(rs));
rm->m_rs = NULL;
}
spin_unlock_irqrestore(&rm->m_rs_lock, flags);

for (i = 0; i < rm->data.op_nents; i++) {
rdsdebug("putting data page %p\n", (void *)sg_page(&rm->data.op_sg[i]));
/* XXX will have to put_page for page refs */
__free_page(sg_page(&rm->data.op_sg[i]));
if (!zcopy)
__free_page(sg_page(&rm->data.op_sg[i]));
else
put_page(sg_page(&rm->data.op_sg[i]));
}
rm->data.op_nents = 0;

Expand Down Expand Up @@ -266,12 +341,14 @@ struct rds_message *rds_message_map_pages(unsigned long *page_addrs, unsigned in
return rm;
}

int rds_message_copy_from_user(struct rds_message *rm, struct iov_iter *from)
int rds_message_copy_from_user(struct rds_message *rm, struct iov_iter *from,
bool zcopy)
{
unsigned long to_copy, nbytes;
unsigned long sg_off;
struct scatterlist *sg;
int ret = 0;
int length = iov_iter_count(from);

rm->m_inc.i_hdr.h_len = cpu_to_be32(iov_iter_count(from));

Expand All @@ -281,6 +358,53 @@ int rds_message_copy_from_user(struct rds_message *rm, struct iov_iter *from)
sg = rm->data.op_sg;
sg_off = 0; /* Dear gcc, sg->page will be null from kzalloc. */

if (zcopy) {
int total_copied = 0;
struct sk_buff *skb;

skb = alloc_skb(SO_EE_ORIGIN_MAX_ZCOOKIES * sizeof(u32),
GFP_KERNEL);
if (!skb)
return -ENOMEM;
rm->data.op_mmp_znotifier = RDS_ZCOPY_SKB(skb);
if (mm_account_pinned_pages(&rm->data.op_mmp_znotifier->z_mmp,
length)) {
ret = -ENOMEM;
goto err;
}
while (iov_iter_count(from)) {
struct page *pages;
size_t start;
ssize_t copied;

copied = iov_iter_get_pages(from, &pages, PAGE_SIZE,
1, &start);
if (copied < 0) {
struct mmpin *mmp;
int i;

for (i = 0; i < rm->data.op_nents; i++)
put_page(sg_page(&rm->data.op_sg[i]));
mmp = &rm->data.op_mmp_znotifier->z_mmp;
mm_unaccount_pinned_pages(mmp);
ret = -EFAULT;
goto err;
}
total_copied += copied;
iov_iter_advance(from, copied);
length -= copied;
sg_set_page(sg, pages, copied, start);
rm->data.op_nents++;
sg++;
}
WARN_ON_ONCE(length != 0);
return ret;
err:
consume_skb(skb);
rm->data.op_mmp_znotifier = NULL;
return ret;
} /* zcopy */

while (iov_iter_count(from)) {
if (!sg_page(sg)) {
ret = rds_page_remainder_alloc(sg, iov_iter_count(from),
Expand Down
17 changes: 16 additions & 1 deletion net/rds/rds.h
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,19 @@ static inline u32 rds_rdma_cookie_offset(rds_rdma_cookie_t cookie)
#define RDS_MSG_PAGEVEC 7
#define RDS_MSG_FLUSH 8

struct rds_znotifier {
struct list_head z_list;
struct mmpin z_mmp;
u32 z_cookie;
};

#define RDS_ZCOPY_SKB(__skb) ((struct rds_znotifier *)&((__skb)->cb[0]))

static inline struct sk_buff *rds_skb_from_znotifier(struct rds_znotifier *z)
{
return container_of((void *)z, struct sk_buff, cb);
}

struct rds_message {
refcount_t m_refcount;
struct list_head m_sock_item;
Expand Down Expand Up @@ -436,6 +449,7 @@ struct rds_message {
unsigned int op_count;
unsigned int op_dmasg;
unsigned int op_dmaoff;
struct rds_znotifier *op_mmp_znotifier;
struct scatterlist *op_sg;
} data;
};
Expand Down Expand Up @@ -771,7 +785,8 @@ rds_conn_connecting(struct rds_connection *conn)
/* message.c */
struct rds_message *rds_message_alloc(unsigned int nents, gfp_t gfp);
struct scatterlist *rds_message_alloc_sgs(struct rds_message *rm, int nents);
int rds_message_copy_from_user(struct rds_message *rm, struct iov_iter *from);
int rds_message_copy_from_user(struct rds_message *rm, struct iov_iter *from,
bool zcopy);
struct rds_message *rds_message_map_pages(unsigned long *page_addrs, unsigned int total_len);
void rds_message_populate_header(struct rds_header *hdr, __be16 sport,
__be16 dport, u64 seq);
Expand Down
2 changes: 2 additions & 0 deletions net/rds/recv.c
Original file line number Diff line number Diff line change
Expand Up @@ -594,6 +594,8 @@ int rds_recvmsg(struct socket *sock, struct msghdr *msg, size_t size,

if (msg_flags & MSG_OOB)
goto out;
if (msg_flags & MSG_ERRQUEUE)
return sock_recv_errqueue(sk, msg, size, SOL_IP, IP_RECVERR);

while (1) {
/* If there are pending notifications, do those - and nothing else */
Expand Down
Loading

0 comments on commit 80c6d2b

Please sign in to comment.