Skip to content

Commit

Permalink
libceph: replace connection state bits with states
Browse files Browse the repository at this point in the history
Use a simple set of 6 enumerated values for the socket states (CON_STATE_*)
and use those instead of the state bits.  All of the con->state checks are
now under the protection of the con mutex, so this is safe.  It also
simplifies many of the state checks because we can check for anything other
than the expected state instead of various bits for races we can think of.

This appears to hold up well to stress testing both with and without socket
failure injection on the server side.

Signed-off-by: Sage Weil <sage@inktank.com>
  • Loading branch information
Sage Weil committed Jul 31, 2012
1 parent d7353dd commit 8dacc7d
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 74 deletions.
12 changes: 0 additions & 12 deletions include/linux/ceph/messenger.h
Original file line number Diff line number Diff line change
Expand Up @@ -116,18 +116,6 @@ struct ceph_msg_pos {
#define SOCK_CLOSED 11 /* socket state changed to closed */
#define BACKOFF 15

/*
* ceph_connection states
*/
#define CONNECTING 1
#define NEGOTIATING 2
#define CONNECTED 5
#define STANDBY 8 /* no outgoing messages, socket closed. we keep
* the ceph_connection around to maintain shared
* state with the peer. */
#define CLOSED 10 /* we've closed the connection */
#define OPENING 13 /* open connection w/ (possibly new) peer */

/*
* A single connection with another host.
*
Expand Down
130 changes: 68 additions & 62 deletions net/ceph/messenger.c
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,17 @@
#define CON_SOCK_STATE_CONNECTED 3 /* -> CLOSING or -> CLOSED */
#define CON_SOCK_STATE_CLOSING 4 /* -> CLOSED */

/*
* connection states
*/
#define CON_STATE_CLOSED 1 /* -> PREOPEN */
#define CON_STATE_PREOPEN 2 /* -> CONNECTING, CLOSED */
#define CON_STATE_CONNECTING 3 /* -> NEGOTIATING, CLOSED */
#define CON_STATE_NEGOTIATING 4 /* -> OPEN, CLOSED */
#define CON_STATE_OPEN 5 /* -> STANDBY, CLOSED */
#define CON_STATE_STANDBY 6 /* -> PREOPEN, CLOSED */


/* static tag bytes (protocol control messages) */
static char tag_msg = CEPH_MSGR_TAG_MSG;
static char tag_ack = CEPH_MSGR_TAG_ACK;
Expand Down Expand Up @@ -503,11 +514,7 @@ void ceph_con_close(struct ceph_connection *con)
mutex_lock(&con->mutex);
dout("con_close %p peer %s\n", con,
ceph_pr_addr(&con->peer_addr.in_addr));
clear_bit(NEGOTIATING, &con->state);
clear_bit(CONNECTING, &con->state);
clear_bit(CONNECTED, &con->state);
clear_bit(STANDBY, &con->state); /* avoid connect_seq bump */
set_bit(CLOSED, &con->state);
con->state = CON_STATE_CLOSED;

clear_bit(LOSSYTX, &con->flags); /* so we retry next connect */
clear_bit(KEEPALIVE_PENDING, &con->flags);
Expand All @@ -530,8 +537,9 @@ void ceph_con_open(struct ceph_connection *con,
{
mutex_lock(&con->mutex);
dout("con_open %p %s\n", con, ceph_pr_addr(&addr->in_addr));
set_bit(OPENING, &con->state);
WARN_ON(!test_and_clear_bit(CLOSED, &con->state));

BUG_ON(con->state != CON_STATE_CLOSED);
con->state = CON_STATE_PREOPEN;

con->peer_name.type = (__u8) entity_type;
con->peer_name.num = cpu_to_le64(entity_num);
Expand Down Expand Up @@ -571,7 +579,7 @@ void ceph_con_init(struct ceph_connection *con, void *private,
INIT_LIST_HEAD(&con->out_sent);
INIT_DELAYED_WORK(&con->work, con_work);

set_bit(CLOSED, &con->state);
con->state = CON_STATE_CLOSED;
}
EXPORT_SYMBOL(ceph_con_init);

Expand Down Expand Up @@ -809,27 +817,21 @@ static struct ceph_auth_handshake *get_connect_authorizer(struct ceph_connection
if (!con->ops->get_authorizer) {
con->out_connect.authorizer_protocol = CEPH_AUTH_UNKNOWN;
con->out_connect.authorizer_len = 0;

return NULL;
}

/* Can't hold the mutex while getting authorizer */

mutex_unlock(&con->mutex);

auth = con->ops->get_authorizer(con, auth_proto, con->auth_retry);

mutex_lock(&con->mutex);

if (IS_ERR(auth))
return auth;
if (test_bit(CLOSED, &con->state) || test_bit(OPENING, &con->flags))
if (con->state != CON_STATE_NEGOTIATING)
return ERR_PTR(-EAGAIN);

con->auth_reply_buf = auth->authorizer_reply_buf;
con->auth_reply_buf_len = auth->authorizer_reply_buf_len;


return auth;
}

Expand Down Expand Up @@ -1484,7 +1486,8 @@ static int process_banner(struct ceph_connection *con)
static void fail_protocol(struct ceph_connection *con)
{
reset_connection(con);
set_bit(CLOSED, &con->state); /* in case there's queued work */
BUG_ON(con->state != CON_STATE_NEGOTIATING);
con->state = CON_STATE_CLOSED;
}

static int process_connect(struct ceph_connection *con)
Expand Down Expand Up @@ -1558,8 +1561,7 @@ static int process_connect(struct ceph_connection *con)
if (con->ops->peer_reset)
con->ops->peer_reset(con);
mutex_lock(&con->mutex);
if (test_bit(CLOSED, &con->state) ||
test_bit(OPENING, &con->state))
if (con->state != CON_STATE_NEGOTIATING)
return -EAGAIN;
break;

Expand Down Expand Up @@ -1605,8 +1607,10 @@ static int process_connect(struct ceph_connection *con)
fail_protocol(con);
return -1;
}
clear_bit(NEGOTIATING, &con->state);
set_bit(CONNECTED, &con->state);

BUG_ON(con->state != CON_STATE_NEGOTIATING);
con->state = CON_STATE_OPEN;

con->peer_global_seq = le32_to_cpu(con->in_reply.global_seq);
con->connect_seq++;
con->peer_features = server_feat;
Expand Down Expand Up @@ -1994,8 +1998,9 @@ static int try_write(struct ceph_connection *con)
dout("try_write out_kvec_bytes %d\n", con->out_kvec_bytes);

/* open the socket first? */
if (con->sock == NULL) {
set_bit(CONNECTING, &con->state);
if (con->state == CON_STATE_PREOPEN) {
BUG_ON(con->sock);
con->state = CON_STATE_CONNECTING;

con_out_kvec_reset(con);
prepare_write_banner(con);
Expand Down Expand Up @@ -2046,8 +2051,7 @@ static int try_write(struct ceph_connection *con)
}

do_next:
if (!test_bit(CONNECTING, &con->state) &&
!test_bit(NEGOTIATING, &con->state)) {
if (con->state == CON_STATE_OPEN) {
/* is anything else pending? */
if (!list_empty(&con->out_queue)) {
prepare_write_message(con);
Expand Down Expand Up @@ -2081,29 +2085,19 @@ static int try_read(struct ceph_connection *con)
{
int ret = -1;

if (!con->sock)
return 0;

if (test_bit(STANDBY, &con->state))
more:
dout("try_read start on %p state %lu\n", con, con->state);
if (con->state != CON_STATE_CONNECTING &&
con->state != CON_STATE_NEGOTIATING &&
con->state != CON_STATE_OPEN)
return 0;

dout("try_read start on %p\n", con);
BUG_ON(!con->sock);

more:
dout("try_read tag %d in_base_pos %d\n", (int)con->in_tag,
con->in_base_pos);

/*
* process_connect and process_message drop and re-take
* con->mutex. make sure we handle a racing close or reopen.
*/
if (test_bit(CLOSED, &con->state) ||
test_bit(OPENING, &con->state)) {
ret = -EAGAIN;
goto out;
}

if (test_bit(CONNECTING, &con->state)) {
if (con->state == CON_STATE_CONNECTING) {
dout("try_read connecting\n");
ret = read_partial_banner(con);
if (ret <= 0)
Expand All @@ -2112,8 +2106,8 @@ static int try_read(struct ceph_connection *con)
if (ret < 0)
goto out;

clear_bit(CONNECTING, &con->state);
set_bit(NEGOTIATING, &con->state);
BUG_ON(con->state != CON_STATE_CONNECTING);
con->state = CON_STATE_NEGOTIATING;

/* Banner is good, exchange connection info */
ret = prepare_write_connect(con);
Expand All @@ -2125,7 +2119,7 @@ static int try_read(struct ceph_connection *con)
goto out;
}

if (test_bit(NEGOTIATING, &con->state)) {
if (con->state == CON_STATE_NEGOTIATING) {
dout("try_read negotiating\n");
ret = read_partial_connect(con);
if (ret <= 0)
Expand All @@ -2136,6 +2130,8 @@ static int try_read(struct ceph_connection *con)
goto more;
}

BUG_ON(con->state != CON_STATE_OPEN);

if (con->in_base_pos < 0) {
/*
* skipping + discarding content.
Expand Down Expand Up @@ -2169,8 +2165,8 @@ static int try_read(struct ceph_connection *con)
prepare_read_ack(con);
break;
case CEPH_MSGR_TAG_CLOSE:
clear_bit(CONNECTED, &con->state);
set_bit(CLOSED, &con->state); /* fixme */
con_close_socket(con);
con->state = CON_STATE_CLOSED;
goto out;
default:
goto bad_tag;
Expand Down Expand Up @@ -2246,14 +2242,21 @@ static void con_work(struct work_struct *work)
mutex_lock(&con->mutex);
restart:
if (test_and_clear_bit(SOCK_CLOSED, &con->flags)) {
if (test_and_clear_bit(CONNECTED, &con->state))
con->error_msg = "socket closed";
else if (test_and_clear_bit(NEGOTIATING, &con->state))
con->error_msg = "negotiation failed";
else if (test_and_clear_bit(CONNECTING, &con->state))
switch (con->state) {
case CON_STATE_CONNECTING:
con->error_msg = "connection failed";
else
break;
case CON_STATE_NEGOTIATING:
con->error_msg = "negotiation failed";
break;
case CON_STATE_OPEN:
con->error_msg = "socket closed";
break;
default:
dout("unrecognized con state %d\n", (int)con->state);
con->error_msg = "unrecognized con state";
BUG();
}
goto fault;
}

Expand All @@ -2271,17 +2274,16 @@ static void con_work(struct work_struct *work)
}
}

if (test_bit(STANDBY, &con->state)) {
if (con->state == CON_STATE_STANDBY) {
dout("con_work %p STANDBY\n", con);
goto done;
}
if (test_bit(CLOSED, &con->state)) {
if (con->state == CON_STATE_CLOSED) {
dout("con_work %p CLOSED\n", con);
BUG_ON(con->sock);
goto done;
}
if (test_and_clear_bit(OPENING, &con->state)) {
/* reopen w/ new peer */
if (con->state == CON_STATE_PREOPEN) {
dout("con_work OPENING\n");
BUG_ON(con->sock);
}
Expand Down Expand Up @@ -2328,13 +2330,15 @@ static void ceph_fault(struct ceph_connection *con)
dout("fault %p state %lu to peer %s\n",
con, con->state, ceph_pr_addr(&con->peer_addr.in_addr));

if (test_bit(CLOSED, &con->state))
goto out_unlock;
BUG_ON(con->state != CON_STATE_CONNECTING &&
con->state != CON_STATE_NEGOTIATING &&
con->state != CON_STATE_OPEN);

con_close_socket(con);

if (test_bit(LOSSYTX, &con->flags)) {
dout("fault on LOSSYTX channel\n");
dout("fault on LOSSYTX channel, marking CLOSED\n");
con->state = CON_STATE_CLOSED;
goto out_unlock;
}

Expand All @@ -2355,9 +2359,10 @@ static void ceph_fault(struct ceph_connection *con)
!test_bit(KEEPALIVE_PENDING, &con->flags)) {
dout("fault %p setting STANDBY clearing WRITE_PENDING\n", con);
clear_bit(WRITE_PENDING, &con->flags);
set_bit(STANDBY, &con->state);
con->state = CON_STATE_STANDBY;
} else {
/* retry after a delay. */
con->state = CON_STATE_PREOPEN;
if (con->delay == 0)
con->delay = BASE_DELAY_INTERVAL;
else if (con->delay < MAX_DELAY_INTERVAL)
Expand Down Expand Up @@ -2431,8 +2436,9 @@ EXPORT_SYMBOL(ceph_messenger_init);
static void clear_standby(struct ceph_connection *con)
{
/* come back from STANDBY? */
if (test_and_clear_bit(STANDBY, &con->state)) {
if (con->state == CON_STATE_STANDBY) {
dout("clear_standby %p and ++connect_seq\n", con);
con->state = CON_STATE_PREOPEN;
con->connect_seq++;
WARN_ON(test_bit(WRITE_PENDING, &con->flags));
WARN_ON(test_bit(KEEPALIVE_PENDING, &con->flags));
Expand All @@ -2451,7 +2457,7 @@ void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg)

mutex_lock(&con->mutex);

if (test_bit(CLOSED, &con->state)) {
if (con->state == CON_STATE_CLOSED) {
dout("con_send %p closed, dropping %p\n", con, msg);
ceph_msg_put(msg);
mutex_unlock(&con->mutex);
Expand Down

0 comments on commit 8dacc7d

Please sign in to comment.