Skip to content

Commit

Permalink
ceph: separate banner and connect during handshake into distinct stages
Browse files Browse the repository at this point in the history
We need to make sure we only swab the address during the banner once.  So
break process_banner out of process_connect, and clean up the surrounding
code so that these are distinct phases of the handshake.

Signed-off-by: Sage Weil <sage@newdream.net>
  • Loading branch information
Sage Weil committed Nov 10, 2009
1 parent 685f9a5 commit eed0ef2
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 44 deletions.
117 changes: 75 additions & 42 deletions fs/ceph/messenger.c
Original file line number Diff line number Diff line change
Expand Up @@ -564,10 +564,26 @@ static void prepare_write_keepalive(struct ceph_connection *con)
/*
* We connected to a peer and are saying hello.
*/
static void prepare_write_connect(struct ceph_messenger *msgr,
struct ceph_connection *con)
static void prepare_write_banner(struct ceph_messenger *msgr,
struct ceph_connection *con)
{
int len = strlen(CEPH_BANNER);

con->out_kvec[0].iov_base = CEPH_BANNER;
con->out_kvec[0].iov_len = len;
con->out_kvec[1].iov_base = &msgr->my_enc_addr;
con->out_kvec[1].iov_len = sizeof(msgr->my_enc_addr);
con->out_kvec_left = 2;
con->out_kvec_bytes = len + sizeof(msgr->my_enc_addr);
con->out_kvec_cur = con->out_kvec;
con->out_more = 0;
set_bit(WRITE_PENDING, &con->state);
}

static void prepare_write_connect(struct ceph_messenger *msgr,
struct ceph_connection *con,
int after_banner)
{
unsigned global_seq = get_global_seq(con->msgr, 0);
int proto;

Expand Down Expand Up @@ -595,32 +611,14 @@ static void prepare_write_connect(struct ceph_messenger *msgr,
if (test_bit(LOSSYTX, &con->state))
con->out_connect.flags = CEPH_MSG_CONNECT_LOSSY;

con->out_kvec[0].iov_base = CEPH_BANNER;
con->out_kvec[0].iov_len = len;
con->out_kvec[1].iov_base = &msgr->my_enc_addr;
con->out_kvec[1].iov_len = sizeof(msgr->my_enc_addr);
con->out_kvec[2].iov_base = &con->out_connect;
con->out_kvec[2].iov_len = sizeof(con->out_connect);
con->out_kvec_left = 3;
con->out_kvec_bytes = len + sizeof(msgr->my_enc_addr) +
sizeof(con->out_connect);
con->out_kvec_cur = con->out_kvec;
con->out_more = 0;
set_bit(WRITE_PENDING, &con->state);
}

static void prepare_write_connect_retry(struct ceph_messenger *msgr,
struct ceph_connection *con)
{
dout("prepare_write_connect_retry %p\n", con);
con->out_connect.connect_seq = cpu_to_le32(con->connect_seq);
con->out_connect.global_seq =
cpu_to_le32(get_global_seq(con->msgr, 0));

con->out_kvec[0].iov_base = &con->out_connect;
con->out_kvec[0].iov_len = sizeof(con->out_connect);
con->out_kvec_left = 1;
con->out_kvec_bytes = sizeof(con->out_connect);
if (!after_banner) {
con->out_kvec_left = 0;
con->out_kvec_bytes = 0;
}
con->out_kvec[con->out_kvec_left].iov_base = &con->out_connect;
con->out_kvec[con->out_kvec_left].iov_len = sizeof(con->out_connect);
con->out_kvec_left++;
con->out_kvec_bytes += sizeof(con->out_connect);
con->out_kvec_cur = con->out_kvec;
con->out_more = 0;
set_bit(WRITE_PENDING, &con->state);
Expand Down Expand Up @@ -778,6 +776,12 @@ static int write_partial_skip(struct ceph_connection *con)
/*
* Prepare to read connection handshake, or an ack.
*/
static void prepare_read_banner(struct ceph_connection *con)
{
dout("prepare_read_banner %p\n", con);
con->in_base_pos = 0;
}

static void prepare_read_connect(struct ceph_connection *con)
{
dout("prepare_read_connect %p\n", con);
Expand Down Expand Up @@ -829,11 +833,11 @@ static int read_partial(struct ceph_connection *con,
/*
* Read all or part of the connect-side handshake on a new connection
*/
static int read_partial_connect(struct ceph_connection *con)
static int read_partial_banner(struct ceph_connection *con)
{
int ret, to = 0;

dout("read_partial_connect %p at %d\n", con, con->in_base_pos);
dout("read_partial_banner %p at %d\n", con, con->in_base_pos);

/* peer's banner */
ret = read_partial(con, &to, strlen(CEPH_BANNER), con->in_banner);
Expand All @@ -847,6 +851,16 @@ static int read_partial_connect(struct ceph_connection *con)
&con->peer_addr_for_me);
if (ret <= 0)
goto out;
out:
return ret;
}

static int read_partial_connect(struct ceph_connection *con)
{
int ret, to = 0;

dout("read_partial_connect %p at %d\n", con, con->in_base_pos);

ret = read_partial(con, &to, sizeof(con->in_reply), &con->in_reply);
if (ret <= 0)
goto out;
Expand All @@ -856,6 +870,7 @@ static int read_partial_connect(struct ceph_connection *con)
le32_to_cpu(con->in_reply.global_seq));
out:
return ret;

}

/*
Expand Down Expand Up @@ -976,9 +991,9 @@ int ceph_parse_ips(const char *c, const char *end,
return -EINVAL;
}

static int process_connect(struct ceph_connection *con)
static int process_banner(struct ceph_connection *con)
{
dout("process_connect on %p tag %d\n", con, (int)con->in_tag);
dout("process_banner on %p\n", con);

if (verify_hello(con) < 0)
return -1;
Expand Down Expand Up @@ -1016,10 +1031,19 @@ static int process_connect(struct ceph_connection *con)
sizeof(con->peer_addr_for_me.in_addr));
addr_set_port(&con->msgr->inst.addr.in_addr, port);
encode_my_addr(con->msgr);
dout("process_connect learned my addr is %s\n",
dout("process_banner learned my addr is %s\n",
pr_addr(&con->msgr->inst.addr.in_addr));
}

set_bit(NEGOTIATING, &con->state);
prepare_read_connect(con);
return 0;
}

static int process_connect(struct ceph_connection *con)
{
dout("process_connect on %p tag %d\n", con, (int)con->in_tag);

switch (con->in_reply.tag) {
case CEPH_MSGR_TAG_BADPROTOVER:
dout("process_connect got BADPROTOVER my %d != their %d\n",
Expand Down Expand Up @@ -1053,7 +1077,7 @@ static int process_connect(struct ceph_connection *con)
ENTITY_NAME(con->peer_name),
pr_addr(&con->peer_addr.in_addr));
reset_connection(con);
prepare_write_connect_retry(con->msgr, con);
prepare_write_connect(con->msgr, con, 0);
prepare_read_connect(con);

/* Tell ceph about it. */
Expand All @@ -1071,7 +1095,7 @@ static int process_connect(struct ceph_connection *con)
le32_to_cpu(con->out_connect.connect_seq),
le32_to_cpu(con->in_connect.connect_seq));
con->connect_seq = le32_to_cpu(con->in_connect.connect_seq);
prepare_write_connect_retry(con->msgr, con);
prepare_write_connect(con->msgr, con, 0);
prepare_read_connect(con);
break;

Expand All @@ -1080,19 +1104,17 @@ static int process_connect(struct ceph_connection *con)
* If we sent a smaller global_seq than the peer has, try
* again with a larger value.
*/
dout("process_connect got RETRY_GLOBAL my %u, peer_gseq = %u\n",
dout("process_connect got RETRY_GLOBAL my %u peer_gseq %u\n",
con->peer_global_seq,
le32_to_cpu(con->in_connect.global_seq));
get_global_seq(con->msgr,
le32_to_cpu(con->in_connect.global_seq));
prepare_write_connect_retry(con->msgr, con);
prepare_write_connect(con->msgr, con, 0);
prepare_read_connect(con);
break;

case CEPH_MSGR_TAG_READY:
clear_bit(CONNECTING, &con->state);
if (con->in_reply.flags & CEPH_MSG_CONNECT_LOSSY)
set_bit(LOSSYRX, &con->state);
con->peer_global_seq = le32_to_cpu(con->in_reply.global_seq);
con->connect_seq++;
dout("process_connect got READY gseq %d cseq %d (%d)\n",
Expand Down Expand Up @@ -1420,9 +1442,11 @@ static int try_write(struct ceph_connection *con)
if (test_and_clear_bit(STANDBY, &con->state))
con->connect_seq++;

prepare_write_connect(msgr, con);
prepare_read_connect(con);
prepare_write_banner(msgr, con);
prepare_write_connect(msgr, con, 1);
prepare_read_banner(con);
set_bit(CONNECTING, &con->state);
clear_bit(NEGOTIATING, &con->state);

con->in_tag = CEPH_MSGR_TAG_READY;
dout("try_write initiating connect on %p new state %lu\n",
Expand Down Expand Up @@ -1521,7 +1545,16 @@ static int try_read(struct ceph_connection *con)
dout("try_read tag %d in_base_pos %d\n", (int)con->in_tag,
con->in_base_pos);
if (test_bit(CONNECTING, &con->state)) {
dout("try_read connecting\n");
if (!test_bit(NEGOTIATING, &con->state)) {
dout("try_read connecting\n");
ret = read_partial_banner(con);
if (ret <= 0)
goto done;
if (process_banner(con) < 0) {
ret = -1;
goto out;
}
}
ret = read_partial_connect(con);
if (ret <= 0)
goto done;
Expand Down
4 changes: 2 additions & 2 deletions fs/ceph/messenger.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,8 @@ struct ceph_msg_pos {
* thread is currently opening, reading or writing data to the socket.
*/
#define LOSSYTX 0 /* we can close channel or drop messages on errors */
#define LOSSYRX 1 /* peer may reset/drop messages */
#define CONNECTING 2
#define CONNECTING 1
#define NEGOTIATING 2
#define KEEPALIVE_PENDING 3
#define WRITE_PENDING 4 /* we have data ready to send */
#define QUEUED 5 /* there is work queued on this connection */
Expand Down

0 comments on commit eed0ef2

Please sign in to comment.