Skip to content

Commit

Permalink
ceph: make page alignment explicit in osd interface
Browse files Browse the repository at this point in the history
We used to infer alignment of IOs within a page based on the file offset,
which assumed they matched.  This broke with direct IO that was not aligned
to pages (e.g., 512-byte aligned IO).  We were also trusting the alignment
specified in the OSD reply, which could have been adjusted by the server.

Explicitly specify the page alignment when setting up OSD IO requests.

Signed-off-by: Sage Weil <sage@newdream.net>
  • Loading branch information
Sage Weil committed Nov 9, 2010
1 parent e98b6fe commit b7495fc
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 19 deletions.
6 changes: 3 additions & 3 deletions fs/ceph/addr.c
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ static int readpage_nounlock(struct file *filp, struct page *page)
err = ceph_osdc_readpages(osdc, ceph_vino(inode), &ci->i_layout,
page->index << PAGE_CACHE_SHIFT, &len,
ci->i_truncate_seq, ci->i_truncate_size,
&page, 1);
&page, 1, 0);
if (err == -ENOENT)
err = 0;
if (err < 0) {
Expand Down Expand Up @@ -287,7 +287,7 @@ static int ceph_readpages(struct file *file, struct address_space *mapping,
rc = ceph_osdc_readpages(osdc, ceph_vino(inode), &ci->i_layout,
offset, &len,
ci->i_truncate_seq, ci->i_truncate_size,
pages, nr_pages);
pages, nr_pages, 0);
if (rc == -ENOENT)
rc = 0;
if (rc < 0)
Expand Down Expand Up @@ -782,7 +782,7 @@ static int ceph_writepages_start(struct address_space *mapping,
snapc, do_sync,
ci->i_truncate_seq,
ci->i_truncate_size,
&inode->i_mtime, true, 1);
&inode->i_mtime, true, 1, 0);
max_pages = req->r_num_pages;

alloc_page_vec(fsc, req);
Expand Down
26 changes: 21 additions & 5 deletions fs/ceph/file.c
Original file line number Diff line number Diff line change
Expand Up @@ -282,11 +282,12 @@ int ceph_release(struct inode *inode, struct file *file)
static int striped_read(struct inode *inode,
u64 off, u64 len,
struct page **pages, int num_pages,
int *checkeof)
int *checkeof, bool align_to_pages)
{
struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
struct ceph_inode_info *ci = ceph_inode(inode);
u64 pos, this_len;
int io_align, page_align;
int page_off = off & ~PAGE_CACHE_MASK; /* first byte's offset in page */
int left, pages_left;
int read;
Expand All @@ -302,14 +303,19 @@ static int striped_read(struct inode *inode,
page_pos = pages;
pages_left = num_pages;
read = 0;
io_align = off & ~PAGE_MASK;

more:
if (align_to_pages)
page_align = (pos - io_align) & ~PAGE_MASK;
else
page_align = pos & ~PAGE_MASK;
this_len = left;
ret = ceph_osdc_readpages(&fsc->client->osdc, ceph_vino(inode),
&ci->i_layout, pos, &this_len,
ci->i_truncate_seq,
ci->i_truncate_size,
page_pos, pages_left);
page_pos, pages_left, page_align);
hit_stripe = this_len < left;
was_short = ret >= 0 && ret < this_len;
if (ret == -ENOENT)
Expand Down Expand Up @@ -393,7 +399,8 @@ static ssize_t ceph_sync_read(struct file *file, char __user *data,
if (ret < 0)
goto done;

ret = striped_read(inode, off, len, pages, num_pages, checkeof);
ret = striped_read(inode, off, len, pages, num_pages, checkeof,
file->f_flags & O_DIRECT);

if (ret >= 0 && (file->f_flags & O_DIRECT) == 0)
ret = ceph_copy_page_vector_to_user(pages, data, off, ret);
Expand Down Expand Up @@ -448,6 +455,7 @@ static ssize_t ceph_sync_write(struct file *file, const char __user *data,
int flags;
int do_sync = 0;
int check_caps = 0;
int page_align, io_align;
int ret;
struct timespec mtime = CURRENT_TIME;

Expand All @@ -462,6 +470,8 @@ static ssize_t ceph_sync_write(struct file *file, const char __user *data,
else
pos = *offset;

io_align = pos & ~PAGE_MASK;

ret = filemap_write_and_wait_range(inode->i_mapping, pos, pos + left);
if (ret < 0)
return ret;
Expand All @@ -486,20 +496,26 @@ static ssize_t ceph_sync_write(struct file *file, const char __user *data,
*/
more:
len = left;
if (file->f_flags & O_DIRECT)
/* write from beginning of first page, regardless of
io alignment */
page_align = (pos - io_align) & ~PAGE_MASK;
else
page_align = pos & ~PAGE_MASK;
req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
ceph_vino(inode), pos, &len,
CEPH_OSD_OP_WRITE, flags,
ci->i_snap_realm->cached_context,
do_sync,
ci->i_truncate_seq, ci->i_truncate_size,
&mtime, false, 2);
&mtime, false, 2, page_align);
if (!req)
return -ENOMEM;

num_pages = calc_pages_for(pos, len);

if (file->f_flags & O_DIRECT) {
pages = ceph_get_direct_page_vector(data, num_pages, pos, len);
pages = ceph_get_direct_page_vector(data, num_pages);
if (IS_ERR(pages)) {
ret = PTR_ERR(pages);
goto out;
Expand Down
2 changes: 1 addition & 1 deletion fs/ceph/inode.c
Original file line number Diff line number Diff line change
Expand Up @@ -1752,7 +1752,7 @@ int ceph_do_getattr(struct inode *inode, int mask)
return 0;
}

dout("do_getattr inode %p mask %s\n", inode, ceph_cap_string(mask));
dout("do_getattr inode %p mask %s mode 0%o\n", inode, ceph_cap_string(mask), inode->i_mode);
if (ceph_caps_issued_mask(ceph_inode(inode), mask, 1))
return 0;

Expand Down
7 changes: 5 additions & 2 deletions include/linux/ceph/osd_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ struct ceph_osd_request {
struct ceph_file_layout r_file_layout;
struct ceph_snap_context *r_snapc; /* snap context for writes */
unsigned r_num_pages; /* size of page array (follows) */
unsigned r_page_alignment; /* io offset in first page */
struct page **r_pages; /* pages for data payload */
int r_pages_from_pool;
int r_own_pages; /* if true, i own page list */
Expand Down Expand Up @@ -194,7 +195,8 @@ extern struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *,
int do_sync, u32 truncate_seq,
u64 truncate_size,
struct timespec *mtime,
bool use_mempool, int num_reply);
bool use_mempool, int num_reply,
int page_align);

static inline void ceph_osdc_get_request(struct ceph_osd_request *req)
{
Expand All @@ -218,7 +220,8 @@ extern int ceph_osdc_readpages(struct ceph_osd_client *osdc,
struct ceph_file_layout *layout,
u64 off, u64 *plen,
u32 truncate_seq, u64 truncate_size,
struct page **pages, int nr_pages);
struct page **pages, int nr_pages,
int page_align);

extern int ceph_osdc_writepages(struct ceph_osd_client *osdc,
struct ceph_vino vino,
Expand Down
22 changes: 14 additions & 8 deletions net/ceph/osd_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ void ceph_calc_raw_layout(struct ceph_osd_client *osdc,
op->extent.length = objlen;
}
req->r_num_pages = calc_pages_for(off, *plen);
req->r_page_alignment = off & ~PAGE_MASK;
if (op->op == CEPH_OSD_OP_WRITE)
op->payload_len = *plen;

Expand Down Expand Up @@ -419,7 +420,8 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
u32 truncate_seq,
u64 truncate_size,
struct timespec *mtime,
bool use_mempool, int num_reply)
bool use_mempool, int num_reply,
int page_align)
{
struct ceph_osd_req_op ops[3];
struct ceph_osd_request *req;
Expand Down Expand Up @@ -447,6 +449,10 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
calc_layout(osdc, vino, layout, off, plen, req, ops);
req->r_file_layout = *layout; /* keep a copy */

/* in case it differs from natural alignment that calc_layout
filled in for us */
req->r_page_alignment = page_align;

ceph_osdc_build_request(req, off, plen, ops,
snapc,
mtime,
Expand Down Expand Up @@ -1489,7 +1495,7 @@ int ceph_osdc_readpages(struct ceph_osd_client *osdc,
struct ceph_vino vino, struct ceph_file_layout *layout,
u64 off, u64 *plen,
u32 truncate_seq, u64 truncate_size,
struct page **pages, int num_pages)
struct page **pages, int num_pages, int page_align)
{
struct ceph_osd_request *req;
int rc = 0;
Expand All @@ -1499,15 +1505,15 @@ int ceph_osdc_readpages(struct ceph_osd_client *osdc,
req = ceph_osdc_new_request(osdc, layout, vino, off, plen,
CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ,
NULL, 0, truncate_seq, truncate_size, NULL,
false, 1);
false, 1, page_align);
if (!req)
return -ENOMEM;

/* it may be a short read due to an object boundary */
req->r_pages = pages;

dout("readpages final extent is %llu~%llu (%d pages)\n",
off, *plen, req->r_num_pages);
dout("readpages final extent is %llu~%llu (%d pages align %d)\n",
off, *plen, req->r_num_pages, page_align);

rc = ceph_osdc_start_request(osdc, req, false);
if (!rc)
Expand All @@ -1533,6 +1539,7 @@ int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino,
{
struct ceph_osd_request *req;
int rc = 0;
int page_align = off & ~PAGE_MASK;

BUG_ON(vino.snap != CEPH_NOSNAP);
req = ceph_osdc_new_request(osdc, layout, vino, off, &len,
Expand All @@ -1541,7 +1548,7 @@ int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino,
CEPH_OSD_FLAG_WRITE,
snapc, do_sync,
truncate_seq, truncate_size, mtime,
nofail, 1);
nofail, 1, page_align);
if (!req)
return -ENOMEM;

Expand Down Expand Up @@ -1638,8 +1645,7 @@ static struct ceph_msg *get_reply(struct ceph_connection *con,
m = ceph_msg_get(req->r_reply);

if (data_len > 0) {
unsigned data_off = le16_to_cpu(hdr->data_off);
int want = calc_pages_for(data_off & ~PAGE_MASK, data_len);
int want = calc_pages_for(req->r_page_alignment, data_len);

if (unlikely(req->r_num_pages < want)) {
pr_warning("tid %lld reply %d > expected %d pages\n",
Expand Down

0 comments on commit b7495fc

Please sign in to comment.