Skip to content

Commit

Permalink
afs: Make {Y,}FS.FetchData an asynchronous operation
Browse files Browse the repository at this point in the history
Make FS.FetchData and YFS.FetchData an asynchronous operation in that the
request is queued in AF_RXRPC and then we return to the caller rather than
waiting.  Processing of the returning packets is then done inline if it's a
synchronous VFS/VM call (readdir, read_folio, sync DIO, prep for write) or
offloaded to a workqueue if asynchronous VM calls (eg. readahead, async
DIO).

This reduces the chain of workqueues invoking workqueues and cuts out some
of the overhead, driving rxrpc data extraction and netfslib read collection
from a thread that's going to block to completion anyway if possible.

The ->done() call op is also split with ->immediate_cancel() handling the
cancellation on failure to begin the call and ->done() handling the rest.
This means that the AFS async FetchData code doesn't try to terminate the
netfs subrequest twice.

Signed-off-by: David Howells <dhowells@redhat.com>
Link: https://lore.kernel.org/r/20241216204124.3752367-26-dhowells@redhat.com
cc: Marc Dionne <marc.dionne@auristor.com>
cc: linux-afs@lists.infradead.org
Signed-off-by: Christian Brauner <brauner@kernel.org>
  • Loading branch information
David Howells authored and Christian Brauner committed Dec 20, 2024
1 parent 9750be9 commit eddf51f
Show file tree
Hide file tree
Showing 9 changed files with 170 additions and 37 deletions.
126 changes: 112 additions & 14 deletions fs/afs/file.c
Original file line number Diff line number Diff line change
Expand Up @@ -225,26 +225,111 @@ static void afs_fetch_data_aborted(struct afs_operation *op)
afs_fetch_data_notify(op);
}

static void afs_fetch_data_put(struct afs_operation *op)
{
op->fetch.subreq->error = afs_op_error(op);
}

const struct afs_operation_ops afs_fetch_data_operation = {
.issue_afs_rpc = afs_fs_fetch_data,
.issue_yfs_rpc = yfs_fs_fetch_data,
.success = afs_fetch_data_success,
.aborted = afs_fetch_data_aborted,
.failed = afs_fetch_data_notify,
.put = afs_fetch_data_put,
};

static void afs_issue_read_call(struct afs_operation *op)
{
op->call_responded = false;
op->call_error = 0;
op->call_abort_code = 0;
if (test_bit(AFS_SERVER_FL_IS_YFS, &op->server->flags))
yfs_fs_fetch_data(op);
else
afs_fs_fetch_data(op);
}

static void afs_end_read(struct afs_operation *op)
{
if (op->call_responded && op->server)
set_bit(AFS_SERVER_FL_RESPONDING, &op->server->flags);

if (!afs_op_error(op))
afs_fetch_data_success(op);
else if (op->cumul_error.aborted)
afs_fetch_data_aborted(op);
else
afs_fetch_data_notify(op);

afs_end_vnode_operation(op);
afs_put_operation(op);
}

/*
* Perform I/O processing on an asynchronous call. The work item carries a ref
* to the call struct that we either need to release or to pass on.
*/
static void afs_read_receive(struct afs_call *call)
{
struct afs_operation *op = call->op;
enum afs_call_state state;

_enter("");

state = READ_ONCE(call->state);
if (state == AFS_CALL_COMPLETE)
return;

while (state < AFS_CALL_COMPLETE && READ_ONCE(call->need_attention)) {
WRITE_ONCE(call->need_attention, false);
afs_deliver_to_call(call);
state = READ_ONCE(call->state);
}

if (state < AFS_CALL_COMPLETE) {
netfs_read_subreq_progress(op->fetch.subreq);
if (rxrpc_kernel_check_life(call->net->socket, call->rxcall))
return;
/* rxrpc terminated the call. */
afs_set_call_complete(call, call->error, call->abort_code);
}

op->call_abort_code = call->abort_code;
op->call_error = call->error;
op->call_responded = call->responded;
op->call = NULL;
call->op = NULL;
afs_put_call(call);

/* If the call failed, then we need to crank the server rotation
* handle and try the next.
*/
if (afs_select_fileserver(op)) {
afs_issue_read_call(op);
return;
}

afs_end_read(op);
}

void afs_fetch_data_async_rx(struct work_struct *work)
{
struct afs_call *call = container_of(work, struct afs_call, async_work);

afs_read_receive(call);
afs_put_call(call);
}

void afs_fetch_data_immediate_cancel(struct afs_call *call)
{
if (call->async) {
afs_get_call(call, afs_call_trace_wake);
if (!queue_work(afs_async_calls, &call->async_work))
afs_deferred_put_call(call);
flush_work(&call->async_work);
}
}

/*
* Fetch file data from the volume.
*/
static void afs_read_worker(struct work_struct *work)
static void afs_issue_read(struct netfs_io_subrequest *subreq)
{
struct netfs_io_subrequest *subreq = container_of(work, struct netfs_io_subrequest, work);
struct afs_operation *op;
struct afs_vnode *vnode = AFS_FS_I(subreq->rreq->inode);
struct key *key = subreq->rreq->netfs_priv;
Expand All @@ -269,13 +354,26 @@ static void afs_read_worker(struct work_struct *work)
op->ops = &afs_fetch_data_operation;

trace_netfs_sreq(subreq, netfs_sreq_trace_submit);
afs_do_sync_operation(op);
}

static void afs_issue_read(struct netfs_io_subrequest *subreq)
{
INIT_WORK(&subreq->work, afs_read_worker);
queue_work(system_long_wq, &subreq->work);
if (subreq->rreq->origin == NETFS_READAHEAD ||
subreq->rreq->iocb) {
op->flags |= AFS_OPERATION_ASYNC;

if (!afs_begin_vnode_operation(op)) {
subreq->error = afs_put_operation(op);
netfs_read_subreq_terminated(subreq);
return;
}

if (!afs_select_fileserver(op)) {
afs_end_read(op);
return;
}

afs_issue_read_call(op);
} else {
afs_do_sync_operation(op);
}
}

static int afs_init_request(struct netfs_io_request *rreq, struct file *file)
Expand Down
2 changes: 1 addition & 1 deletion fs/afs/fs_operation.c
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ bool afs_begin_vnode_operation(struct afs_operation *op)
/*
* Tidy up a filesystem cursor and unlock the vnode.
*/
static void afs_end_vnode_operation(struct afs_operation *op)
void afs_end_vnode_operation(struct afs_operation *op)
{
_enter("");

Expand Down
9 changes: 8 additions & 1 deletion fs/afs/fsclient.c
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,6 @@ static int afs_deliver_fs_fetch_data(struct afs_call *call)
ret = afs_extract_data(call, true);
subreq->transferred += count_before - call->iov_len;
call->remaining -= count_before - call->iov_len;
netfs_read_subreq_progress(subreq);
if (ret < 0)
return ret;

Expand Down Expand Up @@ -409,14 +408,18 @@ static int afs_deliver_fs_fetch_data(struct afs_call *call)
static const struct afs_call_type afs_RXFSFetchData = {
.name = "FS.FetchData",
.op = afs_FS_FetchData,
.async_rx = afs_fetch_data_async_rx,
.deliver = afs_deliver_fs_fetch_data,
.immediate_cancel = afs_fetch_data_immediate_cancel,
.destructor = afs_flat_call_destructor,
};

static const struct afs_call_type afs_RXFSFetchData64 = {
.name = "FS.FetchData64",
.op = afs_FS_FetchData64,
.async_rx = afs_fetch_data_async_rx,
.deliver = afs_deliver_fs_fetch_data,
.immediate_cancel = afs_fetch_data_immediate_cancel,
.destructor = afs_flat_call_destructor,
};

Expand All @@ -436,6 +439,9 @@ static void afs_fs_fetch_data64(struct afs_operation *op)
if (!call)
return afs_op_nomem(op);

if (op->flags & AFS_OPERATION_ASYNC)
call->async = true;

/* marshall the parameters */
bp = call->request;
bp[0] = htonl(FSFETCHDATA64);
Expand Down Expand Up @@ -1730,6 +1736,7 @@ static const struct afs_call_type afs_RXFSGetCapabilities = {
.op = afs_FS_GetCapabilities,
.deliver = afs_deliver_fs_get_capabilities,
.done = afs_fileserver_probe_result,
.immediate_cancel = afs_fileserver_probe_result,
.destructor = afs_fs_get_capabilities_destructor,
};

Expand Down
24 changes: 24 additions & 0 deletions fs/afs/internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -202,11 +202,17 @@ struct afs_call_type {
/* clean up a call */
void (*destructor)(struct afs_call *call);

/* Async receive processing function */
void (*async_rx)(struct work_struct *work);

/* Work function */
void (*work)(struct work_struct *work);

/* Call done function (gets called immediately on success or failure) */
void (*done)(struct afs_call *call);

/* Handle a call being immediately cancelled. */
void (*immediate_cancel)(struct afs_call *call);
};

/*
Expand Down Expand Up @@ -942,6 +948,7 @@ struct afs_operation {
#define AFS_OPERATION_TRIED_ALL 0x0400 /* Set if we've tried all the fileservers */
#define AFS_OPERATION_RETRY_SERVER 0x0800 /* Set if we should retry the current server */
#define AFS_OPERATION_DIR_CONFLICT 0x1000 /* Set if we detected a 3rd-party dir change */
#define AFS_OPERATION_ASYNC 0x2000 /* Set if should run asynchronously */
};

/*
Expand Down Expand Up @@ -1104,6 +1111,8 @@ extern int afs_cache_wb_key(struct afs_vnode *, struct afs_file *);
extern void afs_put_wb_key(struct afs_wb_key *);
extern int afs_open(struct inode *, struct file *);
extern int afs_release(struct inode *, struct file *);
void afs_fetch_data_async_rx(struct work_struct *work);
void afs_fetch_data_immediate_cancel(struct afs_call *call);

/*
* flock.c
Expand Down Expand Up @@ -1155,6 +1164,7 @@ extern void afs_fs_store_acl(struct afs_operation *);
extern struct afs_operation *afs_alloc_operation(struct key *, struct afs_volume *);
extern int afs_put_operation(struct afs_operation *);
extern bool afs_begin_vnode_operation(struct afs_operation *);
extern void afs_end_vnode_operation(struct afs_operation *op);
extern void afs_wait_for_operation(struct afs_operation *);
extern int afs_do_sync_operation(struct afs_operation *);

Expand Down Expand Up @@ -1326,6 +1336,7 @@ extern void afs_charge_preallocation(struct work_struct *);
extern void afs_put_call(struct afs_call *);
void afs_deferred_put_call(struct afs_call *call);
void afs_make_call(struct afs_call *call, gfp_t gfp);
void afs_deliver_to_call(struct afs_call *call);
void afs_wait_for_call_to_complete(struct afs_call *call);
extern struct afs_call *afs_alloc_flat_call(struct afs_net *,
const struct afs_call_type *,
Expand All @@ -1336,6 +1347,19 @@ extern void afs_send_simple_reply(struct afs_call *, const void *, size_t);
extern int afs_extract_data(struct afs_call *, bool);
extern int afs_protocol_error(struct afs_call *, enum afs_eproto_cause);

static inline struct afs_call *afs_get_call(struct afs_call *call,
enum afs_call_trace why)
{
int r;

__refcount_inc(&call->ref, &r);

trace_afs_call(call->debug_id, why, r + 1,
atomic_read(&call->net->nr_outstanding_calls),
__builtin_return_address(0));
return call;
}

static inline void afs_see_call(struct afs_call *call, enum afs_call_trace why)
{
int r = refcount_read(&call->ref);
Expand Down
2 changes: 1 addition & 1 deletion fs/afs/main.c
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ static int __init afs_init(void)
afs_wq = alloc_workqueue("afs", 0, 0);
if (!afs_wq)
goto error_afs_wq;
afs_async_calls = alloc_workqueue("kafsd", WQ_MEM_RECLAIM, 0);
afs_async_calls = alloc_workqueue("kafsd", WQ_MEM_RECLAIM | WQ_UNBOUND, 0);
if (!afs_async_calls)
goto error_async;
afs_lock_manager = alloc_workqueue("kafs_lockd", WQ_MEM_RECLAIM, 0);
Expand Down
25 changes: 6 additions & 19 deletions fs/afs/rxrpc.c
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,8 @@ static struct afs_call *afs_alloc_call(struct afs_net *net,
call->net = net;
call->debug_id = atomic_inc_return(&rxrpc_debug_id);
refcount_set(&call->ref, 1);
INIT_WORK(&call->async_work, afs_process_async_call);
INIT_WORK(&call->async_work, type->async_rx ?: afs_process_async_call);
INIT_WORK(&call->work, call->type->work);
INIT_WORK(&call->free_work, afs_deferred_free_worker);
init_waitqueue_head(&call->waitq);
spin_lock_init(&call->state_lock);
Expand Down Expand Up @@ -235,27 +236,12 @@ void afs_deferred_put_call(struct afs_call *call)
schedule_work(&call->free_work);
}

static struct afs_call *afs_get_call(struct afs_call *call,
enum afs_call_trace why)
{
int r;

__refcount_inc(&call->ref, &r);

trace_afs_call(call->debug_id, why, r + 1,
atomic_read(&call->net->nr_outstanding_calls),
__builtin_return_address(0));
return call;
}

/*
* Queue the call for actual work.
*/
static void afs_queue_call_work(struct afs_call *call)
{
if (call->type->work) {
INIT_WORK(&call->work, call->type->work);

afs_get_call(call, afs_call_trace_work);
if (!queue_work(afs_wq, &call->work))
afs_put_call(call);
Expand Down Expand Up @@ -452,8 +438,8 @@ void afs_make_call(struct afs_call *call, gfp_t gfp)
error_kill_call:
if (call->async)
afs_see_call(call, afs_call_trace_async_kill);
if (call->type->done)
call->type->done(call);
if (call->type->immediate_cancel)
call->type->immediate_cancel(call);

/* We need to dispose of the extra ref we grabbed for an async call.
* The call, however, might be queued on afs_async_calls and we need to
Expand Down Expand Up @@ -508,7 +494,7 @@ static void afs_log_error(struct afs_call *call, s32 remote_abort)
/*
* deliver messages to a call
*/
static void afs_deliver_to_call(struct afs_call *call)
void afs_deliver_to_call(struct afs_call *call)
{
enum afs_call_state state;
size_t len;
Expand Down Expand Up @@ -809,6 +795,7 @@ static int afs_deliver_cm_op_id(struct afs_call *call)
return -ENOTSUPP;

trace_afs_cb_call(call);
call->work.func = call->type->work;

/* pass responsibility for the remainer of this message off to the
* cache manager op */
Expand Down
1 change: 1 addition & 0 deletions fs/afs/vlclient.c
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,7 @@ static const struct afs_call_type afs_RXVLGetCapabilities = {
.name = "VL.GetCapabilities",
.op = afs_VL_GetCapabilities,
.deliver = afs_deliver_vl_get_capabilities,
.immediate_cancel = afs_vlserver_probe_result,
.done = afs_vlserver_probe_result,
.destructor = afs_destroy_vl_get_capabilities,
};
Expand Down
12 changes: 12 additions & 0 deletions fs/afs/write.c
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,18 @@ void afs_retry_request(struct netfs_io_request *wreq, struct netfs_io_stream *st
list_first_entry(&stream->subrequests,
struct netfs_io_subrequest, rreq_link);

switch (wreq->origin) {
case NETFS_READAHEAD:
case NETFS_READPAGE:
case NETFS_READ_GAPS:
case NETFS_READ_SINGLE:
case NETFS_READ_FOR_WRITE:
case NETFS_DIO_READ:
return;
default:
break;
}

switch (subreq->error) {
case -EACCES:
case -EPERM:
Expand Down
Loading

0 comments on commit eddf51f

Please sign in to comment.