Skip to content

Commit

Permalink
Merge branch 'tipc-next'
Browse files Browse the repository at this point in the history
Ying Xue says:

====================
tipc: purge signal handler infrastructure

When we delay some actions to be executed in asynchronous contexts,
these usually add unnecessary code complexities, and make their
behaviours unpredictable and indeterministic. Moreover, as the signal
handler infrastructure is first stopped when tipc module is removed,
this may cause some potential risks for us. For instance, although
signal handler is already stopped, some tipc components still submit
signal requests to signal handler infrastructure, which may lead to
some resources not to be released or freed correctly.

So the series aims to convert all actions being performed in tasklet
context asynchronously with interface provided by signal handler
infrastructure to be executed synchronously, thereby deleting the
whole infrastructure of signal handler.
====================

Signed-off-by: David S. Miller <davem@davemloft.net>
  • Loading branch information
David S. Miller committed May 5, 2014
2 parents 5b579e2 + 52ff872 commit 5a50a92
Show file tree
Hide file tree
Showing 17 changed files with 268 additions and 352 deletions.
2 changes: 1 addition & 1 deletion net/tipc/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
obj-$(CONFIG_TIPC) := tipc.o

tipc-y += addr.o bcast.o bearer.o config.o \
core.o handler.o link.o discover.o msg.o \
core.o link.o discover.o msg.o \
name_distr.o subscr.o name_table.o net.o \
netlink.o node.o node_subscr.o port.o ref.o \
socket.o log.o eth_media.o server.o
Expand Down
145 changes: 93 additions & 52 deletions net/tipc/bcast.c
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ struct tipc_bcbearer_pair {
* Note: The fields labelled "temporary" are incorporated into the bearer
* to avoid consuming potentially limited stack space through the use of
* large local variables within multicast routines. Concurrent access is
* prevented through use of the spinlock "bc_lock".
* prevented through use of the spinlock "bclink_lock".
*/
struct tipc_bcbearer {
struct tipc_bearer bearer;
Expand All @@ -84,28 +84,27 @@ struct tipc_bcbearer {

/**
* struct tipc_bclink - link used for broadcast messages
* @lock: spinlock governing access to structure
* @link: (non-standard) broadcast link structure
* @node: (non-standard) node structure representing b'cast link's peer node
* @flags: represent bclink states
* @bcast_nodes: map of broadcast-capable nodes
* @retransmit_to: node that most recently requested a retransmit
*
* Handles sequence numbering, fragmentation, bundling, etc.
*/
struct tipc_bclink {
spinlock_t lock;
struct tipc_link link;
struct tipc_node node;
unsigned int flags;
struct tipc_node_map bcast_nodes;
struct tipc_node *retransmit_to;
};

static struct tipc_bcbearer bcast_bearer;
static struct tipc_bclink bcast_link;

static struct tipc_bcbearer *bcbearer = &bcast_bearer;
static struct tipc_bclink *bclink = &bcast_link;
static struct tipc_link *bcl = &bcast_link.link;

static DEFINE_SPINLOCK(bc_lock);
static struct tipc_bcbearer *bcbearer;
static struct tipc_bclink *bclink;
static struct tipc_link *bcl;

const char tipc_bclink_name[] = "broadcast-link";

Expand All @@ -115,6 +114,35 @@ static void tipc_nmap_diff(struct tipc_node_map *nm_a,
static void tipc_nmap_add(struct tipc_node_map *nm_ptr, u32 node);
static void tipc_nmap_remove(struct tipc_node_map *nm_ptr, u32 node);

static void tipc_bclink_lock(void)
{
spin_lock_bh(&bclink->lock);
}

static void tipc_bclink_unlock(void)
{
struct tipc_node *node = NULL;

if (likely(!bclink->flags)) {
spin_unlock_bh(&bclink->lock);
return;
}

if (bclink->flags & TIPC_BCLINK_RESET) {
bclink->flags &= ~TIPC_BCLINK_RESET;
node = tipc_bclink_retransmit_to();
}
spin_unlock_bh(&bclink->lock);

if (node)
tipc_link_reset_all(node);
}

void tipc_bclink_set_flags(unsigned int flags)
{
bclink->flags |= flags;
}

static u32 bcbuf_acks(struct sk_buff *buf)
{
return (u32)(unsigned long)TIPC_SKB_CB(buf)->handle;
Expand All @@ -132,16 +160,16 @@ static void bcbuf_decr_acks(struct sk_buff *buf)

void tipc_bclink_add_node(u32 addr)
{
spin_lock_bh(&bc_lock);
tipc_bclink_lock();
tipc_nmap_add(&bclink->bcast_nodes, addr);
spin_unlock_bh(&bc_lock);
tipc_bclink_unlock();
}

void tipc_bclink_remove_node(u32 addr)
{
spin_lock_bh(&bc_lock);
tipc_bclink_lock();
tipc_nmap_remove(&bclink->bcast_nodes, addr);
spin_unlock_bh(&bc_lock);
tipc_bclink_unlock();
}

static void bclink_set_last_sent(void)
Expand All @@ -167,7 +195,7 @@ static void bclink_update_last_sent(struct tipc_node *node, u32 seqno)
/**
* tipc_bclink_retransmit_to - get most recent node to request retransmission
*
* Called with bc_lock locked
* Called with bclink_lock locked
*/
struct tipc_node *tipc_bclink_retransmit_to(void)
{
Expand All @@ -179,7 +207,7 @@ struct tipc_node *tipc_bclink_retransmit_to(void)
* @after: sequence number of last packet to *not* retransmit
* @to: sequence number of last packet to retransmit
*
* Called with bc_lock locked
* Called with bclink_lock locked
*/
static void bclink_retransmit_pkt(u32 after, u32 to)
{
Expand All @@ -196,16 +224,15 @@ static void bclink_retransmit_pkt(u32 after, u32 to)
* @n_ptr: node that sent acknowledgement info
* @acked: broadcast sequence # that has been acknowledged
*
* Node is locked, bc_lock unlocked.
* Node is locked, bclink_lock unlocked.
*/
void tipc_bclink_acknowledge(struct tipc_node *n_ptr, u32 acked)
{
struct sk_buff *crs;
struct sk_buff *next;
unsigned int released = 0;

spin_lock_bh(&bc_lock);

tipc_bclink_lock();
/* Bail out if tx queue is empty (no clean up is required) */
crs = bcl->first_out;
if (!crs)
Expand Down Expand Up @@ -269,7 +296,7 @@ void tipc_bclink_acknowledge(struct tipc_node *n_ptr, u32 acked)
if (unlikely(released && !list_empty(&bcl->waiting_ports)))
tipc_link_wakeup_ports(bcl, 0);
exit:
spin_unlock_bh(&bc_lock);
tipc_bclink_unlock();
}

/**
Expand Down Expand Up @@ -322,10 +349,10 @@ void tipc_bclink_update_link_state(struct tipc_node *n_ptr, u32 last_sent)
? buf_seqno(n_ptr->bclink.deferred_head) - 1
: n_ptr->bclink.last_sent);

spin_lock_bh(&bc_lock);
tipc_bclink_lock();
tipc_bearer_send(MAX_BEARERS, buf, NULL);
bcl->stats.sent_nacks++;
spin_unlock_bh(&bc_lock);
tipc_bclink_unlock();
kfree_skb(buf);

n_ptr->bclink.oos_state++;
Expand Down Expand Up @@ -362,7 +389,7 @@ int tipc_bclink_xmit(struct sk_buff *buf)
{
int res;

spin_lock_bh(&bc_lock);
tipc_bclink_lock();

if (!bclink->bcast_nodes.count) {
res = msg_data_sz(buf_msg(buf));
Expand All @@ -377,14 +404,14 @@ int tipc_bclink_xmit(struct sk_buff *buf)
bcl->stats.accu_queue_sz += bcl->out_queue_size;
}
exit:
spin_unlock_bh(&bc_lock);
tipc_bclink_unlock();
return res;
}

/**
* bclink_accept_pkt - accept an incoming, in-sequence broadcast packet
*
* Called with both sending node's lock and bc_lock taken.
* Called with both sending node's lock and bclink_lock taken.
*/
static void bclink_accept_pkt(struct tipc_node *node, u32 seqno)
{
Expand Down Expand Up @@ -439,12 +466,12 @@ void tipc_bclink_rcv(struct sk_buff *buf)
if (msg_destnode(msg) == tipc_own_addr) {
tipc_bclink_acknowledge(node, msg_bcast_ack(msg));
tipc_node_unlock(node);
spin_lock_bh(&bc_lock);
tipc_bclink_lock();
bcl->stats.recv_nacks++;
bclink->retransmit_to = node;
bclink_retransmit_pkt(msg_bcgap_after(msg),
msg_bcgap_to(msg));
spin_unlock_bh(&bc_lock);
tipc_bclink_unlock();
} else {
tipc_node_unlock(node);
bclink_peek_nack(msg);
Expand All @@ -462,20 +489,20 @@ void tipc_bclink_rcv(struct sk_buff *buf)
/* Deliver message to destination */

if (likely(msg_isdata(msg))) {
spin_lock_bh(&bc_lock);
tipc_bclink_lock();
bclink_accept_pkt(node, seqno);
spin_unlock_bh(&bc_lock);
tipc_bclink_unlock();
tipc_node_unlock(node);
if (likely(msg_mcast(msg)))
tipc_port_mcast_rcv(buf, NULL);
else
kfree_skb(buf);
} else if (msg_user(msg) == MSG_BUNDLER) {
spin_lock_bh(&bc_lock);
tipc_bclink_lock();
bclink_accept_pkt(node, seqno);
bcl->stats.recv_bundles++;
bcl->stats.recv_bundled += msg_msgcnt(msg);
spin_unlock_bh(&bc_lock);
tipc_bclink_unlock();
tipc_node_unlock(node);
tipc_link_bundle_rcv(buf);
} else if (msg_user(msg) == MSG_FRAGMENTER) {
Expand All @@ -485,28 +512,28 @@ void tipc_bclink_rcv(struct sk_buff *buf)
&buf);
if (ret == LINK_REASM_ERROR)
goto unlock;
spin_lock_bh(&bc_lock);
tipc_bclink_lock();
bclink_accept_pkt(node, seqno);
bcl->stats.recv_fragments++;
if (ret == LINK_REASM_COMPLETE) {
bcl->stats.recv_fragmented++;
/* Point msg to inner header */
msg = buf_msg(buf);
spin_unlock_bh(&bc_lock);
tipc_bclink_unlock();
goto receive;
}
spin_unlock_bh(&bc_lock);
tipc_bclink_unlock();
tipc_node_unlock(node);
} else if (msg_user(msg) == NAME_DISTRIBUTOR) {
spin_lock_bh(&bc_lock);
tipc_bclink_lock();
bclink_accept_pkt(node, seqno);
spin_unlock_bh(&bc_lock);
tipc_bclink_unlock();
tipc_node_unlock(node);
tipc_named_rcv(buf);
} else {
spin_lock_bh(&bc_lock);
tipc_bclink_lock();
bclink_accept_pkt(node, seqno);
spin_unlock_bh(&bc_lock);
tipc_bclink_unlock();
tipc_node_unlock(node);
kfree_skb(buf);
}
Expand Down Expand Up @@ -552,14 +579,14 @@ void tipc_bclink_rcv(struct sk_buff *buf)
} else
deferred = 0;

spin_lock_bh(&bc_lock);
tipc_bclink_lock();

if (deferred)
bcl->stats.deferred_recv++;
else
bcl->stats.duplicates++;

spin_unlock_bh(&bc_lock);
tipc_bclink_unlock();

unlock:
tipc_node_unlock(node);
Expand Down Expand Up @@ -663,7 +690,7 @@ void tipc_bcbearer_sort(struct tipc_node_map *nm_ptr, u32 node, bool action)
int b_index;
int pri;

spin_lock_bh(&bc_lock);
tipc_bclink_lock();

if (action)
tipc_nmap_add(nm_ptr, node);
Expand Down Expand Up @@ -710,7 +737,7 @@ void tipc_bcbearer_sort(struct tipc_node_map *nm_ptr, u32 node, bool action)
bp_curr++;
}

spin_unlock_bh(&bc_lock);
tipc_bclink_unlock();
}


Expand All @@ -722,7 +749,7 @@ int tipc_bclink_stats(char *buf, const u32 buf_size)
if (!bcl)
return 0;

spin_lock_bh(&bc_lock);
tipc_bclink_lock();

s = &bcl->stats;

Expand Down Expand Up @@ -751,7 +778,7 @@ int tipc_bclink_stats(char *buf, const u32 buf_size)
s->queue_sz_counts ?
(s->accu_queue_sz / s->queue_sz_counts) : 0);

spin_unlock_bh(&bc_lock);
tipc_bclink_unlock();
return ret;
}

Expand All @@ -760,9 +787,9 @@ int tipc_bclink_reset_stats(void)
if (!bcl)
return -ENOPROTOOPT;

spin_lock_bh(&bc_lock);
tipc_bclink_lock();
memset(&bcl->stats, 0, sizeof(bcl->stats));
spin_unlock_bh(&bc_lock);
tipc_bclink_unlock();
return 0;
}

Expand All @@ -773,18 +800,30 @@ int tipc_bclink_set_queue_limits(u32 limit)
if ((limit < TIPC_MIN_LINK_WIN) || (limit > TIPC_MAX_LINK_WIN))
return -EINVAL;

spin_lock_bh(&bc_lock);
tipc_bclink_lock();
tipc_link_set_queue_limits(bcl, limit);
spin_unlock_bh(&bc_lock);
tipc_bclink_unlock();
return 0;
}

void tipc_bclink_init(void)
int tipc_bclink_init(void)
{
bcbearer = kzalloc(sizeof(*bcbearer), GFP_ATOMIC);
if (!bcbearer)
return -ENOMEM;

bclink = kzalloc(sizeof(*bclink), GFP_ATOMIC);
if (!bclink) {
kfree(bcbearer);
return -ENOMEM;
}

bcl = &bclink->link;
bcbearer->bearer.media = &bcbearer->media;
bcbearer->media.send_msg = tipc_bcbearer_send;
sprintf(bcbearer->media.name, "tipc-broadcast");

spin_lock_init(&bclink->lock);
INIT_LIST_HEAD(&bcl->waiting_ports);
bcl->next_out_no = 1;
spin_lock_init(&bclink->node.lock);
Expand All @@ -795,17 +834,19 @@ void tipc_bclink_init(void)
rcu_assign_pointer(bearer_list[MAX_BEARERS], &bcbearer->bearer);
bcl->state = WORKING_WORKING;
strlcpy(bcl->name, tipc_bclink_name, TIPC_MAX_LINK_NAME);
return 0;
}

void tipc_bclink_stop(void)
{
spin_lock_bh(&bc_lock);
tipc_bclink_lock();
tipc_link_purge_queues(bcl);
spin_unlock_bh(&bc_lock);
tipc_bclink_unlock();

RCU_INIT_POINTER(bearer_list[BCBEARER], NULL);
memset(bclink, 0, sizeof(*bclink));
memset(bcbearer, 0, sizeof(*bcbearer));
synchronize_net();
kfree(bcbearer);
kfree(bclink);
}

/**
Expand Down
Loading

0 comments on commit 5a50a92

Please sign in to comment.