Skip to content

Commit

Permalink
---
Browse files Browse the repository at this point in the history
yaml
---
r: 290040
b: refs/heads/master
c: 7a54d4a
h: refs/heads/master
v: v3
  • Loading branch information
Allan Stephens authored and Paul Gortmaker committed Feb 6, 2012
1 parent 5d2cd8d commit 9cdf8f4
Show file tree
Hide file tree
Showing 6 changed files with 101 additions and 166 deletions.
2 changes: 1 addition & 1 deletion [refs]
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
---
refs/heads/master: b98158e3b36645305363a598d91c544fa31446f1
refs/heads/master: 7a54d4a99dcbbfdf1d4550faa19b615091137953
226 changes: 78 additions & 148 deletions trunk/net/tipc/bcast.c
Original file line number Diff line number Diff line change
Expand Up @@ -157,39 +157,14 @@ u32 tipc_bclink_get_last_sent(void)
return bcl->fsm_msg_cnt;
}

/**
* bclink_set_gap - set gap according to contents of current deferred pkt queue
*
* Called with 'node' locked, bc_lock unlocked
*/

static void bclink_set_gap(struct tipc_node *n_ptr)
{
struct sk_buff *buf = n_ptr->bclink.deferred_head;

n_ptr->bclink.gap_after = n_ptr->bclink.gap_to =
mod(n_ptr->bclink.last_in);
if (unlikely(buf != NULL))
n_ptr->bclink.gap_to = mod(buf_seqno(buf) - 1);
}

/**
* bclink_ack_allowed - test if ACK or NACK message can be sent at this moment
*
* This mechanism endeavours to prevent all nodes in network from trying
* to ACK or NACK at the same time.
*
* Note: TIPC uses a different trigger to distribute ACKs than it does to
* distribute NACKs, but tries to use the same spacing (divide by 16).
*/

static int bclink_ack_allowed(u32 n)
static void bclink_update_last_sent(struct tipc_node *node, u32 seqno)
{
return (n % TIPC_MIN_LINK_WIN) == tipc_own_tag;
node->bclink.last_sent = less_eq(node->bclink.last_sent, seqno) ?
seqno : node->bclink.last_sent;
}


/**
/*
* tipc_bclink_retransmit_to - get most recent node to request retransmission
*
* Called with bc_lock locked
Expand Down Expand Up @@ -300,44 +275,56 @@ void tipc_bclink_acknowledge(struct tipc_node *n_ptr, u32 acked)
spin_unlock_bh(&bc_lock);
}

/**
* bclink_send_ack - unicast an ACK msg
/*
* tipc_bclink_update_link_state - update broadcast link state
*
* tipc_net_lock and node lock set
*/

static void bclink_send_ack(struct tipc_node *n_ptr)
void tipc_bclink_update_link_state(struct tipc_node *n_ptr, u32 last_sent)
{
struct tipc_link *l_ptr = n_ptr->active_links[n_ptr->addr & 1];
struct sk_buff *buf;

if (l_ptr != NULL)
tipc_link_send_proto_msg(l_ptr, STATE_MSG, 0, 0, 0, 0, 0);
}
/* Ignore "stale" link state info */

/**
* bclink_send_nack- broadcast a NACK msg
*
* tipc_net_lock and node lock set
*/
if (less_eq(last_sent, n_ptr->bclink.last_in))
return;

static void bclink_send_nack(struct tipc_node *n_ptr)
{
struct sk_buff *buf;
struct tipc_msg *msg;
/* Update link synchronization state; quit if in sync */

bclink_update_last_sent(n_ptr, last_sent);

if (n_ptr->bclink.last_sent == n_ptr->bclink.last_in)
return;

/* Update out-of-sync state; quit if loss is still unconfirmed */

if (!less(n_ptr->bclink.gap_after, n_ptr->bclink.gap_to))
if ((++n_ptr->bclink.oos_state) == 1) {
if (n_ptr->bclink.deferred_size < (TIPC_MIN_LINK_WIN / 2))
return;
n_ptr->bclink.oos_state++;
}

/* Don't NACK if one has been recently sent (or seen) */

if (n_ptr->bclink.oos_state & 0x1)
return;

/* Send NACK */

buf = tipc_buf_acquire(INT_H_SIZE);
if (buf) {
msg = buf_msg(buf);
struct tipc_msg *msg = buf_msg(buf);

tipc_msg_init(msg, BCAST_PROTOCOL, STATE_MSG,
INT_H_SIZE, n_ptr->addr);
INT_H_SIZE, n_ptr->addr);
msg_set_non_seq(msg, 1);
msg_set_mc_netid(msg, tipc_net_id);
msg_set_bcast_ack(msg, mod(n_ptr->bclink.last_in));
msg_set_bcgap_after(msg, n_ptr->bclink.gap_after);
msg_set_bcgap_to(msg, n_ptr->bclink.gap_to);
msg_set_bcast_ack(msg, n_ptr->bclink.last_in);
msg_set_bcgap_after(msg, n_ptr->bclink.last_in);
msg_set_bcgap_to(msg, n_ptr->bclink.deferred_head
? buf_seqno(n_ptr->bclink.deferred_head) - 1
: n_ptr->bclink.last_sent);
msg_set_bcast_tag(msg, tipc_own_tag);

spin_lock_bh(&bc_lock);
Expand All @@ -346,96 +333,37 @@ static void bclink_send_nack(struct tipc_node *n_ptr)
spin_unlock_bh(&bc_lock);
buf_discard(buf);

/*
* Ensure we doesn't send another NACK msg to the node
* until 16 more deferred messages arrive from it
* (i.e. helps prevent all nodes from NACK'ing at same time)
*/

n_ptr->bclink.nack_sync = tipc_own_tag;
n_ptr->bclink.oos_state++;
}
}

/**
* tipc_bclink_check_gap - send a NACK if a sequence gap exists
/*
* bclink_peek_nack - monitor retransmission requests sent by other nodes
*
* tipc_net_lock and node lock set
*/

void tipc_bclink_check_gap(struct tipc_node *n_ptr, u32 last_sent)
{
if (!n_ptr->bclink.supported ||
less_eq(last_sent, mod(n_ptr->bclink.last_in)))
return;

bclink_set_gap(n_ptr);
if (n_ptr->bclink.gap_after == n_ptr->bclink.gap_to)
n_ptr->bclink.gap_to = last_sent;
bclink_send_nack(n_ptr);
}

/**
* tipc_bclink_peek_nack - process a NACK msg meant for another node
* Delay any upcoming NACK by this node if another node has already
* requested the first message this node is going to ask for.
*
* Only tipc_net_lock set.
*/

static void tipc_bclink_peek_nack(u32 dest, u32 sender_tag, u32 gap_after, u32 gap_to)
static void bclink_peek_nack(struct tipc_msg *msg)
{
struct tipc_node *n_ptr = tipc_node_find(dest);
u32 my_after, my_to;
struct tipc_node *n_ptr = tipc_node_find(msg_destnode(msg));

if (unlikely(!n_ptr || !tipc_node_is_up(n_ptr)))
if (unlikely(!n_ptr))
return;

tipc_node_lock(n_ptr);
/*
* Modify gap to suppress unnecessary NACKs from this node
*/
my_after = n_ptr->bclink.gap_after;
my_to = n_ptr->bclink.gap_to;

if (less_eq(gap_after, my_after)) {
if (less(my_after, gap_to) && less(gap_to, my_to))
n_ptr->bclink.gap_after = gap_to;
else if (less_eq(my_to, gap_to))
n_ptr->bclink.gap_to = n_ptr->bclink.gap_after;
} else if (less_eq(gap_after, my_to)) {
if (less_eq(my_to, gap_to))
n_ptr->bclink.gap_to = gap_after;
} else {
/*
* Expand gap if missing bufs not in deferred queue:
*/
struct sk_buff *buf = n_ptr->bclink.deferred_head;
u32 prev = n_ptr->bclink.gap_to;

for (; buf; buf = buf->next) {
u32 seqno = buf_seqno(buf);
if (n_ptr->bclink.supported &&
(n_ptr->bclink.last_in != n_ptr->bclink.last_sent) &&
(n_ptr->bclink.last_in == msg_bcgap_after(msg)))
n_ptr->bclink.oos_state = 2;

if (mod(seqno - prev) != 1) {
buf = NULL;
break;
}
if (seqno == gap_after)
break;
prev = seqno;
}
if (buf == NULL)
n_ptr->bclink.gap_to = gap_after;
}
/*
* Some nodes may send a complementary NACK now:
*/
if (bclink_ack_allowed(sender_tag + 1)) {
if (n_ptr->bclink.gap_to != n_ptr->bclink.gap_after) {
bclink_send_nack(n_ptr);
bclink_set_gap(n_ptr);
}
}
tipc_node_unlock(n_ptr);
}

/**
/*
* tipc_bclink_send_msg - broadcast a packet to all nodes in cluster
*/

Expand Down Expand Up @@ -505,10 +433,7 @@ void tipc_bclink_recv_pkt(struct sk_buff *buf)
spin_unlock_bh(&bc_lock);
} else {
tipc_node_unlock(node);
tipc_bclink_peek_nack(msg_destnode(msg),
msg_bcast_tag(msg),
msg_bcgap_after(msg),
msg_bcgap_to(msg));
bclink_peek_nack(msg);
}
goto exit;
}
Expand All @@ -519,16 +444,28 @@ void tipc_bclink_recv_pkt(struct sk_buff *buf)
next_in = mod(node->bclink.last_in + 1);

if (likely(seqno == next_in)) {
bclink_update_last_sent(node, seqno);
receive:
node->bclink.last_in = seqno;
node->bclink.oos_state = 0;

spin_lock_bh(&bc_lock);
bcl->stats.recv_info++;
node->bclink.last_in++;
bclink_set_gap(node);
if (unlikely(bclink_ack_allowed(seqno))) {
bclink_send_ack(node);

/*
* Unicast an ACK periodically, ensuring that
* all nodes in the cluster don't ACK at the same time
*/

if (((seqno - tipc_own_addr) % TIPC_MIN_LINK_WIN) == 0) {
tipc_link_send_proto_msg(
node->active_links[node->addr & 1],
STATE_MSG, 0, 0, 0, 0, 0);
bcl->stats.sent_acks++;
}

/* Deliver message to destination */

if (likely(msg_isdata(msg))) {
spin_unlock_bh(&bc_lock);
tipc_node_unlock(node);
Expand Down Expand Up @@ -567,9 +504,14 @@ void tipc_bclink_recv_pkt(struct sk_buff *buf)
if (unlikely(!tipc_node_is_up(node)))
goto unlock;

if (!node->bclink.deferred_head)
if (node->bclink.last_in == node->bclink.last_sent)
goto unlock;

if (!node->bclink.deferred_head) {
node->bclink.oos_state = 1;
goto unlock;
}

msg = buf_msg(node->bclink.deferred_head);
seqno = msg_seqno(msg);
next_in = mod(next_in + 1);
Expand All @@ -580,31 +522,19 @@ void tipc_bclink_recv_pkt(struct sk_buff *buf)

buf = node->bclink.deferred_head;
node->bclink.deferred_head = buf->next;
node->bclink.deferred_size--;
goto receive;
}

/* Handle out-of-sequence broadcast message */

if (less(next_in, seqno)) {
u32 gap_after = node->bclink.gap_after;
u32 gap_to = node->bclink.gap_to;

deferred = tipc_link_defer_pkt(&node->bclink.deferred_head,
&node->bclink.deferred_tail,
buf);
if (deferred) {
node->bclink.nack_sync++;
if (seqno == mod(gap_after + 1))
node->bclink.gap_after = seqno;
else if (less(gap_after, seqno) && less(seqno, gap_to))
node->bclink.gap_to = seqno;
}
node->bclink.deferred_size += deferred;
bclink_update_last_sent(node, seqno);
buf = NULL;
if (bclink_ack_allowed(node->bclink.nack_sync)) {
if (gap_to != gap_after)
bclink_send_nack(node);
bclink_set_gap(node);
}
} else
deferred = 0;

Expand Down
2 changes: 1 addition & 1 deletion trunk/net/tipc/bcast.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ int tipc_bclink_send_msg(struct sk_buff *buf);
void tipc_bclink_recv_pkt(struct sk_buff *buf);
u32 tipc_bclink_get_last_sent(void);
u32 tipc_bclink_acks_missing(struct tipc_node *n_ptr);
void tipc_bclink_check_gap(struct tipc_node *n_ptr, u32 seqno);
void tipc_bclink_update_link_state(struct tipc_node *n_ptr, u32 last_sent);
int tipc_bclink_stats(char *stats_buf, const u32 buf_size);
int tipc_bclink_reset_stats(void);
int tipc_bclink_set_queue_limits(u32 limit);
Expand Down
21 changes: 13 additions & 8 deletions trunk/net/tipc/link.c
Original file line number Diff line number Diff line change
Expand Up @@ -1501,14 +1501,13 @@ static void link_retransmit_failure(struct tipc_link *l_ptr,
tipc_node_lock(n_ptr);

tipc_addr_string_fill(addr_string, n_ptr->addr);
info("Multicast link info for %s\n", addr_string);
info("Broadcast link info for %s\n", addr_string);
info("Supportable: %d, ", n_ptr->bclink.supportable);
info("Supported: %d, ", n_ptr->bclink.supported);
info("Acked: %u\n", n_ptr->bclink.acked);
info("Last in: %u, ", n_ptr->bclink.last_in);
info("Gap after: %u, ", n_ptr->bclink.gap_after);
info("Gap to: %u\n", n_ptr->bclink.gap_to);
info("Nack sync: %u\n\n", n_ptr->bclink.nack_sync);
info("Oos state: %u, ", n_ptr->bclink.oos_state);
info("Last sent: %u\n", n_ptr->bclink.last_sent);

tipc_k_signal((Handler)link_reset_all, (unsigned long)n_ptr->addr);

Expand Down Expand Up @@ -1974,7 +1973,7 @@ void tipc_link_send_proto_msg(struct tipc_link *l_ptr, u32 msg_typ,

msg_set_type(msg, msg_typ);
msg_set_net_plane(msg, l_ptr->b_ptr->net_plane);
msg_set_bcast_ack(msg, mod(l_ptr->owner->bclink.last_in));
msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in);
msg_set_last_bcast(msg, tipc_bclink_get_last_sent());

if (msg_typ == STATE_MSG) {
Expand Down Expand Up @@ -2133,8 +2132,12 @@ static void link_recv_proto_msg(struct tipc_link *l_ptr, struct sk_buff *buf)

/* Synchronize broadcast link info, if not done previously */

if (!tipc_node_is_up(l_ptr->owner))
l_ptr->owner->bclink.last_in = msg_last_bcast(msg);
if (!tipc_node_is_up(l_ptr->owner)) {
l_ptr->owner->bclink.last_sent =
l_ptr->owner->bclink.last_in =
msg_last_bcast(msg);
l_ptr->owner->bclink.oos_state = 0;
}

l_ptr->peer_session = msg_session(msg);
l_ptr->peer_bearer_id = msg_bearer_id(msg);
Expand Down Expand Up @@ -2181,7 +2184,9 @@ static void link_recv_proto_msg(struct tipc_link *l_ptr, struct sk_buff *buf)

/* Protocol message before retransmits, reduce loss risk */

tipc_bclink_check_gap(l_ptr->owner, msg_last_bcast(msg));
if (l_ptr->owner->bclink.supported)
tipc_bclink_update_link_state(l_ptr->owner,
msg_last_bcast(msg));

if (rec_gap || (msg_probe(msg))) {
tipc_link_send_proto_msg(l_ptr, STATE_MSG,
Expand Down
Loading

0 comments on commit 9cdf8f4

Please sign in to comment.