Skip to content

Commit

Permalink
Merge branch 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel…
Browse files Browse the repository at this point in the history
…/git/sage/ceph-client

Pull Ceph updates from Sage Weil:
 "There is some follow-on RBD cleanup after the last window's code drop,
  a series from Yan fixing multi-mds behavior in cephfs, and then a
  sprinkling of bug fixes all around.  Some warnings, sleeping while
  atomic, a null dereference, and cleanups"

* 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client: (36 commits)
  libceph: fix invalid unsigned->signed conversion for timespec encoding
  libceph: call r_unsafe_callback when unsafe reply is received
  ceph: fix race between cap issue and revoke
  ceph: fix cap revoke race
  ceph: fix pending vmtruncate race
  ceph: avoid accessing invalid memory
  libceph: Fix NULL pointer dereference in auth client code
  ceph: Reconstruct the func ceph_reserve_caps.
  ceph: Free mdsc if alloc mdsc->mdsmap failed.
  ceph: remove sb_start/end_write in ceph_aio_write.
  ceph: avoid meaningless calling ceph_caps_revoking if sync_mode == WB_SYNC_ALL.
  ceph: fix sleeping function called from invalid context.
  ceph: move inode to proper flushing list when auth MDS changes
  rbd: fix a couple warnings
  ceph: clear migrate seq when MDS restarts
  ceph: check migrate seq before changing auth cap
  ceph: fix race between page writeback and truncate
  ceph: reset iov_len when discarding cap release messages
  ceph: fix cap release race
  libceph: fix truncate size calculation
  ...
  • Loading branch information
Linus Torvalds committed Jul 9, 2013
2 parents e3a0dd9 + 8b8cf89 commit 9a5889a
Show file tree
Hide file tree
Showing 15 changed files with 277 additions and 245 deletions.
174 changes: 90 additions & 84 deletions drivers/block/rbd.c

Large diffs are not rendered by default.

88 changes: 42 additions & 46 deletions fs/ceph/addr.c
Original file line number Diff line number Diff line change
Expand Up @@ -439,13 +439,12 @@ static int writepage_nounlock(struct page *page, struct writeback_control *wbc)
struct ceph_inode_info *ci;
struct ceph_fs_client *fsc;
struct ceph_osd_client *osdc;
loff_t page_off = page_offset(page);
int len = PAGE_CACHE_SIZE;
loff_t i_size;
int err = 0;
struct ceph_snap_context *snapc, *oldest;
u64 snap_size = 0;
loff_t page_off = page_offset(page);
long writeback_stat;
u64 truncate_size, snap_size = 0;
u32 truncate_seq;
int err = 0, len = PAGE_CACHE_SIZE;

dout("writepage %p idx %lu\n", page, page->index);

Expand Down Expand Up @@ -475,13 +474,20 @@ static int writepage_nounlock(struct page *page, struct writeback_control *wbc)
}
ceph_put_snap_context(oldest);

spin_lock(&ci->i_ceph_lock);
truncate_seq = ci->i_truncate_seq;
truncate_size = ci->i_truncate_size;
if (!snap_size)
snap_size = i_size_read(inode);
spin_unlock(&ci->i_ceph_lock);

/* is this a partial page at end of file? */
if (snap_size)
i_size = snap_size;
else
i_size = i_size_read(inode);
if (i_size < page_off + len)
len = i_size - page_off;
if (page_off >= snap_size) {
dout("%p page eof %llu\n", page, snap_size);
goto out;
}
if (snap_size < page_off + len)
len = snap_size - page_off;

dout("writepage %p page %p index %lu on %llu~%u snapc %p\n",
inode, page, page->index, page_off, len, snapc);
Expand All @@ -495,7 +501,7 @@ static int writepage_nounlock(struct page *page, struct writeback_control *wbc)
err = ceph_osdc_writepages(osdc, ceph_vino(inode),
&ci->i_layout, snapc,
page_off, len,
ci->i_truncate_seq, ci->i_truncate_size,
truncate_seq, truncate_size,
&inode->i_mtime, &page, 1);
if (err < 0) {
dout("writepage setting page/mapping error %d %p\n", err, page);
Expand Down Expand Up @@ -632,25 +638,6 @@ static void writepages_finish(struct ceph_osd_request *req,
ceph_osdc_put_request(req);
}

static struct ceph_osd_request *
ceph_writepages_osd_request(struct inode *inode, u64 offset, u64 *len,
struct ceph_snap_context *snapc, int num_ops)
{
struct ceph_fs_client *fsc;
struct ceph_inode_info *ci;
struct ceph_vino vino;

fsc = ceph_inode_to_client(inode);
ci = ceph_inode(inode);
vino = ceph_vino(inode);
/* BUG_ON(vino.snap != CEPH_NOSNAP); */

return ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
vino, offset, len, num_ops, CEPH_OSD_OP_WRITE,
CEPH_OSD_FLAG_WRITE|CEPH_OSD_FLAG_ONDISK,
snapc, ci->i_truncate_seq, ci->i_truncate_size, true);
}

/*
* initiate async writeback
*/
Expand All @@ -659,7 +646,8 @@ static int ceph_writepages_start(struct address_space *mapping,
{
struct inode *inode = mapping->host;
struct ceph_inode_info *ci = ceph_inode(inode);
struct ceph_fs_client *fsc;
struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
struct ceph_vino vino = ceph_vino(inode);
pgoff_t index, start, end;
int range_whole = 0;
int should_loop = 1;
Expand All @@ -671,22 +659,22 @@ static int ceph_writepages_start(struct address_space *mapping,
unsigned wsize = 1 << inode->i_blkbits;
struct ceph_osd_request *req = NULL;
int do_sync;
u64 snap_size;
u64 truncate_size, snap_size;
u32 truncate_seq;

/*
* Include a 'sync' in the OSD request if this is a data
* integrity write (e.g., O_SYNC write or fsync()), or if our
* cap is being revoked.
*/
do_sync = wbc->sync_mode == WB_SYNC_ALL;
if (ceph_caps_revoking(ci, CEPH_CAP_FILE_BUFFER))
if ((wbc->sync_mode == WB_SYNC_ALL) ||
ceph_caps_revoking(ci, CEPH_CAP_FILE_BUFFER))
do_sync = 1;
dout("writepages_start %p dosync=%d (mode=%s)\n",
inode, do_sync,
wbc->sync_mode == WB_SYNC_NONE ? "NONE" :
(wbc->sync_mode == WB_SYNC_ALL ? "ALL" : "HOLD"));

fsc = ceph_inode_to_client(inode);
if (fsc->mount_state == CEPH_MOUNT_SHUTDOWN) {
pr_warning("writepage_start %p on forced umount\n", inode);
return -EIO; /* we're in a forced umount, don't write! */
Expand Down Expand Up @@ -729,6 +717,14 @@ static int ceph_writepages_start(struct address_space *mapping,
snap_size = i_size_read(inode);
dout(" oldest snapc is %p seq %lld (%d snaps)\n",
snapc, snapc->seq, snapc->num_snaps);

spin_lock(&ci->i_ceph_lock);
truncate_seq = ci->i_truncate_seq;
truncate_size = ci->i_truncate_size;
if (!snap_size)
snap_size = i_size_read(inode);
spin_unlock(&ci->i_ceph_lock);

if (last_snapc && snapc != last_snapc) {
/* if we switched to a newer snapc, restart our scan at the
* start of the original file range. */
Expand All @@ -740,7 +736,6 @@ static int ceph_writepages_start(struct address_space *mapping,

while (!done && index <= end) {
int num_ops = do_sync ? 2 : 1;
struct ceph_vino vino;
unsigned i;
int first;
pgoff_t next;
Expand Down Expand Up @@ -834,17 +829,18 @@ static int ceph_writepages_start(struct address_space *mapping,
* that it will use.
*/
if (locked_pages == 0) {
size_t size;

BUG_ON(pages);

/* prepare async write request */
offset = (u64)page_offset(page);
len = wsize;
req = ceph_writepages_osd_request(inode,
offset, &len, snapc,
num_ops);

req = ceph_osdc_new_request(&fsc->client->osdc,
&ci->i_layout, vino,
offset, &len, num_ops,
CEPH_OSD_OP_WRITE,
CEPH_OSD_FLAG_WRITE |
CEPH_OSD_FLAG_ONDISK,
snapc, truncate_seq,
truncate_size, true);
if (IS_ERR(req)) {
rc = PTR_ERR(req);
unlock_page(page);
Expand All @@ -855,8 +851,8 @@ static int ceph_writepages_start(struct address_space *mapping,
req->r_inode = inode;

max_pages = calc_pages_for(0, (u64)len);
size = max_pages * sizeof (*pages);
pages = kmalloc(size, GFP_NOFS);
pages = kmalloc(max_pages * sizeof (*pages),
GFP_NOFS);
if (!pages) {
pool = fsc->wb_pagevec_pool;
pages = mempool_alloc(pool, GFP_NOFS);
Expand Down
102 changes: 63 additions & 39 deletions fs/ceph/caps.c
Original file line number Diff line number Diff line change
Expand Up @@ -147,15 +147,14 @@ void ceph_adjust_min_caps(struct ceph_mds_client *mdsc, int delta)
spin_unlock(&mdsc->caps_list_lock);
}

int ceph_reserve_caps(struct ceph_mds_client *mdsc,
void ceph_reserve_caps(struct ceph_mds_client *mdsc,
struct ceph_cap_reservation *ctx, int need)
{
int i;
struct ceph_cap *cap;
int have;
int alloc = 0;
LIST_HEAD(newcaps);
int ret = 0;

dout("reserve caps ctx=%p need=%d\n", ctx, need);

Expand All @@ -174,14 +173,15 @@ int ceph_reserve_caps(struct ceph_mds_client *mdsc,

for (i = have; i < need; i++) {
cap = kmem_cache_alloc(ceph_cap_cachep, GFP_NOFS);
if (!cap) {
ret = -ENOMEM;
goto out_alloc_count;
}
if (!cap)
break;
list_add(&cap->caps_item, &newcaps);
alloc++;
}
BUG_ON(have + alloc != need);
/* we didn't manage to reserve as much as we needed */
if (have + alloc != need)
pr_warn("reserve caps ctx=%p ENOMEM need=%d got=%d\n",
ctx, need, have + alloc);

spin_lock(&mdsc->caps_list_lock);
mdsc->caps_total_count += alloc;
Expand All @@ -197,13 +197,6 @@ int ceph_reserve_caps(struct ceph_mds_client *mdsc,
dout("reserve caps ctx=%p %d = %d used + %d resv + %d avail\n",
ctx, mdsc->caps_total_count, mdsc->caps_use_count,
mdsc->caps_reserve_count, mdsc->caps_avail_count);
return 0;

out_alloc_count:
/* we didn't manage to reserve as much as we needed */
pr_warning("reserve caps ctx=%p ENOMEM need=%d got=%d\n",
ctx, need, have);
return ret;
}

int ceph_unreserve_caps(struct ceph_mds_client *mdsc,
Expand Down Expand Up @@ -612,9 +605,11 @@ int ceph_add_cap(struct inode *inode,
__cap_delay_requeue(mdsc, ci);
}

if (flags & CEPH_CAP_FLAG_AUTH)
ci->i_auth_cap = cap;
else if (ci->i_auth_cap == cap) {
if (flags & CEPH_CAP_FLAG_AUTH) {
if (ci->i_auth_cap == NULL ||
ceph_seq_cmp(ci->i_auth_cap->mseq, mseq) < 0)
ci->i_auth_cap = cap;
} else if (ci->i_auth_cap == cap) {
ci->i_auth_cap = NULL;
spin_lock(&mdsc->cap_dirty_lock);
if (!list_empty(&ci->i_dirty_item)) {
Expand Down Expand Up @@ -695,6 +690,15 @@ int __ceph_caps_issued(struct ceph_inode_info *ci, int *implemented)
if (implemented)
*implemented |= cap->implemented;
}
/*
* exclude caps issued by non-auth MDS, but are been revoking
* by the auth MDS. The non-auth MDS should be revoking/exporting
* these caps, but the message is delayed.
*/
if (ci->i_auth_cap) {
cap = ci->i_auth_cap;
have &= ~cap->implemented | cap->issued;
}
return have;
}

Expand Down Expand Up @@ -802,22 +806,28 @@ int __ceph_caps_issued_mask(struct ceph_inode_info *ci, int mask, int touch)
/*
* Return true if mask caps are currently being revoked by an MDS.
*/
int ceph_caps_revoking(struct ceph_inode_info *ci, int mask)
int __ceph_caps_revoking_other(struct ceph_inode_info *ci,
struct ceph_cap *ocap, int mask)
{
struct inode *inode = &ci->vfs_inode;
struct ceph_cap *cap;
struct rb_node *p;
int ret = 0;

spin_lock(&ci->i_ceph_lock);
for (p = rb_first(&ci->i_caps); p; p = rb_next(p)) {
cap = rb_entry(p, struct ceph_cap, ci_node);
if (__cap_is_valid(cap) &&
(cap->implemented & ~cap->issued & mask)) {
ret = 1;
break;
}
if (cap != ocap && __cap_is_valid(cap) &&
(cap->implemented & ~cap->issued & mask))
return 1;
}
return 0;
}

int ceph_caps_revoking(struct ceph_inode_info *ci, int mask)
{
struct inode *inode = &ci->vfs_inode;
int ret;

spin_lock(&ci->i_ceph_lock);
ret = __ceph_caps_revoking_other(ci, NULL, mask);
spin_unlock(&ci->i_ceph_lock);
dout("ceph_caps_revoking %p %s = %d\n", inode,
ceph_cap_string(mask), ret);
Expand Down Expand Up @@ -1980,8 +1990,15 @@ static void kick_flushing_inode_caps(struct ceph_mds_client *mdsc,
cap = ci->i_auth_cap;
dout("kick_flushing_inode_caps %p flushing %s flush_seq %lld\n", inode,
ceph_cap_string(ci->i_flushing_caps), ci->i_cap_flush_seq);

__ceph_flush_snaps(ci, &session, 1);

if (ci->i_flushing_caps) {
spin_lock(&mdsc->cap_dirty_lock);
list_move_tail(&ci->i_flushing_item,
&cap->session->s_cap_flushing);
spin_unlock(&mdsc->cap_dirty_lock);

delayed = __send_cap(mdsc, cap, CEPH_CAP_OP_FLUSH,
__ceph_caps_used(ci),
__ceph_caps_wanted(ci),
Expand Down Expand Up @@ -2055,7 +2072,11 @@ static int try_get_cap_refs(struct ceph_inode_info *ci, int need, int want,
/* finish pending truncate */
while (ci->i_truncate_pending) {
spin_unlock(&ci->i_ceph_lock);
__ceph_do_pending_vmtruncate(inode, !(need & CEPH_CAP_FILE_WR));
if (!(need & CEPH_CAP_FILE_WR))
mutex_lock(&inode->i_mutex);
__ceph_do_pending_vmtruncate(inode);
if (!(need & CEPH_CAP_FILE_WR))
mutex_unlock(&inode->i_mutex);
spin_lock(&ci->i_ceph_lock);
}

Expand Down Expand Up @@ -2473,6 +2494,11 @@ static void handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant,
} else {
dout("grant: %s -> %s\n", ceph_cap_string(cap->issued),
ceph_cap_string(newcaps));
/* non-auth MDS is revoking the newly grant caps ? */
if (cap == ci->i_auth_cap &&
__ceph_caps_revoking_other(ci, cap, newcaps))
check_caps = 2;

cap->issued = newcaps;
cap->implemented |= newcaps; /* add bits only, to
* avoid stepping on a
Expand Down Expand Up @@ -3042,21 +3068,19 @@ int ceph_encode_inode_release(void **p, struct inode *inode,
(cap->issued & unless) == 0)) {
if ((cap->issued & drop) &&
(cap->issued & unless) == 0) {
dout("encode_inode_release %p cap %p %s -> "
"%s\n", inode, cap,
int wanted = __ceph_caps_wanted(ci);
if ((ci->i_ceph_flags & CEPH_I_NODELAY) == 0)
wanted |= cap->mds_wanted;
dout("encode_inode_release %p cap %p "
"%s -> %s, wanted %s -> %s\n", inode, cap,
ceph_cap_string(cap->issued),
ceph_cap_string(cap->issued & ~drop));
ceph_cap_string(cap->issued & ~drop),
ceph_cap_string(cap->mds_wanted),
ceph_cap_string(wanted));

cap->issued &= ~drop;
cap->implemented &= ~drop;
if (ci->i_ceph_flags & CEPH_I_NODELAY) {
int wanted = __ceph_caps_wanted(ci);
dout(" wanted %s -> %s (act %s)\n",
ceph_cap_string(cap->mds_wanted),
ceph_cap_string(cap->mds_wanted &
~wanted),
ceph_cap_string(wanted));
cap->mds_wanted &= wanted;
}
cap->mds_wanted = wanted;
} else {
dout("encode_inode_release %p cap %p %s"
" (force)\n", inode, cap,
Expand Down
4 changes: 1 addition & 3 deletions fs/ceph/file.c
Original file line number Diff line number Diff line change
Expand Up @@ -716,7 +716,6 @@ static ssize_t ceph_aio_write(struct kiocb *iocb, const struct iovec *iov,
if (ceph_snap(inode) != CEPH_NOSNAP)
return -EROFS;

sb_start_write(inode->i_sb);
mutex_lock(&inode->i_mutex);
hold_mutex = true;

Expand Down Expand Up @@ -809,7 +808,6 @@ static ssize_t ceph_aio_write(struct kiocb *iocb, const struct iovec *iov,
out:
if (hold_mutex)
mutex_unlock(&inode->i_mutex);
sb_end_write(inode->i_sb);
current->backing_dev_info = NULL;

return written ? written : err;
Expand All @@ -824,7 +822,7 @@ static loff_t ceph_llseek(struct file *file, loff_t offset, int whence)
int ret;

mutex_lock(&inode->i_mutex);
__ceph_do_pending_vmtruncate(inode, false);
__ceph_do_pending_vmtruncate(inode);

if (whence == SEEK_END || whence == SEEK_DATA || whence == SEEK_HOLE) {
ret = ceph_do_getattr(inode, CEPH_STAT_CAP_SIZE);
Expand Down
Loading

0 comments on commit 9a5889a

Please sign in to comment.