Skip to content

Commit

Permalink
libceph: collapse all data items into one
Browse files Browse the repository at this point in the history
It turns out that only one of the data item types is ever used at
any one time in a single message (currently).
    - A page array is used by the osd client (on behalf of the file
      system) and by rbd.  Only one osd op (and therefore at most
      one data item) is ever used at a time by rbd.  And the only
      time the file system sends two, the second op contains no
      data.
    - A bio is only used by the rbd client (and again, only one
      data item per message)
    - A page list is used by the file system and by rbd for outgoing
      data, but only one op (and one data item) at a time.

We can therefore collapse all three of our data item fields into a
single field "data", and depend on the messenger code to properly
handle it based on its type.

This allows us to eliminate quite a bit of duplicated code.

This is related to:
    http://tracker.ceph.com/issues/4429

Signed-off-by: Alex Elder <elder@inktank.com>
Reviewed-by: Josh Durgin <josh.durgin@inktank.com>
  • Loading branch information
Alex Elder authored and Sage Weil committed May 2, 2013
1 parent 686be20 commit 4c59b4a
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 95 deletions.
12 changes: 2 additions & 10 deletions include/linux/ceph/messenger.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,7 @@ struct ceph_messenger {
u32 required_features;
};

#define ceph_msg_has_pages(m) ((m)->p.type == CEPH_MSG_DATA_PAGES)
#define ceph_msg_has_pagelist(m) ((m)->l.type == CEPH_MSG_DATA_PAGELIST)
#ifdef CONFIG_BLOCK
#define ceph_msg_has_bio(m) ((m)->b.type == CEPH_MSG_DATA_BIO)
#endif /* CONFIG_BLOCK */
#define ceph_msg_has_data(m) ((m)->data.type != CEPH_MSG_DATA_NONE)

enum ceph_msg_data_type {
CEPH_MSG_DATA_NONE, /* message contains no data payload */
Expand Down Expand Up @@ -145,11 +141,7 @@ struct ceph_msg {
struct ceph_buffer *middle;

/* data payload */
struct ceph_msg_data p; /* pages */
struct ceph_msg_data l; /* pagelist */
#ifdef CONFIG_BLOCK
struct ceph_msg_data b; /* bio */
#endif /* CONFIG_BLOCK */
struct ceph_msg_data data;

struct ceph_connection *con;
struct list_head list_head; /* links for connection lists */
Expand Down
123 changes: 38 additions & 85 deletions net/ceph/messenger.c
Original file line number Diff line number Diff line change
Expand Up @@ -1085,22 +1085,15 @@ static void prepare_message_data(struct ceph_msg *msg,

/* initialize page iterator */
msg_pos->page = 0;
if (ceph_msg_has_pages(msg))
msg_pos->page_pos = msg->p.alignment;
if (ceph_msg_has_data(msg))
msg_pos->page_pos = msg->data.alignment;
else
msg_pos->page_pos = 0;
msg_pos->data_pos = 0;

/* Initialize data cursors */
/* Initialize data cursor */

#ifdef CONFIG_BLOCK
if (ceph_msg_has_bio(msg))
ceph_msg_data_cursor_init(&msg->b, data_len);
#endif /* CONFIG_BLOCK */
if (ceph_msg_has_pages(msg))
ceph_msg_data_cursor_init(&msg->p, data_len);
if (ceph_msg_has_pagelist(msg))
ceph_msg_data_cursor_init(&msg->l, data_len);
ceph_msg_data_cursor_init(&msg->data, data_len);

msg_pos->did_page_crc = false;
}
Expand Down Expand Up @@ -1166,10 +1159,10 @@ static void prepare_write_message(struct ceph_connection *con)
m->needs_out_seq = false;
}

dout("prepare_write_message %p seq %lld type %d len %d+%d+%d (%zd)\n",
dout("prepare_write_message %p seq %lld type %d len %d+%d+%d\n",
m, con->out_seq, le16_to_cpu(m->hdr.type),
le32_to_cpu(m->hdr.front_len), le32_to_cpu(m->hdr.middle_len),
le32_to_cpu(m->hdr.data_len), m->p.length);
le32_to_cpu(m->hdr.data_len));
BUG_ON(le32_to_cpu(m->hdr.front_len) != m->front.iov_len);

/* tag + hdr + front + middle */
Expand Down Expand Up @@ -1411,14 +1404,7 @@ static void out_msg_pos_next(struct ceph_connection *con, struct page *page,

msg_pos->data_pos += sent;
msg_pos->page_pos += sent;
if (ceph_msg_has_pages(msg))
need_crc = ceph_msg_data_advance(&msg->p, sent);
else if (ceph_msg_has_pagelist(msg))
need_crc = ceph_msg_data_advance(&msg->l, sent);
#ifdef CONFIG_BLOCK
else if (ceph_msg_has_bio(msg))
need_crc = ceph_msg_data_advance(&msg->b, sent);
#endif /* CONFIG_BLOCK */
need_crc = ceph_msg_data_advance(&msg->data, sent);
BUG_ON(need_crc && sent != len);

if (sent < len)
Expand All @@ -1441,12 +1427,8 @@ static void in_msg_pos_next(struct ceph_connection *con, size_t len,

msg_pos->data_pos += received;
msg_pos->page_pos += received;
if (ceph_msg_has_pages(msg))
(void) ceph_msg_data_advance(&msg->p, received);
#ifdef CONFIG_BLOCK
else if (ceph_msg_has_bio(msg))
(void) ceph_msg_data_advance(&msg->b, received);
#endif /* CONFIG_BLOCK */
(void) ceph_msg_data_advance(&msg->data, received);

if (received < len)
return;

Expand Down Expand Up @@ -1486,6 +1468,9 @@ static int write_partial_message_data(struct ceph_connection *con)
dout("%s %p msg %p page %d offset %d\n", __func__,
con, msg, msg_pos->page, msg_pos->page_pos);

if (WARN_ON(!ceph_msg_has_data(msg)))
return -EINVAL;

/*
* Iterate through each page that contains data to be
* written, and send as much as possible for each.
Expand All @@ -1500,23 +1485,8 @@ static int write_partial_message_data(struct ceph_connection *con)
size_t length;
bool last_piece;

if (ceph_msg_has_pages(msg)) {
page = ceph_msg_data_next(&msg->p, &page_offset,
&length, &last_piece);
} else if (ceph_msg_has_pagelist(msg)) {
page = ceph_msg_data_next(&msg->l, &page_offset,
&length, &last_piece);
#ifdef CONFIG_BLOCK
} else if (ceph_msg_has_bio(msg)) {
page = ceph_msg_data_next(&msg->b, &page_offset,
&length, &last_piece);
#endif
} else {
WARN(1, "con %p data_len %u but no outbound data\n",
con, data_len);
ret = -EINVAL;
goto out;
}
page = ceph_msg_data_next(&msg->data, &page_offset, &length,
&last_piece);
if (do_datacrc && !msg_pos->did_page_crc) {
u32 crc = le32_to_cpu(msg->footer.data_crc);

Expand Down Expand Up @@ -2197,28 +2167,20 @@ static int read_partial_msg_data(struct ceph_connection *con)
int ret;

BUG_ON(!msg);
if (WARN_ON(!ceph_msg_has_data(msg)))
return -EIO;

data_len = le32_to_cpu(con->in_hdr.data_len);
while (msg_pos->data_pos < data_len) {
if (ceph_msg_has_pages(msg)) {
page = ceph_msg_data_next(&msg->p, &page_offset,
&length, NULL);
#ifdef CONFIG_BLOCK
} else if (ceph_msg_has_bio(msg)) {
page = ceph_msg_data_next(&msg->b, &page_offset,
&length, NULL);
#endif
} else {
BUG_ON(1);
}
page = ceph_msg_data_next(&msg->data, &page_offset, &length,
NULL);
ret = ceph_tcp_recvpage(con->sock, page, page_offset, length);
if (ret <= 0)
return ret;

if (do_datacrc)
con->in_data_crc = ceph_crc32c_page(con->in_data_crc,
page, page_offset, ret);

in_msg_pos_next(con, length, ret);
}

Expand Down Expand Up @@ -3043,12 +3005,12 @@ void ceph_msg_data_set_pages(struct ceph_msg *msg, struct page **pages,
{
BUG_ON(!pages);
BUG_ON(!length);
BUG_ON(msg->p.type != CEPH_MSG_DATA_NONE);
BUG_ON(msg->data.type != CEPH_MSG_DATA_NONE);

msg->p.type = CEPH_MSG_DATA_PAGES;
msg->p.pages = pages;
msg->p.length = length;
msg->p.alignment = alignment & ~PAGE_MASK;
msg->data.type = CEPH_MSG_DATA_PAGES;
msg->data.pages = pages;
msg->data.length = length;
msg->data.alignment = alignment & ~PAGE_MASK;
}
EXPORT_SYMBOL(ceph_msg_data_set_pages);

Expand All @@ -3057,20 +3019,20 @@ void ceph_msg_data_set_pagelist(struct ceph_msg *msg,
{
BUG_ON(!pagelist);
BUG_ON(!pagelist->length);
BUG_ON(msg->l.type != CEPH_MSG_DATA_NONE);
BUG_ON(msg->data.type != CEPH_MSG_DATA_NONE);

msg->l.type = CEPH_MSG_DATA_PAGELIST;
msg->l.pagelist = pagelist;
msg->data.type = CEPH_MSG_DATA_PAGELIST;
msg->data.pagelist = pagelist;
}
EXPORT_SYMBOL(ceph_msg_data_set_pagelist);

void ceph_msg_data_set_bio(struct ceph_msg *msg, struct bio *bio)
{
BUG_ON(!bio);
BUG_ON(msg->b.type != CEPH_MSG_DATA_NONE);
BUG_ON(msg->data.type != CEPH_MSG_DATA_NONE);

msg->b.type = CEPH_MSG_DATA_BIO;
msg->b.bio = bio;
msg->data.type = CEPH_MSG_DATA_BIO;
msg->data.bio = bio;
}
EXPORT_SYMBOL(ceph_msg_data_set_bio);

Expand All @@ -3094,9 +3056,7 @@ struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags,
INIT_LIST_HEAD(&m->list_head);
kref_init(&m->kref);

ceph_msg_data_init(&m->p);
ceph_msg_data_init(&m->l);
ceph_msg_data_init(&m->b);
ceph_msg_data_init(&m->data);

/* front */
m->front_max = front_len;
Expand Down Expand Up @@ -3251,20 +3211,13 @@ void ceph_msg_last_put(struct kref *kref)
ceph_buffer_put(m->middle);
m->middle = NULL;
}
if (ceph_msg_has_pages(m)) {
m->p.length = 0;
m->p.pages = NULL;
m->p.type = CEPH_OSD_DATA_TYPE_NONE;
}
if (ceph_msg_has_pagelist(m)) {
ceph_pagelist_release(m->l.pagelist);
kfree(m->l.pagelist);
m->l.pagelist = NULL;
m->l.type = CEPH_OSD_DATA_TYPE_NONE;
}
if (ceph_msg_has_bio(m)) {
m->b.bio = NULL;
m->b.type = CEPH_OSD_DATA_TYPE_NONE;
if (ceph_msg_has_data(m)) {
if (m->data.type == CEPH_MSG_DATA_PAGELIST) {
ceph_pagelist_release(m->data.pagelist);
kfree(m->data.pagelist);
}
memset(&m->data, 0, sizeof m->data);
ceph_msg_data_init(&m->data);
}

if (m->pool)
Expand All @@ -3277,7 +3230,7 @@ EXPORT_SYMBOL(ceph_msg_last_put);
void ceph_msg_dump(struct ceph_msg *msg)
{
pr_debug("msg_dump %p (front_max %d length %zd)\n", msg,
msg->front_max, msg->p.length);
msg->front_max, msg->data.length);
print_hex_dump(KERN_DEBUG, "header: ",
DUMP_PREFIX_OFFSET, 16, 1,
&msg->hdr, sizeof(msg->hdr), true);
Expand Down

0 comments on commit 4c59b4a

Please sign in to comment.