Skip to content

Commit

Permalink
ceph: messenger and osdc changes for rbd
Browse files Browse the repository at this point in the history
Allow the messenger to send/receive data in a bio.  This is added
so that we wouldn't need to copy the data into pages or some other buffer
when doing IO for an rbd block device.

We can now have trailing variable sized data for osd
ops.  Also osd ops encoding is more modular.

Signed-off-by: Yehuda Sadeh <yehuda@hq.newdream.net>
Signed-off-by: Sage Weil <sage@newdream.net>
  • Loading branch information
Yehuda Sadeh authored and Sage Weil committed Oct 20, 2010
1 parent 3499e8a commit 68b4476
Show file tree
Hide file tree
Showing 6 changed files with 436 additions and 101 deletions.
219 changes: 187 additions & 32 deletions fs/ceph/messenger.c
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
#include <linux/slab.h>
#include <linux/socket.h>
#include <linux/string.h>
#include <linux/bio.h>
#include <linux/blkdev.h>
#include <net/tcp.h>

#include "super.h"
Expand Down Expand Up @@ -529,8 +531,11 @@ static void prepare_write_message(struct ceph_connection *con)
if (le32_to_cpu(m->hdr.data_len) > 0) {
/* initialize page iterator */
con->out_msg_pos.page = 0;
con->out_msg_pos.page_pos =
le16_to_cpu(m->hdr.data_off) & ~PAGE_MASK;
if (m->pages)
con->out_msg_pos.page_pos =
le16_to_cpu(m->hdr.data_off) & ~PAGE_MASK;
else
con->out_msg_pos.page_pos = 0;
con->out_msg_pos.data_pos = 0;
con->out_msg_pos.did_page_crc = 0;
con->out_more = 1; /* data + footer will follow */
Expand Down Expand Up @@ -712,6 +717,31 @@ static int write_partial_kvec(struct ceph_connection *con)
return ret; /* done! */
}

#ifdef CONFIG_BLOCK
static void init_bio_iter(struct bio *bio, struct bio **iter, int *seg)
{
if (!bio) {
*iter = NULL;
*seg = 0;
return;
}
*iter = bio;
*seg = bio->bi_idx;
}

static void iter_bio_next(struct bio **bio_iter, int *seg)
{
if (*bio_iter == NULL)
return;

BUG_ON(*seg >= (*bio_iter)->bi_vcnt);

(*seg)++;
if (*seg == (*bio_iter)->bi_vcnt)
init_bio_iter((*bio_iter)->bi_next, bio_iter, seg);
}
#endif

/*
* Write as much message data payload as we can. If we finish, queue
* up the footer.
Expand All @@ -726,21 +756,46 @@ static int write_partial_msg_pages(struct ceph_connection *con)
size_t len;
int crc = con->msgr->nocrc;
int ret;
int total_max_write;
int in_trail = 0;
size_t trail_len = (msg->trail ? msg->trail->length : 0);

dout("write_partial_msg_pages %p msg %p page %d/%d offset %d\n",
con, con->out_msg, con->out_msg_pos.page, con->out_msg->nr_pages,
con->out_msg_pos.page_pos);

while (con->out_msg_pos.page < con->out_msg->nr_pages) {
#ifdef CONFIG_BLOCK
if (msg->bio && !msg->bio_iter)
init_bio_iter(msg->bio, &msg->bio_iter, &msg->bio_seg);
#endif

while (data_len > con->out_msg_pos.data_pos) {
struct page *page = NULL;
void *kaddr = NULL;
int max_write = PAGE_SIZE;
int page_shift = 0;

total_max_write = data_len - trail_len -
con->out_msg_pos.data_pos;

/*
* if we are calculating the data crc (the default), we need
* to map the page. if our pages[] has been revoked, use the
* zero page.
*/
if (msg->pages) {

/* have we reached the trail part of the data? */
if (con->out_msg_pos.data_pos >= data_len - trail_len) {
in_trail = 1;

total_max_write = data_len - con->out_msg_pos.data_pos;

page = list_first_entry(&msg->trail->head,
struct page, lru);
if (crc)
kaddr = kmap(page);
max_write = PAGE_SIZE;
} else if (msg->pages) {
page = msg->pages[con->out_msg_pos.page];
if (crc)
kaddr = kmap(page);
Expand All @@ -749,13 +804,25 @@ static int write_partial_msg_pages(struct ceph_connection *con)
struct page, lru);
if (crc)
kaddr = kmap(page);
#ifdef CONFIG_BLOCK
} else if (msg->bio) {
struct bio_vec *bv;

bv = bio_iovec_idx(msg->bio_iter, msg->bio_seg);
page = bv->bv_page;
page_shift = bv->bv_offset;
if (crc)
kaddr = kmap(page) + page_shift;
max_write = bv->bv_len;
#endif
} else {
page = con->msgr->zero_page;
if (crc)
kaddr = page_address(con->msgr->zero_page);
}
len = min((int)(PAGE_SIZE - con->out_msg_pos.page_pos),
(int)(data_len - con->out_msg_pos.data_pos));
len = min_t(int, max_write - con->out_msg_pos.page_pos,
total_max_write);

if (crc && !con->out_msg_pos.did_page_crc) {
void *base = kaddr + con->out_msg_pos.page_pos;
u32 tmpcrc = le32_to_cpu(con->out_msg->footer.data_crc);
Expand All @@ -765,13 +832,14 @@ static int write_partial_msg_pages(struct ceph_connection *con)
cpu_to_le32(crc32c(tmpcrc, base, len));
con->out_msg_pos.did_page_crc = 1;
}

ret = kernel_sendpage(con->sock, page,
con->out_msg_pos.page_pos, len,
con->out_msg_pos.page_pos + page_shift,
len,
MSG_DONTWAIT | MSG_NOSIGNAL |
MSG_MORE);

if (crc && (msg->pages || msg->pagelist))
if (crc &&
(msg->pages || msg->pagelist || msg->bio || in_trail))
kunmap(page);

if (ret <= 0)
Expand All @@ -783,9 +851,16 @@ static int write_partial_msg_pages(struct ceph_connection *con)
con->out_msg_pos.page_pos = 0;
con->out_msg_pos.page++;
con->out_msg_pos.did_page_crc = 0;
if (msg->pagelist)
if (in_trail)
list_move_tail(&page->lru,
&msg->trail->head);
else if (msg->pagelist)
list_move_tail(&page->lru,
&msg->pagelist->head);
#ifdef CONFIG_BLOCK
else if (msg->bio)
iter_bio_next(&msg->bio_iter, &msg->bio_seg);
#endif
}
}

Expand Down Expand Up @@ -1305,8 +1380,7 @@ static int read_partial_message_section(struct ceph_connection *con,
struct kvec *section,
unsigned int sec_len, u32 *crc)
{
int left;
int ret;
int ret, left;

BUG_ON(!section);

Expand All @@ -1329,13 +1403,83 @@ static int read_partial_message_section(struct ceph_connection *con,
static struct ceph_msg *ceph_alloc_msg(struct ceph_connection *con,
struct ceph_msg_header *hdr,
int *skip);


static int read_partial_message_pages(struct ceph_connection *con,
struct page **pages,
unsigned data_len, int datacrc)
{
void *p;
int ret;
int left;

left = min((int)(data_len - con->in_msg_pos.data_pos),
(int)(PAGE_SIZE - con->in_msg_pos.page_pos));
/* (page) data */
BUG_ON(pages == NULL);
p = kmap(pages[con->in_msg_pos.page]);
ret = ceph_tcp_recvmsg(con->sock, p + con->in_msg_pos.page_pos,
left);
if (ret > 0 && datacrc)
con->in_data_crc =
crc32c(con->in_data_crc,
p + con->in_msg_pos.page_pos, ret);
kunmap(pages[con->in_msg_pos.page]);
if (ret <= 0)
return ret;
con->in_msg_pos.data_pos += ret;
con->in_msg_pos.page_pos += ret;
if (con->in_msg_pos.page_pos == PAGE_SIZE) {
con->in_msg_pos.page_pos = 0;
con->in_msg_pos.page++;
}

return ret;
}

#ifdef CONFIG_BLOCK
static int read_partial_message_bio(struct ceph_connection *con,
struct bio **bio_iter, int *bio_seg,
unsigned data_len, int datacrc)
{
struct bio_vec *bv = bio_iovec_idx(*bio_iter, *bio_seg);
void *p;
int ret, left;

if (IS_ERR(bv))
return PTR_ERR(bv);

left = min((int)(data_len - con->in_msg_pos.data_pos),
(int)(bv->bv_len - con->in_msg_pos.page_pos));

p = kmap(bv->bv_page) + bv->bv_offset;

ret = ceph_tcp_recvmsg(con->sock, p + con->in_msg_pos.page_pos,
left);
if (ret > 0 && datacrc)
con->in_data_crc =
crc32c(con->in_data_crc,
p + con->in_msg_pos.page_pos, ret);
kunmap(bv->bv_page);
if (ret <= 0)
return ret;
con->in_msg_pos.data_pos += ret;
con->in_msg_pos.page_pos += ret;
if (con->in_msg_pos.page_pos == bv->bv_len) {
con->in_msg_pos.page_pos = 0;
iter_bio_next(bio_iter, bio_seg);
}

return ret;
}
#endif

/*
* read (part of) a message.
*/
static int read_partial_message(struct ceph_connection *con)
{
struct ceph_msg *m = con->in_msg;
void *p;
int ret;
int to, left;
unsigned front_len, middle_len, data_len, data_off;
Expand Down Expand Up @@ -1422,7 +1566,10 @@ static int read_partial_message(struct ceph_connection *con)
m->middle->vec.iov_len = 0;

con->in_msg_pos.page = 0;
con->in_msg_pos.page_pos = data_off & ~PAGE_MASK;
if (m->pages)
con->in_msg_pos.page_pos = data_off & ~PAGE_MASK;
else
con->in_msg_pos.page_pos = 0;
con->in_msg_pos.data_pos = 0;
}

Expand All @@ -1440,27 +1587,29 @@ static int read_partial_message(struct ceph_connection *con)
if (ret <= 0)
return ret;
}
#ifdef CONFIG_BLOCK
if (m->bio && !m->bio_iter)
init_bio_iter(m->bio, &m->bio_iter, &m->bio_seg);
#endif

/* (page) data */
while (con->in_msg_pos.data_pos < data_len) {
left = min((int)(data_len - con->in_msg_pos.data_pos),
(int)(PAGE_SIZE - con->in_msg_pos.page_pos));
BUG_ON(m->pages == NULL);
p = kmap(m->pages[con->in_msg_pos.page]);
ret = ceph_tcp_recvmsg(con->sock, p + con->in_msg_pos.page_pos,
left);
if (ret > 0 && datacrc)
con->in_data_crc =
crc32c(con->in_data_crc,
p + con->in_msg_pos.page_pos, ret);
kunmap(m->pages[con->in_msg_pos.page]);
if (ret <= 0)
return ret;
con->in_msg_pos.data_pos += ret;
con->in_msg_pos.page_pos += ret;
if (con->in_msg_pos.page_pos == PAGE_SIZE) {
con->in_msg_pos.page_pos = 0;
con->in_msg_pos.page++;
if (m->pages) {
ret = read_partial_message_pages(con, m->pages,
data_len, datacrc);
if (ret <= 0)
return ret;
#ifdef CONFIG_BLOCK
} else if (m->bio) {

ret = read_partial_message_bio(con,
&m->bio_iter, &m->bio_seg,
data_len, datacrc);
if (ret <= 0)
return ret;
#endif
} else {
BUG_ON(1);
}
}

Expand Down Expand Up @@ -2136,6 +2285,10 @@ struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags)
m->nr_pages = 0;
m->pages = NULL;
m->pagelist = NULL;
m->bio = NULL;
m->bio_iter = NULL;
m->bio_seg = 0;
m->trail = NULL;

dout("ceph_msg_new %p front %d\n", m, front_len);
return m;
Expand Down Expand Up @@ -2250,6 +2403,8 @@ void ceph_msg_last_put(struct kref *kref)
m->pagelist = NULL;
}

m->trail = NULL;

if (m->pool)
ceph_msgpool_put(m->pool, m);
else
Expand Down
4 changes: 4 additions & 0 deletions fs/ceph/messenger.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,10 @@ struct ceph_msg {
struct ceph_pagelist *pagelist; /* instead of pages */
struct list_head list_head;
struct kref kref;
struct bio *bio; /* instead of pages/pagelist */
struct bio *bio_iter; /* bio iterator */
int bio_seg; /* current bio segment */
struct ceph_pagelist *trail; /* the trailing part of the data */
bool front_is_vmalloc;
bool more_to_follow;
bool needs_out_seq;
Expand Down
Loading

0 comments on commit 68b4476

Please sign in to comment.