Skip to content

Commit

Permalink
ceph: reset osd after relevant messages timed out
Browse files Browse the repository at this point in the history
This simplifies the process of timing out messages. We
keep lru of current messages that are in flight. If a
timeout has passed, we reset the osd connection, so that
messages will be retransmitted.  This is a failsafe in case
we hit some sort of problem sending out message to the OSD.
Normally, we'll get notification via an updated osdmap if
there are problems.

If a request is older than the keepalive timeout, send a
keepalive to ensure we detect any breaks in the TCP connection.

Signed-off-by: Yehuda Sadeh <yehuda@hq.newdream.net>
Signed-off-by: Sage Weil <sage@newdream.net>
  • Loading branch information
Yehuda Sadeh authored and Sage Weil committed Mar 4, 2010
1 parent e9964c1 commit 422d2cb
Show file tree
Hide file tree
Showing 4 changed files with 113 additions and 57 deletions.
153 changes: 98 additions & 55 deletions fs/ceph/osd_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
#define OSD_OPREPLY_FRONT_LEN 512

const static struct ceph_connection_operations osd_con_ops;
static int __kick_requests(struct ceph_osd_client *osdc,
struct ceph_osd *kickosd);

static void kick_requests(struct ceph_osd_client *osdc, struct ceph_osd *osd);

Expand Down Expand Up @@ -339,6 +341,7 @@ static struct ceph_osd *create_osd(struct ceph_osd_client *osdc)
osd->o_con.ops = &osd_con_ops;
osd->o_con.peer_name.type = CEPH_ENTITY_TYPE_OSD;

INIT_LIST_HEAD(&osd->o_keepalive_item);
return osd;
}

Expand Down Expand Up @@ -461,6 +464,16 @@ static struct ceph_osd *__lookup_osd(struct ceph_osd_client *osdc, int o)
return NULL;
}

static void __schedule_osd_timeout(struct ceph_osd_client *osdc)
{
schedule_delayed_work(&osdc->timeout_work,
osdc->client->mount_args->osd_keepalive_timeout * HZ);
}

static void __cancel_osd_timeout(struct ceph_osd_client *osdc)
{
cancel_delayed_work(&osdc->timeout_work);
}

/*
* Register request, assign tid. If this is the first request, set up
Expand All @@ -472,21 +485,16 @@ static void register_request(struct ceph_osd_client *osdc,
mutex_lock(&osdc->request_mutex);
req->r_tid = ++osdc->last_tid;
req->r_request->hdr.tid = cpu_to_le64(req->r_tid);
INIT_LIST_HEAD(&req->r_req_lru_item);

dout("register_request %p tid %lld\n", req, req->r_tid);
__insert_request(osdc, req);
ceph_osdc_get_request(req);
osdc->num_requests++;

req->r_timeout_stamp =
jiffies + osdc->client->mount_args->osd_timeout*HZ;

if (osdc->num_requests == 1) {
osdc->timeout_tid = req->r_tid;
dout(" timeout on tid %llu at %lu\n", req->r_tid,
req->r_timeout_stamp);
schedule_delayed_work(&osdc->timeout_work,
round_jiffies_relative(req->r_timeout_stamp - jiffies));
dout(" first request, scheduling timeout\n");
__schedule_osd_timeout(osdc);
}
mutex_unlock(&osdc->request_mutex);
}
Expand All @@ -513,21 +521,10 @@ static void __unregister_request(struct ceph_osd_client *osdc,

ceph_osdc_put_request(req);

if (req->r_tid == osdc->timeout_tid) {
if (osdc->num_requests == 0) {
dout("no requests, canceling timeout\n");
osdc->timeout_tid = 0;
cancel_delayed_work(&osdc->timeout_work);
} else {
req = rb_entry(rb_first(&osdc->requests),
struct ceph_osd_request, r_node);
osdc->timeout_tid = req->r_tid;
dout("rescheduled timeout on tid %llu at %lu\n",
req->r_tid, req->r_timeout_stamp);
schedule_delayed_work(&osdc->timeout_work,
round_jiffies_relative(req->r_timeout_stamp -
jiffies));
}
list_del_init(&req->r_req_lru_item);
if (osdc->num_requests == 0) {
dout(" no requests, canceling timeout\n");
__cancel_osd_timeout(osdc);
}
}

Expand All @@ -540,6 +537,7 @@ static void __cancel_request(struct ceph_osd_request *req)
ceph_con_revoke(&req->r_osd->o_con, req->r_request);
req->r_sent = 0;
}
list_del_init(&req->r_req_lru_item);
}

/*
Expand Down Expand Up @@ -635,7 +633,8 @@ static int __send_request(struct ceph_osd_client *osdc,
reqhead->flags |= cpu_to_le32(req->r_flags); /* e.g., RETRY */
reqhead->reassert_version = req->r_reassert_version;

req->r_timeout_stamp = jiffies+osdc->client->mount_args->osd_timeout*HZ;
req->r_sent_stamp = jiffies;
list_move_tail(&osdc->req_lru, &req->r_req_lru_item);

ceph_msg_get(req->r_request); /* send consumes a ref */
ceph_con_send(&req->r_osd->o_con, req->r_request);
Expand All @@ -656,11 +655,14 @@ static void handle_timeout(struct work_struct *work)
{
struct ceph_osd_client *osdc =
container_of(work, struct ceph_osd_client, timeout_work.work);
struct ceph_osd_request *req;
struct ceph_osd_request *req, *last_req = NULL;
struct ceph_osd *osd;
unsigned long timeout = osdc->client->mount_args->osd_timeout * HZ;
unsigned long next_timeout = timeout + jiffies;
unsigned long keepalive =
osdc->client->mount_args->osd_keepalive_timeout * HZ;
unsigned long last_sent = 0;
struct rb_node *p;
struct list_head slow_osds;

dout("timeout\n");
down_read(&osdc->map_sem);
Expand All @@ -683,25 +685,56 @@ static void handle_timeout(struct work_struct *work)
continue;
}
}
for (p = rb_first(&osdc->osds); p; p = rb_next(p)) {
osd = rb_entry(p, struct ceph_osd, o_node);
if (list_empty(&osd->o_requests))
continue;
req = list_first_entry(&osd->o_requests,
struct ceph_osd_request, r_osd_item);
if (time_before(jiffies, req->r_timeout_stamp))
continue;

dout(" tid %llu (at least) timed out on osd%d\n",
/*
* reset osds that appear to be _really_ unresponsive. this
* is a failsafe measure.. we really shouldn't be getting to
* this point if the system is working properly. the monitors
* should mark the osd as failed and we should find out about
* it from an updated osd map.
*/
while (!list_empty(&osdc->req_lru)) {
req = list_entry(osdc->req_lru.next, struct ceph_osd_request,
r_req_lru_item);

if (time_before(jiffies, req->r_sent_stamp + timeout))
break;

BUG_ON(req == last_req && req->r_sent_stamp == last_sent);
last_req = req;
last_sent = req->r_sent_stamp;

osd = req->r_osd;
BUG_ON(!osd);
pr_warning(" tid %llu timed out on osd%d, will reset osd\n",
req->r_tid, osd->o_osd);
__kick_requests(osdc, osd);
}

/*
* ping osds that are a bit slow. this ensures that if there
* is a break in the TCP connection we will notice, and reopen
* a connection with that osd (from the fault callback).
*/
INIT_LIST_HEAD(&slow_osds);
list_for_each_entry(req, &osdc->req_lru, r_req_lru_item) {
if (time_before(jiffies, req->r_sent_stamp + keepalive))
break;

osd = req->r_osd;
BUG_ON(!osd);
dout(" tid %llu is slow, will send keepalive on osd%d\n",
req->r_tid, osd->o_osd);
req->r_timeout_stamp = next_timeout;
list_move_tail(&osd->o_keepalive_item, &slow_osds);
}
while (!list_empty(&slow_osds)) {
osd = list_entry(slow_osds.next, struct ceph_osd,
o_keepalive_item);
list_del_init(&osd->o_keepalive_item);
ceph_con_keepalive(&osd->o_con);
}

if (osdc->timeout_tid)
schedule_delayed_work(&osdc->timeout_work,
round_jiffies_relative(timeout));

__schedule_osd_timeout(osdc);
mutex_unlock(&osdc->request_mutex);

up_read(&osdc->map_sem);
Expand Down Expand Up @@ -819,18 +852,7 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
}


/*
* Resubmit osd requests whose osd or osd address has changed. Request
* a new osd map if osds are down, or we are otherwise unable to determine
* how to direct a request.
*
* Close connections to down osds.
*
* If @who is specified, resubmit requests for that specific osd.
*
* Caller should hold map_sem for read and request_mutex.
*/
static void kick_requests(struct ceph_osd_client *osdc,
static int __kick_requests(struct ceph_osd_client *osdc,
struct ceph_osd *kickosd)
{
struct ceph_osd_request *req;
Expand All @@ -839,7 +861,6 @@ static void kick_requests(struct ceph_osd_client *osdc,
int err;

dout("kick_requests osd%d\n", kickosd ? kickosd->o_osd : -1);
mutex_lock(&osdc->request_mutex);
if (kickosd) {
__reset_osd(osdc, kickosd);
} else {
Expand Down Expand Up @@ -900,14 +921,36 @@ static void kick_requests(struct ceph_osd_client *osdc,
req->r_resend = true;
}
}

return needmap;
}

/*
* Resubmit osd requests whose osd or osd address has changed. Request
* a new osd map if osds are down, or we are otherwise unable to determine
* how to direct a request.
*
* Close connections to down osds.
*
* If @who is specified, resubmit requests for that specific osd.
*
* Caller should hold map_sem for read and request_mutex.
*/
static void kick_requests(struct ceph_osd_client *osdc,
struct ceph_osd *kickosd)
{
int needmap;

mutex_lock(&osdc->request_mutex);
needmap = __kick_requests(osdc, kickosd);
mutex_unlock(&osdc->request_mutex);

if (needmap) {
dout("%d requests for down osds, need new map\n", needmap);
ceph_monc_request_next_osdmap(&osdc->client->monc);
}
}

}
/*
* Process updated osd map.
*
Expand Down Expand Up @@ -1164,11 +1207,11 @@ int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client)
init_completion(&osdc->map_waiters);
osdc->last_requested_map = 0;
mutex_init(&osdc->request_mutex);
osdc->timeout_tid = 0;
osdc->last_tid = 0;
osdc->osds = RB_ROOT;
INIT_LIST_HEAD(&osdc->osd_lru);
osdc->requests = RB_ROOT;
INIT_LIST_HEAD(&osdc->req_lru);
osdc->num_requests = 0;
INIT_DELAYED_WORK(&osdc->timeout_work, handle_timeout);
INIT_DELAYED_WORK(&osdc->osds_timeout_work, handle_osds_timeout);
Expand Down
6 changes: 5 additions & 1 deletion fs/ceph/osd_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,15 @@ struct ceph_osd {
void *o_authorizer_buf, *o_authorizer_reply_buf;
size_t o_authorizer_buf_len, o_authorizer_reply_buf_len;
unsigned long lru_ttl;
int o_marked_for_keepalive;
struct list_head o_keepalive_item;
};

/* an in-flight request */
struct ceph_osd_request {
u64 r_tid; /* unique for this client */
struct rb_node r_node;
struct list_head r_req_lru_item;
struct list_head r_osd_item;
struct ceph_osd *r_osd;
struct ceph_pg r_pgid;
Expand All @@ -67,7 +70,7 @@ struct ceph_osd_request {

char r_oid[40]; /* object name */
int r_oid_len;
unsigned long r_timeout_stamp;
unsigned long r_sent_stamp;
bool r_resend; /* msg send failed, needs retry */

struct ceph_file_layout r_file_layout;
Expand All @@ -92,6 +95,7 @@ struct ceph_osd_client {
u64 timeout_tid; /* tid of timeout triggering rq */
u64 last_tid; /* tid of last request */
struct rb_root requests; /* pending requests */
struct list_head req_lru; /* pending requests lru */
int num_requests;
struct delayed_work timeout_work;
struct delayed_work osds_timeout_work;
Expand Down
8 changes: 7 additions & 1 deletion fs/ceph/super.c
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,7 @@ enum {
Opt_wsize,
Opt_rsize,
Opt_osdtimeout,
Opt_osdkeepalivetimeout,
Opt_mount_timeout,
Opt_osd_idle_ttl,
Opt_caps_wanted_delay_min,
Expand Down Expand Up @@ -322,6 +323,7 @@ static match_table_t arg_tokens = {
{Opt_wsize, "wsize=%d"},
{Opt_rsize, "rsize=%d"},
{Opt_osdtimeout, "osdtimeout=%d"},
{Opt_osdkeepalivetimeout, "osdkeepalive=%d"},
{Opt_mount_timeout, "mount_timeout=%d"},
{Opt_osd_idle_ttl, "osd_idle_ttl=%d"},
{Opt_caps_wanted_delay_min, "caps_wanted_delay_min=%d"},
Expand Down Expand Up @@ -367,7 +369,8 @@ static struct ceph_mount_args *parse_mount_args(int flags, char *options,
/* start with defaults */
args->sb_flags = flags;
args->flags = CEPH_OPT_DEFAULT;
args->osd_timeout = 5; /* seconds */
args->osd_timeout = CEPH_OSD_TIMEOUT_DEFAULT;
args->osd_keepalive_timeout = CEPH_OSD_KEEPALIVE_DEFAULT;
args->mount_timeout = CEPH_MOUNT_TIMEOUT_DEFAULT; /* seconds */
args->osd_idle_ttl = CEPH_OSD_IDLE_TTL_DEFAULT; /* seconds */
args->caps_wanted_delay_min = CEPH_CAPS_WANTED_DELAY_MIN_DEFAULT;
Expand Down Expand Up @@ -468,6 +471,9 @@ static struct ceph_mount_args *parse_mount_args(int flags, char *options,
case Opt_osdtimeout:
args->osd_timeout = intval;
break;
case Opt_osdkeepalivetimeout:
args->osd_keepalive_timeout = intval;
break;
case Opt_mount_timeout:
args->mount_timeout = intval;
break;
Expand Down
3 changes: 3 additions & 0 deletions fs/ceph/super.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ struct ceph_mount_args {
int max_readdir; /* max readdir size */
int congestion_kb; /* max readdir size */
int osd_timeout;
int osd_keepalive_timeout;
char *snapdir_name; /* default ".snap" */
char *name;
char *secret;
Expand All @@ -72,6 +73,8 @@ struct ceph_mount_args {
* defaults
*/
#define CEPH_MOUNT_TIMEOUT_DEFAULT 60
#define CEPH_OSD_TIMEOUT_DEFAULT 60 /* seconds */
#define CEPH_OSD_KEEPALIVE_DEFAULT 5
#define CEPH_OSD_IDLE_TTL_DEFAULT 60
#define CEPH_MOUNT_RSIZE_DEFAULT (512*1024) /* readahead */

Expand Down

0 comments on commit 422d2cb

Please sign in to comment.