Skip to content

Commit

Permalink
ceph: drop messages on unregistered mds sessions; cleanup
Browse files Browse the repository at this point in the history
Verify the mds session is currently registered before handling
incoming messages.  Clean up message handlers to pull mds out
of session->s_mds instead of less trustworthy src field.

Clean up con_{get,put} debug output.

Signed-off-by: Sage Weil <sage@newdream.net>
  • Loading branch information
Sage Weil committed Feb 23, 2010
1 parent a636974 commit 2600d2d
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 59 deletions.
2 changes: 1 addition & 1 deletion fs/ceph/caps.c
Original file line number Diff line number Diff line change
Expand Up @@ -2600,7 +2600,7 @@ void ceph_handle_caps(struct ceph_mds_session *session,
struct inode *inode;
struct ceph_cap *cap;
struct ceph_mds_caps *h;
int mds = le64_to_cpu(msg->hdr.src.name.num);
int mds = session->s_mds;
int op;
u32 seq;
struct ceph_vino vino;
Expand Down
85 changes: 42 additions & 43 deletions fs/ceph/mds_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,15 @@ static bool __have_session(struct ceph_mds_client *mdsc, int mds)
return mdsc->sessions[mds];
}

static int __verify_registered_session(struct ceph_mds_client *mdsc,
struct ceph_mds_session *s)
{
if (s->s_mds >= mdsc->max_sessions ||
mdsc->sessions[s->s_mds] != s)
return -ENOENT;
return 0;
}

/*
* create+register a new session for given mds.
* called under mdsc->mutex.
Expand Down Expand Up @@ -382,10 +391,11 @@ static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc,
/*
* called under mdsc->mutex
*/
static void unregister_session(struct ceph_mds_client *mdsc,
static void __unregister_session(struct ceph_mds_client *mdsc,
struct ceph_mds_session *s)
{
dout("unregister_session mds%d %p\n", s->s_mds, s);
dout("__unregister_session mds%d %p\n", s->s_mds, s);
BUG_ON(mdsc->sessions[s->s_mds] != s);
mdsc->sessions[s->s_mds] = NULL;
ceph_con_close(&s->s_con);
ceph_put_mds_session(s);
Expand Down Expand Up @@ -1740,10 +1750,8 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
struct ceph_mds_reply_info_parsed *rinfo; /* parsed reply info */
u64 tid;
int err, result;
int mds;
int mds = session->s_mds;

if (msg->hdr.src.name.type != CEPH_ENTITY_TYPE_MDS)
return;
if (msg->front.iov_len < sizeof(*head)) {
pr_err("mdsc_handle_reply got corrupt (short) reply\n");
ceph_msg_dump(msg);
Expand All @@ -1760,7 +1768,6 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
return;
}
dout("handle_reply %p\n", req);
mds = le64_to_cpu(msg->hdr.src.name.num);

/* correct session? */
if (!req->r_session && req->r_session != session) {
Expand Down Expand Up @@ -1884,7 +1891,9 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
/*
* handle mds notification that our request has been forwarded.
*/
static void handle_forward(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
static void handle_forward(struct ceph_mds_client *mdsc,
struct ceph_mds_session *session,
struct ceph_msg *msg)
{
struct ceph_mds_request *req;
u64 tid;
Expand All @@ -1894,11 +1903,7 @@ static void handle_forward(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
int err = -EINVAL;
void *p = msg->front.iov_base;
void *end = p + msg->front.iov_len;
int from_mds, state;

if (msg->hdr.src.name.type != CEPH_ENTITY_TYPE_MDS)
goto bad;
from_mds = le64_to_cpu(msg->hdr.src.name.num);
int state;

ceph_decode_need(&p, end, sizeof(u64)+2*sizeof(u32), bad);
tid = ceph_decode_64(&p);
Expand All @@ -1915,6 +1920,9 @@ static void handle_forward(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
goto out; /* dup reply? */
}

if (next_mds >= mdsc->max_sessions)
goto out;

state = mdsc->sessions[next_mds]->s_state;
if (fwd_seq <= req->r_num_fwd) {
dout("forward %llu to mds%d - old seq %d <= %d\n",
Expand Down Expand Up @@ -1945,21 +1953,19 @@ static void handle_session(struct ceph_mds_session *session,
struct ceph_mds_client *mdsc = session->s_mdsc;
u32 op;
u64 seq;
int mds;
int mds = session->s_mds;
struct ceph_mds_session_head *h = msg->front.iov_base;
int wake = 0;

if (msg->hdr.src.name.type != CEPH_ENTITY_TYPE_MDS)
return;
mds = le64_to_cpu(msg->hdr.src.name.num);

/* decode */
if (msg->front.iov_len != sizeof(*h))
goto bad;
op = le32_to_cpu(h->op);
seq = le64_to_cpu(h->seq);

mutex_lock(&mdsc->mutex);
if (op == CEPH_SESSION_CLOSE)
__unregister_session(mdsc, session);
/* FIXME: this ttl calculation is generous */
session->s_ttl = jiffies + HZ*mdsc->mdsmap->m_session_autoclose;
mutex_unlock(&mdsc->mutex);
Expand Down Expand Up @@ -1990,7 +1996,6 @@ static void handle_session(struct ceph_mds_session *session,
break;

case CEPH_SESSION_CLOSE:
unregister_session(mdsc, session);
remove_session_caps(session);
wake = 1; /* for good measure */
complete(&mdsc->session_close_waiters);
Expand Down Expand Up @@ -2269,7 +2274,7 @@ static void check_new_map(struct ceph_mds_client *mdsc,
/* the session never opened, just close it
* out now */
__wake_requests(mdsc, &s->s_waiting);
unregister_session(mdsc, s);
__unregister_session(mdsc, s);
} else {
/* just close it */
mutex_unlock(&mdsc->mutex);
Expand Down Expand Up @@ -2329,24 +2334,22 @@ void __ceph_mdsc_drop_dentry_lease(struct dentry *dentry)
di->lease_session = NULL;
}

static void handle_lease(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
static void handle_lease(struct ceph_mds_client *mdsc,
struct ceph_mds_session *session,
struct ceph_msg *msg)
{
struct super_block *sb = mdsc->client->sb;
struct inode *inode;
struct ceph_mds_session *session;
struct ceph_inode_info *ci;
struct dentry *parent, *dentry;
struct ceph_dentry_info *di;
int mds;
int mds = session->s_mds;
struct ceph_mds_lease *h = msg->front.iov_base;
struct ceph_vino vino;
int mask;
struct qstr dname;
int release = 0;

if (msg->hdr.src.name.type != CEPH_ENTITY_TYPE_MDS)
return;
mds = le64_to_cpu(msg->hdr.src.name.num);
dout("handle_lease from mds%d\n", mds);

/* decode */
Expand All @@ -2360,15 +2363,6 @@ static void handle_lease(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
if (dname.len != get_unaligned_le32(h+1))
goto bad;

/* find session */
mutex_lock(&mdsc->mutex);
session = __ceph_lookup_mds_session(mdsc, mds);
mutex_unlock(&mdsc->mutex);
if (!session) {
pr_err("handle_lease got lease but no session mds%d\n", mds);
return;
}

mutex_lock(&session->s_mutex);
session->s_seq++;

Expand Down Expand Up @@ -2437,7 +2431,6 @@ static void handle_lease(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
out:
iput(inode);
mutex_unlock(&session->s_mutex);
ceph_put_mds_session(session);
return;

bad:
Expand Down Expand Up @@ -2794,7 +2787,7 @@ void ceph_mdsc_close_sessions(struct ceph_mds_client *mdsc)
for (i = 0; i < mdsc->max_sessions; i++) {
if (mdsc->sessions[i]) {
session = get_session(mdsc->sessions[i]);
unregister_session(mdsc, session);
__unregister_session(mdsc, session);
mutex_unlock(&mdsc->mutex);
mutex_lock(&session->s_mutex);
remove_session_caps(session);
Expand Down Expand Up @@ -2891,8 +2884,7 @@ static struct ceph_connection *con_get(struct ceph_connection *con)
struct ceph_mds_session *s = con->private;

if (get_session(s)) {
dout("mdsc con_get %p %d -> %d\n", s,
atomic_read(&s->s_ref) - 1, atomic_read(&s->s_ref));
dout("mdsc con_get %p ok (%d)\n", s, atomic_read(&s->s_ref));
return con;
}
dout("mdsc con_get %p FAIL\n", s);
Expand All @@ -2903,9 +2895,8 @@ static void con_put(struct ceph_connection *con)
{
struct ceph_mds_session *s = con->private;

dout("mdsc con_put %p %d -> %d\n", s, atomic_read(&s->s_ref),
atomic_read(&s->s_ref) - 1);
ceph_put_mds_session(s);
dout("mdsc con_put %p (%d)\n", s, atomic_read(&s->s_ref));
}

/*
Expand All @@ -2926,6 +2917,13 @@ static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
struct ceph_mds_client *mdsc = s->s_mdsc;
int type = le16_to_cpu(msg->hdr.type);

mutex_lock(&mdsc->mutex);
if (__verify_registered_session(mdsc, s) < 0) {
mutex_unlock(&mdsc->mutex);
goto out;
}
mutex_unlock(&mdsc->mutex);

switch (type) {
case CEPH_MSG_MDS_MAP:
ceph_mdsc_handle_map(mdsc, msg);
Expand All @@ -2937,22 +2935,23 @@ static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
handle_reply(s, msg);
break;
case CEPH_MSG_CLIENT_REQUEST_FORWARD:
handle_forward(mdsc, msg);
handle_forward(mdsc, s, msg);
break;
case CEPH_MSG_CLIENT_CAPS:
ceph_handle_caps(s, msg);
break;
case CEPH_MSG_CLIENT_SNAP:
ceph_handle_snap(mdsc, msg);
ceph_handle_snap(mdsc, s, msg);
break;
case CEPH_MSG_CLIENT_LEASE:
handle_lease(mdsc, msg);
handle_lease(mdsc, s, msg);
break;

default:
pr_err("received unknown message type %d %s\n", type,
ceph_msg_type_name(type));
}
out:
ceph_msg_put(msg);
}

Expand Down
17 changes: 2 additions & 15 deletions fs/ceph/snap.c
Original file line number Diff line number Diff line change
Expand Up @@ -713,11 +713,11 @@ static void flush_snaps(struct ceph_mds_client *mdsc)
* directory into another realm.
*/
void ceph_handle_snap(struct ceph_mds_client *mdsc,
struct ceph_mds_session *session,
struct ceph_msg *msg)
{
struct super_block *sb = mdsc->client->sb;
struct ceph_mds_session *session;
int mds;
int mds = session->s_mds;
u64 split;
int op;
int trace_len;
Expand All @@ -730,10 +730,6 @@ void ceph_handle_snap(struct ceph_mds_client *mdsc,
int i;
int locked_rwsem = 0;

if (msg->hdr.src.name.type != CEPH_ENTITY_TYPE_MDS)
return;
mds = le64_to_cpu(msg->hdr.src.name.num);

/* decode */
if (msg->front.iov_len < sizeof(*h))
goto bad;
Expand All @@ -749,15 +745,6 @@ void ceph_handle_snap(struct ceph_mds_client *mdsc,
dout("handle_snap from mds%d op %s split %llx tracelen %d\n", mds,
ceph_snap_op_name(op), split, trace_len);

/* find session */
mutex_lock(&mdsc->mutex);
session = __ceph_lookup_mds_session(mdsc, mds);
mutex_unlock(&mdsc->mutex);
if (!session) {
dout("WTF, got snap but no session for mds%d\n", mds);
return;
}

mutex_lock(&session->s_mutex);
session->s_seq++;
mutex_unlock(&session->s_mutex);
Expand Down
1 change: 1 addition & 0 deletions fs/ceph/super.h
Original file line number Diff line number Diff line change
Expand Up @@ -707,6 +707,7 @@ extern void ceph_put_snap_realm(struct ceph_mds_client *mdsc,
extern int ceph_update_snap_trace(struct ceph_mds_client *m,
void *p, void *e, bool deletion);
extern void ceph_handle_snap(struct ceph_mds_client *mdsc,
struct ceph_mds_session *session,
struct ceph_msg *msg);
extern void ceph_queue_cap_snap(struct ceph_inode_info *ci,
struct ceph_snap_context *snapc);
Expand Down

0 comments on commit 2600d2d

Please sign in to comment.