Skip to content

Commit

Permalink
ceph: alloc message data pages and check if tid exists
Browse files Browse the repository at this point in the history
Now doing it in the same callback that is also responsible for
allocating the 'front' part of the message. If we get a message
that we haven't got a corresponding tid for, mark it for skipping.

Moving the mutex unlock/lock from the osd alloc_msg callback
to the calling function in the messenger.

Signed-off-by: Yehuda Sadeh <yehuda@hq.newdream.net>
  • Loading branch information
Yehuda Sadeh authored and Sage Weil committed Jan 25, 2010
1 parent 9d7f0f1 commit 0547a9b
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 59 deletions.
33 changes: 2 additions & 31 deletions fs/ceph/messenger.c
Original file line number Diff line number Diff line change
Expand Up @@ -2114,25 +2114,6 @@ static int ceph_alloc_middle(struct ceph_connection *con, struct ceph_msg *msg)
return 0;
}

static int ceph_alloc_data_section(struct ceph_connection *con, struct ceph_msg *msg)
{
int ret;
int want;
int data_len = le32_to_cpu(msg->hdr.data_len);
unsigned data_off = le16_to_cpu(msg->hdr.data_off);

want = calc_pages_for(data_off & ~PAGE_MASK, data_len);
ret = -1;
mutex_unlock(&con->mutex);
if (con->ops->prepare_pages)
ret = con->ops->prepare_pages(con, msg, want);
mutex_lock(&con->mutex);

BUG_ON(msg->nr_pages < want);

return ret;
}

/*
* Generic message allocator, for incoming messages.
*/
Expand All @@ -2143,12 +2124,13 @@ static struct ceph_msg *ceph_alloc_msg(struct ceph_connection *con,
int type = le16_to_cpu(hdr->type);
int front_len = le32_to_cpu(hdr->front_len);
int middle_len = le32_to_cpu(hdr->middle_len);
int data_len = le32_to_cpu(hdr->data_len);
struct ceph_msg *msg = NULL;
int ret;

if (con->ops->alloc_msg) {
mutex_unlock(&con->mutex);
msg = con->ops->alloc_msg(con, hdr, skip);
mutex_lock(&con->mutex);
if (IS_ERR(msg))
return msg;

Expand All @@ -2175,17 +2157,6 @@ static struct ceph_msg *ceph_alloc_msg(struct ceph_connection *con,
}
}

if (data_len) {
ret = ceph_alloc_data_section(con, msg);

if (ret < 0) {
*skip = 1;
ceph_msg_put(msg);
return NULL;
}
}


return msg;
}

Expand Down
4 changes: 0 additions & 4 deletions fs/ceph/messenger.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,6 @@ struct ceph_connection_operations {
struct ceph_msg * (*alloc_msg) (struct ceph_connection *con,
struct ceph_msg_header *hdr,
int *skip);
/* an incoming message has a data payload; tell me what pages I
* should read the data into. */
int (*prepare_pages) (struct ceph_connection *con, struct ceph_msg *m,
int want);
};

extern const char *ceph_name_type_str(int t);
Expand Down
1 change: 1 addition & 0 deletions fs/ceph/mon_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -701,6 +701,7 @@ static struct ceph_msg *mon_alloc_msg(struct ceph_connection *con,
struct ceph_msg *m;

*skip = 0;

switch (type) {
case CEPH_MSG_MON_SUBSCRIBE_ACK:
m = ceph_msgpool_get(&monc->msgpool_subscribe_ack, front_len);
Expand Down
66 changes: 42 additions & 24 deletions fs/ceph/osd_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -998,31 +998,26 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)
* find those pages.
* 0 = success, -1 failure.
*/
static int prepare_pages(struct ceph_connection *con, struct ceph_msg *m,
int want)
static int prepare_pages(struct ceph_connection *con,
struct ceph_msg_header *hdr,
struct ceph_osd_request *req,
u64 tid,
struct ceph_msg *m)
{
struct ceph_osd *osd = con->private;
struct ceph_osd_client *osdc;
struct ceph_osd_request *req;
u64 tid;
int ret = -1;
int type = le16_to_cpu(m->hdr.type);
int data_len = le32_to_cpu(hdr->data_len);
unsigned data_off = le16_to_cpu(hdr->data_off);

int want = calc_pages_for(data_off & ~PAGE_MASK, data_len);

if (!osd)
return -1;

osdc = osd->o_osdc;

dout("prepare_pages on msg %p want %d\n", m, want);
if (unlikely(type != CEPH_MSG_OSD_OPREPLY))
return -1; /* hmm! */

tid = le64_to_cpu(m->hdr.tid);
mutex_lock(&osdc->request_mutex);
req = __lookup_request(osdc, tid);
if (!req) {
dout("prepare_pages unknown tid %llu\n", tid);
goto out;
}
dout("prepare_pages tid %llu has %d pages, want %d\n",
tid, req->r_num_pages, want);
if (unlikely(req->r_num_pages < want))
Expand All @@ -1040,7 +1035,8 @@ static int prepare_pages(struct ceph_connection *con, struct ceph_msg *m,
m->nr_pages = req->r_num_pages;
ret = 0; /* success */
out:
mutex_unlock(&osdc->request_mutex);
BUG_ON(ret < 0 || m->nr_pages < want);

return ret;
}

Expand Down Expand Up @@ -1311,19 +1307,42 @@ static struct ceph_msg *alloc_msg(struct ceph_connection *con,
struct ceph_osd_client *osdc = osd->o_osdc;
int type = le16_to_cpu(hdr->type);
int front = le32_to_cpu(hdr->front_len);
int data_len = le32_to_cpu(hdr->data_len);
struct ceph_msg *m;
struct ceph_osd_request *req;
u64 tid;
int err;

*skip = 0;
switch (type) {
case CEPH_MSG_OSD_OPREPLY:
m = ceph_msgpool_get(&osdc->msgpool_op_reply, front);
break;
default:
if (type != CEPH_MSG_OSD_OPREPLY)
return NULL;
}

if (!m)
tid = le64_to_cpu(hdr->tid);
mutex_lock(&osdc->request_mutex);
req = __lookup_request(osdc, tid);
if (!req) {
*skip = 1;
m = NULL;
dout("prepare_pages unknown tid %llu\n", tid);
goto out;
}
m = ceph_msgpool_get(&osdc->msgpool_op_reply, front);
if (!m) {
*skip = 1;
goto out;
}

if (data_len > 0) {
err = prepare_pages(con, hdr, req, tid, m);
if (err < 0) {
*skip = 1;
ceph_msg_put(m);
m = ERR_PTR(err);
}
}

out:
mutex_unlock(&osdc->request_mutex);

return m;
}
Expand Down Expand Up @@ -1400,5 +1419,4 @@ const static struct ceph_connection_operations osd_con_ops = {
.verify_authorizer_reply = verify_authorizer_reply,
.alloc_msg = alloc_msg,
.fault = osd_reset,
.prepare_pages = prepare_pages,
};

0 comments on commit 0547a9b

Please sign in to comment.