Skip to content

Commit

Permalink
---
Browse files Browse the repository at this point in the history
yaml
---
r: 373351
b: refs/heads/master
c: fe38a2b
h: refs/heads/master
i:
  373349: 6642101
  373347: cd1fa17
  373343: e831ee7
v: v3
  • Loading branch information
Alex Elder authored and Sage Weil committed May 2, 2013
1 parent 4e5e9b4 commit 5ad6124
Show file tree
Hide file tree
Showing 3 changed files with 136 additions and 11 deletions.
2 changes: 1 addition & 1 deletion [refs]
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
---
refs/heads/master: 437945094fed0deb1810e8da95465c8f26bc6f80
refs/heads/master: fe38a2b67bc6b3a60da82a23e9082256a30e39d9
7 changes: 7 additions & 0 deletions trunk/include/linux/ceph/messenger.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,12 @@ static __inline__ bool ceph_msg_data_type_valid(enum ceph_msg_data_type type)
}
}

struct ceph_msg_data_cursor {
bool last_piece; /* now at last piece of data item */
struct page *page; /* current page in pagelist */
size_t offset; /* pagelist bytes consumed */
};

struct ceph_msg_data {
enum ceph_msg_data_type type;
union {
Expand All @@ -112,6 +118,7 @@ struct ceph_msg_data {
};
struct ceph_pagelist *pagelist;
};
struct ceph_msg_data_cursor cursor; /* pagelist only */
};

/*
Expand Down
138 changes: 128 additions & 10 deletions trunk/net/ceph/messenger.c
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
#include <linux/ceph/pagelist.h>
#include <linux/export.h>

#define list_entry_next(pos, member) \
list_entry(pos->member.next, typeof(*pos), member)

/*
* Ceph uses the messenger to exchange ceph_msg messages with other
* hosts in the system. The messenger provides ordered and reliable
Expand Down Expand Up @@ -738,6 +741,109 @@ static void iter_bio_next(struct bio **bio_iter, unsigned int *seg)
}
#endif

/*
* Message data is handled (sent or received) in pieces, where each
* piece resides on a single page. The network layer might not
* consume an entire piece at once. A data item's cursor keeps
* track of which piece is next to process and how much remains to
* be processed in that piece. It also tracks whether the current
* piece is the last one in the data item.
*/
static void ceph_msg_data_cursor_init(struct ceph_msg_data *data)
{
struct ceph_msg_data_cursor *cursor = &data->cursor;
struct ceph_pagelist *pagelist;
struct page *page;

if (data->type != CEPH_MSG_DATA_PAGELIST)
return;

pagelist = data->pagelist;
BUG_ON(!pagelist);
if (!pagelist->length)
return; /* pagelist can be assigned but empty */

BUG_ON(list_empty(&pagelist->head));
page = list_first_entry(&pagelist->head, struct page, lru);

cursor->page = page;
cursor->offset = 0;
cursor->last_piece = pagelist->length <= PAGE_SIZE;
}

/*
* Return the page containing the next piece to process for a given
* data item, and supply the page offset and length of that piece.
* Indicate whether this is the last piece in this data item.
*/
static struct page *ceph_msg_data_next(struct ceph_msg_data *data,
size_t *page_offset,
size_t *length,
bool *last_piece)
{
struct ceph_msg_data_cursor *cursor = &data->cursor;
struct ceph_pagelist *pagelist;
size_t piece_end;

BUG_ON(data->type != CEPH_MSG_DATA_PAGELIST);

pagelist = data->pagelist;
BUG_ON(!pagelist);

BUG_ON(!cursor->page);
BUG_ON(cursor->offset >= pagelist->length);

*last_piece = cursor->last_piece;
if (*last_piece) {
/* pagelist offset is always 0 */
piece_end = pagelist->length & ~PAGE_MASK;
if (!piece_end)
piece_end = PAGE_SIZE;
} else {
piece_end = PAGE_SIZE;
}
*page_offset = cursor->offset & ~PAGE_MASK;
*length = piece_end - *page_offset;

return data->cursor.page;
}

/*
* Returns true if the result moves the cursor on to the next piece
* (the next page) of the pagelist.
*/
static bool ceph_msg_data_advance(struct ceph_msg_data *data, size_t bytes)
{
struct ceph_msg_data_cursor *cursor = &data->cursor;
struct ceph_pagelist *pagelist;

BUG_ON(data->type != CEPH_MSG_DATA_PAGELIST);

pagelist = data->pagelist;
BUG_ON(!pagelist);
BUG_ON(!cursor->page);
BUG_ON(cursor->offset + bytes > pagelist->length);
BUG_ON((cursor->offset & ~PAGE_MASK) + bytes > PAGE_SIZE);

/* Advance the cursor offset */

cursor->offset += bytes;
/* pagelist offset is always 0 */
if (!bytes || cursor->offset & ~PAGE_MASK)
return false; /* more bytes to process in the current page */

/* Move on to the next page */

BUG_ON(list_is_last(&cursor->page->lru, &pagelist->head));
cursor->page = list_entry_next(cursor->page, lru);

/* cursor offset is at page boundary; pagelist offset is always 0 */
if (pagelist->length - cursor->offset <= PAGE_SIZE)
cursor->last_piece = true;

return true;
}

static void prepare_message_data(struct ceph_msg *msg,
struct ceph_msg_pos *msg_pos)
{
Expand All @@ -755,6 +861,12 @@ static void prepare_message_data(struct ceph_msg *msg,
init_bio_iter(msg->b.bio, &msg->b.bio_iter, &msg->b.bio_seg);
#endif
msg_pos->data_pos = 0;

/* If there's a trail, initialize its cursor */

if (ceph_msg_has_trail(msg))
ceph_msg_data_cursor_init(&msg->t);

msg_pos->did_page_crc = false;
}

Expand Down Expand Up @@ -1045,17 +1157,20 @@ static void out_msg_pos_next(struct ceph_connection *con, struct page *page,

msg_pos->data_pos += sent;
msg_pos->page_pos += sent;
if (in_trail) {
bool need_crc;

need_crc = ceph_msg_data_advance(&msg->t, sent);
BUG_ON(need_crc && sent != len);
}
if (sent < len)
return;

BUG_ON(sent != len);
msg_pos->page_pos = 0;
msg_pos->page++;
msg_pos->did_page_crc = false;
if (in_trail) {
BUG_ON(!ceph_msg_has_trail(msg));
list_rotate_left(&msg->t.pagelist->head);
} else if (ceph_msg_has_pagelist(msg)) {
if (ceph_msg_has_pagelist(msg)) {
list_rotate_left(&msg->l.pagelist->head);
#ifdef CONFIG_BLOCK
} else if (ceph_msg_has_bio(msg)) {
Expand Down Expand Up @@ -1141,16 +1256,18 @@ static int write_partial_message_data(struct ceph_connection *con)
size_t length;
int max_write = PAGE_SIZE;
int bio_offset = 0;
bool use_cursor = false;
bool last_piece = true; /* preserve existing behavior */

in_trail = in_trail || msg_pos->data_pos >= trail_off;
if (!in_trail)
total_max_write = trail_off - msg_pos->data_pos;

if (in_trail) {
BUG_ON(!ceph_msg_has_trail(msg));
total_max_write = data_len - msg_pos->data_pos;
page = list_first_entry(&msg->t.pagelist->head,
struct page, lru);
use_cursor = true;
page = ceph_msg_data_next(&msg->t, &page_offset,
&length, &last_piece);
} else if (ceph_msg_has_pages(msg)) {
page = msg->p.pages[msg_pos->page];
} else if (ceph_msg_has_pagelist(msg)) {
Expand All @@ -1168,8 +1285,9 @@ static int write_partial_message_data(struct ceph_connection *con)
} else {
page = zero_page;
}
length = min_t(int, max_write - msg_pos->page_pos,
total_max_write);
if (!use_cursor)
length = min_t(int, max_write - msg_pos->page_pos,
total_max_write);

page_offset = msg_pos->page_pos + bio_offset;
if (do_datacrc && !msg_pos->did_page_crc) {
Expand All @@ -1180,7 +1298,7 @@ static int write_partial_message_data(struct ceph_connection *con)
msg_pos->did_page_crc = true;
}
ret = ceph_tcp_sendpage(con->sock, page, page_offset,
length, true);
length, last_piece);
if (ret <= 0)
goto out;

Expand Down

0 comments on commit 5ad6124

Please sign in to comment.