Skip to content

Commit

Permalink
---
Browse files Browse the repository at this point in the history
yaml
---
r: 373354
b: refs/heads/master
c: 6aaa451
h: refs/heads/master
v: v3
  • Loading branch information
Alex Elder authored and Sage Weil committed May 2, 2013
1 parent 3a0894b commit 4f63c89
Show file tree
Hide file tree
Showing 3 changed files with 124 additions and 22 deletions.
2 changes: 1 addition & 1 deletion [refs]
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
---
refs/heads/master: 7fe1e5e57b84eab98ff352519aa66e86dac5bf61
refs/heads/master: 6aaa4511deb4b0fd776d1153dc63a89cdc024fb8
7 changes: 7 additions & 0 deletions trunk/include/linux/ceph/messenger.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,13 @@ static __inline__ bool ceph_msg_data_type_valid(enum ceph_msg_data_type type)
struct ceph_msg_data_cursor {
bool last_piece; /* now at last piece of data item */
union {
#ifdef CONFIG_BLOCK
struct { /* bio */
struct bio *bio; /* bio from list */
unsigned int vector_index; /* vector from bio */
unsigned int vector_offset; /* bytes from vector */
};
#endif /* CONFIG_BLOCK */
struct { /* pagelist */
struct page *page; /* page from list */
size_t offset; /* bytes from list */
Expand Down
137 changes: 116 additions & 21 deletions trunk/net/ceph/messenger.c
Original file line number Diff line number Diff line change
Expand Up @@ -739,6 +739,95 @@ static void iter_bio_next(struct bio **bio_iter, unsigned int *seg)
if (*seg == (*bio_iter)->bi_vcnt)
init_bio_iter((*bio_iter)->bi_next, bio_iter, seg);
}

/*
* For a bio data item, a piece is whatever remains of the next
* entry in the current bio iovec, or the first entry in the next
* bio in the list.
*/
static void ceph_msg_data_bio_cursor_init(struct ceph_msg_data *data)
{
struct ceph_msg_data_cursor *cursor = &data->cursor;
struct bio *bio;

BUG_ON(data->type != CEPH_MSG_DATA_BIO);

bio = data->bio;
BUG_ON(!bio);
BUG_ON(!bio->bi_vcnt);
/* resid = bio->bi_size */

cursor->bio = bio;
cursor->vector_index = 0;
cursor->vector_offset = 0;
cursor->last_piece = !bio->bi_next && bio->bi_vcnt == 1;
}

static struct page *ceph_msg_data_bio_next(struct ceph_msg_data *data,
size_t *page_offset,
size_t *length)
{
struct ceph_msg_data_cursor *cursor = &data->cursor;
struct bio *bio;
struct bio_vec *bio_vec;
unsigned int index;

BUG_ON(data->type != CEPH_MSG_DATA_BIO);

bio = cursor->bio;
BUG_ON(!bio);

index = cursor->vector_index;
BUG_ON(index >= (unsigned int) bio->bi_vcnt);

bio_vec = &bio->bi_io_vec[index];
BUG_ON(cursor->vector_offset >= bio_vec->bv_len);
*page_offset = (size_t) (bio_vec->bv_offset + cursor->vector_offset);
BUG_ON(*page_offset >= PAGE_SIZE);
*length = (size_t) (bio_vec->bv_len - cursor->vector_offset);
BUG_ON(*length > PAGE_SIZE);

return bio_vec->bv_page;
}

static bool ceph_msg_data_bio_advance(struct ceph_msg_data *data, size_t bytes)
{
struct ceph_msg_data_cursor *cursor = &data->cursor;
struct bio *bio;
struct bio_vec *bio_vec;
unsigned int index;

BUG_ON(data->type != CEPH_MSG_DATA_BIO);

bio = cursor->bio;
BUG_ON(!bio);

index = cursor->vector_index;
BUG_ON(index >= (unsigned int) bio->bi_vcnt);
bio_vec = &bio->bi_io_vec[index];
BUG_ON(cursor->vector_offset + bytes > bio_vec->bv_len);

/* Advance the cursor offset */

cursor->vector_offset += bytes;
if (cursor->vector_offset < bio_vec->bv_len)
return false; /* more bytes to process in this segment */

/* Move on to the next segment, and possibly the next bio */

if (++cursor->vector_index == (unsigned int) bio->bi_vcnt) {
bio = bio->bi_next;
cursor->bio = bio;
cursor->vector_index = 0;
}
cursor->vector_offset = 0;

if (!cursor->last_piece && bio && !bio->bi_next)
if (cursor->vector_index == (unsigned int) bio->bi_vcnt - 1)
cursor->last_piece = true;

return true;
}
#endif

/*
Expand Down Expand Up @@ -843,11 +932,13 @@ static void ceph_msg_data_cursor_init(struct ceph_msg_data *data)
case CEPH_MSG_DATA_PAGELIST:
ceph_msg_data_pagelist_cursor_init(data);
break;
case CEPH_MSG_DATA_NONE:
case CEPH_MSG_DATA_PAGES:
#ifdef CONFIG_BLOCK
case CEPH_MSG_DATA_BIO:
ceph_msg_data_bio_cursor_init(data);
break;
#endif /* CONFIG_BLOCK */
case CEPH_MSG_DATA_NONE:
case CEPH_MSG_DATA_PAGES:
default:
/* BUG(); */
break;
Expand All @@ -870,11 +961,13 @@ static struct page *ceph_msg_data_next(struct ceph_msg_data *data,
case CEPH_MSG_DATA_PAGELIST:
page = ceph_msg_data_pagelist_next(data, page_offset, length);
break;
case CEPH_MSG_DATA_NONE:
case CEPH_MSG_DATA_PAGES:
#ifdef CONFIG_BLOCK
case CEPH_MSG_DATA_BIO:
page = ceph_msg_data_bio_next(data, page_offset, length);
break;
#endif /* CONFIG_BLOCK */
case CEPH_MSG_DATA_NONE:
case CEPH_MSG_DATA_PAGES:
default:
page = NULL;
break;
Expand All @@ -900,11 +993,13 @@ static bool ceph_msg_data_advance(struct ceph_msg_data *data, size_t bytes)
case CEPH_MSG_DATA_PAGELIST:
new_piece = ceph_msg_data_pagelist_advance(data, bytes);
break;
case CEPH_MSG_DATA_NONE:
case CEPH_MSG_DATA_PAGES:
#ifdef CONFIG_BLOCK
case CEPH_MSG_DATA_BIO:
new_piece = ceph_msg_data_bio_advance(data, bytes);
break;
#endif /* CONFIG_BLOCK */
case CEPH_MSG_DATA_NONE:
case CEPH_MSG_DATA_PAGES:
default:
BUG();
break;
Expand Down Expand Up @@ -933,6 +1028,10 @@ static void prepare_message_data(struct ceph_msg *msg,

/* Initialize data cursors */

#ifdef CONFIG_BLOCK
if (ceph_msg_has_bio(msg))
ceph_msg_data_cursor_init(&msg->b);
#endif /* CONFIG_BLOCK */
if (ceph_msg_has_pagelist(msg))
ceph_msg_data_cursor_init(&msg->l);
if (ceph_msg_has_trail(msg))
Expand Down Expand Up @@ -1233,6 +1332,10 @@ static void out_msg_pos_next(struct ceph_connection *con, struct page *page,
need_crc = ceph_msg_data_advance(&msg->t, sent);
else if (ceph_msg_has_pagelist(msg))
need_crc = ceph_msg_data_advance(&msg->l, sent);
#ifdef CONFIG_BLOCK
else if (ceph_msg_has_bio(msg))
need_crc = ceph_msg_data_advance(&msg->b, sent);
#endif /* CONFIG_BLOCK */
BUG_ON(need_crc && sent != len);

if (sent < len)
Expand All @@ -1242,10 +1345,6 @@ static void out_msg_pos_next(struct ceph_connection *con, struct page *page,
msg_pos->page_pos = 0;
msg_pos->page++;
msg_pos->did_page_crc = false;
#ifdef CONFIG_BLOCK
if (ceph_msg_has_bio(msg))
iter_bio_next(&msg->b.bio_iter, &msg->b.bio_seg);
#endif
}

static void in_msg_pos_next(struct ceph_connection *con, size_t len,
Expand Down Expand Up @@ -1323,8 +1422,6 @@ static int write_partial_message_data(struct ceph_connection *con)
struct page *page = NULL;
size_t page_offset;
size_t length;
int max_write = PAGE_SIZE;
int bio_offset = 0;
bool use_cursor = false;
bool last_piece = true; /* preserve existing behavior */

Expand All @@ -1345,21 +1442,19 @@ static int write_partial_message_data(struct ceph_connection *con)
&length, &last_piece);
#ifdef CONFIG_BLOCK
} else if (ceph_msg_has_bio(msg)) {
struct bio_vec *bv;

bv = bio_iovec_idx(msg->b.bio_iter, msg->b.bio_seg);
page = bv->bv_page;
bio_offset = bv->bv_offset;
max_write = bv->bv_len;
use_cursor = true;
page = ceph_msg_data_next(&msg->b, &page_offset,
&length, &last_piece);
#endif
} else {
page = zero_page;
}
if (!use_cursor)
length = min_t(int, max_write - msg_pos->page_pos,
if (!use_cursor) {
length = min_t(int, PAGE_SIZE - msg_pos->page_pos,
total_max_write);

page_offset = msg_pos->page_pos + bio_offset;
page_offset = msg_pos->page_pos;
}
if (do_datacrc && !msg_pos->did_page_crc) {
u32 crc = le32_to_cpu(msg->footer.data_crc);

Expand Down

0 comments on commit 4f63c89

Please sign in to comment.