Skip to content

Commit

Permalink
ceph: convert inline data to normal data before data write
Browse files Browse the repository at this point in the history
Before any data write, convert inline data to normal data and set
i_inline_version to CEPH_INLINE_NONE. The OSD request that saves
inline data to object contains 3 operations (CMPXATTR, WRITE and
SETXATTR). It compares a xattr named 'inline_version' to prevent
old data overwrites newer data.

Signed-off-by: Yan, Zheng <zyan@redhat.com>
  • Loading branch information
Yan, Zheng authored and Ilya Dryomov committed Dec 17, 2014
1 parent 8370124 commit 28127bd
Show file tree
Hide file tree
Showing 3 changed files with 161 additions and 3 deletions.
148 changes: 146 additions & 2 deletions fs/ceph/addr.c
Original file line number Diff line number Diff line change
Expand Up @@ -1313,6 +1313,19 @@ static int ceph_page_mkwrite(struct vm_area_struct *vma, struct vm_fault *vmf)
size_t len;
int want, got, ret;

if (ci->i_inline_version != CEPH_INLINE_NONE) {
struct page *locked_page = NULL;
if (off == 0) {
lock_page(page);
locked_page = page;
}
ret = ceph_uninline_data(vma->vm_file, locked_page);
if (locked_page)
unlock_page(locked_page);
if (ret < 0)
return VM_FAULT_SIGBUS;
}

if (off + PAGE_CACHE_SIZE <= size)
len = PAGE_CACHE_SIZE;
else
Expand Down Expand Up @@ -1361,11 +1374,13 @@ static int ceph_page_mkwrite(struct vm_area_struct *vma, struct vm_fault *vmf)
ret = VM_FAULT_SIGBUS;
}
out:
if (ret != VM_FAULT_LOCKED) {
if (ret != VM_FAULT_LOCKED)
unlock_page(page);
} else {
if (ret == VM_FAULT_LOCKED ||
ci->i_inline_version != CEPH_INLINE_NONE) {
int dirty;
spin_lock(&ci->i_ceph_lock);
ci->i_inline_version = CEPH_INLINE_NONE;
dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR);
spin_unlock(&ci->i_ceph_lock);
if (dirty)
Expand Down Expand Up @@ -1422,6 +1437,135 @@ void ceph_fill_inline_data(struct inode *inode, struct page *locked_page,
}
}

int ceph_uninline_data(struct file *filp, struct page *locked_page)
{
struct inode *inode = file_inode(filp);
struct ceph_inode_info *ci = ceph_inode(inode);
struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
struct ceph_osd_request *req;
struct page *page = NULL;
u64 len, inline_version;
int err = 0;
bool from_pagecache = false;

spin_lock(&ci->i_ceph_lock);
inline_version = ci->i_inline_version;
spin_unlock(&ci->i_ceph_lock);

dout("uninline_data %p %llx.%llx inline_version %llu\n",
inode, ceph_vinop(inode), inline_version);

if (inline_version == 1 || /* initial version, no data */
inline_version == CEPH_INLINE_NONE)
goto out;

if (locked_page) {
page = locked_page;
WARN_ON(!PageUptodate(page));
} else if (ceph_caps_issued(ci) &
(CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) {
page = find_get_page(inode->i_mapping, 0);
if (page) {
if (PageUptodate(page)) {
from_pagecache = true;
lock_page(page);
} else {
page_cache_release(page);
page = NULL;
}
}
}

if (page) {
len = i_size_read(inode);
if (len > PAGE_CACHE_SIZE)
len = PAGE_CACHE_SIZE;
} else {
page = __page_cache_alloc(GFP_NOFS);
if (!page) {
err = -ENOMEM;
goto out;
}
err = __ceph_do_getattr(inode, page,
CEPH_STAT_CAP_INLINE_DATA, true);
if (err < 0) {
/* no inline data */
if (err == -ENODATA)
err = 0;
goto out;
}
len = err;
}

req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
ceph_vino(inode), 0, &len, 0, 1,
CEPH_OSD_OP_CREATE,
CEPH_OSD_FLAG_ONDISK | CEPH_OSD_FLAG_WRITE,
ci->i_snap_realm->cached_context,
0, 0, false);
if (IS_ERR(req)) {
err = PTR_ERR(req);
goto out;
}

ceph_osdc_build_request(req, 0, NULL, CEPH_NOSNAP, &inode->i_mtime);
err = ceph_osdc_start_request(&fsc->client->osdc, req, false);
if (!err)
err = ceph_osdc_wait_request(&fsc->client->osdc, req);
ceph_osdc_put_request(req);
if (err < 0)
goto out;

req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
ceph_vino(inode), 0, &len, 1, 3,
CEPH_OSD_OP_WRITE,
CEPH_OSD_FLAG_ONDISK | CEPH_OSD_FLAG_WRITE,
ci->i_snap_realm->cached_context,
ci->i_truncate_seq, ci->i_truncate_size,
false);
if (IS_ERR(req)) {
err = PTR_ERR(req);
goto out;
}

osd_req_op_extent_osd_data_pages(req, 1, &page, len, 0, false, false);

err = osd_req_op_xattr_init(req, 0, CEPH_OSD_OP_CMPXATTR,
"inline_version", &inline_version,
sizeof(inline_version),
CEPH_OSD_CMPXATTR_OP_GT,
CEPH_OSD_CMPXATTR_MODE_U64);
if (err)
goto out_put;

err = osd_req_op_xattr_init(req, 2, CEPH_OSD_OP_SETXATTR,
"inline_version", &inline_version,
sizeof(inline_version), 0, 0);
if (err)
goto out_put;

ceph_osdc_build_request(req, 0, NULL, CEPH_NOSNAP, &inode->i_mtime);
err = ceph_osdc_start_request(&fsc->client->osdc, req, false);
if (!err)
err = ceph_osdc_wait_request(&fsc->client->osdc, req);
out_put:
ceph_osdc_put_request(req);
if (err == -ECANCELED)
err = 0;
out:
if (page && page != locked_page) {
if (from_pagecache) {
unlock_page(page);
page_cache_release(page);
} else
__free_pages(page, 0);
}

dout("uninline_data %p %llx.%llx inline_version %llu = %d\n",
inode, ceph_vinop(inode), inline_version, err);
return err;
}

static struct vm_operations_struct ceph_vmops = {
.fault = ceph_filemap_fault,
.page_mkwrite = ceph_page_mkwrite,
Expand Down
14 changes: 14 additions & 0 deletions fs/ceph/file.c
Original file line number Diff line number Diff line change
Expand Up @@ -963,6 +963,12 @@ static ssize_t ceph_write_iter(struct kiocb *iocb, struct iov_iter *from)
if (err)
goto out;

if (ci->i_inline_version != CEPH_INLINE_NONE) {
err = ceph_uninline_data(file, NULL);
if (err < 0)
goto out;
}

retry_snap:
if (ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL)) {
err = -ENOSPC;
Expand Down Expand Up @@ -1024,6 +1030,7 @@ static ssize_t ceph_write_iter(struct kiocb *iocb, struct iov_iter *from)
if (written >= 0) {
int dirty;
spin_lock(&ci->i_ceph_lock);
ci->i_inline_version = CEPH_INLINE_NONE;
dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR);
spin_unlock(&ci->i_ceph_lock);
if (dirty)
Expand Down Expand Up @@ -1269,6 +1276,12 @@ static long ceph_fallocate(struct file *file, int mode,
goto unlock;
}

if (ci->i_inline_version != CEPH_INLINE_NONE) {
ret = ceph_uninline_data(file, NULL);
if (ret < 0)
goto unlock;
}

size = i_size_read(inode);
if (!(mode & FALLOC_FL_KEEP_SIZE))
endoff = offset + length;
Expand All @@ -1295,6 +1308,7 @@ static long ceph_fallocate(struct file *file, int mode,

if (!ret) {
spin_lock(&ci->i_ceph_lock);
ci->i_inline_version = CEPH_INLINE_NONE;
dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR);
spin_unlock(&ci->i_ceph_lock);
if (dirty)
Expand Down
2 changes: 1 addition & 1 deletion fs/ceph/super.h
Original file line number Diff line number Diff line change
Expand Up @@ -888,7 +888,7 @@ extern int ceph_atomic_open(struct inode *dir, struct dentry *dentry,
extern int ceph_release(struct inode *inode, struct file *filp);
extern void ceph_fill_inline_data(struct inode *inode, struct page *locked_page,
char *data, size_t len);

int ceph_uninline_data(struct file *filp, struct page *locked_page);
/* dir.c */
extern const struct file_operations ceph_dir_fops;
extern const struct inode_operations ceph_dir_iops;
Expand Down

0 comments on commit 28127bd

Please sign in to comment.