Skip to content

Commit

Permalink
ceph: send TID of the oldest pending caps flush to MDS
Browse files Browse the repository at this point in the history
According to this information, MDS can trim its completed caps flush
list (which is used to detect duplicated cap flush).

Signed-off-by: Yan, Zheng <zyan@redhat.com>
  • Loading branch information
Yan, Zheng authored and Ilya Dryomov committed Jun 25, 2015
1 parent 8310b08 commit a2971c8
Showing 1 changed file with 49 additions and 18 deletions.
67 changes: 49 additions & 18 deletions fs/ceph/caps.c
Original file line number Diff line number Diff line change
Expand Up @@ -986,8 +986,8 @@ void __ceph_remove_cap(struct ceph_cap *cap, bool queue_release)
static int send_cap_msg(struct ceph_mds_session *session,
u64 ino, u64 cid, int op,
int caps, int wanted, int dirty,
u32 seq, u64 flush_tid, u32 issue_seq, u32 mseq,
u64 size, u64 max_size,
u32 seq, u64 flush_tid, u64 oldest_flush_tid,
u32 issue_seq, u32 mseq, u64 size, u64 max_size,
struct timespec *mtime, struct timespec *atime,
u64 time_warp_seq,
kuid_t uid, kgid_t gid, umode_t mode,
Expand All @@ -1001,20 +1001,23 @@ static int send_cap_msg(struct ceph_mds_session *session,
size_t extra_len;

dout("send_cap_msg %s %llx %llx caps %s wanted %s dirty %s"
" seq %u/%u mseq %u follows %lld size %llu/%llu"
" seq %u/%u tid %llu/%llu mseq %u follows %lld size %llu/%llu"
" xattr_ver %llu xattr_len %d\n", ceph_cap_op_name(op),
cid, ino, ceph_cap_string(caps), ceph_cap_string(wanted),
ceph_cap_string(dirty),
seq, issue_seq, mseq, follows, size, max_size,
seq, issue_seq, flush_tid, oldest_flush_tid,
mseq, follows, size, max_size,
xattr_version, xattrs_buf ? (int)xattrs_buf->vec.iov_len : 0);

/* flock buffer size + inline version + inline data size */
extra_len = 4 + 8 + 4;
/* flock buffer size + inline version + inline data size +
* osd_epoch_barrier + oldest_flush_tid */
extra_len = 4 + 8 + 4 + 4 + 8;
msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPS, sizeof(*fc) + extra_len,
GFP_NOFS, false);
if (!msg)
return -ENOMEM;

msg->hdr.version = cpu_to_le16(6);
msg->hdr.tid = cpu_to_le64(flush_tid);

fc = msg->front.iov_base;
Expand Down Expand Up @@ -1050,6 +1053,10 @@ static int send_cap_msg(struct ceph_mds_session *session,
ceph_encode_64(&p, inline_data ? 0 : CEPH_INLINE_NONE);
/* inline data size */
ceph_encode_32(&p, 0);
/* osd_epoch_barrier */
ceph_encode_32(&p, 0);
/* oldest_flush_tid */
ceph_encode_64(&p, oldest_flush_tid);

fc->xattr_version = cpu_to_le64(xattr_version);
if (xattrs_buf) {
Expand Down Expand Up @@ -1098,7 +1105,7 @@ void ceph_queue_caps_release(struct inode *inode)
*/
static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap,
int op, int used, int want, int retain, int flushing,
u64 flush_tid)
u64 flush_tid, u64 oldest_flush_tid)
__releases(cap->ci->i_ceph_lock)
{
struct ceph_inode_info *ci = cap->ci;
Expand Down Expand Up @@ -1187,7 +1194,8 @@ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap,
spin_unlock(&ci->i_ceph_lock);

ret = send_cap_msg(session, ceph_vino(inode).ino, cap_id,
op, keep, want, flushing, seq, flush_tid, issue_seq, mseq,
op, keep, want, flushing, seq,
flush_tid, oldest_flush_tid, issue_seq, mseq,
size, max_size, &mtime, &atime, time_warp_seq,
uid, gid, mode, xattr_version, xattr_blob,
follows, inline_data);
Expand Down Expand Up @@ -1307,8 +1315,8 @@ void __ceph_flush_snaps(struct ceph_inode_info *ci,
inode, capsnap, capsnap->follows, capsnap->flush_tid);
send_cap_msg(session, ceph_vino(inode).ino, 0,
CEPH_CAP_OP_FLUSHSNAP, capsnap->issued, 0,
capsnap->dirty, 0, capsnap->flush_tid, 0, mseq,
capsnap->size, 0,
capsnap->dirty, 0, capsnap->flush_tid, 0,
0, mseq, capsnap->size, 0,
&capsnap->mtime, &capsnap->atime,
capsnap->time_warp_seq,
capsnap->uid, capsnap->gid, capsnap->mode,
Expand Down Expand Up @@ -1438,6 +1446,17 @@ static void __add_cap_flushing_to_mdsc(struct ceph_mds_client *mdsc,
rb_insert_color(&cf->g_node, &mdsc->cap_flush_tree);
}

static u64 __get_oldest_flush_tid(struct ceph_mds_client *mdsc)
{
struct rb_node *n = rb_first(&mdsc->cap_flush_tree);
if (n) {
struct ceph_cap_flush *cf =
rb_entry(n, struct ceph_cap_flush, g_node);
return cf->tid;
}
return 0;
}

/*
* Add dirty inode to the flushing list. Assigned a seq number so we
* can wait for caps to flush without starving.
Expand All @@ -1446,7 +1465,7 @@ static void __add_cap_flushing_to_mdsc(struct ceph_mds_client *mdsc,
*/
static int __mark_caps_flushing(struct inode *inode,
struct ceph_mds_session *session,
u64 *flush_tid)
u64 *flush_tid, u64 *oldest_flush_tid)
{
struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc;
struct ceph_inode_info *ci = ceph_inode(inode);
Expand All @@ -1473,6 +1492,7 @@ static int __mark_caps_flushing(struct inode *inode,

cf->tid = ++mdsc->last_cap_flush_tid;
__add_cap_flushing_to_mdsc(mdsc, cf);
*oldest_flush_tid = __get_oldest_flush_tid(mdsc);

if (list_empty(&ci->i_flushing_item)) {
list_add_tail(&ci->i_flushing_item, &session->s_cap_flushing);
Expand Down Expand Up @@ -1533,7 +1553,7 @@ void ceph_check_caps(struct ceph_inode_info *ci, int flags,
struct ceph_mds_client *mdsc = fsc->mdsc;
struct inode *inode = &ci->vfs_inode;
struct ceph_cap *cap;
u64 flush_tid;
u64 flush_tid, oldest_flush_tid;
int file_wanted, used, cap_used;
int took_snap_rwsem = 0; /* true if mdsc->snap_rwsem held */
int issued, implemented, want, retain, revoking, flushing = 0;
Expand Down Expand Up @@ -1754,18 +1774,23 @@ void ceph_check_caps(struct ceph_inode_info *ci, int flags,

if (cap == ci->i_auth_cap && ci->i_dirty_caps) {
flushing = __mark_caps_flushing(inode, session,
&flush_tid);
&flush_tid,
&oldest_flush_tid);
} else {
flushing = 0;
flush_tid = 0;
spin_lock(&mdsc->cap_dirty_lock);
oldest_flush_tid = __get_oldest_flush_tid(mdsc);
spin_unlock(&mdsc->cap_dirty_lock);
}

mds = cap->mds; /* remember mds, so we don't repeat */
sent++;

/* __send_cap drops i_ceph_lock */
delayed += __send_cap(mdsc, cap, CEPH_CAP_OP_UPDATE, cap_used,
want, retain, flushing, flush_tid);
want, retain, flushing,
flush_tid, oldest_flush_tid);
goto retry; /* retake i_ceph_lock and restart our cap scan. */
}

Expand Down Expand Up @@ -1800,7 +1825,7 @@ static int try_flush_caps(struct inode *inode, u64 *ptid)
struct ceph_inode_info *ci = ceph_inode(inode);
struct ceph_mds_session *session = NULL;
int flushing = 0;
u64 flush_tid = 0;
u64 flush_tid = 0, oldest_flush_tid = 0;

retry:
spin_lock(&ci->i_ceph_lock);
Expand All @@ -1825,12 +1850,13 @@ static int try_flush_caps(struct inode *inode, u64 *ptid)
if (cap->session->s_state < CEPH_MDS_SESSION_OPEN)
goto out;

flushing = __mark_caps_flushing(inode, session, &flush_tid);
flushing = __mark_caps_flushing(inode, session, &flush_tid,
&oldest_flush_tid);

/* __send_cap drops i_ceph_lock */
delayed = __send_cap(mdsc, cap, CEPH_CAP_OP_FLUSH, used, want,
(cap->issued | cap->implemented),
flushing, flush_tid);
flushing, flush_tid, oldest_flush_tid);

if (delayed) {
spin_lock(&ci->i_ceph_lock);
Expand Down Expand Up @@ -2083,6 +2109,11 @@ static int __kick_flushing_caps(struct ceph_mds_client *mdsc,
struct rb_node *n;
int delayed = 0;
u64 first_tid = 0;
u64 oldest_flush_tid;

spin_lock(&mdsc->cap_dirty_lock);
oldest_flush_tid = __get_oldest_flush_tid(mdsc);
spin_unlock(&mdsc->cap_dirty_lock);

while (true) {
spin_lock(&ci->i_ceph_lock);
Expand Down Expand Up @@ -2113,7 +2144,7 @@ static int __kick_flushing_caps(struct ceph_mds_client *mdsc,
__ceph_caps_used(ci),
__ceph_caps_wanted(ci),
cap->issued | cap->implemented,
cf->caps, cf->tid);
cf->caps, cf->tid, oldest_flush_tid);
}
return delayed;
}
Expand Down

0 comments on commit a2971c8

Please sign in to comment.