Skip to content

Commit

Permalink
---
Browse files Browse the repository at this point in the history
yaml
---
r: 188668
b: refs/heads/master
c: 350b1c3
h: refs/heads/master
v: v3
  • Loading branch information
Sage Weil committed Dec 23, 2009
1 parent 10c7ffa commit 4a9f1a8
Show file tree
Hide file tree
Showing 5 changed files with 68 additions and 11 deletions.
2 changes: 1 addition & 1 deletion [refs]
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
---
refs/heads/master: ec302645f4a9bd9ec757c30d185557e1c0972c1a
refs/heads/master: 350b1c32ea58d29e25d63fc25e92dd48f9339546
29 changes: 29 additions & 0 deletions trunk/fs/ceph/messenger.c
Original file line number Diff line number Diff line change
Expand Up @@ -1975,6 +1975,35 @@ void ceph_con_revoke(struct ceph_connection *con, struct ceph_msg *msg)
mutex_unlock(&con->mutex);
}

/*
* Revoke a page vector that we may be reading data into
*/
void ceph_con_revoke_pages(struct ceph_connection *con, struct page **pages)
{
mutex_lock(&con->mutex);
if (con->in_msg && con->in_msg->pages == pages) {
unsigned data_len = le32_to_cpu(con->in_hdr.data_len);

/* skip rest of message */
dout("con_revoke_pages %p msg %p pages %p revoked\n", con,
con->in_msg, pages);
if (con->in_msg_pos.data_pos < data_len)
con->in_base_pos = con->in_msg_pos.data_pos - data_len;
else
con->in_base_pos = con->in_base_pos -
sizeof(struct ceph_msg_header) -
sizeof(struct ceph_msg_footer);
con->in_msg->pages = NULL;
ceph_msg_put(con->in_msg);
con->in_msg = NULL;
con->in_tag = CEPH_MSGR_TAG_READY;
} else {
dout("con_revoke_pages %p msg %p pages %p no-op\n",
con, con->in_msg, pages);
}
mutex_unlock(&con->mutex);
}

/*
* Queue a keepalive byte to ensure the tcp connection is alive.
*/
Expand Down
2 changes: 2 additions & 0 deletions trunk/fs/ceph/messenger.h
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,8 @@ extern void ceph_con_open(struct ceph_connection *con,
extern void ceph_con_close(struct ceph_connection *con);
extern void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg);
extern void ceph_con_revoke(struct ceph_connection *con, struct ceph_msg *msg);
extern void ceph_con_revoke_pages(struct ceph_connection *con,
struct page **pages);
extern void ceph_con_keepalive(struct ceph_connection *con);
extern struct ceph_connection *ceph_con_get(struct ceph_connection *con);
extern void ceph_con_put(struct ceph_connection *con);
Expand Down
42 changes: 33 additions & 9 deletions trunk/fs/ceph/osd_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,13 @@ void ceph_osdc_release_request(struct kref *kref)
ceph_msg_put(req->r_request);
if (req->r_reply)
ceph_msg_put(req->r_reply);
if (req->r_con_filling_pages) {
dout("release_request revoking pages %p from con %p\n",
req->r_pages, req->r_con_filling_pages);
ceph_con_revoke_pages(req->r_con_filling_pages,
req->r_pages);
ceph_con_put(req->r_con_filling_pages);
}
if (req->r_own_pages)
ceph_release_page_vector(req->r_pages,
req->r_num_pages);
Expand Down Expand Up @@ -687,7 +694,8 @@ static void handle_timeout(struct work_struct *work)
* handle osd op reply. either call the callback if it is specified,
* or do the completion to wake up the waiting thread.
*/
static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg)
static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
struct ceph_connection *con)
{
struct ceph_osd_reply_head *rhead = msg->front.iov_base;
struct ceph_osd_request *req;
Expand Down Expand Up @@ -715,6 +723,16 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg)
ceph_osdc_get_request(req);
flags = le32_to_cpu(rhead->flags);

/*
* if this connection filled our pages, drop our reference now, to
* avoid a (safe but slower) revoke later.
*/
if (req->r_con_filling_pages == con && req->r_pages == msg->pages) {
dout(" got pages, dropping con_filling_pages ref %p\n", con);
req->r_con_filling_pages = NULL;
ceph_con_put(con);
}

if (req->r_reply) {
/*
* once we see the message has been received, we don't
Expand Down Expand Up @@ -1007,14 +1025,20 @@ static int prepare_pages(struct ceph_connection *con, struct ceph_msg *m,
}
dout("prepare_pages tid %llu has %d pages, want %d\n",
tid, req->r_num_pages, want);
if (likely(req->r_num_pages >= want && !req->r_prepared_pages)) {
m->pages = req->r_pages;
m->nr_pages = req->r_num_pages;
req->r_reply = m; /* only for duration of read over socket */
ceph_msg_get(m);
req->r_prepared_pages = 1;
ret = 0; /* success */
if (unlikely(req->r_num_pages < want))
goto out;

if (req->r_con_filling_pages) {
dout("revoking pages %p from old con %p\n", req->r_pages,
req->r_con_filling_pages);
ceph_con_revoke_pages(req->r_con_filling_pages, req->r_pages);
ceph_con_put(req->r_con_filling_pages);
}
req->r_con_filling_pages = ceph_con_get(con);
req->r_reply = ceph_msg_get(m); /* for duration of read over socket */
m->pages = req->r_pages;
m->nr_pages = req->r_num_pages;
ret = 0; /* success */
out:
mutex_unlock(&osdc->request_mutex);
return ret;
Expand Down Expand Up @@ -1269,7 +1293,7 @@ static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
ceph_osdc_handle_map(osdc, msg);
break;
case CEPH_MSG_OSD_OPREPLY:
handle_reply(osdc, msg);
handle_reply(osdc, msg, con);
break;

default:
Expand Down
4 changes: 3 additions & 1 deletion trunk/fs/ceph/osd_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,13 @@ struct ceph_osd_request {
struct list_head r_osd_item;
struct ceph_osd *r_osd;

struct ceph_connection *r_con_filling_pages;

struct ceph_msg *r_request, *r_reply;
int r_result;
int r_flags; /* any additional flags for the osd */
u32 r_sent; /* >0 if r_request is sending/sent */
int r_prepared_pages, r_got_reply;
int r_got_reply;
int r_num_prealloc_reply;

struct ceph_osd_client *r_osdc;
Expand Down

0 comments on commit 4a9f1a8

Please sign in to comment.