Skip to content

Commit

Permalink
rpc: return sent and err from xs_sendpages()
Browse files Browse the repository at this point in the history
If an error is returned after the first bits of a packet have already been
successfully queued, xs_sendpages() will return a positive 'int' value
indicating success. Callers seem to treat this as -EAGAIN.

However, there are cases where its not a question of waiting for the write
queue to drain. For example, when there is an iptables rule dropping packets
to the destination, the lower level code can return -EPERM only after parts
of the packet have been successfully queued. In this case, we can end up
continuously retrying resulting in a kernel softlockup.

This patch is intended to make no changes in behavior but is in preparation for
subsequent patches that can make decisions based on both on the number of bytes
sent by xs_sendpages() and any errors that may have be returned.

Signed-off-by: Jason Baron <jbaron@akamai.com>
Signed-off-by: Trond Myklebust <trond.myklebust@primarydata.com>
  • Loading branch information
Jason Baron authored and Trond Myklebust committed Sep 25, 2014
1 parent 173b3af commit f279cd0
Showing 1 changed file with 42 additions and 39 deletions.
81 changes: 42 additions & 39 deletions net/sunrpc/xprtsock.c
Original file line number Diff line number Diff line change
Expand Up @@ -399,13 +399,13 @@ static int xs_send_kvec(struct socket *sock, struct sockaddr *addr, int addrlen,
return kernel_sendmsg(sock, &msg, NULL, 0, 0);
}

static int xs_send_pagedata(struct socket *sock, struct xdr_buf *xdr, unsigned int base, int more, bool zerocopy)
static int xs_send_pagedata(struct socket *sock, struct xdr_buf *xdr, unsigned int base, int more, bool zerocopy, int *sent_p)
{
ssize_t (*do_sendpage)(struct socket *sock, struct page *page,
int offset, size_t size, int flags);
struct page **ppage;
unsigned int remainder;
int err, sent = 0;
int err;

remainder = xdr->page_len - base;
base += xdr->page_base;
Expand All @@ -424,15 +424,15 @@ static int xs_send_pagedata(struct socket *sock, struct xdr_buf *xdr, unsigned i
err = do_sendpage(sock, *ppage, base, len, flags);
if (remainder == 0 || err != len)
break;
sent += err;
*sent_p += err;
ppage++;
base = 0;
}
if (sent == 0)
return err;
if (err > 0)
sent += err;
return sent;
if (err > 0) {
*sent_p += err;
err = 0;
}
return err;
}

/**
Expand All @@ -443,12 +443,14 @@ static int xs_send_pagedata(struct socket *sock, struct xdr_buf *xdr, unsigned i
* @xdr: buffer containing this request
* @base: starting position in the buffer
* @zerocopy: true if it is safe to use sendpage()
* @sent_p: return the total number of bytes successfully queued for sending
*
*/
static int xs_sendpages(struct socket *sock, struct sockaddr *addr, int addrlen, struct xdr_buf *xdr, unsigned int base, bool zerocopy)
static int xs_sendpages(struct socket *sock, struct sockaddr *addr, int addrlen, struct xdr_buf *xdr, unsigned int base, bool zerocopy, int *sent_p)
{
unsigned int remainder = xdr->len - base;
int err, sent = 0;
int err = 0;
int sent = 0;

if (unlikely(!sock))
return -ENOTSOCK;
Expand All @@ -465,31 +467,31 @@ static int xs_sendpages(struct socket *sock, struct sockaddr *addr, int addrlen,
err = xs_send_kvec(sock, addr, addrlen, &xdr->head[0], base, remainder != 0);
if (remainder == 0 || err != len)
goto out;
sent += err;
*sent_p += err;
base = 0;
} else
base -= xdr->head[0].iov_len;

if (base < xdr->page_len) {
unsigned int len = xdr->page_len - base;
remainder -= len;
err = xs_send_pagedata(sock, xdr, base, remainder != 0, zerocopy);
if (remainder == 0 || err != len)
err = xs_send_pagedata(sock, xdr, base, remainder != 0, zerocopy, &sent);
*sent_p += sent;
if (remainder == 0 || sent != len)
goto out;
sent += err;
base = 0;
} else
base -= xdr->page_len;

if (base >= xdr->tail[0].iov_len)
return sent;
return 0;
err = xs_send_kvec(sock, NULL, 0, &xdr->tail[0], base, 0);
out:
if (sent == 0)
return err;
if (err > 0)
sent += err;
return sent;
if (err > 0) {
*sent_p += err;
err = 0;
}
return err;
}

static void xs_nospace_callback(struct rpc_task *task)
Expand Down Expand Up @@ -573,19 +575,20 @@ static int xs_local_send_request(struct rpc_task *task)
container_of(xprt, struct sock_xprt, xprt);
struct xdr_buf *xdr = &req->rq_snd_buf;
int status;
int sent = 0;

xs_encode_stream_record_marker(&req->rq_snd_buf);

xs_pktdump("packet data:",
req->rq_svec->iov_base, req->rq_svec->iov_len);

status = xs_sendpages(transport->sock, NULL, 0,
xdr, req->rq_bytes_sent, true);
status = xs_sendpages(transport->sock, NULL, 0, xdr, req->rq_bytes_sent,
true, &sent);
dprintk("RPC: %s(%u) = %d\n",
__func__, xdr->len - req->rq_bytes_sent, status);
if (likely(status >= 0)) {
req->rq_bytes_sent += status;
req->rq_xmit_bytes_sent += status;
if (likely(sent > 0) || status == 0) {
req->rq_bytes_sent += sent;
req->rq_xmit_bytes_sent += sent;
if (likely(req->rq_bytes_sent >= req->rq_slen)) {
req->rq_bytes_sent = 0;
return 0;
Expand Down Expand Up @@ -626,6 +629,7 @@ static int xs_udp_send_request(struct rpc_task *task)
struct rpc_xprt *xprt = req->rq_xprt;
struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
struct xdr_buf *xdr = &req->rq_snd_buf;
int sent = 0;
int status;

xs_pktdump("packet data:",
Expand All @@ -634,17 +638,15 @@ static int xs_udp_send_request(struct rpc_task *task)

if (!xprt_bound(xprt))
return -ENOTCONN;
status = xs_sendpages(transport->sock,
xs_addr(xprt),
xprt->addrlen, xdr,
req->rq_bytes_sent, true);
status = xs_sendpages(transport->sock, xs_addr(xprt), xprt->addrlen,
xdr, req->rq_bytes_sent, true, &sent);

dprintk("RPC: xs_udp_send_request(%u) = %d\n",
xdr->len - req->rq_bytes_sent, status);

if (status >= 0) {
req->rq_xmit_bytes_sent += status;
if (status >= req->rq_slen)
if (sent > 0 || status == 0) {
req->rq_xmit_bytes_sent += sent;
if (sent >= req->rq_slen)
return 0;
/* Still some bytes left; set up for a retry later. */
status = -EAGAIN;
Expand Down Expand Up @@ -713,6 +715,7 @@ static int xs_tcp_send_request(struct rpc_task *task)
struct xdr_buf *xdr = &req->rq_snd_buf;
bool zerocopy = true;
int status;
int sent;

xs_encode_stream_record_marker(&req->rq_snd_buf);

Expand All @@ -730,26 +733,26 @@ static int xs_tcp_send_request(struct rpc_task *task)
* to cope with writespace callbacks arriving _after_ we have
* called sendmsg(). */
while (1) {
status = xs_sendpages(transport->sock,
NULL, 0, xdr, req->rq_bytes_sent,
zerocopy);
sent = 0;
status = xs_sendpages(transport->sock, NULL, 0, xdr,
req->rq_bytes_sent, zerocopy, &sent);

dprintk("RPC: xs_tcp_send_request(%u) = %d\n",
xdr->len - req->rq_bytes_sent, status);

if (unlikely(status < 0))
if (unlikely(sent == 0 && status < 0))
break;

/* If we've sent the entire packet, immediately
* reset the count of bytes sent. */
req->rq_bytes_sent += status;
req->rq_xmit_bytes_sent += status;
req->rq_bytes_sent += sent;
req->rq_xmit_bytes_sent += sent;
if (likely(req->rq_bytes_sent >= req->rq_slen)) {
req->rq_bytes_sent = 0;
return 0;
}

if (status != 0)
if (sent != 0)
continue;
status = -EAGAIN;
break;
Expand Down

0 comments on commit f279cd0

Please sign in to comment.