Skip to content

Commit

Permalink
---
Browse files Browse the repository at this point in the history
yaml
---
r: 377716
b: refs/heads/master
c: 5d68987
h: refs/heads/master
v: v3
  • Loading branch information
Mike Christie authored and David Teigland committed Jun 14, 2013
1 parent 6d02a15 commit 7b5a1f9
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 30 deletions.
2 changes: 1 addition & 1 deletion [refs]
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
---
refs/heads/master: 98e1b60ecc441625c91013e88f14cbd1b3c1fa08
refs/heads/master: 5d6898714fe2ce485e95ac74479ed40ebd8d5748
104 changes: 75 additions & 29 deletions trunk/fs/dlm/lowcomms.c
Original file line number Diff line number Diff line change
Expand Up @@ -607,15 +607,56 @@ static void sctp_init_failed(void)
mutex_unlock(&connections_lock);
}

static void retry_failed_sctp_send(struct connection *recv_con,
struct sctp_send_failed *sn_send_failed,
char *buf)
{
int len = sn_send_failed->ssf_length - sizeof(struct sctp_send_failed);
struct dlm_mhandle *mh;
struct connection *con;
char *retry_buf;
int nodeid = sn_send_failed->ssf_info.sinfo_ppid;

log_print("Retry sending %d bytes to node id %d", len, nodeid);

con = nodeid2con(nodeid, 0);
if (!con) {
log_print("Could not look up con for nodeid %d\n",
nodeid);
return;
}

mh = dlm_lowcomms_get_buffer(nodeid, len, GFP_NOFS, &retry_buf);
if (!mh) {
log_print("Could not allocate buf for retry.");
return;
}
memcpy(retry_buf, buf + sizeof(struct sctp_send_failed), len);
dlm_lowcomms_commit_buffer(mh);

/*
* If we got a assoc changed event before the send failed event then
* we only need to retry the send.
*/
if (con->sctp_assoc) {
if (!test_and_set_bit(CF_WRITE_PENDING, &con->flags))
queue_work(send_workqueue, &con->swork);
} else
sctp_init_failed_foreach(con);
}

/* Something happened to an association */
static void process_sctp_notification(struct connection *con,
struct msghdr *msg, char *buf)
{
union sctp_notification *sn = (union sctp_notification *)buf;

if (sn->sn_header.sn_type == SCTP_ASSOC_CHANGE) {
switch (sn->sn_header.sn_type) {
case SCTP_SEND_FAILED:
retry_failed_sctp_send(con, &sn->sn_send_failed, buf);
break;
case SCTP_ASSOC_CHANGE:
switch (sn->sn_assoc_change.sac_state) {

case SCTP_COMM_UP:
case SCTP_RESTART:
{
Expand Down Expand Up @@ -713,14 +754,10 @@ static void process_sctp_notification(struct connection *con,
}
break;

/* We don't know which INIT failed, so clear the PENDING flags
* on them all. if assoc_id is zero then it will then try
* again */

case SCTP_CANT_STR_ASSOC:
{
/* Will retry init when we get the send failed notification */
log_print("Can't start SCTP association - retrying");
sctp_init_failed();
}
break;

Expand All @@ -729,6 +766,8 @@ static void process_sctp_notification(struct connection *con,
(int)sn->sn_assoc_change.sac_assoc_id,
sn->sn_assoc_change.sac_state);
}
default:
; /* fall through */
}
}

Expand Down Expand Up @@ -988,6 +1027,24 @@ static void free_entry(struct writequeue_entry *e)
kfree(e);
}

/*
* writequeue_entry_complete - try to delete and free write queue entry
* @e: write queue entry to try to delete
* @completed: bytes completed
*
* writequeue_lock must be held.
*/
static void writequeue_entry_complete(struct writequeue_entry *e, int completed)
{
e->offset += completed;
e->len -= completed;

if (e->len == 0 && e->users == 0) {
list_del(&e->list);
free_entry(e);
}
}

/* Initiate an SCTP association.
This is a special case of send_to_sock() in that we don't yet have a
peeled-off socket for this association, so we use the listening socket
Expand All @@ -1007,16 +1064,14 @@ static void sctp_init_assoc(struct connection *con)
int addrlen;
struct kvec iov[1];

mutex_lock(&con->sock_mutex);
if (test_and_set_bit(CF_INIT_PENDING, &con->flags))
return;

if (con->retries++ > MAX_CONNECT_RETRIES)
return;
goto unlock;

if (nodeid_to_addr(con->nodeid, NULL, (struct sockaddr *)&rem_addr,
con->try_new_addr)) {
log_print("no address for nodeid %d", con->nodeid);
return;
goto unlock;
}
base_con = nodeid2con(0, 0);
BUG_ON(base_con == NULL);
Expand All @@ -1034,17 +1089,17 @@ static void sctp_init_assoc(struct connection *con)
if (list_empty(&con->writequeue)) {
spin_unlock(&con->writequeue_lock);
log_print("writequeue empty for nodeid %d", con->nodeid);
return;
goto unlock;
}

e = list_first_entry(&con->writequeue, struct writequeue_entry, list);
len = e->len;
offset = e->offset;
spin_unlock(&con->writequeue_lock);

/* Send the first block off the write queue */
iov[0].iov_base = page_address(e->page)+offset;
iov[0].iov_len = len;
spin_unlock(&con->writequeue_lock);

if (rem_addr.ss_family == AF_INET) {
struct sockaddr_in *sin = (struct sockaddr_in *)&rem_addr;
Expand All @@ -1060,7 +1115,7 @@ static void sctp_init_assoc(struct connection *con)
cmsg->cmsg_len = CMSG_LEN(sizeof(struct sctp_sndrcvinfo));
sinfo = CMSG_DATA(cmsg);
memset(sinfo, 0x00, sizeof(struct sctp_sndrcvinfo));
sinfo->sinfo_ppid = cpu_to_le32(dlm_our_nodeid());
sinfo->sinfo_ppid = cpu_to_le32(con->nodeid);
outmessage.msg_controllen = cmsg->cmsg_len;
sinfo->sinfo_flags |= SCTP_ADDR_OVER;

Expand All @@ -1075,15 +1130,12 @@ static void sctp_init_assoc(struct connection *con)
}
else {
spin_lock(&con->writequeue_lock);
e->offset += ret;
e->len -= ret;

if (e->len == 0 && e->users == 0) {
list_del(&e->list);
free_entry(e);
}
writequeue_entry_complete(e, ret);
spin_unlock(&con->writequeue_lock);
}

unlock:
mutex_unlock(&con->sock_mutex);
}

/* Connect a new socket to its peer */
Expand Down Expand Up @@ -1533,13 +1585,7 @@ static void send_to_sock(struct connection *con)
}

spin_lock(&con->writequeue_lock);
e->offset += ret;
e->len -= ret;

if (e->len == 0 && e->users == 0) {
list_del(&e->list);
free_entry(e);
}
writequeue_entry_complete(e, ret);
}
spin_unlock(&con->writequeue_lock);
out:
Expand Down

0 comments on commit 7b5a1f9

Please sign in to comment.