Skip to content

Commit

Permalink
Merge branch 'for-linus' of git://ceph.newdream.net/git/ceph-client
Browse files Browse the repository at this point in the history
* 'for-linus' of git://ceph.newdream.net/git/ceph-client:
  libceph: fix double-free of page vector
  ceph: fix 32-bit ino numbers
  libceph: force resend of osd requests if we skip an osdmap
  ceph: use kernel DNS resolver
  ceph: fix ceph_monc_init memory leak
  ceph: let the set_layout ioctl set single traits
  Revert "ceph: don't truncate dirty pages in invalidate work thread"
  ceph: replace leading spaces with tabs
  libceph: warn on msg allocation failures
  libceph: don't complain on msgpool alloc failures
  libceph: always preallocate mon connection
  libceph: create messenger with client
  ceph: document ioctls
  ceph: implement (optional) max read size
  ceph: rename rsize -> rasize
  ceph: make readpages fully async
  • Loading branch information
Linus Torvalds committed Oct 28, 2011
2 parents 68d99b2 + 3395734 commit 97d2eb1
Show file tree
Hide file tree
Showing 17 changed files with 483 additions and 255 deletions.
2 changes: 1 addition & 1 deletion drivers/block/rbd.c
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ static struct rbd_client *rbd_client_create(struct ceph_options *opt,
kref_init(&rbdc->kref);
INIT_LIST_HEAD(&rbdc->node);

rbdc->client = ceph_create_client(opt, rbdc);
rbdc->client = ceph_create_client(opt, rbdc, 0, 0);
if (IS_ERR(rbdc->client))
goto out_rbdc;
opt = NULL; /* Now rbdc->client is responsible for opt */
Expand Down
193 changes: 123 additions & 70 deletions fs/ceph/addr.c
Original file line number Diff line number Diff line change
Expand Up @@ -228,102 +228,155 @@ static int ceph_readpage(struct file *filp, struct page *page)
}

/*
* Build a vector of contiguous pages from the provided page list.
* Finish an async read(ahead) op.
*/
static struct page **page_vector_from_list(struct list_head *page_list,
unsigned *nr_pages)
static void finish_read(struct ceph_osd_request *req, struct ceph_msg *msg)
{
struct page **pages;
struct page *page;
int next_index, contig_pages = 0;
struct inode *inode = req->r_inode;
struct ceph_osd_reply_head *replyhead;
int rc, bytes;
int i;

/* build page vector */
pages = kmalloc(sizeof(*pages) * *nr_pages, GFP_NOFS);
if (!pages)
return ERR_PTR(-ENOMEM);
/* parse reply */
replyhead = msg->front.iov_base;
WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
rc = le32_to_cpu(replyhead->result);
bytes = le32_to_cpu(msg->hdr.data_len);

BUG_ON(list_empty(page_list));
next_index = list_entry(page_list->prev, struct page, lru)->index;
list_for_each_entry_reverse(page, page_list, lru) {
if (page->index == next_index) {
dout("readpages page %d %p\n", contig_pages, page);
pages[contig_pages] = page;
contig_pages++;
next_index++;
} else {
break;
dout("finish_read %p req %p rc %d bytes %d\n", inode, req, rc, bytes);

/* unlock all pages, zeroing any data we didn't read */
for (i = 0; i < req->r_num_pages; i++, bytes -= PAGE_CACHE_SIZE) {
struct page *page = req->r_pages[i];

if (bytes < (int)PAGE_CACHE_SIZE) {
/* zero (remainder of) page */
int s = bytes < 0 ? 0 : bytes;
zero_user_segment(page, s, PAGE_CACHE_SIZE);
}
dout("finish_read %p uptodate %p idx %lu\n", inode, page,
page->index);
flush_dcache_page(page);
SetPageUptodate(page);
unlock_page(page);
page_cache_release(page);
}
*nr_pages = contig_pages;
return pages;
kfree(req->r_pages);
}

/*
* Read multiple pages. Leave pages we don't read + unlock in page_list;
* the caller (VM) cleans them up.
* start an async read(ahead) operation. return nr_pages we submitted
* a read for on success, or negative error code.
*/
static int ceph_readpages(struct file *file, struct address_space *mapping,
struct list_head *page_list, unsigned nr_pages)
static int start_read(struct inode *inode, struct list_head *page_list, int max)
{
struct inode *inode = file->f_dentry->d_inode;
struct ceph_inode_info *ci = ceph_inode(inode);
struct ceph_osd_client *osdc =
&ceph_inode_to_client(inode)->client->osdc;
int rc = 0;
struct page **pages;
loff_t offset;
struct ceph_inode_info *ci = ceph_inode(inode);
struct page *page = list_entry(page_list->prev, struct page, lru);
struct ceph_osd_request *req;
u64 off;
u64 len;
int i;
struct page **pages;
pgoff_t next_index;
int nr_pages = 0;
int ret;

dout("readpages %p file %p nr_pages %d\n",
inode, file, nr_pages);

pages = page_vector_from_list(page_list, &nr_pages);
if (IS_ERR(pages))
return PTR_ERR(pages);
off = page->index << PAGE_CACHE_SHIFT;

/* guess read extent */
offset = pages[0]->index << PAGE_CACHE_SHIFT;
/* count pages */
next_index = page->index;
list_for_each_entry_reverse(page, page_list, lru) {
if (page->index != next_index)
break;
nr_pages++;
next_index++;
if (max && nr_pages == max)
break;
}
len = nr_pages << PAGE_CACHE_SHIFT;
rc = ceph_osdc_readpages(osdc, ceph_vino(inode), &ci->i_layout,
offset, &len,
ci->i_truncate_seq, ci->i_truncate_size,
pages, nr_pages, 0);
if (rc == -ENOENT)
rc = 0;
if (rc < 0)
goto out;

for (; !list_empty(page_list) && len > 0;
rc -= PAGE_CACHE_SIZE, len -= PAGE_CACHE_SIZE) {
struct page *page =
list_entry(page_list->prev, struct page, lru);
dout("start_read %p nr_pages %d is %lld~%lld\n", inode, nr_pages,
off, len);

req = ceph_osdc_new_request(osdc, &ci->i_layout, ceph_vino(inode),
off, &len,
CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ,
NULL, 0,
ci->i_truncate_seq, ci->i_truncate_size,
NULL, false, 1, 0);
if (!req)
return -ENOMEM;

/* build page vector */
nr_pages = len >> PAGE_CACHE_SHIFT;
pages = kmalloc(sizeof(*pages) * nr_pages, GFP_NOFS);
ret = -ENOMEM;
if (!pages)
goto out;
for (i = 0; i < nr_pages; ++i) {
page = list_entry(page_list->prev, struct page, lru);
BUG_ON(PageLocked(page));
list_del(&page->lru);

if (rc < (int)PAGE_CACHE_SIZE) {
/* zero (remainder of) page */
int s = rc < 0 ? 0 : rc;
zero_user_segment(page, s, PAGE_CACHE_SIZE);
}

if (add_to_page_cache_lru(page, mapping, page->index,

dout("start_read %p adding %p idx %lu\n", inode, page,
page->index);
if (add_to_page_cache_lru(page, &inode->i_data, page->index,
GFP_NOFS)) {
page_cache_release(page);
dout("readpages %p add_to_page_cache failed %p\n",
dout("start_read %p add_to_page_cache failed %p\n",
inode, page);
continue;
nr_pages = i;
goto out_pages;
}
dout("readpages %p adding %p idx %lu\n", inode, page,
page->index);
flush_dcache_page(page);
SetPageUptodate(page);
unlock_page(page);
page_cache_release(page);
pages[i] = page;
}
rc = 0;
req->r_pages = pages;
req->r_num_pages = nr_pages;
req->r_callback = finish_read;
req->r_inode = inode;

dout("start_read %p starting %p %lld~%lld\n", inode, req, off, len);
ret = ceph_osdc_start_request(osdc, req, false);
if (ret < 0)
goto out_pages;
ceph_osdc_put_request(req);
return nr_pages;

out_pages:
ceph_release_page_vector(pages, nr_pages);
out:
ceph_osdc_put_request(req);
return ret;
}


/*
* Read multiple pages. Leave pages we don't read + unlock in page_list;
* the caller (VM) cleans them up.
*/
static int ceph_readpages(struct file *file, struct address_space *mapping,
struct list_head *page_list, unsigned nr_pages)
{
struct inode *inode = file->f_dentry->d_inode;
struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
int rc = 0;
int max = 0;

if (fsc->mount_options->rsize >= PAGE_CACHE_SIZE)
max = (fsc->mount_options->rsize + PAGE_CACHE_SIZE - 1)
>> PAGE_SHIFT;

dout("readpages %p file %p nr_pages %d max %d\n", inode, file, nr_pages,
max);
while (!list_empty(page_list)) {
rc = start_read(inode, page_list, max);
if (rc < 0)
goto out;
BUG_ON(rc == 0);
}
out:
kfree(pages);
dout("readpages %p file %p ret %d\n", inode, file, rc);
return rc;
}

Expand Down
2 changes: 1 addition & 1 deletion fs/ceph/caps.c
Original file line number Diff line number Diff line change
Expand Up @@ -945,7 +945,7 @@ static int send_cap_msg(struct ceph_mds_session *session,
seq, issue_seq, mseq, follows, size, max_size,
xattr_version, xattrs_buf ? (int)xattrs_buf->vec.iov_len : 0);

msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPS, sizeof(*fc), GFP_NOFS);
msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPS, sizeof(*fc), GFP_NOFS, false);
if (!msg)
return -ENOMEM;

Expand Down
46 changes: 1 addition & 45 deletions fs/ceph/inode.c
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
#include <linux/namei.h>
#include <linux/writeback.h>
#include <linux/vmalloc.h>
#include <linux/pagevec.h>

#include "super.h"
#include "mds_client.h"
Expand Down Expand Up @@ -1363,49 +1362,6 @@ void ceph_queue_invalidate(struct inode *inode)
}
}

/*
* invalidate any pages that are not dirty or under writeback. this
* includes pages that are clean and mapped.
*/
static void ceph_invalidate_nondirty_pages(struct address_space *mapping)
{
struct pagevec pvec;
pgoff_t next = 0;
int i;

pagevec_init(&pvec, 0);
while (pagevec_lookup(&pvec, mapping, next, PAGEVEC_SIZE)) {
for (i = 0; i < pagevec_count(&pvec); i++) {
struct page *page = pvec.pages[i];
pgoff_t index;
int skip_page =
(PageDirty(page) || PageWriteback(page));

if (!skip_page)
skip_page = !trylock_page(page);

/*
* We really shouldn't be looking at the ->index of an
* unlocked page. But we're not allowed to lock these
* pages. So we rely upon nobody altering the ->index
* of this (pinned-by-us) page.
*/
index = page->index;
if (index > next)
next = index;
next++;

if (skip_page)
continue;

generic_error_remove_page(mapping, page);
unlock_page(page);
}
pagevec_release(&pvec);
cond_resched();
}
}

/*
* Invalidate inode pages in a worker thread. (This can't be done
* in the message handler context.)
Expand All @@ -1429,7 +1385,7 @@ static void ceph_invalidate_work(struct work_struct *work)
orig_gen = ci->i_rdcache_gen;
spin_unlock(&inode->i_lock);

ceph_invalidate_nondirty_pages(inode->i_mapping);
truncate_inode_pages(&inode->i_data, 0);

spin_lock(&inode->i_lock);
if (orig_gen == ci->i_rdcache_gen &&
Expand Down
34 changes: 28 additions & 6 deletions fs/ceph/ioctl.c
Original file line number Diff line number Diff line change
Expand Up @@ -42,17 +42,39 @@ static long ceph_ioctl_set_layout(struct file *file, void __user *arg)
struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc;
struct ceph_mds_request *req;
struct ceph_ioctl_layout l;
struct ceph_inode_info *ci = ceph_inode(file->f_dentry->d_inode);
struct ceph_ioctl_layout nl;
int err, i;

/* copy and validate */
if (copy_from_user(&l, arg, sizeof(l)))
return -EFAULT;

if ((l.object_size & ~PAGE_MASK) ||
(l.stripe_unit & ~PAGE_MASK) ||
!l.stripe_unit ||
(l.object_size &&
(unsigned)l.object_size % (unsigned)l.stripe_unit))
/* validate changed params against current layout */
err = ceph_do_getattr(file->f_dentry->d_inode, CEPH_STAT_CAP_LAYOUT);
if (!err) {
nl.stripe_unit = ceph_file_layout_su(ci->i_layout);
nl.stripe_count = ceph_file_layout_stripe_count(ci->i_layout);
nl.object_size = ceph_file_layout_object_size(ci->i_layout);
nl.data_pool = le32_to_cpu(ci->i_layout.fl_pg_pool);
nl.preferred_osd =
(s32)le32_to_cpu(ci->i_layout.fl_pg_preferred);
} else
return err;

if (l.stripe_count)
nl.stripe_count = l.stripe_count;
if (l.stripe_unit)
nl.stripe_unit = l.stripe_unit;
if (l.object_size)
nl.object_size = l.object_size;
if (l.data_pool)
nl.data_pool = l.data_pool;
if (l.preferred_osd)
nl.preferred_osd = l.preferred_osd;

if ((nl.object_size & ~PAGE_MASK) ||
(nl.stripe_unit & ~PAGE_MASK) ||
((unsigned)nl.object_size % (unsigned)nl.stripe_unit))
return -EINVAL;

/* make sure it's a valid data pool */
Expand Down
Loading

0 comments on commit 97d2eb1

Please sign in to comment.