Skip to content

Commit

Permalink
---
Browse files Browse the repository at this point in the history
yaml
---
r: 320367
b: refs/heads/master
c: 8dacc7d
h: refs/heads/master
i:
  320365: 7c3871b
  320363: 829750f
  320359: 6fd7286
  320351: 1414831
v: v3
  • Loading branch information
Sage Weil committed Jul 31, 2012
1 parent ac6483f commit 82cd70a
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 75 deletions.
2 changes: 1 addition & 1 deletion [refs]
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
---
refs/heads/master: d7353dd5aaf22ed611fbcd0d4a4a12fb30659290
refs/heads/master: 8dacc7da69a491c515851e68de6036f21b5663ce
12 changes: 0 additions & 12 deletions trunk/include/linux/ceph/messenger.h
Original file line number Diff line number Diff line change
Expand Up @@ -116,18 +116,6 @@ struct ceph_msg_pos {
#define SOCK_CLOSED 11 /* socket state changed to closed */
#define BACKOFF 15

/*
* ceph_connection states
*/
#define CONNECTING 1
#define NEGOTIATING 2
#define CONNECTED 5
#define STANDBY 8 /* no outgoing messages, socket closed. we keep
* the ceph_connection around to maintain shared
* state with the peer. */
#define CLOSED 10 /* we've closed the connection */
#define OPENING 13 /* open connection w/ (possibly new) peer */

/*
* A single connection with another host.
*
Expand Down
130 changes: 68 additions & 62 deletions trunk/net/ceph/messenger.c
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,17 @@
#define CON_SOCK_STATE_CONNECTED 3 /* -> CLOSING or -> CLOSED */
#define CON_SOCK_STATE_CLOSING 4 /* -> CLOSED */

/*
* connection states
*/
#define CON_STATE_CLOSED 1 /* -> PREOPEN */
#define CON_STATE_PREOPEN 2 /* -> CONNECTING, CLOSED */
#define CON_STATE_CONNECTING 3 /* -> NEGOTIATING, CLOSED */
#define CON_STATE_NEGOTIATING 4 /* -> OPEN, CLOSED */
#define CON_STATE_OPEN 5 /* -> STANDBY, CLOSED */
#define CON_STATE_STANDBY 6 /* -> PREOPEN, CLOSED */


/* static tag bytes (protocol control messages) */
static char tag_msg = CEPH_MSGR_TAG_MSG;
static char tag_ack = CEPH_MSGR_TAG_ACK;
Expand Down Expand Up @@ -503,11 +514,7 @@ void ceph_con_close(struct ceph_connection *con)
mutex_lock(&con->mutex);
dout("con_close %p peer %s\n", con,
ceph_pr_addr(&con->peer_addr.in_addr));
clear_bit(NEGOTIATING, &con->state);
clear_bit(CONNECTING, &con->state);
clear_bit(CONNECTED, &con->state);
clear_bit(STANDBY, &con->state); /* avoid connect_seq bump */
set_bit(CLOSED, &con->state);
con->state = CON_STATE_CLOSED;

clear_bit(LOSSYTX, &con->flags); /* so we retry next connect */
clear_bit(KEEPALIVE_PENDING, &con->flags);
Expand All @@ -530,8 +537,9 @@ void ceph_con_open(struct ceph_connection *con,
{
mutex_lock(&con->mutex);
dout("con_open %p %s\n", con, ceph_pr_addr(&addr->in_addr));
set_bit(OPENING, &con->state);
WARN_ON(!test_and_clear_bit(CLOSED, &con->state));

BUG_ON(con->state != CON_STATE_CLOSED);
con->state = CON_STATE_PREOPEN;

con->peer_name.type = (__u8) entity_type;
con->peer_name.num = cpu_to_le64(entity_num);
Expand Down Expand Up @@ -571,7 +579,7 @@ void ceph_con_init(struct ceph_connection *con, void *private,
INIT_LIST_HEAD(&con->out_sent);
INIT_DELAYED_WORK(&con->work, con_work);

set_bit(CLOSED, &con->state);
con->state = CON_STATE_CLOSED;
}
EXPORT_SYMBOL(ceph_con_init);

Expand Down Expand Up @@ -809,27 +817,21 @@ static struct ceph_auth_handshake *get_connect_authorizer(struct ceph_connection
if (!con->ops->get_authorizer) {
con->out_connect.authorizer_protocol = CEPH_AUTH_UNKNOWN;
con->out_connect.authorizer_len = 0;

return NULL;
}

/* Can't hold the mutex while getting authorizer */

mutex_unlock(&con->mutex);

auth = con->ops->get_authorizer(con, auth_proto, con->auth_retry);

mutex_lock(&con->mutex);

if (IS_ERR(auth))
return auth;
if (test_bit(CLOSED, &con->state) || test_bit(OPENING, &con->flags))
if (con->state != CON_STATE_NEGOTIATING)
return ERR_PTR(-EAGAIN);

con->auth_reply_buf = auth->authorizer_reply_buf;
con->auth_reply_buf_len = auth->authorizer_reply_buf_len;


return auth;
}

Expand Down Expand Up @@ -1484,7 +1486,8 @@ static int process_banner(struct ceph_connection *con)
static void fail_protocol(struct ceph_connection *con)
{
reset_connection(con);
set_bit(CLOSED, &con->state); /* in case there's queued work */
BUG_ON(con->state != CON_STATE_NEGOTIATING);
con->state = CON_STATE_CLOSED;
}

static int process_connect(struct ceph_connection *con)
Expand Down Expand Up @@ -1558,8 +1561,7 @@ static int process_connect(struct ceph_connection *con)
if (con->ops->peer_reset)
con->ops->peer_reset(con);
mutex_lock(&con->mutex);
if (test_bit(CLOSED, &con->state) ||
test_bit(OPENING, &con->state))
if (con->state != CON_STATE_NEGOTIATING)
return -EAGAIN;
break;

Expand Down Expand Up @@ -1605,8 +1607,10 @@ static int process_connect(struct ceph_connection *con)
fail_protocol(con);
return -1;
}
clear_bit(NEGOTIATING, &con->state);
set_bit(CONNECTED, &con->state);

BUG_ON(con->state != CON_STATE_NEGOTIATING);
con->state = CON_STATE_OPEN;

con->peer_global_seq = le32_to_cpu(con->in_reply.global_seq);
con->connect_seq++;
con->peer_features = server_feat;
Expand Down Expand Up @@ -1994,8 +1998,9 @@ static int try_write(struct ceph_connection *con)
dout("try_write out_kvec_bytes %d\n", con->out_kvec_bytes);

/* open the socket first? */
if (con->sock == NULL) {
set_bit(CONNECTING, &con->state);
if (con->state == CON_STATE_PREOPEN) {
BUG_ON(con->sock);
con->state = CON_STATE_CONNECTING;

con_out_kvec_reset(con);
prepare_write_banner(con);
Expand Down Expand Up @@ -2046,8 +2051,7 @@ static int try_write(struct ceph_connection *con)
}

do_next:
if (!test_bit(CONNECTING, &con->state) &&
!test_bit(NEGOTIATING, &con->state)) {
if (con->state == CON_STATE_OPEN) {
/* is anything else pending? */
if (!list_empty(&con->out_queue)) {
prepare_write_message(con);
Expand Down Expand Up @@ -2081,29 +2085,19 @@ static int try_read(struct ceph_connection *con)
{
int ret = -1;

if (!con->sock)
return 0;

if (test_bit(STANDBY, &con->state))
more:
dout("try_read start on %p state %lu\n", con, con->state);
if (con->state != CON_STATE_CONNECTING &&
con->state != CON_STATE_NEGOTIATING &&
con->state != CON_STATE_OPEN)
return 0;

dout("try_read start on %p\n", con);
BUG_ON(!con->sock);

more:
dout("try_read tag %d in_base_pos %d\n", (int)con->in_tag,
con->in_base_pos);

/*
* process_connect and process_message drop and re-take
* con->mutex. make sure we handle a racing close or reopen.
*/
if (test_bit(CLOSED, &con->state) ||
test_bit(OPENING, &con->state)) {
ret = -EAGAIN;
goto out;
}

if (test_bit(CONNECTING, &con->state)) {
if (con->state == CON_STATE_CONNECTING) {
dout("try_read connecting\n");
ret = read_partial_banner(con);
if (ret <= 0)
Expand All @@ -2112,8 +2106,8 @@ static int try_read(struct ceph_connection *con)
if (ret < 0)
goto out;

clear_bit(CONNECTING, &con->state);
set_bit(NEGOTIATING, &con->state);
BUG_ON(con->state != CON_STATE_CONNECTING);
con->state = CON_STATE_NEGOTIATING;

/* Banner is good, exchange connection info */
ret = prepare_write_connect(con);
Expand All @@ -2125,7 +2119,7 @@ static int try_read(struct ceph_connection *con)
goto out;
}

if (test_bit(NEGOTIATING, &con->state)) {
if (con->state == CON_STATE_NEGOTIATING) {
dout("try_read negotiating\n");
ret = read_partial_connect(con);
if (ret <= 0)
Expand All @@ -2136,6 +2130,8 @@ static int try_read(struct ceph_connection *con)
goto more;
}

BUG_ON(con->state != CON_STATE_OPEN);

if (con->in_base_pos < 0) {
/*
* skipping + discarding content.
Expand Down Expand Up @@ -2169,8 +2165,8 @@ static int try_read(struct ceph_connection *con)
prepare_read_ack(con);
break;
case CEPH_MSGR_TAG_CLOSE:
clear_bit(CONNECTED, &con->state);
set_bit(CLOSED, &con->state); /* fixme */
con_close_socket(con);
con->state = CON_STATE_CLOSED;
goto out;
default:
goto bad_tag;
Expand Down Expand Up @@ -2246,14 +2242,21 @@ static void con_work(struct work_struct *work)
mutex_lock(&con->mutex);
restart:
if (test_and_clear_bit(SOCK_CLOSED, &con->flags)) {
if (test_and_clear_bit(CONNECTED, &con->state))
con->error_msg = "socket closed";
else if (test_and_clear_bit(NEGOTIATING, &con->state))
con->error_msg = "negotiation failed";
else if (test_and_clear_bit(CONNECTING, &con->state))
switch (con->state) {
case CON_STATE_CONNECTING:
con->error_msg = "connection failed";
else
break;
case CON_STATE_NEGOTIATING:
con->error_msg = "negotiation failed";
break;
case CON_STATE_OPEN:
con->error_msg = "socket closed";
break;
default:
dout("unrecognized con state %d\n", (int)con->state);
con->error_msg = "unrecognized con state";
BUG();
}
goto fault;
}

Expand All @@ -2271,17 +2274,16 @@ static void con_work(struct work_struct *work)
}
}

if (test_bit(STANDBY, &con->state)) {
if (con->state == CON_STATE_STANDBY) {
dout("con_work %p STANDBY\n", con);
goto done;
}
if (test_bit(CLOSED, &con->state)) {
if (con->state == CON_STATE_CLOSED) {
dout("con_work %p CLOSED\n", con);
BUG_ON(con->sock);
goto done;
}
if (test_and_clear_bit(OPENING, &con->state)) {
/* reopen w/ new peer */
if (con->state == CON_STATE_PREOPEN) {
dout("con_work OPENING\n");
BUG_ON(con->sock);
}
Expand Down Expand Up @@ -2328,13 +2330,15 @@ static void ceph_fault(struct ceph_connection *con)
dout("fault %p state %lu to peer %s\n",
con, con->state, ceph_pr_addr(&con->peer_addr.in_addr));

if (test_bit(CLOSED, &con->state))
goto out_unlock;
BUG_ON(con->state != CON_STATE_CONNECTING &&
con->state != CON_STATE_NEGOTIATING &&
con->state != CON_STATE_OPEN);

con_close_socket(con);

if (test_bit(LOSSYTX, &con->flags)) {
dout("fault on LOSSYTX channel\n");
dout("fault on LOSSYTX channel, marking CLOSED\n");
con->state = CON_STATE_CLOSED;
goto out_unlock;
}

Expand All @@ -2355,9 +2359,10 @@ static void ceph_fault(struct ceph_connection *con)
!test_bit(KEEPALIVE_PENDING, &con->flags)) {
dout("fault %p setting STANDBY clearing WRITE_PENDING\n", con);
clear_bit(WRITE_PENDING, &con->flags);
set_bit(STANDBY, &con->state);
con->state = CON_STATE_STANDBY;
} else {
/* retry after a delay. */
con->state = CON_STATE_PREOPEN;
if (con->delay == 0)
con->delay = BASE_DELAY_INTERVAL;
else if (con->delay < MAX_DELAY_INTERVAL)
Expand Down Expand Up @@ -2431,8 +2436,9 @@ EXPORT_SYMBOL(ceph_messenger_init);
static void clear_standby(struct ceph_connection *con)
{
/* come back from STANDBY? */
if (test_and_clear_bit(STANDBY, &con->state)) {
if (con->state == CON_STATE_STANDBY) {
dout("clear_standby %p and ++connect_seq\n", con);
con->state = CON_STATE_PREOPEN;
con->connect_seq++;
WARN_ON(test_bit(WRITE_PENDING, &con->flags));
WARN_ON(test_bit(KEEPALIVE_PENDING, &con->flags));
Expand All @@ -2451,7 +2457,7 @@ void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg)

mutex_lock(&con->mutex);

if (test_bit(CLOSED, &con->state)) {
if (con->state == CON_STATE_CLOSED) {
dout("con_send %p closed, dropping %p\n", con, msg);
ceph_msg_put(msg);
mutex_unlock(&con->mutex);
Expand Down

0 comments on commit 82cd70a

Please sign in to comment.