Skip to content

Commit

Permalink
NFSD introduce async copy feature
Browse files Browse the repository at this point in the history
Upon receiving a request for async copy, create a new kthread.  If we
get asynchronous request, make sure to copy the needed arguments/state
from the stack before starting the copy. Then start the thread and reply
back to the client indicating copy is asynchronous.

nfsd_copy_file_range() will copy in a loop over the total number of
bytes is needed to copy. In case a failure happens in the middle, we
ignore the error and return how much we copied so far. Once done
creating a workitem for the callback workqueue and send CB_OFFLOAD with
the results.

The lifetime of the copy stateid is bound to the vfs copy. This way we
don't need to keep the nfsd_net structure for the callback.  We could
keep it around longer so that an OFFLOAD_STATUS that came late would
still get results, but clients should be able to deal without that.

We handle OFFLOAD_CANCEL by sending a signal to the copy thread and
calling kthread_stop.

A client should cancel any ongoing copies before calling DESTROY_CLIENT;
if not, we return a CLIENT_BUSY error.

If the client is destroyed for some other reason (lease expiration, or
server shutdown), we must clean up any ongoing copies ourselves.

Signed-off-by: Olga Kornievskaia <kolga@netapp.com>
[colin.king@canonical.com: fix leak in error case]
[bfields@fieldses.org: remove signalling, merge patches]
Signed-off-by: J. Bruce Fields <bfields@redhat.com>
  • Loading branch information
Olga Kornievskaia authored and J. Bruce Fields committed Sep 26, 2018
1 parent 885e2bf commit e0639dc
Show file tree
Hide file tree
Showing 7 changed files with 326 additions and 24 deletions.
8 changes: 8 additions & 0 deletions fs/nfsd/netns.h
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,14 @@ struct nfsd_net {

wait_queue_head_t ntf_wq;
atomic_t ntf_refcnt;

/*
* clientid and stateid data for construction of net unique COPY
* stateids.
*/
u32 s2s_cp_cl_id;
struct idr s2s_cp_stateids;
spinlock_t s2s_cp_lock;
};

/* Simple check to find out if a given net was properly initialized */
Expand Down
261 changes: 242 additions & 19 deletions fs/nfsd/nfs4proc.c
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
#include <linux/file.h>
#include <linux/falloc.h>
#include <linux/slab.h>
#include <linux/kthread.h>

#include "idmap.h"
#include "cache.h"
Expand Down Expand Up @@ -1089,45 +1090,255 @@ nfsd4_clone(struct svc_rqst *rqstp, struct nfsd4_compound_state *cstate,
return status;
}

void nfs4_put_copy(struct nfsd4_copy *copy)
{
if (!refcount_dec_and_test(&copy->refcount))
return;
kfree(copy);
}

static bool
check_and_set_stop_copy(struct nfsd4_copy *copy)
{
bool value;

spin_lock(&copy->cp_clp->async_lock);
value = copy->stopped;
if (!copy->stopped)
copy->stopped = true;
spin_unlock(&copy->cp_clp->async_lock);
return value;
}

static void nfsd4_stop_copy(struct nfsd4_copy *copy)
{
/* only 1 thread should stop the copy */
if (!check_and_set_stop_copy(copy))
kthread_stop(copy->copy_task);
nfs4_put_copy(copy);
}

static struct nfsd4_copy *nfsd4_get_copy(struct nfs4_client *clp)
{
struct nfsd4_copy *copy = NULL;

spin_lock(&clp->async_lock);
if (!list_empty(&clp->async_copies)) {
copy = list_first_entry(&clp->async_copies, struct nfsd4_copy,
copies);
refcount_inc(&copy->refcount);
}
spin_unlock(&clp->async_lock);
return copy;
}

void nfsd4_shutdown_copy(struct nfs4_client *clp)
{
struct nfsd4_copy *copy;

while ((copy = nfsd4_get_copy(clp)) != NULL)
nfsd4_stop_copy(copy);
}

static void nfsd4_cb_offload_release(struct nfsd4_callback *cb)
{
struct nfsd4_copy *copy = container_of(cb, struct nfsd4_copy, cp_cb);

nfs4_put_copy(copy);
}

static int nfsd4_cb_offload_done(struct nfsd4_callback *cb,
struct rpc_task *task)
{
return 1;
}

static const struct nfsd4_callback_ops nfsd4_cb_offload_ops = {
.release = nfsd4_cb_offload_release,
.done = nfsd4_cb_offload_done
};

static void nfsd4_init_copy_res(struct nfsd4_copy *copy, bool sync)
{
copy->cp_res.wr_stable_how = NFS_UNSTABLE;
copy->cp_synchronous = sync;
gen_boot_verifier(&copy->cp_res.wr_verifier, copy->cp_clp->net);
}

static ssize_t _nfsd_copy_file_range(struct nfsd4_copy *copy)
{
ssize_t bytes_copied = 0;
size_t bytes_total = copy->cp_count;
u64 src_pos = copy->cp_src_pos;
u64 dst_pos = copy->cp_dst_pos;

do {
if (kthread_should_stop())
break;
bytes_copied = nfsd_copy_file_range(copy->file_src, src_pos,
copy->file_dst, dst_pos, bytes_total);
if (bytes_copied <= 0)
break;
bytes_total -= bytes_copied;
copy->cp_res.wr_bytes_written += bytes_copied;
src_pos += bytes_copied;
dst_pos += bytes_copied;
} while (bytes_total > 0 && !copy->cp_synchronous);
return bytes_copied;
}

static __be32 nfsd4_do_copy(struct nfsd4_copy *copy, bool sync)
{
__be32 status;
ssize_t bytes;

bytes = _nfsd_copy_file_range(copy);
/* for async copy, we ignore the error, client can always retry
* to get the error
*/
if (bytes < 0 && !copy->cp_res.wr_bytes_written)
status = nfserrno(bytes);
else {
nfsd4_init_copy_res(copy, sync);
status = nfs_ok;
}

fput(copy->file_src);
fput(copy->file_dst);
return status;
}

static void dup_copy_fields(struct nfsd4_copy *src, struct nfsd4_copy *dst)
{
dst->cp_src_pos = src->cp_src_pos;
dst->cp_dst_pos = src->cp_dst_pos;
dst->cp_count = src->cp_count;
dst->cp_synchronous = src->cp_synchronous;
memcpy(&dst->cp_res, &src->cp_res, sizeof(src->cp_res));
memcpy(&dst->fh, &src->fh, sizeof(src->fh));
dst->cp_clp = src->cp_clp;
dst->file_dst = get_file(src->file_dst);
dst->file_src = get_file(src->file_src);
memcpy(&dst->cp_stateid, &src->cp_stateid, sizeof(src->cp_stateid));
}

static void cleanup_async_copy(struct nfsd4_copy *copy)
{
nfs4_free_cp_state(copy);
fput(copy->file_dst);
fput(copy->file_src);
spin_lock(&copy->cp_clp->async_lock);
list_del(&copy->copies);
spin_unlock(&copy->cp_clp->async_lock);
nfs4_put_copy(copy);
}

static int nfsd4_do_async_copy(void *data)
{
struct nfsd4_copy *copy = (struct nfsd4_copy *)data;
struct nfsd4_copy *cb_copy;

copy->nfserr = nfsd4_do_copy(copy, 0);
cb_copy = kzalloc(sizeof(struct nfsd4_copy), GFP_KERNEL);
if (!cb_copy)
goto out;
memcpy(&cb_copy->cp_res, &copy->cp_res, sizeof(copy->cp_res));
cb_copy->cp_clp = copy->cp_clp;
cb_copy->nfserr = copy->nfserr;
memcpy(&cb_copy->fh, &copy->fh, sizeof(copy->fh));
nfsd4_init_cb(&cb_copy->cp_cb, cb_copy->cp_clp,
&nfsd4_cb_offload_ops, NFSPROC4_CLNT_CB_OFFLOAD);
nfsd4_run_cb(&cb_copy->cp_cb);
out:
cleanup_async_copy(copy);
return 0;
}

static __be32
nfsd4_copy(struct svc_rqst *rqstp, struct nfsd4_compound_state *cstate,
union nfsd4_op_u *u)
{
struct nfsd4_copy *copy = &u->copy;
struct file *src, *dst;
__be32 status;
ssize_t bytes;
struct nfsd4_copy *async_copy = NULL;

status = nfsd4_verify_copy(rqstp, cstate, &copy->cp_src_stateid, &src,
&copy->cp_dst_stateid, &dst);
status = nfsd4_verify_copy(rqstp, cstate, &copy->cp_src_stateid,
&copy->file_src, &copy->cp_dst_stateid,
&copy->file_dst);
if (status)
goto out;

bytes = nfsd_copy_file_range(src, copy->cp_src_pos,
dst, copy->cp_dst_pos, copy->cp_count);
copy->cp_clp = cstate->clp;
memcpy(&copy->fh, &cstate->current_fh.fh_handle,
sizeof(struct knfsd_fh));
if (!copy->cp_synchronous) {
struct nfsd_net *nn = net_generic(SVC_NET(rqstp), nfsd_net_id);

if (bytes < 0)
status = nfserrno(bytes);
else {
copy->cp_res.wr_bytes_written = bytes;
copy->cp_res.wr_stable_how = NFS_UNSTABLE;
copy->cp_synchronous = 1;
gen_boot_verifier(&copy->cp_res.wr_verifier, SVC_NET(rqstp));
status = nfserrno(-ENOMEM);
async_copy = kzalloc(sizeof(struct nfsd4_copy), GFP_KERNEL);
if (!async_copy)
goto out;
if (!nfs4_init_cp_state(nn, copy)) {
kfree(async_copy);
goto out;
}
refcount_set(&async_copy->refcount, 1);
memcpy(&copy->cp_res.cb_stateid, &copy->cp_stateid,
sizeof(copy->cp_stateid));
dup_copy_fields(copy, async_copy);
async_copy->copy_task = kthread_create(nfsd4_do_async_copy,
async_copy, "%s", "copy thread");
if (IS_ERR(async_copy->copy_task))
goto out_err;
spin_lock(&async_copy->cp_clp->async_lock);
list_add(&async_copy->copies,
&async_copy->cp_clp->async_copies);
spin_unlock(&async_copy->cp_clp->async_lock);
wake_up_process(async_copy->copy_task);
status = nfs_ok;
}

fput(src);
fput(dst);
} else
status = nfsd4_do_copy(copy, 1);
out:
return status;
out_err:
cleanup_async_copy(async_copy);
goto out;
}

struct nfsd4_copy *
find_async_copy(struct nfs4_client *clp, stateid_t *stateid)
{
struct nfsd4_copy *copy;

spin_lock(&clp->async_lock);
list_for_each_entry(copy, &clp->async_copies, copies) {
if (memcmp(&copy->cp_stateid, stateid, NFS4_STATEID_SIZE))
continue;
refcount_inc(&copy->refcount);
spin_unlock(&clp->async_lock);
return copy;
}
spin_unlock(&clp->async_lock);
return NULL;
}

static __be32
nfsd4_offload_cancel(struct svc_rqst *rqstp,
struct nfsd4_compound_state *cstate,
union nfsd4_op_u *u)
{
return 0;
struct nfsd4_offload_status *os = &u->offload_status;
__be32 status = 0;
struct nfsd4_copy *copy;
struct nfs4_client *clp = cstate->clp;

copy = find_async_copy(clp, &os->stateid);
if (copy)
nfsd4_stop_copy(copy);
else
status = nfserr_bad_stateid;

return status;
}

static __be32
Expand Down Expand Up @@ -1157,7 +1368,19 @@ nfsd4_offload_status(struct svc_rqst *rqstp,
struct nfsd4_compound_state *cstate,
union nfsd4_op_u *u)
{
return nfserr_notsupp;
struct nfsd4_offload_status *os = &u->offload_status;
__be32 status = 0;
struct nfsd4_copy *copy;
struct nfs4_client *clp = cstate->clp;

copy = find_async_copy(clp, &os->stateid);
if (copy) {
os->count = copy->cp_res.wr_bytes_written;
nfs4_put_copy(copy);
} else
status = nfserr_bad_stateid;

return status;
}

static __be32
Expand Down
38 changes: 37 additions & 1 deletion fs/nfsd/nfs4state.c
Original file line number Diff line number Diff line change
Expand Up @@ -713,6 +713,36 @@ struct nfs4_stid *nfs4_alloc_stid(struct nfs4_client *cl, struct kmem_cache *sla
return NULL;
}

/*
* Create a unique stateid_t to represent each COPY.
*/
int nfs4_init_cp_state(struct nfsd_net *nn, struct nfsd4_copy *copy)
{
int new_id;

idr_preload(GFP_KERNEL);
spin_lock(&nn->s2s_cp_lock);
new_id = idr_alloc_cyclic(&nn->s2s_cp_stateids, copy, 0, 0, GFP_NOWAIT);
spin_unlock(&nn->s2s_cp_lock);
idr_preload_end();
if (new_id < 0)
return 0;
copy->cp_stateid.si_opaque.so_id = new_id;
copy->cp_stateid.si_opaque.so_clid.cl_boot = nn->boot_time;
copy->cp_stateid.si_opaque.so_clid.cl_id = nn->s2s_cp_cl_id;
return 1;
}

void nfs4_free_cp_state(struct nfsd4_copy *copy)
{
struct nfsd_net *nn;

nn = net_generic(copy->cp_clp->net, nfsd_net_id);
spin_lock(&nn->s2s_cp_lock);
idr_remove(&nn->s2s_cp_stateids, copy->cp_stateid.si_opaque.so_id);
spin_unlock(&nn->s2s_cp_lock);
}

static struct nfs4_ol_stateid * nfs4_alloc_open_stateid(struct nfs4_client *clp)
{
struct nfs4_stid *stid;
Expand Down Expand Up @@ -1827,6 +1857,8 @@ static struct nfs4_client *alloc_client(struct xdr_netobj name)
#ifdef CONFIG_NFSD_PNFS
INIT_LIST_HEAD(&clp->cl_lo_states);
#endif
INIT_LIST_HEAD(&clp->async_copies);
spin_lock_init(&clp->async_lock);
spin_lock_init(&clp->cl_lock);
rpc_init_wait_queue(&clp->cl_cb_waitq, "Backchannel slot table");
return clp;
Expand Down Expand Up @@ -1942,6 +1974,7 @@ __destroy_client(struct nfs4_client *clp)
}
}
nfsd4_return_all_client_layouts(clp);
nfsd4_shutdown_copy(clp);
nfsd4_shutdown_callback(clp);
if (clp->cl_cb_conn.cb_xprt)
svc_xprt_put(clp->cl_cb_conn.cb_xprt);
Expand Down Expand Up @@ -2475,7 +2508,8 @@ static bool client_has_state(struct nfs4_client *clp)
|| !list_empty(&clp->cl_lo_states)
#endif
|| !list_empty(&clp->cl_delegations)
|| !list_empty(&clp->cl_sessions);
|| !list_empty(&clp->cl_sessions)
|| !list_empty(&clp->async_copies);
}

__be32
Expand Down Expand Up @@ -7161,6 +7195,8 @@ static int nfs4_state_create_net(struct net *net)
INIT_LIST_HEAD(&nn->close_lru);
INIT_LIST_HEAD(&nn->del_recall_lru);
spin_lock_init(&nn->client_lock);
spin_lock_init(&nn->s2s_cp_lock);
idr_init(&nn->s2s_cp_stateids);

spin_lock_init(&nn->blocked_locks_lock);
INIT_LIST_HEAD(&nn->blocked_locks_lru);
Expand Down
Loading

0 comments on commit e0639dc

Please sign in to comment.