Skip to content

Commit

Permalink
ceph: allocate middle of message before stating to read
Browse files Browse the repository at this point in the history
Both front and middle parts of the message are now being
allocated at the ceph_alloc_msg().

Signed-off-by: Yehuda Sadeh <yehuda@hq.newdream.net>
  • Loading branch information
Yehuda Sadeh authored and Sage Weil committed Jan 25, 2010
1 parent 5b1daec commit 2450418
Show file tree
Hide file tree
Showing 5 changed files with 115 additions and 80 deletions.
2 changes: 0 additions & 2 deletions fs/ceph/mds_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -2953,8 +2953,6 @@ const static struct ceph_connection_operations mds_con_ops = {
.get_authorizer = get_authorizer,
.verify_authorizer_reply = verify_authorizer_reply,
.peer_reset = peer_reset,
.alloc_msg = ceph_alloc_msg,
.alloc_middle = ceph_alloc_middle,
};


Expand Down
142 changes: 82 additions & 60 deletions fs/ceph/messenger.c
Original file line number Diff line number Diff line change
Expand Up @@ -1279,8 +1279,34 @@ static void process_ack(struct ceph_connection *con)



static int read_partial_message_section(struct ceph_connection *con,
struct kvec *section, unsigned int sec_len,
u32 *crc)
{
int left;
int ret;

BUG_ON(!section);

while (section->iov_len < sec_len) {
BUG_ON(section->iov_base == NULL);
left = sec_len - section->iov_len;
ret = ceph_tcp_recvmsg(con->sock, (char *)section->iov_base +
section->iov_len, left);
if (ret <= 0)
return ret;
section->iov_len += ret;
if (section->iov_len == sec_len)
*crc = crc32c(0, section->iov_base,
section->iov_len);
}

return 1;
}

static struct ceph_msg *ceph_alloc_msg(struct ceph_connection *con,
struct ceph_msg_header *hdr,
int *skip);
/*
* read (part of) a message.
*/
Expand All @@ -1292,6 +1318,7 @@ static int read_partial_message(struct ceph_connection *con)
int to, want, left;
unsigned front_len, middle_len, data_len, data_off;
int datacrc = con->msgr->nocrc;
int skip;

dout("read_partial_message con %p msg %p\n", con, m);

Expand All @@ -1315,7 +1342,6 @@ static int read_partial_message(struct ceph_connection *con)
}
}
}

front_len = le32_to_cpu(con->in_hdr.front_len);
if (front_len > CEPH_MSG_MAX_FRONT_LEN)
return -EIO;
Expand All @@ -1330,8 +1356,8 @@ static int read_partial_message(struct ceph_connection *con)
if (!con->in_msg) {
dout("got hdr type %d front %d data %d\n", con->in_hdr.type,
con->in_hdr.front_len, con->in_hdr.data_len);
con->in_msg = con->ops->alloc_msg(con, &con->in_hdr);
if (!con->in_msg) {
con->in_msg = ceph_alloc_msg(con, &con->in_hdr, &skip);
if (skip) {
/* skip this message */
pr_err("alloc_msg returned NULL, skipping message\n");
con->in_base_pos = -front_len - middle_len - data_len -
Expand All @@ -1342,56 +1368,28 @@ static int read_partial_message(struct ceph_connection *con)
if (IS_ERR(con->in_msg)) {
ret = PTR_ERR(con->in_msg);
con->in_msg = NULL;
con->error_msg = "out of memory for incoming message";
con->error_msg = "error allocating memory for incoming message";
return ret;
}
m = con->in_msg;
m->front.iov_len = 0; /* haven't read it yet */
if (m->middle)
m->middle->vec.iov_len = 0;
memcpy(&m->hdr, &con->in_hdr, sizeof(con->in_hdr));
}

/* front */
while (m->front.iov_len < front_len) {
BUG_ON(m->front.iov_base == NULL);
left = front_len - m->front.iov_len;
ret = ceph_tcp_recvmsg(con->sock, (char *)m->front.iov_base +
m->front.iov_len, left);
if (ret <= 0)
return ret;
m->front.iov_len += ret;
if (m->front.iov_len == front_len)
con->in_front_crc = crc32c(0, m->front.iov_base,
m->front.iov_len);
}
ret = read_partial_message_section(con, &m->front, front_len,
&con->in_front_crc);
if (ret <= 0)
return ret;

/* middle */
while (middle_len > 0 && (!m->middle ||
m->middle->vec.iov_len < middle_len)) {
if (m->middle == NULL) {
ret = -EOPNOTSUPP;
if (con->ops->alloc_middle)
ret = con->ops->alloc_middle(con, m);
if (ret < 0) {
pr_err("alloc_middle fail skipping payload\n");
con->in_base_pos = -middle_len - data_len
- sizeof(m->footer);
ceph_msg_put(con->in_msg);
con->in_msg = NULL;
con->in_tag = CEPH_MSGR_TAG_READY;
return 0;
}
m->middle->vec.iov_len = 0;
}
left = middle_len - m->middle->vec.iov_len;
ret = ceph_tcp_recvmsg(con->sock,
(char *)m->middle->vec.iov_base +
m->middle->vec.iov_len, left);
if (m->middle) {
ret = read_partial_message_section(con, &m->middle->vec, middle_len,
&con->in_middle_crc);
if (ret <= 0)
return ret;
m->middle->vec.iov_len += ret;
if (m->middle->vec.iov_len == middle_len)
con->in_middle_crc = crc32c(0, m->middle->vec.iov_base,
m->middle->vec.iov_len);
}

/* (page) data */
Expand Down Expand Up @@ -2115,32 +2113,14 @@ struct ceph_msg *ceph_msg_new(int type, int front_len,
return ERR_PTR(-ENOMEM);
}

/*
* Generic message allocator, for incoming messages.
*/
struct ceph_msg *ceph_alloc_msg(struct ceph_connection *con,
struct ceph_msg_header *hdr)
{
int type = le16_to_cpu(hdr->type);
int front_len = le32_to_cpu(hdr->front_len);
struct ceph_msg *msg = ceph_msg_new(type, front_len, 0, 0, NULL);

if (!msg) {
pr_err("unable to allocate msg type %d len %d\n",
type, front_len);
return ERR_PTR(-ENOMEM);
}
return msg;
}

/*
* Allocate "middle" portion of a message, if it is needed and wasn't
* allocated by alloc_msg. This allows us to read a small fixed-size
* per-type header in the front and then gracefully fail (i.e.,
* propagate the error to the caller based on info in the front) when
* the middle is too large.
*/
int ceph_alloc_middle(struct ceph_connection *con, struct ceph_msg *msg)
static int ceph_alloc_middle(struct ceph_connection *con, struct ceph_msg *msg)
{
int type = le16_to_cpu(msg->hdr.type);
int middle_len = le32_to_cpu(msg->hdr.middle_len);
Expand All @@ -2156,6 +2136,48 @@ int ceph_alloc_middle(struct ceph_connection *con, struct ceph_msg *msg)
return 0;
}

/*
* Generic message allocator, for incoming messages.
*/
static struct ceph_msg *ceph_alloc_msg(struct ceph_connection *con,
struct ceph_msg_header *hdr,
int *skip)
{
int type = le16_to_cpu(hdr->type);
int front_len = le32_to_cpu(hdr->front_len);
int middle_len = le32_to_cpu(hdr->middle_len);
struct ceph_msg *msg = NULL;
int ret;

if (con->ops->alloc_msg) {
msg = con->ops->alloc_msg(con, hdr, skip);
if (IS_ERR(msg))
return msg;

if (*skip)
return NULL;
}
if (!msg) {
*skip = 0;
msg = ceph_msg_new(type, front_len, 0, 0, NULL);
if (!msg) {
pr_err("unable to allocate msg type %d len %d\n",
type, front_len);
return ERR_PTR(-ENOMEM);
}
}

if (middle_len) {
ret = ceph_alloc_middle(con, msg);

if (ret < 0) {
ceph_msg_put(msg);
return msg;
}
}
return msg;
}


/*
* Free a generically kmalloc'd message.
Expand Down
9 changes: 2 additions & 7 deletions fs/ceph/messenger.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,8 @@ struct ceph_connection_operations {
void (*peer_reset) (struct ceph_connection *con);

struct ceph_msg * (*alloc_msg) (struct ceph_connection *con,
struct ceph_msg_header *hdr);
int (*alloc_middle) (struct ceph_connection *con,
struct ceph_msg *msg);
struct ceph_msg_header *hdr,
int *skip);
/* an incoming message has a data payload; tell me what pages I
* should read the data into. */
int (*prepare_pages) (struct ceph_connection *con, struct ceph_msg *m,
Expand Down Expand Up @@ -242,10 +241,6 @@ extern struct ceph_msg *ceph_msg_new(int type, int front_len,
struct page **pages);
extern void ceph_msg_kfree(struct ceph_msg *m);

extern struct ceph_msg *ceph_alloc_msg(struct ceph_connection *con,
struct ceph_msg_header *hdr);
extern int ceph_alloc_middle(struct ceph_connection *con, struct ceph_msg *msg);


static inline struct ceph_msg *ceph_msg_get(struct ceph_msg *msg)
{
Expand Down
25 changes: 18 additions & 7 deletions fs/ceph/mon_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -692,21 +692,33 @@ static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
* Allocate memory for incoming message
*/
static struct ceph_msg *mon_alloc_msg(struct ceph_connection *con,
struct ceph_msg_header *hdr)
struct ceph_msg_header *hdr,
int *skip)
{
struct ceph_mon_client *monc = con->private;
int type = le16_to_cpu(hdr->type);
int front = le32_to_cpu(hdr->front_len);
int front_len = le32_to_cpu(hdr->front_len);
struct ceph_msg *m;

*skip = 0;
switch (type) {
case CEPH_MSG_MON_SUBSCRIBE_ACK:
return ceph_msgpool_get(&monc->msgpool_subscribe_ack, front);
m = ceph_msgpool_get(&monc->msgpool_subscribe_ack, front_len);
break;
case CEPH_MSG_STATFS_REPLY:
return ceph_msgpool_get(&monc->msgpool_statfs_reply, front);
m = ceph_msgpool_get(&monc->msgpool_statfs_reply, front_len);
break;
case CEPH_MSG_AUTH_REPLY:
return ceph_msgpool_get(&monc->msgpool_auth_reply, front);
m = ceph_msgpool_get(&monc->msgpool_auth_reply, front_len);
break;
default:
return NULL;
}
return ceph_alloc_msg(con, hdr);

if (!m)
*skip = 1;

return m;
}

/*
Expand Down Expand Up @@ -749,5 +761,4 @@ const static struct ceph_connection_operations mon_con_ops = {
.dispatch = dispatch,
.fault = mon_fault,
.alloc_msg = mon_alloc_msg,
.alloc_middle = ceph_alloc_middle,
};
17 changes: 13 additions & 4 deletions fs/ceph/osd_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -1304,18 +1304,28 @@ static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
}

static struct ceph_msg *alloc_msg(struct ceph_connection *con,
struct ceph_msg_header *hdr)
struct ceph_msg_header *hdr,
int *skip)
{
struct ceph_osd *osd = con->private;
struct ceph_osd_client *osdc = osd->o_osdc;
int type = le16_to_cpu(hdr->type);
int front = le32_to_cpu(hdr->front_len);
struct ceph_msg *m;

*skip = 0;
switch (type) {
case CEPH_MSG_OSD_OPREPLY:
return ceph_msgpool_get(&osdc->msgpool_op_reply, front);
m = ceph_msgpool_get(&osdc->msgpool_op_reply, front);
break;
default:
return NULL;
}
return ceph_alloc_msg(con, hdr);

if (!m)
*skip = 1;

return m;
}

/*
Expand Down Expand Up @@ -1390,6 +1400,5 @@ const static struct ceph_connection_operations osd_con_ops = {
.verify_authorizer_reply = verify_authorizer_reply,
.alloc_msg = alloc_msg,
.fault = osd_reset,
.alloc_middle = ceph_alloc_middle,
.prepare_pages = prepare_pages,
};

0 comments on commit 2450418

Please sign in to comment.