Skip to content

Commit

Permalink
rxrpc: Use MSG_WAITALL to tell sendmsg() to temporarily ignore signals
Browse files Browse the repository at this point in the history
Make AF_RXRPC accept MSG_WAITALL as a flag to sendmsg() to tell it to
ignore signals whilst loading up the message queue, provided progress is
being made in emptying the queue at the other side.

Progress is defined as the base of the transmit window having being
advanced within 2 RTT periods.  If the period is exceeded with no progress,
sendmsg() will return anyway, indicating how much data has been copied, if
any.

Once the supplied buffer is entirely decanted, the sendmsg() will return.

Signed-off-by: David Howells <dhowells@redhat.com>
  • Loading branch information
David Howells committed Oct 18, 2017
1 parent f4d15fb commit bc5e3a5
Show file tree
Hide file tree
Showing 3 changed files with 119 additions and 31 deletions.
12 changes: 12 additions & 0 deletions Documentation/networking/rxrpc.txt
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,18 @@ Interaction with the user of the RxRPC socket:
nominated by a socket option.


Notes on sendmsg:

(*) MSG_WAITALL can be set to tell sendmsg to ignore signals if the peer is
making progress at accepting packets within a reasonable time such that we
manage to queue up all the data for transmission. This requires the
client to accept at least one packet per 2*RTT time period.

If this isn't set, sendmsg() will return immediately, either returning
EINTR/ERESTARTSYS if nothing was consumed or returning the amount of data
consumed.


Notes on recvmsg:

(*) If there's a sequence of data messages belonging to a particular call on
Expand Down
31 changes: 26 additions & 5 deletions fs/afs/rxrpc.c
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,7 @@ int afs_make_call(struct in_addr *addr, struct afs_call *call, gfp_t gfp,
call->request_size);
msg.msg_control = NULL;
msg.msg_controllen = 0;
msg.msg_flags = (call->send_pages ? MSG_MORE : 0);
msg.msg_flags = MSG_WAITALL | (call->send_pages ? MSG_MORE : 0);

/* We have to change the state *before* sending the last packet as
* rxrpc might give us the reply before it returns from sending the
Expand Down Expand Up @@ -538,15 +538,26 @@ static void afs_deliver_to_call(struct afs_call *call)
*/
static int afs_wait_for_call_to_complete(struct afs_call *call)
{
signed long rtt2, timeout;
int ret;
u64 rtt;
u32 life, last_life;

DECLARE_WAITQUEUE(myself, current);

_enter("");

rtt = rxrpc_kernel_get_rtt(afs_socket, call->rxcall);
rtt2 = nsecs_to_jiffies64(rtt) * 2;
if (rtt2 < 2)
rtt2 = 2;

timeout = rtt2;
last_life = rxrpc_kernel_check_life(afs_socket, call->rxcall);

add_wait_queue(&call->waitq, &myself);
for (;;) {
set_current_state(TASK_INTERRUPTIBLE);
set_current_state(TASK_UNINTERRUPTIBLE);

/* deliver any messages that are in the queue */
if (call->state < AFS_CALL_COMPLETE && call->need_attention) {
Expand All @@ -556,10 +567,20 @@ static int afs_wait_for_call_to_complete(struct afs_call *call)
continue;
}

if (call->state == AFS_CALL_COMPLETE ||
signal_pending(current))
if (call->state == AFS_CALL_COMPLETE)
break;
schedule();

life = rxrpc_kernel_check_life(afs_socket, call->rxcall);
if (timeout == 0 &&
life == last_life && signal_pending(current))
break;

if (life != last_life) {
timeout = rtt2;
last_life = life;
}

timeout = schedule_timeout(timeout);
}

remove_wait_queue(&call->waitq, &myself);
Expand Down
107 changes: 81 additions & 26 deletions net/rxrpc/sendmsg.c
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,87 @@ struct rxrpc_send_params {
bool upgrade; /* If the connection is upgradeable */
};

/*
* Wait for space to appear in the Tx queue or a signal to occur.
*/
static int rxrpc_wait_for_tx_window_intr(struct rxrpc_sock *rx,
struct rxrpc_call *call,
long *timeo)
{
for (;;) {
set_current_state(TASK_INTERRUPTIBLE);
if (call->tx_top - call->tx_hard_ack <
min_t(unsigned int, call->tx_winsize,
call->cong_cwnd + call->cong_extra))
return 0;

if (call->state >= RXRPC_CALL_COMPLETE)
return call->error;

if (signal_pending(current))
return sock_intr_errno(*timeo);

trace_rxrpc_transmit(call, rxrpc_transmit_wait);
mutex_unlock(&call->user_mutex);
*timeo = schedule_timeout(*timeo);
if (mutex_lock_interruptible(&call->user_mutex) < 0)
return sock_intr_errno(*timeo);
}
}

/*
* Wait for space to appear in the Tx queue uninterruptibly, but with
* a timeout of 2*RTT if no progress was made and a signal occurred.
*/
static int rxrpc_wait_for_tx_window_nonintr(struct rxrpc_sock *rx,
struct rxrpc_call *call)
{
rxrpc_seq_t tx_start, tx_win;
signed long rtt2, timeout;
u64 rtt;

rtt = READ_ONCE(call->peer->rtt);
rtt2 = nsecs_to_jiffies64(rtt) * 2;
if (rtt2 < 1)
rtt2 = 1;

timeout = rtt2;
tx_start = READ_ONCE(call->tx_hard_ack);

for (;;) {
set_current_state(TASK_UNINTERRUPTIBLE);

tx_win = READ_ONCE(call->tx_hard_ack);
if (call->tx_top - tx_win <
min_t(unsigned int, call->tx_winsize,
call->cong_cwnd + call->cong_extra))
return 0;

if (call->state >= RXRPC_CALL_COMPLETE)
return call->error;

if (timeout == 0 &&
tx_win == tx_start && signal_pending(current))
return -EINTR;

if (tx_win != tx_start) {
timeout = rtt2;
tx_start = tx_win;
}

trace_rxrpc_transmit(call, rxrpc_transmit_wait);
timeout = schedule_timeout(timeout);
}
}

/*
* wait for space to appear in the transmit/ACK window
* - caller holds the socket locked
*/
static int rxrpc_wait_for_tx_window(struct rxrpc_sock *rx,
struct rxrpc_call *call,
long *timeo)
long *timeo,
bool waitall)
{
DECLARE_WAITQUEUE(myself, current);
int ret;
Expand All @@ -53,30 +127,10 @@ static int rxrpc_wait_for_tx_window(struct rxrpc_sock *rx,

add_wait_queue(&call->waitq, &myself);

for (;;) {
set_current_state(TASK_INTERRUPTIBLE);
ret = 0;
if (call->tx_top - call->tx_hard_ack <
min_t(unsigned int, call->tx_winsize,
call->cong_cwnd + call->cong_extra))
break;
if (call->state >= RXRPC_CALL_COMPLETE) {
ret = call->error;
break;
}
if (signal_pending(current)) {
ret = sock_intr_errno(*timeo);
break;
}

trace_rxrpc_transmit(call, rxrpc_transmit_wait);
mutex_unlock(&call->user_mutex);
*timeo = schedule_timeout(*timeo);
if (mutex_lock_interruptible(&call->user_mutex) < 0) {
ret = sock_intr_errno(*timeo);
break;
}
}
if (waitall)
ret = rxrpc_wait_for_tx_window_nonintr(rx, call);
else
ret = rxrpc_wait_for_tx_window_intr(rx, call, timeo);

remove_wait_queue(&call->waitq, &myself);
set_current_state(TASK_RUNNING);
Expand Down Expand Up @@ -254,7 +308,8 @@ static int rxrpc_send_data(struct rxrpc_sock *rx,
if (msg->msg_flags & MSG_DONTWAIT)
goto maybe_error;
ret = rxrpc_wait_for_tx_window(rx, call,
&timeo);
&timeo,
msg->msg_flags & MSG_WAITALL);
if (ret < 0)
goto maybe_error;
}
Expand Down

0 comments on commit bc5e3a5

Please sign in to comment.