Skip to content

Commit

Permalink
---
Browse files Browse the repository at this point in the history
yaml
---
r: 211598
b: refs/heads/master
c: 68b4476
h: refs/heads/master
v: v3
  • Loading branch information
Yehuda Sadeh authored and Sage Weil committed Oct 20, 2010
1 parent 7aa013b commit 77ea49f
Show file tree
Hide file tree
Showing 7 changed files with 437 additions and 102 deletions.
2 changes: 1 addition & 1 deletion [refs]
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
---
refs/heads/master: 3499e8a5d4dbb083324efd942e2c4fb7eb65f27c
refs/heads/master: 68b4476b0bc13fef18266b4140309a30e86739d2
219 changes: 187 additions & 32 deletions trunk/fs/ceph/messenger.c
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
#include <linux/slab.h>
#include <linux/socket.h>
#include <linux/string.h>
#include <linux/bio.h>
#include <linux/blkdev.h>
#include <net/tcp.h>

#include "super.h"
Expand Down Expand Up @@ -529,8 +531,11 @@ static void prepare_write_message(struct ceph_connection *con)
if (le32_to_cpu(m->hdr.data_len) > 0) {
/* initialize page iterator */
con->out_msg_pos.page = 0;
con->out_msg_pos.page_pos =
le16_to_cpu(m->hdr.data_off) & ~PAGE_MASK;
if (m->pages)
con->out_msg_pos.page_pos =
le16_to_cpu(m->hdr.data_off) & ~PAGE_MASK;
else
con->out_msg_pos.page_pos = 0;
con->out_msg_pos.data_pos = 0;
con->out_msg_pos.did_page_crc = 0;
con->out_more = 1; /* data + footer will follow */
Expand Down Expand Up @@ -712,6 +717,31 @@ static int write_partial_kvec(struct ceph_connection *con)
return ret; /* done! */
}

#ifdef CONFIG_BLOCK
static void init_bio_iter(struct bio *bio, struct bio **iter, int *seg)
{
if (!bio) {
*iter = NULL;
*seg = 0;
return;
}
*iter = bio;
*seg = bio->bi_idx;
}

static void iter_bio_next(struct bio **bio_iter, int *seg)
{
if (*bio_iter == NULL)
return;

BUG_ON(*seg >= (*bio_iter)->bi_vcnt);

(*seg)++;
if (*seg == (*bio_iter)->bi_vcnt)
init_bio_iter((*bio_iter)->bi_next, bio_iter, seg);
}
#endif

/*
* Write as much message data payload as we can. If we finish, queue
* up the footer.
Expand All @@ -726,21 +756,46 @@ static int write_partial_msg_pages(struct ceph_connection *con)
size_t len;
int crc = con->msgr->nocrc;
int ret;
int total_max_write;
int in_trail = 0;
size_t trail_len = (msg->trail ? msg->trail->length : 0);

dout("write_partial_msg_pages %p msg %p page %d/%d offset %d\n",
con, con->out_msg, con->out_msg_pos.page, con->out_msg->nr_pages,
con->out_msg_pos.page_pos);

while (con->out_msg_pos.page < con->out_msg->nr_pages) {
#ifdef CONFIG_BLOCK
if (msg->bio && !msg->bio_iter)
init_bio_iter(msg->bio, &msg->bio_iter, &msg->bio_seg);
#endif

while (data_len > con->out_msg_pos.data_pos) {
struct page *page = NULL;
void *kaddr = NULL;
int max_write = PAGE_SIZE;
int page_shift = 0;

total_max_write = data_len - trail_len -
con->out_msg_pos.data_pos;

/*
* if we are calculating the data crc (the default), we need
* to map the page. if our pages[] has been revoked, use the
* zero page.
*/
if (msg->pages) {

/* have we reached the trail part of the data? */
if (con->out_msg_pos.data_pos >= data_len - trail_len) {
in_trail = 1;

total_max_write = data_len - con->out_msg_pos.data_pos;

page = list_first_entry(&msg->trail->head,
struct page, lru);
if (crc)
kaddr = kmap(page);
max_write = PAGE_SIZE;
} else if (msg->pages) {
page = msg->pages[con->out_msg_pos.page];
if (crc)
kaddr = kmap(page);
Expand All @@ -749,13 +804,25 @@ static int write_partial_msg_pages(struct ceph_connection *con)
struct page, lru);
if (crc)
kaddr = kmap(page);
#ifdef CONFIG_BLOCK
} else if (msg->bio) {
struct bio_vec *bv;

bv = bio_iovec_idx(msg->bio_iter, msg->bio_seg);
page = bv->bv_page;
page_shift = bv->bv_offset;
if (crc)
kaddr = kmap(page) + page_shift;
max_write = bv->bv_len;
#endif
} else {
page = con->msgr->zero_page;
if (crc)
kaddr = page_address(con->msgr->zero_page);
}
len = min((int)(PAGE_SIZE - con->out_msg_pos.page_pos),
(int)(data_len - con->out_msg_pos.data_pos));
len = min_t(int, max_write - con->out_msg_pos.page_pos,
total_max_write);

if (crc && !con->out_msg_pos.did_page_crc) {
void *base = kaddr + con->out_msg_pos.page_pos;
u32 tmpcrc = le32_to_cpu(con->out_msg->footer.data_crc);
Expand All @@ -765,13 +832,14 @@ static int write_partial_msg_pages(struct ceph_connection *con)
cpu_to_le32(crc32c(tmpcrc, base, len));
con->out_msg_pos.did_page_crc = 1;
}

ret = kernel_sendpage(con->sock, page,
con->out_msg_pos.page_pos, len,
con->out_msg_pos.page_pos + page_shift,
len,
MSG_DONTWAIT | MSG_NOSIGNAL |
MSG_MORE);

if (crc && (msg->pages || msg->pagelist))
if (crc &&
(msg->pages || msg->pagelist || msg->bio || in_trail))
kunmap(page);

if (ret <= 0)
Expand All @@ -783,9 +851,16 @@ static int write_partial_msg_pages(struct ceph_connection *con)
con->out_msg_pos.page_pos = 0;
con->out_msg_pos.page++;
con->out_msg_pos.did_page_crc = 0;
if (msg->pagelist)
if (in_trail)
list_move_tail(&page->lru,
&msg->trail->head);
else if (msg->pagelist)
list_move_tail(&page->lru,
&msg->pagelist->head);
#ifdef CONFIG_BLOCK
else if (msg->bio)
iter_bio_next(&msg->bio_iter, &msg->bio_seg);
#endif
}
}

Expand Down Expand Up @@ -1305,8 +1380,7 @@ static int read_partial_message_section(struct ceph_connection *con,
struct kvec *section,
unsigned int sec_len, u32 *crc)
{
int left;
int ret;
int ret, left;

BUG_ON(!section);

Expand All @@ -1329,13 +1403,83 @@ static int read_partial_message_section(struct ceph_connection *con,
static struct ceph_msg *ceph_alloc_msg(struct ceph_connection *con,
struct ceph_msg_header *hdr,
int *skip);


static int read_partial_message_pages(struct ceph_connection *con,
struct page **pages,
unsigned data_len, int datacrc)
{
void *p;
int ret;
int left;

left = min((int)(data_len - con->in_msg_pos.data_pos),
(int)(PAGE_SIZE - con->in_msg_pos.page_pos));
/* (page) data */
BUG_ON(pages == NULL);
p = kmap(pages[con->in_msg_pos.page]);
ret = ceph_tcp_recvmsg(con->sock, p + con->in_msg_pos.page_pos,
left);
if (ret > 0 && datacrc)
con->in_data_crc =
crc32c(con->in_data_crc,
p + con->in_msg_pos.page_pos, ret);
kunmap(pages[con->in_msg_pos.page]);
if (ret <= 0)
return ret;
con->in_msg_pos.data_pos += ret;
con->in_msg_pos.page_pos += ret;
if (con->in_msg_pos.page_pos == PAGE_SIZE) {
con->in_msg_pos.page_pos = 0;
con->in_msg_pos.page++;
}

return ret;
}

#ifdef CONFIG_BLOCK
static int read_partial_message_bio(struct ceph_connection *con,
struct bio **bio_iter, int *bio_seg,
unsigned data_len, int datacrc)
{
struct bio_vec *bv = bio_iovec_idx(*bio_iter, *bio_seg);
void *p;
int ret, left;

if (IS_ERR(bv))
return PTR_ERR(bv);

left = min((int)(data_len - con->in_msg_pos.data_pos),
(int)(bv->bv_len - con->in_msg_pos.page_pos));

p = kmap(bv->bv_page) + bv->bv_offset;

ret = ceph_tcp_recvmsg(con->sock, p + con->in_msg_pos.page_pos,
left);
if (ret > 0 && datacrc)
con->in_data_crc =
crc32c(con->in_data_crc,
p + con->in_msg_pos.page_pos, ret);
kunmap(bv->bv_page);
if (ret <= 0)
return ret;
con->in_msg_pos.data_pos += ret;
con->in_msg_pos.page_pos += ret;
if (con->in_msg_pos.page_pos == bv->bv_len) {
con->in_msg_pos.page_pos = 0;
iter_bio_next(bio_iter, bio_seg);
}

return ret;
}
#endif

/*
* read (part of) a message.
*/
static int read_partial_message(struct ceph_connection *con)
{
struct ceph_msg *m = con->in_msg;
void *p;
int ret;
int to, left;
unsigned front_len, middle_len, data_len, data_off;
Expand Down Expand Up @@ -1422,7 +1566,10 @@ static int read_partial_message(struct ceph_connection *con)
m->middle->vec.iov_len = 0;

con->in_msg_pos.page = 0;
con->in_msg_pos.page_pos = data_off & ~PAGE_MASK;
if (m->pages)
con->in_msg_pos.page_pos = data_off & ~PAGE_MASK;
else
con->in_msg_pos.page_pos = 0;
con->in_msg_pos.data_pos = 0;
}

Expand All @@ -1440,27 +1587,29 @@ static int read_partial_message(struct ceph_connection *con)
if (ret <= 0)
return ret;
}
#ifdef CONFIG_BLOCK
if (m->bio && !m->bio_iter)
init_bio_iter(m->bio, &m->bio_iter, &m->bio_seg);
#endif

/* (page) data */
while (con->in_msg_pos.data_pos < data_len) {
left = min((int)(data_len - con->in_msg_pos.data_pos),
(int)(PAGE_SIZE - con->in_msg_pos.page_pos));
BUG_ON(m->pages == NULL);
p = kmap(m->pages[con->in_msg_pos.page]);
ret = ceph_tcp_recvmsg(con->sock, p + con->in_msg_pos.page_pos,
left);
if (ret > 0 && datacrc)
con->in_data_crc =
crc32c(con->in_data_crc,
p + con->in_msg_pos.page_pos, ret);
kunmap(m->pages[con->in_msg_pos.page]);
if (ret <= 0)
return ret;
con->in_msg_pos.data_pos += ret;
con->in_msg_pos.page_pos += ret;
if (con->in_msg_pos.page_pos == PAGE_SIZE) {
con->in_msg_pos.page_pos = 0;
con->in_msg_pos.page++;
if (m->pages) {
ret = read_partial_message_pages(con, m->pages,
data_len, datacrc);
if (ret <= 0)
return ret;
#ifdef CONFIG_BLOCK
} else if (m->bio) {

ret = read_partial_message_bio(con,
&m->bio_iter, &m->bio_seg,
data_len, datacrc);
if (ret <= 0)
return ret;
#endif
} else {
BUG_ON(1);
}
}

Expand Down Expand Up @@ -2136,6 +2285,10 @@ struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags)
m->nr_pages = 0;
m->pages = NULL;
m->pagelist = NULL;
m->bio = NULL;
m->bio_iter = NULL;
m->bio_seg = 0;
m->trail = NULL;

dout("ceph_msg_new %p front %d\n", m, front_len);
return m;
Expand Down Expand Up @@ -2250,6 +2403,8 @@ void ceph_msg_last_put(struct kref *kref)
m->pagelist = NULL;
}

m->trail = NULL;

if (m->pool)
ceph_msgpool_put(m->pool, m);
else
Expand Down
4 changes: 4 additions & 0 deletions trunk/fs/ceph/messenger.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,10 @@ struct ceph_msg {
struct ceph_pagelist *pagelist; /* instead of pages */
struct list_head list_head;
struct kref kref;
struct bio *bio; /* instead of pages/pagelist */
struct bio *bio_iter; /* bio iterator */
int bio_seg; /* current bio segment */
struct ceph_pagelist *trail; /* the trailing part of the data */
bool front_is_vmalloc;
bool more_to_follow;
bool needs_out_seq;
Expand Down
Loading

0 comments on commit 77ea49f

Please sign in to comment.