Skip to content

Commit

Permalink
---
Browse files Browse the repository at this point in the history
yaml
---
r: 208863
b: refs/heads/master
c: e56fa10
h: refs/heads/master
i:
  208861: 58960af
  208859: 5c687b0
  208855: 9afc4ef
  208847: b0039a4
  208831: 44f0b28
v: v3
  • Loading branch information
Yehuda Sadeh authored and Sage Weil committed Aug 10, 2010
1 parent 57308b7 commit ab0a748
Show file tree
Hide file tree
Showing 3 changed files with 159 additions and 18 deletions.
2 changes: 1 addition & 1 deletion [refs]
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
---
refs/heads/master: 0eb6cd49f6e3ec523787d09cf08d3179be270db4
refs/heads/master: e56fa10e92e077d456cbc33b7025032887772b33
170 changes: 153 additions & 17 deletions trunk/fs/ceph/mon_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ static void ceph_monc_handle_map(struct ceph_mon_client *monc,
}

/*
* statfs
* generic requests (e.g., statfs, poolop)
*/
static struct ceph_mon_generic_request *__lookup_generic_req(
struct ceph_mon_client *monc, u64 tid)
Expand Down Expand Up @@ -442,6 +442,35 @@ static struct ceph_msg *get_generic_reply(struct ceph_connection *con,
return m;
}

static int do_generic_request(struct ceph_mon_client *monc,
struct ceph_mon_generic_request *req)
{
int err;

/* register request */
mutex_lock(&monc->mutex);
req->tid = ++monc->last_tid;
req->request->hdr.tid = cpu_to_le64(req->tid);
__insert_generic_request(monc, req);
monc->num_generic_requests++;
ceph_con_send(monc->con, ceph_msg_get(req->request));
mutex_unlock(&monc->mutex);

err = wait_for_completion_interruptible(&req->completion);

mutex_lock(&monc->mutex);
rb_erase(&req->node, &monc->generic_request_tree);
monc->num_generic_requests--;
mutex_unlock(&monc->mutex);

if (!err)
err = req->result;
return err;
}

/*
* statfs
*/
static void handle_statfs_reply(struct ceph_mon_client *monc,
struct ceph_msg *msg)
{
Expand All @@ -468,7 +497,7 @@ static void handle_statfs_reply(struct ceph_mon_client *monc,
return;

bad:
pr_err("corrupt generic reply, no tid\n");
pr_err("corrupt generic reply, tid %llu\n", tid);
ceph_msg_dump(msg);
}

Expand All @@ -487,6 +516,7 @@ int ceph_monc_do_statfs(struct ceph_mon_client *monc, struct ceph_statfs *buf)

kref_init(&req->kref);
req->buf = buf;
req->buf_len = sizeof(*buf);
init_completion(&req->completion);

err = -ENOMEM;
Expand All @@ -504,33 +534,134 @@ int ceph_monc_do_statfs(struct ceph_mon_client *monc, struct ceph_statfs *buf)
h->monhdr.session_mon_tid = 0;
h->fsid = monc->monmap->fsid;

/* register request */
mutex_lock(&monc->mutex);
req->tid = ++monc->last_tid;
req->request->hdr.tid = cpu_to_le64(req->tid);
__insert_generic_request(monc, req);
monc->num_generic_requests++;
mutex_unlock(&monc->mutex);
err = do_generic_request(monc, req);

/* send request and wait */
ceph_con_send(monc->con, ceph_msg_get(req->request));
err = wait_for_completion_interruptible(&req->completion);
out:
kref_put(&req->kref, release_generic_request);
return err;
}

/*
* pool ops
*/
static int get_poolop_reply_buf(const char *src, size_t src_len,
char *dst, size_t dst_len)
{
u32 buf_len;

if (src_len != sizeof(u32) + dst_len)
return -EINVAL;

buf_len = le32_to_cpu(*(u32 *)src);
if (buf_len != dst_len)
return -EINVAL;

memcpy(dst, src + sizeof(u32), dst_len);
return 0;
}

static void handle_poolop_reply(struct ceph_mon_client *monc,
struct ceph_msg *msg)
{
struct ceph_mon_generic_request *req;
struct ceph_mon_poolop_reply *reply = msg->front.iov_base;
u64 tid = le64_to_cpu(msg->hdr.tid);

if (msg->front.iov_len < sizeof(*reply))
goto bad;
dout("handle_poolop_reply %p tid %llu\n", msg, tid);

mutex_lock(&monc->mutex);
rb_erase(&req->node, &monc->generic_request_tree);
monc->num_generic_requests--;
req = __lookup_generic_req(monc, tid);
if (req) {
if (req->buf_len &&
get_poolop_reply_buf(msg->front.iov_base + sizeof(*reply),
msg->front.iov_len - sizeof(*reply),
req->buf, req->buf_len) < 0) {
mutex_unlock(&monc->mutex);
goto bad;
}
req->result = le32_to_cpu(reply->reply_code);
get_generic_request(req);
}
mutex_unlock(&monc->mutex);
if (req) {
complete(&req->completion);
put_generic_request(req);
}
return;

if (!err)
err = req->result;
bad:
pr_err("corrupt generic reply, tid %llu\n", tid);
ceph_msg_dump(msg);
}

/*
* Do a synchronous pool op.
*/
int ceph_monc_do_poolop(struct ceph_mon_client *monc, u32 op,
u32 pool, u64 snapid,
char *buf, int len)
{
struct ceph_mon_generic_request *req;
struct ceph_mon_poolop *h;
int err;

req = kzalloc(sizeof(*req), GFP_NOFS);
if (!req)
return -ENOMEM;

kref_init(&req->kref);
req->buf = buf;
req->buf_len = len;
init_completion(&req->completion);

err = -ENOMEM;
req->request = ceph_msg_new(CEPH_MSG_POOLOP, sizeof(*h), GFP_NOFS);
if (!req->request)
goto out;
req->reply = ceph_msg_new(CEPH_MSG_POOLOP_REPLY, 1024, GFP_NOFS);
if (!req->reply)
goto out;

/* fill out request */
req->request->hdr.version = cpu_to_le16(2);
h = req->request->front.iov_base;
h->monhdr.have_version = 0;
h->monhdr.session_mon = cpu_to_le16(-1);
h->monhdr.session_mon_tid = 0;
h->fsid = monc->monmap->fsid;
h->pool = cpu_to_le32(pool);
h->op = cpu_to_le32(op);
h->auid = 0;
h->snapid = cpu_to_le64(snapid);
h->name_len = 0;

err = do_generic_request(monc, req);

out:
kref_put(&req->kref, release_generic_request);
return err;
}

int ceph_monc_create_snapid(struct ceph_mon_client *monc,
u32 pool, u64 *snapid)
{
return ceph_monc_do_poolop(monc, POOL_OP_CREATE_UNMANAGED_SNAP,
pool, 0, (char *)snapid, sizeof(*snapid));

}

int ceph_monc_delete_snapid(struct ceph_mon_client *monc,
u32 pool, u64 snapid)
{
return ceph_monc_do_poolop(monc, POOL_OP_CREATE_UNMANAGED_SNAP,
pool, snapid, 0, 0);

}

/*
* Resend pending statfs requests.
* Resend pending generic requests.
*/
static void __resend_generic_request(struct ceph_mon_client *monc)
{
Expand Down Expand Up @@ -783,6 +914,10 @@ static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
handle_statfs_reply(monc, msg);
break;

case CEPH_MSG_POOLOP_REPLY:
handle_poolop_reply(monc, msg);
break;

case CEPH_MSG_MON_MAP:
ceph_monc_handle_map(monc, msg);
break;
Expand Down Expand Up @@ -820,6 +955,7 @@ static struct ceph_msg *mon_alloc_msg(struct ceph_connection *con,
case CEPH_MSG_MON_SUBSCRIBE_ACK:
m = ceph_msg_get(monc->m_subscribe_ack);
break;
case CEPH_MSG_POOLOP_REPLY:
case CEPH_MSG_STATFS_REPLY:
return get_generic_reply(con, hdr, skip);
case CEPH_MSG_AUTH_REPLY:
Expand Down
5 changes: 5 additions & 0 deletions trunk/fs/ceph/mon_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ struct ceph_mon_generic_request {
struct rb_node node;
int result;
void *buf;
int buf_len;
struct completion completion;
struct ceph_msg *request; /* original request */
struct ceph_msg *reply; /* and reply */
Expand Down Expand Up @@ -111,6 +112,10 @@ extern int ceph_monc_open_session(struct ceph_mon_client *monc);

extern int ceph_monc_validate_auth(struct ceph_mon_client *monc);

extern int ceph_monc_create_snapid(struct ceph_mon_client *monc,
u32 pool, u64 *snapid);

extern int ceph_monc_delete_snapid(struct ceph_mon_client *monc,
u32 pool, u64 snapid);

#endif

0 comments on commit ab0a748

Please sign in to comment.