Skip to content

Commit

Permalink
ceph: use ceph_pagelist for mds reconnect message; change encoding (p…
Browse files Browse the repository at this point in the history
…rotocol change)

Use the ceph_pagelist to encode the MDS reconnect message.  We change the
message encoding (protocol change!) at the same time to make our life
easier (we don't know how many snaprealms we have when we start encoding).

An empty message implies the session is closed/does not exist.

Signed-off-by: Sage Weil <sage@newdream.net>
  • Loading branch information
Sage Weil committed Dec 23, 2009
1 parent 58bb3b3 commit 93cea5b
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 101 deletions.
2 changes: 1 addition & 1 deletion fs/ceph/ceph_fs.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
#define CEPH_MDS_PROTOCOL 9 /* cluster internal */
#define CEPH_MON_PROTOCOL 5 /* cluster internal */
#define CEPH_OSDC_PROTOCOL 22 /* server/client */
#define CEPH_MDSC_PROTOCOL 30 /* server/client */
#define CEPH_MDSC_PROTOCOL 31 /* server/client */
#define CEPH_MONC_PROTOCOL 15 /* server/client */


Expand Down
156 changes: 56 additions & 100 deletions fs/ceph/mds_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include "messenger.h"
#include "decode.h"
#include "auth.h"
#include "pagelist.h"

/*
* A cluster of MDS (metadata server) daemons is responsible for
Expand Down Expand Up @@ -1971,20 +1972,12 @@ static void replay_unsafe_requests(struct ceph_mds_client *mdsc,
/*
* Encode information about a cap for a reconnect with the MDS.
*/
struct encode_caps_data {
void **pp;
void *end;
int *num_caps;
};

static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap,
void *arg)
{
struct ceph_mds_cap_reconnect *rec;
struct ceph_mds_cap_reconnect rec;
struct ceph_inode_info *ci;
struct encode_caps_data *data = (struct encode_caps_data *)arg;
void *p = *(data->pp);
void *end = data->end;
struct ceph_pagelist *pagelist = arg;
char *path;
int pathlen, err;
u64 pathbase;
Expand All @@ -1995,8 +1988,9 @@ static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap,
dout(" adding %p ino %llx.%llx cap %p %lld %s\n",
inode, ceph_vinop(inode), cap, cap->cap_id,
ceph_cap_string(cap->issued));
ceph_decode_need(&p, end, sizeof(u64), needmore);
ceph_encode_64(&p, ceph_ino(inode));
err = ceph_pagelist_encode_64(pagelist, ceph_ino(inode));
if (err)
return err;

dentry = d_find_alias(inode);
if (dentry) {
Expand All @@ -2009,33 +2003,29 @@ static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap,
path = NULL;
pathlen = 0;
}
ceph_decode_need(&p, end, pathlen+4, needmore);
ceph_encode_string(&p, end, path, pathlen);
err = ceph_pagelist_encode_string(pagelist, path, pathlen);
if (err)
goto out;

ceph_decode_need(&p, end, sizeof(*rec), needmore);
rec = p;
p += sizeof(*rec);
BUG_ON(p > end);
spin_lock(&inode->i_lock);
cap->seq = 0; /* reset cap seq */
cap->issue_seq = 0; /* and issue_seq */
rec->cap_id = cpu_to_le64(cap->cap_id);
rec->pathbase = cpu_to_le64(pathbase);
rec->wanted = cpu_to_le32(__ceph_caps_wanted(ci));
rec->issued = cpu_to_le32(cap->issued);
rec->size = cpu_to_le64(inode->i_size);
ceph_encode_timespec(&rec->mtime, &inode->i_mtime);
ceph_encode_timespec(&rec->atime, &inode->i_atime);
rec->snaprealm = cpu_to_le64(ci->i_snap_realm->ino);
rec.cap_id = cpu_to_le64(cap->cap_id);
rec.pathbase = cpu_to_le64(pathbase);
rec.wanted = cpu_to_le32(__ceph_caps_wanted(ci));
rec.issued = cpu_to_le32(cap->issued);
rec.size = cpu_to_le64(inode->i_size);
ceph_encode_timespec(&rec.mtime, &inode->i_mtime);
ceph_encode_timespec(&rec.atime, &inode->i_atime);
rec.snaprealm = cpu_to_le64(ci->i_snap_realm->ino);
spin_unlock(&inode->i_lock);

err = ceph_pagelist_append(pagelist, &rec, sizeof(rec));

out:
kfree(path);
dput(dentry);
(*data->num_caps)++;
*(data->pp) = p;
return 0;
needmore:
return -ENOSPC;
return err;
}


Expand All @@ -2053,19 +2043,26 @@ static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap,
*/
static void send_mds_reconnect(struct ceph_mds_client *mdsc, int mds)
{
struct ceph_mds_session *session;
struct ceph_mds_session *session = NULL;
struct ceph_msg *reply;
int newlen, len = 4 + 1;
void *p, *end;
int err;
int num_caps, num_realms = 0;
int got;
u64 next_snap_ino = 0;
__le32 *pnum_caps, *pnum_realms;
struct encode_caps_data iter_args;
struct ceph_pagelist *pagelist;

pr_info("reconnect to recovering mds%d\n", mds);

pagelist = kmalloc(sizeof(*pagelist), GFP_NOFS);
if (!pagelist)
goto fail_nopagelist;
ceph_pagelist_init(pagelist);

reply = ceph_msg_new(CEPH_MSG_CLIENT_RECONNECT, 0, 0, 0, NULL);
if (IS_ERR(reply)) {
err = PTR_ERR(reply);
goto fail_nomsg;
}

/* find session */
session = __ceph_lookup_mds_session(mdsc, mds);
mutex_unlock(&mdsc->mutex); /* drop lock for duration */
Expand All @@ -2081,92 +2078,55 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc, int mds)

/* replay unsafe requests */
replay_unsafe_requests(mdsc, session);

/* estimate needed space */
len += session->s_nr_caps *
(100+sizeof(struct ceph_mds_cap_reconnect));
pr_info("estimating i need %d bytes for %d caps\n",
len, session->s_nr_caps);
} else {
dout("no session for mds%d, will send short reconnect\n",
mds);
}

down_read(&mdsc->snap_rwsem);

retry:
/* build reply */
reply = ceph_msg_new(CEPH_MSG_CLIENT_RECONNECT, len, 0, 0, NULL);
if (IS_ERR(reply)) {
err = PTR_ERR(reply);
pr_err("send_mds_reconnect ENOMEM on %d for mds%d\n",
len, mds);
goto out;
}
p = reply->front.iov_base;
end = p + len;

if (!session) {
ceph_encode_8(&p, 1); /* session was closed */
ceph_encode_32(&p, 0);
if (!session)
goto send;
}
dout("session %p state %s\n", session,
session_state_name(session->s_state));

/* traverse this session's caps */
ceph_encode_8(&p, 0);
pnum_caps = p;
ceph_encode_32(&p, session->s_nr_caps);
num_caps = 0;

iter_args.pp = &p;
iter_args.end = end;
iter_args.num_caps = &num_caps;
err = iterate_session_caps(session, encode_caps_cb, &iter_args);
if (err == -ENOSPC)
goto needmore;
err = ceph_pagelist_encode_32(pagelist, session->s_nr_caps);
if (err)
goto fail;
err = iterate_session_caps(session, encode_caps_cb, pagelist);
if (err < 0)
goto out;
*pnum_caps = cpu_to_le32(num_caps);

/*
* snaprealms. we provide mds with the ino, seq (version), and
* parent for all of our realms. If the mds has any newer info,
* it will tell us.
*/
next_snap_ino = 0;
/* save some space for the snaprealm count */
pnum_realms = p;
ceph_decode_need(&p, end, sizeof(*pnum_realms), needmore);
p += sizeof(*pnum_realms);
num_realms = 0;
while (1) {
struct ceph_snap_realm *realm;
struct ceph_mds_snaprealm_reconnect *sr_rec;
struct ceph_mds_snaprealm_reconnect sr_rec;
got = radix_tree_gang_lookup(&mdsc->snap_realms,
(void **)&realm, next_snap_ino, 1);
if (!got)
break;

dout(" adding snap realm %llx seq %lld parent %llx\n",
realm->ino, realm->seq, realm->parent_ino);
ceph_decode_need(&p, end, sizeof(*sr_rec), needmore);
sr_rec = p;
sr_rec->ino = cpu_to_le64(realm->ino);
sr_rec->seq = cpu_to_le64(realm->seq);
sr_rec->parent = cpu_to_le64(realm->parent_ino);
p += sizeof(*sr_rec);
num_realms++;
sr_rec.ino = cpu_to_le64(realm->ino);
sr_rec.seq = cpu_to_le64(realm->seq);
sr_rec.parent = cpu_to_le64(realm->parent_ino);
err = ceph_pagelist_append(pagelist, &sr_rec, sizeof(sr_rec));
if (err)
goto fail;
next_snap_ino = realm->ino + 1;
}
*pnum_realms = cpu_to_le32(num_realms);

send:
reply->front.iov_len = p - reply->front.iov_base;
reply->hdr.front_len = cpu_to_le32(reply->front.iov_len);
dout("final len was %u (guessed %d)\n",
(unsigned)reply->front.iov_len, len);
reply->pagelist = pagelist;
reply->hdr.data_len = cpu_to_le32(pagelist->length);
reply->nr_pages = calc_pages_for(0, pagelist->length);
ceph_con_send(&session->s_con, reply);

if (session) {
Expand All @@ -2183,18 +2143,14 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc, int mds)
mutex_lock(&mdsc->mutex);
return;

needmore:
/*
* we need a larger buffer. this doesn't very accurately
* factor in snap realms, but it's safe.
*/
num_caps += num_realms;
newlen = len * ((100 * (session->s_nr_caps+3)) / (num_caps + 1)) / 100;
pr_info("i guessed %d, and did %d of %d caps, retrying with %d\n",
len, num_caps, session->s_nr_caps, newlen);
len = newlen;
fail:
ceph_msg_put(reply);
goto retry;
fail_nomsg:
ceph_pagelist_release(pagelist);
kfree(pagelist);
fail_nopagelist:
pr_err("ENOMEM preparing reconnect for mds%d\n", mds);
goto out;
}


Expand Down

0 comments on commit 93cea5b

Please sign in to comment.