diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c
index 9dc239dfe1921..e401108360a2b 100644
--- a/net/tipc/bcast.c
+++ b/net/tipc/bcast.c
@@ -332,131 +332,15 @@ void tipc_bcast_remove_peer(struct net *net, struct tipc_link *rcv_l)
 		tipc_sk_rcv(net, inputq);
 }
 
-static int __tipc_nl_add_bc_link_stat(struct sk_buff *skb,
-				      struct tipc_stats *stats)
-{
-	int i;
-	struct nlattr *nest;
-
-	struct nla_map {
-		__u32 key;
-		__u32 val;
-	};
-
-	struct nla_map map[] = {
-		{TIPC_NLA_STATS_RX_INFO, stats->recv_info},
-		{TIPC_NLA_STATS_RX_FRAGMENTS, stats->recv_fragments},
-		{TIPC_NLA_STATS_RX_FRAGMENTED, stats->recv_fragmented},
-		{TIPC_NLA_STATS_RX_BUNDLES, stats->recv_bundles},
-		{TIPC_NLA_STATS_RX_BUNDLED, stats->recv_bundled},
-		{TIPC_NLA_STATS_TX_INFO, stats->sent_info},
-		{TIPC_NLA_STATS_TX_FRAGMENTS, stats->sent_fragments},
-		{TIPC_NLA_STATS_TX_FRAGMENTED, stats->sent_fragmented},
-		{TIPC_NLA_STATS_TX_BUNDLES, stats->sent_bundles},
-		{TIPC_NLA_STATS_TX_BUNDLED, stats->sent_bundled},
-		{TIPC_NLA_STATS_RX_NACKS, stats->recv_nacks},
-		{TIPC_NLA_STATS_RX_DEFERRED, stats->deferred_recv},
-		{TIPC_NLA_STATS_TX_NACKS, stats->sent_nacks},
-		{TIPC_NLA_STATS_TX_ACKS, stats->sent_acks},
-		{TIPC_NLA_STATS_RETRANSMITTED, stats->retransmitted},
-		{TIPC_NLA_STATS_DUPLICATES, stats->duplicates},
-		{TIPC_NLA_STATS_LINK_CONGS, stats->link_congs},
-		{TIPC_NLA_STATS_MAX_QUEUE, stats->max_queue_sz},
-		{TIPC_NLA_STATS_AVG_QUEUE, stats->queue_sz_counts ?
-			(stats->accu_queue_sz / stats->queue_sz_counts) : 0}
-	};
-
-	nest = nla_nest_start(skb, TIPC_NLA_LINK_STATS);
-	if (!nest)
-		return -EMSGSIZE;
-
-	for (i = 0; i <  ARRAY_SIZE(map); i++)
-		if (nla_put_u32(skb, map[i].key, map[i].val))
-			goto msg_full;
-
-	nla_nest_end(skb, nest);
-
-	return 0;
-msg_full:
-	nla_nest_cancel(skb, nest);
-
-	return -EMSGSIZE;
-}
-
-int tipc_nl_add_bc_link(struct net *net, struct tipc_nl_msg *msg)
-{
-	int err;
-	void *hdr;
-	struct nlattr *attrs;
-	struct nlattr *prop;
-	struct tipc_net *tn = net_generic(net, tipc_net_id);
-	struct tipc_link *bcl = tn->bcl;
-
-	if (!bcl)
-		return 0;
-
-	tipc_bcast_lock(net);
-
-	hdr = genlmsg_put(msg->skb, msg->portid, msg->seq, &tipc_genl_family,
-			  NLM_F_MULTI, TIPC_NL_LINK_GET);
-	if (!hdr)
-		return -EMSGSIZE;
-
-	attrs = nla_nest_start(msg->skb, TIPC_NLA_LINK);
-	if (!attrs)
-		goto msg_full;
-
-	/* The broadcast link is always up */
-	if (nla_put_flag(msg->skb, TIPC_NLA_LINK_UP))
-		goto attr_msg_full;
-
-	if (nla_put_flag(msg->skb, TIPC_NLA_LINK_BROADCAST))
-		goto attr_msg_full;
-	if (nla_put_string(msg->skb, TIPC_NLA_LINK_NAME, bcl->name))
-		goto attr_msg_full;
-	if (nla_put_u32(msg->skb, TIPC_NLA_LINK_RX, bcl->rcv_nxt))
-		goto attr_msg_full;
-	if (nla_put_u32(msg->skb, TIPC_NLA_LINK_TX, bcl->snd_nxt))
-		goto attr_msg_full;
-
-	prop = nla_nest_start(msg->skb, TIPC_NLA_LINK_PROP);
-	if (!prop)
-		goto attr_msg_full;
-	if (nla_put_u32(msg->skb, TIPC_NLA_PROP_WIN, bcl->window))
-		goto prop_msg_full;
-	nla_nest_end(msg->skb, prop);
-
-	err = __tipc_nl_add_bc_link_stat(msg->skb, &bcl->stats);
-	if (err)
-		goto attr_msg_full;
-
-	tipc_bcast_unlock(net);
-	nla_nest_end(msg->skb, attrs);
-	genlmsg_end(msg->skb, hdr);
-
-	return 0;
-
-prop_msg_full:
-	nla_nest_cancel(msg->skb, prop);
-attr_msg_full:
-	nla_nest_cancel(msg->skb, attrs);
-msg_full:
-	tipc_bcast_unlock(net);
-	genlmsg_cancel(msg->skb, hdr);
-
-	return -EMSGSIZE;
-}
-
 int tipc_bclink_reset_stats(struct net *net)
 {
-	struct tipc_net *tn = net_generic(net, tipc_net_id);
-	struct tipc_link *bcl = tn->bcl;
+	struct tipc_link *l = tipc_bc_sndlink(net);
 
-	if (!bcl)
+	if (!l)
 		return -ENOPROTOOPT;
 
 	tipc_bcast_lock(net);
-	memset(&bcl->stats, 0, sizeof(bcl->stats));
+	tipc_link_reset_stats(l);
 	tipc_bcast_unlock(net);
 	return 0;
 }
@@ -530,9 +414,7 @@ int tipc_bcast_init(struct net *net)
 
 void tipc_bcast_reinit(struct net *net)
 {
-	struct tipc_bc_base *b = tipc_bc_base(net);
-
-	msg_set_prevnode(b->link->pmsg, tipc_own_addr(net));
+	tipc_link_reinit(tipc_bc_sndlink(net), tipc_own_addr(net));
 }
 
 void tipc_bcast_stop(struct net *net)
diff --git a/net/tipc/bcast.h b/net/tipc/bcast.h
index 2855b9356a152..1944c6c00bb91 100644
--- a/net/tipc/bcast.h
+++ b/net/tipc/bcast.h
@@ -43,6 +43,7 @@ struct tipc_node;
 struct tipc_msg;
 struct tipc_nl_msg;
 struct tipc_node_map;
+extern const char tipc_bclink_name[];
 
 int tipc_bcast_init(struct net *net);
 void tipc_bcast_reinit(struct net *net);
diff --git a/net/tipc/bearer.c b/net/tipc/bearer.c
index 648f2a67f3148..802ffad3200da 100644
--- a/net/tipc/bearer.c
+++ b/net/tipc/bearer.c
@@ -71,7 +71,7 @@ static const struct nla_policy tipc_nl_media_policy[TIPC_NLA_MEDIA_MAX + 1] = {
 	[TIPC_NLA_MEDIA_PROP]		= { .type = NLA_NESTED }
 };
 
-static void bearer_disable(struct net *net, struct tipc_bearer *b_ptr);
+static void bearer_disable(struct net *net, struct tipc_bearer *b);
 
 /**
  * tipc_media_find - locates specified media object by name
@@ -107,13 +107,13 @@ static struct tipc_media *media_find_id(u8 type)
 void tipc_media_addr_printf(char *buf, int len, struct tipc_media_addr *a)
 {
 	char addr_str[MAX_ADDR_STR];
-	struct tipc_media *m_ptr;
+	struct tipc_media *m;
 	int ret;
 
-	m_ptr = media_find_id(a->media_id);
+	m = media_find_id(a->media_id);
 
-	if (m_ptr && !m_ptr->addr2str(a, addr_str, sizeof(addr_str)))
-		ret = scnprintf(buf, len, "%s(%s)", m_ptr->name, addr_str);
+	if (m && !m->addr2str(a, addr_str, sizeof(addr_str)))
+		ret = scnprintf(buf, len, "%s(%s)", m->name, addr_str);
 	else {
 		u32 i;
 
@@ -175,13 +175,13 @@ static int bearer_name_validate(const char *name,
 struct tipc_bearer *tipc_bearer_find(struct net *net, const char *name)
 {
 	struct tipc_net *tn = net_generic(net, tipc_net_id);
-	struct tipc_bearer *b_ptr;
+	struct tipc_bearer *b;
 	u32 i;
 
 	for (i = 0; i < MAX_BEARERS; i++) {
-		b_ptr = rtnl_dereference(tn->bearer_list[i]);
-		if (b_ptr && (!strcmp(b_ptr->name, name)))
-			return b_ptr;
+		b = rtnl_dereference(tn->bearer_list[i]);
+		if (b && (!strcmp(b->name, name)))
+			return b;
 	}
 	return NULL;
 }
@@ -189,24 +189,24 @@ struct tipc_bearer *tipc_bearer_find(struct net *net, const char *name)
 void tipc_bearer_add_dest(struct net *net, u32 bearer_id, u32 dest)
 {
 	struct tipc_net *tn = net_generic(net, tipc_net_id);
-	struct tipc_bearer *b_ptr;
+	struct tipc_bearer *b;
 
 	rcu_read_lock();
-	b_ptr = rcu_dereference_rtnl(tn->bearer_list[bearer_id]);
-	if (b_ptr)
-		tipc_disc_add_dest(b_ptr->link_req);
+	b = rcu_dereference_rtnl(tn->bearer_list[bearer_id]);
+	if (b)
+		tipc_disc_add_dest(b->link_req);
 	rcu_read_unlock();
 }
 
 void tipc_bearer_remove_dest(struct net *net, u32 bearer_id, u32 dest)
 {
 	struct tipc_net *tn = net_generic(net, tipc_net_id);
-	struct tipc_bearer *b_ptr;
+	struct tipc_bearer *b;
 
 	rcu_read_lock();
-	b_ptr = rcu_dereference_rtnl(tn->bearer_list[bearer_id]);
-	if (b_ptr)
-		tipc_disc_remove_dest(b_ptr->link_req);
+	b = rcu_dereference_rtnl(tn->bearer_list[bearer_id]);
+	if (b)
+		tipc_disc_remove_dest(b->link_req);
 	rcu_read_unlock();
 }
 
@@ -218,8 +218,8 @@ static int tipc_enable_bearer(struct net *net, const char *name,
 			      struct nlattr *attr[])
 {
 	struct tipc_net *tn = net_generic(net, tipc_net_id);
-	struct tipc_bearer *b_ptr;
-	struct tipc_media *m_ptr;
+	struct tipc_bearer *b;
+	struct tipc_media *m;
 	struct tipc_bearer_names b_names;
 	char addr_string[16];
 	u32 bearer_id;
@@ -255,31 +255,31 @@ static int tipc_enable_bearer(struct net *net, const char *name,
 		return -EINVAL;
 	}
 
-	m_ptr = tipc_media_find(b_names.media_name);
-	if (!m_ptr) {
+	m = tipc_media_find(b_names.media_name);
+	if (!m) {
 		pr_warn("Bearer <%s> rejected, media <%s> not registered\n",
 			name, b_names.media_name);
 		return -EINVAL;
 	}
 
 	if (priority == TIPC_MEDIA_LINK_PRI)
-		priority = m_ptr->priority;
+		priority = m->priority;
 
 restart:
 	bearer_id = MAX_BEARERS;
 	with_this_prio = 1;
 	for (i = MAX_BEARERS; i-- != 0; ) {
-		b_ptr = rtnl_dereference(tn->bearer_list[i]);
-		if (!b_ptr) {
+		b = rtnl_dereference(tn->bearer_list[i]);
+		if (!b) {
 			bearer_id = i;
 			continue;
 		}
-		if (!strcmp(name, b_ptr->name)) {
+		if (!strcmp(name, b->name)) {
 			pr_warn("Bearer <%s> rejected, already enabled\n",
 				name);
 			return -EINVAL;
 		}
-		if ((b_ptr->priority == priority) &&
+		if ((b->priority == priority) &&
 		    (++with_this_prio > 2)) {
 			if (priority-- == 0) {
 				pr_warn("Bearer <%s> rejected, duplicate priority\n",
@@ -297,35 +297,35 @@ static int tipc_enable_bearer(struct net *net, const char *name,
 		return -EINVAL;
 	}
 
-	b_ptr = kzalloc(sizeof(*b_ptr), GFP_ATOMIC);
-	if (!b_ptr)
+	b = kzalloc(sizeof(*b), GFP_ATOMIC);
+	if (!b)
 		return -ENOMEM;
 
-	strcpy(b_ptr->name, name);
-	b_ptr->media = m_ptr;
-	res = m_ptr->enable_media(net, b_ptr, attr);
+	strcpy(b->name, name);
+	b->media = m;
+	res = m->enable_media(net, b, attr);
 	if (res) {
 		pr_warn("Bearer <%s> rejected, enable failure (%d)\n",
 			name, -res);
 		return -EINVAL;
 	}
 
-	b_ptr->identity = bearer_id;
-	b_ptr->tolerance = m_ptr->tolerance;
-	b_ptr->window = m_ptr->window;
-	b_ptr->domain = disc_domain;
-	b_ptr->net_plane = bearer_id + 'A';
-	b_ptr->priority = priority;
+	b->identity = bearer_id;
+	b->tolerance = m->tolerance;
+	b->window = m->window;
+	b->domain = disc_domain;
+	b->net_plane = bearer_id + 'A';
+	b->priority = priority;
 
-	res = tipc_disc_create(net, b_ptr, &b_ptr->bcast_addr);
+	res = tipc_disc_create(net, b, &b->bcast_addr);
 	if (res) {
-		bearer_disable(net, b_ptr);
+		bearer_disable(net, b);
 		pr_warn("Bearer <%s> rejected, discovery object creation failed\n",
 			name);
 		return -EINVAL;
 	}
 
-	rcu_assign_pointer(tn->bearer_list[bearer_id], b_ptr);
+	rcu_assign_pointer(tn->bearer_list[bearer_id], b);
 
 	pr_info("Enabled bearer <%s>, discovery domain %s, priority %u\n",
 		name,
@@ -336,11 +336,11 @@ static int tipc_enable_bearer(struct net *net, const char *name,
 /**
  * tipc_reset_bearer - Reset all links established over this bearer
  */
-static int tipc_reset_bearer(struct net *net, struct tipc_bearer *b_ptr)
+static int tipc_reset_bearer(struct net *net, struct tipc_bearer *b)
 {
-	pr_info("Resetting bearer <%s>\n", b_ptr->name);
-	tipc_node_delete_links(net, b_ptr->identity);
-	tipc_disc_reset(net, b_ptr);
+	pr_info("Resetting bearer <%s>\n", b->name);
+	tipc_node_delete_links(net, b->identity);
+	tipc_disc_reset(net, b);
 	return 0;
 }
 
@@ -349,26 +349,26 @@ static int tipc_reset_bearer(struct net *net, struct tipc_bearer *b_ptr)
  *
  * Note: This routine assumes caller holds RTNL lock.
  */
-static void bearer_disable(struct net *net, struct tipc_bearer *b_ptr)
+static void bearer_disable(struct net *net, struct tipc_bearer *b)
 {
 	struct tipc_net *tn = net_generic(net, tipc_net_id);
 	u32 i;
 
-	pr_info("Disabling bearer <%s>\n", b_ptr->name);
-	b_ptr->media->disable_media(b_ptr);
+	pr_info("Disabling bearer <%s>\n", b->name);
+	b->media->disable_media(b);
 
-	tipc_node_delete_links(net, b_ptr->identity);
-	RCU_INIT_POINTER(b_ptr->media_ptr, NULL);
-	if (b_ptr->link_req)
-		tipc_disc_delete(b_ptr->link_req);
+	tipc_node_delete_links(net, b->identity);
+	RCU_INIT_POINTER(b->media_ptr, NULL);
+	if (b->link_req)
+		tipc_disc_delete(b->link_req);
 
 	for (i = 0; i < MAX_BEARERS; i++) {
-		if (b_ptr == rtnl_dereference(tn->bearer_list[i])) {
+		if (b == rtnl_dereference(tn->bearer_list[i])) {
 			RCU_INIT_POINTER(tn->bearer_list[i], NULL);
 			break;
 		}
 	}
-	kfree_rcu(b_ptr, rcu);
+	kfree_rcu(b, rcu);
 }
 
 int tipc_enable_l2_media(struct net *net, struct tipc_bearer *b,
@@ -411,7 +411,7 @@ void tipc_disable_l2_media(struct tipc_bearer *b)
 /**
  * tipc_l2_send_msg - send a TIPC packet out over an L2 interface
  * @buf: the packet to be sent
- * @b_ptr: the bearer through which the packet is to be sent
+ * @b: the bearer through which the packet is to be sent
  * @dest: peer destination address
  */
 int tipc_l2_send_msg(struct net *net, struct sk_buff *skb,
@@ -532,14 +532,14 @@ void tipc_bearer_bc_xmit(struct net *net, u32 bearer_id,
 static int tipc_l2_rcv_msg(struct sk_buff *buf, struct net_device *dev,
 			   struct packet_type *pt, struct net_device *orig_dev)
 {
-	struct tipc_bearer *b_ptr;
+	struct tipc_bearer *b;
 
 	rcu_read_lock();
-	b_ptr = rcu_dereference_rtnl(dev->tipc_ptr);
-	if (likely(b_ptr)) {
+	b = rcu_dereference_rtnl(dev->tipc_ptr);
+	if (likely(b)) {
 		if (likely(buf->pkt_type <= PACKET_BROADCAST)) {
 			buf->next = NULL;
-			tipc_rcv(dev_net(dev), buf, b_ptr);
+			tipc_rcv(dev_net(dev), buf, b);
 			rcu_read_unlock();
 			return NET_RX_SUCCESS;
 		}
@@ -564,13 +564,13 @@ static int tipc_l2_device_event(struct notifier_block *nb, unsigned long evt,
 {
 	struct net_device *dev = netdev_notifier_info_to_dev(ptr);
 	struct net *net = dev_net(dev);
-	struct tipc_bearer *b_ptr;
+	struct tipc_bearer *b;
 
-	b_ptr = rtnl_dereference(dev->tipc_ptr);
-	if (!b_ptr)
+	b = rtnl_dereference(dev->tipc_ptr);
+	if (!b)
 		return NOTIFY_DONE;
 
-	b_ptr->mtu = dev->mtu;
+	b->mtu = dev->mtu;
 
 	switch (evt) {
 	case NETDEV_CHANGE:
@@ -578,16 +578,16 @@ static int tipc_l2_device_event(struct notifier_block *nb, unsigned long evt,
 			break;
 	case NETDEV_GOING_DOWN:
 	case NETDEV_CHANGEMTU:
-		tipc_reset_bearer(net, b_ptr);
+		tipc_reset_bearer(net, b);
 		break;
 	case NETDEV_CHANGEADDR:
-		b_ptr->media->raw2addr(b_ptr, &b_ptr->addr,
+		b->media->raw2addr(b, &b->addr,
 				       (char *)dev->dev_addr);
-		tipc_reset_bearer(net, b_ptr);
+		tipc_reset_bearer(net, b);
 		break;
 	case NETDEV_UNREGISTER:
 	case NETDEV_CHANGENAME:
-		bearer_disable(dev_net(dev), b_ptr);
+		bearer_disable(dev_net(dev), b);
 		break;
 	}
 	return NOTIFY_OK;
@@ -623,13 +623,13 @@ void tipc_bearer_cleanup(void)
 void tipc_bearer_stop(struct net *net)
 {
 	struct tipc_net *tn = net_generic(net, tipc_net_id);
-	struct tipc_bearer *b_ptr;
+	struct tipc_bearer *b;
 	u32 i;
 
 	for (i = 0; i < MAX_BEARERS; i++) {
-		b_ptr = rtnl_dereference(tn->bearer_list[i]);
-		if (b_ptr) {
-			bearer_disable(net, b_ptr);
+		b = rtnl_dereference(tn->bearer_list[i]);
+		if (b) {
+			bearer_disable(net, b);
 			tn->bearer_list[i] = NULL;
 		}
 	}
diff --git a/net/tipc/bearer.h b/net/tipc/bearer.h
index 552185bc47732..e318205167740 100644
--- a/net/tipc/bearer.h
+++ b/net/tipc/bearer.h
@@ -103,11 +103,11 @@ struct tipc_bearer;
  */
 struct tipc_media {
 	int (*send_msg)(struct net *net, struct sk_buff *buf,
-			struct tipc_bearer *b_ptr,
+			struct tipc_bearer *b,
 			struct tipc_media_addr *dest);
-	int (*enable_media)(struct net *net, struct tipc_bearer *b_ptr,
+	int (*enable_media)(struct net *net, struct tipc_bearer *b,
 			    struct nlattr *attr[]);
-	void (*disable_media)(struct tipc_bearer *b_ptr);
+	void (*disable_media)(struct tipc_bearer *b);
 	int (*addr2str)(struct tipc_media_addr *addr,
 			char *strbuf,
 			int bufsz);
@@ -176,7 +176,7 @@ struct tipc_bearer_names {
  * TIPC routines available to supported media types
  */
 
-void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b_ptr);
+void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b);
 
 /*
  * Routines made available to TIPC by supported media types
diff --git a/net/tipc/core.h b/net/tipc/core.h
index 18e95a8020cd4..5504d63503df4 100644
--- a/net/tipc/core.h
+++ b/net/tipc/core.h
@@ -118,6 +118,11 @@ static inline int tipc_netid(struct net *net)
 	return tipc_net(net)->net_id;
 }
 
+static inline struct list_head *tipc_nodes(struct net *net)
+{
+	return &tipc_net(net)->node_list;
+}
+
 static inline u16 mod(u16 x)
 {
 	return x & 0xffffu;
diff --git a/net/tipc/discover.c b/net/tipc/discover.c
index afe8c47c4085c..f1e738e80535a 100644
--- a/net/tipc/discover.c
+++ b/net/tipc/discover.c
@@ -75,14 +75,14 @@ struct tipc_link_req {
  * tipc_disc_init_msg - initialize a link setup message
  * @net: the applicable net namespace
  * @type: message type (request or response)
- * @b_ptr: ptr to bearer issuing message
+ * @b: ptr to bearer issuing message
  */
 static void tipc_disc_init_msg(struct net *net, struct sk_buff *buf, u32 type,
-			       struct tipc_bearer *b_ptr)
+			       struct tipc_bearer *b)
 {
 	struct tipc_net *tn = net_generic(net, tipc_net_id);
 	struct tipc_msg *msg;
-	u32 dest_domain = b_ptr->domain;
+	u32 dest_domain = b->domain;
 
 	msg = buf_msg(buf);
 	tipc_msg_init(tn->own_addr, msg, LINK_CONFIG, type,
@@ -92,16 +92,16 @@ static void tipc_disc_init_msg(struct net *net, struct sk_buff *buf, u32 type,
 	msg_set_node_capabilities(msg, TIPC_NODE_CAPABILITIES);
 	msg_set_dest_domain(msg, dest_domain);
 	msg_set_bc_netid(msg, tn->net_id);
-	b_ptr->media->addr2msg(msg_media_addr(msg), &b_ptr->addr);
+	b->media->addr2msg(msg_media_addr(msg), &b->addr);
 }
 
 /**
  * disc_dupl_alert - issue node address duplication alert
- * @b_ptr: pointer to bearer detecting duplication
+ * @b: pointer to bearer detecting duplication
  * @node_addr: duplicated node address
  * @media_addr: media address advertised by duplicated node
  */
-static void disc_dupl_alert(struct tipc_bearer *b_ptr, u32 node_addr,
+static void disc_dupl_alert(struct tipc_bearer *b, u32 node_addr,
 			    struct tipc_media_addr *media_addr)
 {
 	char node_addr_str[16];
@@ -111,7 +111,7 @@ static void disc_dupl_alert(struct tipc_bearer *b_ptr, u32 node_addr,
 	tipc_media_addr_printf(media_addr_str, sizeof(media_addr_str),
 			       media_addr);
 	pr_warn("Duplicate %s using %s seen on <%s>\n", node_addr_str,
-		media_addr_str, b_ptr->name);
+		media_addr_str, b->name);
 }
 
 /**
@@ -261,13 +261,13 @@ static void disc_timeout(unsigned long data)
 /**
  * tipc_disc_create - create object to send periodic link setup requests
  * @net: the applicable net namespace
- * @b_ptr: ptr to bearer issuing requests
+ * @b: ptr to bearer issuing requests
  * @dest: destination address for request messages
  * @dest_domain: network domain to which links can be established
  *
  * Returns 0 if successful, otherwise -errno.
  */
-int tipc_disc_create(struct net *net, struct tipc_bearer *b_ptr,
+int tipc_disc_create(struct net *net, struct tipc_bearer *b,
 		     struct tipc_media_addr *dest)
 {
 	struct tipc_link_req *req;
@@ -282,17 +282,17 @@ int tipc_disc_create(struct net *net, struct tipc_bearer *b_ptr,
 		return -ENOMEM;
 	}
 
-	tipc_disc_init_msg(net, req->buf, DSC_REQ_MSG, b_ptr);
+	tipc_disc_init_msg(net, req->buf, DSC_REQ_MSG, b);
 	memcpy(&req->dest, dest, sizeof(*dest));
 	req->net = net;
-	req->bearer_id = b_ptr->identity;
-	req->domain = b_ptr->domain;
+	req->bearer_id = b->identity;
+	req->domain = b->domain;
 	req->num_nodes = 0;
 	req->timer_intv = TIPC_LINK_REQ_INIT;
 	spin_lock_init(&req->lock);
 	setup_timer(&req->timer, disc_timeout, (unsigned long)req);
 	mod_timer(&req->timer, jiffies + req->timer_intv);
-	b_ptr->link_req = req;
+	b->link_req = req;
 	skb = skb_clone(req->buf, GFP_ATOMIC);
 	if (skb)
 		tipc_bearer_xmit_skb(net, req->bearer_id, skb, &req->dest);
@@ -313,19 +313,19 @@ void tipc_disc_delete(struct tipc_link_req *req)
 /**
  * tipc_disc_reset - reset object to send periodic link setup requests
  * @net: the applicable net namespace
- * @b_ptr: ptr to bearer issuing requests
+ * @b: ptr to bearer issuing requests
  * @dest_domain: network domain to which links can be established
  */
-void tipc_disc_reset(struct net *net, struct tipc_bearer *b_ptr)
+void tipc_disc_reset(struct net *net, struct tipc_bearer *b)
 {
-	struct tipc_link_req *req = b_ptr->link_req;
+	struct tipc_link_req *req = b->link_req;
 	struct sk_buff *skb;
 
 	spin_lock_bh(&req->lock);
-	tipc_disc_init_msg(net, req->buf, DSC_REQ_MSG, b_ptr);
+	tipc_disc_init_msg(net, req->buf, DSC_REQ_MSG, b);
 	req->net = net;
-	req->bearer_id = b_ptr->identity;
-	req->domain = b_ptr->domain;
+	req->bearer_id = b->identity;
+	req->domain = b->domain;
 	req->num_nodes = 0;
 	req->timer_intv = TIPC_LINK_REQ_INIT;
 	mod_timer(&req->timer, jiffies + req->timer_intv);
diff --git a/net/tipc/link.c b/net/tipc/link.c
index 9efbdbde2b086..b11afe71dfc18 100644
--- a/net/tipc/link.c
+++ b/net/tipc/link.c
@@ -45,28 +45,156 @@
 
 #include <linux/pkt_sched.h>
 
+struct tipc_stats {
+	u32 sent_info;		/* used in counting # sent packets */
+	u32 recv_info;		/* used in counting # recv'd packets */
+	u32 sent_states;
+	u32 recv_states;
+	u32 sent_probes;
+	u32 recv_probes;
+	u32 sent_nacks;
+	u32 recv_nacks;
+	u32 sent_acks;
+	u32 sent_bundled;
+	u32 sent_bundles;
+	u32 recv_bundled;
+	u32 recv_bundles;
+	u32 retransmitted;
+	u32 sent_fragmented;
+	u32 sent_fragments;
+	u32 recv_fragmented;
+	u32 recv_fragments;
+	u32 link_congs;		/* # port sends blocked by congestion */
+	u32 deferred_recv;
+	u32 duplicates;
+	u32 max_queue_sz;	/* send queue size high water mark */
+	u32 accu_queue_sz;	/* used for send queue size profiling */
+	u32 queue_sz_counts;	/* used for send queue size profiling */
+	u32 msg_length_counts;	/* used for message length profiling */
+	u32 msg_lengths_total;	/* used for message length profiling */
+	u32 msg_length_profile[7]; /* used for msg. length profiling */
+};
+
+/**
+ * struct tipc_link - TIPC link data structure
+ * @addr: network address of link's peer node
+ * @name: link name character string
+ * @media_addr: media address to use when sending messages over link
+ * @timer: link timer
+ * @net: pointer to namespace struct
+ * @refcnt: reference counter for permanent references (owner node & timer)
+ * @peer_session: link session # being used by peer end of link
+ * @peer_bearer_id: bearer id used by link's peer endpoint
+ * @bearer_id: local bearer id used by link
+ * @tolerance: minimum link continuity loss needed to reset link [in ms]
+ * @keepalive_intv: link keepalive timer interval
+ * @abort_limit: # of unacknowledged continuity probes needed to reset link
+ * @state: current state of link FSM
+ * @peer_caps: bitmap describing capabilities of peer node
+ * @silent_intv_cnt: # of timer intervals without any reception from peer
+ * @proto_msg: template for control messages generated by link
+ * @pmsg: convenience pointer to "proto_msg" field
+ * @priority: current link priority
+ * @net_plane: current link network plane ('A' through 'H')
+ * @backlog_limit: backlog queue congestion thresholds (indexed by importance)
+ * @exp_msg_count: # of tunnelled messages expected during link changeover
+ * @reset_rcv_checkpt: seq # of last acknowledged message at time of link reset
+ * @mtu: current maximum packet size for this link
+ * @advertised_mtu: advertised own mtu when link is being established
+ * @transmitq: queue for sent, non-acked messages
+ * @backlogq: queue for messages waiting to be sent
+ * @snt_nxt: next sequence number to use for outbound messages
+ * @last_retransmitted: sequence number of most recently retransmitted message
+ * @stale_count: # of identical retransmit requests made by peer
+ * @ackers: # of peers that needs to ack each packet before it can be released
+ * @acked: # last packet acked by a certain peer. Used for broadcast.
+ * @rcv_nxt: next sequence number to expect for inbound messages
+ * @deferred_queue: deferred queue saved OOS b'cast message received from node
+ * @unacked_window: # of inbound messages rx'd without ack'ing back to peer
+ * @inputq: buffer queue for messages to be delivered upwards
+ * @namedq: buffer queue for name table messages to be delivered upwards
+ * @next_out: ptr to first unsent outbound message in queue
+ * @wakeupq: linked list of wakeup msgs waiting for link congestion to abate
+ * @long_msg_seq_no: next identifier to use for outbound fragmented messages
+ * @reasm_buf: head of partially reassembled inbound message fragments
+ * @bc_rcvr: marks that this is a broadcast receiver link
+ * @stats: collects statistics regarding link activity
+ */
+struct tipc_link {
+	u32 addr;
+	char name[TIPC_MAX_LINK_NAME];
+	struct tipc_media_addr *media_addr;
+	struct net *net;
+
+	/* Management and link supervision data */
+	u32 peer_session;
+	u32 peer_bearer_id;
+	u32 bearer_id;
+	u32 tolerance;
+	unsigned long keepalive_intv;
+	u32 abort_limit;
+	u32 state;
+	u16 peer_caps;
+	bool active;
+	u32 silent_intv_cnt;
+	struct {
+		unchar hdr[INT_H_SIZE];
+		unchar body[TIPC_MAX_IF_NAME];
+	} proto_msg;
+	struct tipc_msg *pmsg;
+	u32 priority;
+	char net_plane;
+
+	/* Failover/synch */
+	u16 drop_point;
+	struct sk_buff *failover_reasm_skb;
+
+	/* Max packet negotiation */
+	u16 mtu;
+	u16 advertised_mtu;
+
+	/* Sending */
+	struct sk_buff_head transmq;
+	struct sk_buff_head backlogq;
+	struct {
+		u16 len;
+		u16 limit;
+	} backlog[5];
+	u16 snd_nxt;
+	u16 last_retransm;
+	u16 window;
+	u32 stale_count;
+
+	/* Reception */
+	u16 rcv_nxt;
+	u32 rcv_unacked;
+	struct sk_buff_head deferdq;
+	struct sk_buff_head *inputq;
+	struct sk_buff_head *namedq;
+
+	/* Congestion handling */
+	struct sk_buff_head wakeupq;
+
+	/* Fragmentation/reassembly */
+	struct sk_buff *reasm_buf;
+
+	/* Broadcast */
+	u16 ackers;
+	u16 acked;
+	struct tipc_link *bc_rcvlink;
+	struct tipc_link *bc_sndlink;
+	int nack_state;
+	bool bc_peer_is_up;
+
+	/* Statistics */
+	struct tipc_stats stats;
+};
+
 /*
  * Error message prefixes
  */
 static const char *link_co_err = "Link tunneling error, ";
 static const char *link_rst_msg = "Resetting link ";
-static const char tipc_bclink_name[] = "broadcast-link";
-
-static const struct nla_policy tipc_nl_link_policy[TIPC_NLA_LINK_MAX + 1] = {
-	[TIPC_NLA_LINK_UNSPEC]		= { .type = NLA_UNSPEC },
-	[TIPC_NLA_LINK_NAME] = {
-		.type = NLA_STRING,
-		.len = TIPC_MAX_LINK_NAME
-	},
-	[TIPC_NLA_LINK_MTU]		= { .type = NLA_U32 },
-	[TIPC_NLA_LINK_BROADCAST]	= { .type = NLA_FLAG },
-	[TIPC_NLA_LINK_UP]		= { .type = NLA_FLAG },
-	[TIPC_NLA_LINK_ACTIVE]		= { .type = NLA_FLAG },
-	[TIPC_NLA_LINK_PROP]		= { .type = NLA_NESTED },
-	[TIPC_NLA_LINK_STATS]		= { .type = NLA_NESTED },
-	[TIPC_NLA_LINK_RX]		= { .type = NLA_U32 },
-	[TIPC_NLA_LINK_TX]		= { .type = NLA_U32 }
-};
 
 /* Properties valid for media, bearar and link */
 static const struct nla_policy tipc_nl_prop_policy[TIPC_NLA_PROP_MAX + 1] = {
@@ -117,8 +245,7 @@ static int tipc_link_proto_rcv(struct tipc_link *l, struct sk_buff *skb,
 static void tipc_link_build_proto_msg(struct tipc_link *l, int mtyp, bool probe,
 				      u16 rcvgap, int tolerance, int priority,
 				      struct sk_buff_head *xmitq);
-static void link_reset_statistics(struct tipc_link *l_ptr);
-static void link_print(struct tipc_link *l_ptr, const char *str);
+static void link_print(struct tipc_link *l, const char *str);
 static void tipc_link_build_nack_msg(struct tipc_link *l,
 				     struct sk_buff_head *xmitq);
 static void tipc_link_build_bc_init_msg(struct tipc_link *l,
@@ -183,6 +310,36 @@ void tipc_link_set_active(struct tipc_link *l, bool active)
 	l->active = active;
 }
 
+u32 tipc_link_id(struct tipc_link *l)
+{
+	return l->peer_bearer_id << 16 | l->bearer_id;
+}
+
+int tipc_link_window(struct tipc_link *l)
+{
+	return l->window;
+}
+
+int tipc_link_prio(struct tipc_link *l)
+{
+	return l->priority;
+}
+
+unsigned long tipc_link_tolerance(struct tipc_link *l)
+{
+	return l->tolerance;
+}
+
+struct sk_buff_head *tipc_link_inputq(struct tipc_link *l)
+{
+	return l->inputq;
+}
+
+char tipc_link_plane(struct tipc_link *l)
+{
+	return l->net_plane;
+}
+
 void tipc_link_add_bc_peer(struct tipc_link *snd_l,
 			   struct tipc_link *uc_l,
 			   struct sk_buff_head *xmitq)
@@ -225,11 +382,31 @@ int tipc_link_mtu(struct tipc_link *l)
 	return l->mtu;
 }
 
+u16 tipc_link_rcv_nxt(struct tipc_link *l)
+{
+	return l->rcv_nxt;
+}
+
+u16 tipc_link_acked(struct tipc_link *l)
+{
+	return l->acked;
+}
+
+char *tipc_link_name(struct tipc_link *l)
+{
+	return l->name;
+}
+
 static u32 link_own_addr(struct tipc_link *l)
 {
 	return msg_prevnode(l->pmsg);
 }
 
+void tipc_link_reinit(struct tipc_link *l, u32 addr)
+{
+	msg_set_prevnode(l->pmsg, addr);
+}
+
 /**
  * tipc_link_create - create a new link
  * @n: pointer to associated node
@@ -692,7 +869,7 @@ void tipc_link_reset(struct tipc_link *l)
 	l->stats.recv_info = 0;
 	l->stale_count = 0;
 	l->bc_peer_is_up = false;
-	link_reset_statistics(l);
+	tipc_link_reset_stats(l);
 }
 
 /**
@@ -1085,8 +1262,9 @@ int tipc_link_rcv(struct tipc_link *l, struct sk_buff *skb,
 /*
  * Send protocol message to the other endpoint.
  */
-void tipc_link_proto_xmit(struct tipc_link *l, u32 msg_typ, int probe_msg,
-			  u32 gap, u32 tolerance, u32 priority)
+static void tipc_link_proto_xmit(struct tipc_link *l, u32 msg_typ,
+				 int probe_msg, u32 gap, u32 tolerance,
+				 u32 priority)
 {
 	struct sk_buff *skb = NULL;
 	struct sk_buff_head xmitq;
@@ -1260,6 +1438,8 @@ static int tipc_link_proto_rcv(struct tipc_link *l, struct sk_buff *skb,
 		/* fall thru' */
 
 	case ACTIVATE_MSG:
+		skb_linearize(skb);
+		hdr = buf_msg(skb);
 
 		/* Complete own link name with peer's interface name */
 		if_name =  strrchr(l->name, ':') + 1;
@@ -1525,53 +1705,17 @@ void tipc_link_set_queue_limits(struct tipc_link *l, u32 win)
 	l->backlog[TIPC_SYSTEM_IMPORTANCE].limit   = max_bulk;
 }
 
-/* tipc_link_find_owner - locate owner node of link by link's name
- * @net: the applicable net namespace
- * @name: pointer to link name string
- * @bearer_id: pointer to index in 'node->links' array where the link was found.
- *
- * Returns pointer to node owning the link, or 0 if no matching link is found.
- */
-static struct tipc_node *tipc_link_find_owner(struct net *net,
-					      const char *link_name,
-					      unsigned int *bearer_id)
-{
-	struct tipc_net *tn = net_generic(net, tipc_net_id);
-	struct tipc_link *l_ptr;
-	struct tipc_node *n_ptr;
-	struct tipc_node *found_node = NULL;
-	int i;
-
-	*bearer_id = 0;
-	rcu_read_lock();
-	list_for_each_entry_rcu(n_ptr, &tn->node_list, list) {
-		tipc_node_lock(n_ptr);
-		for (i = 0; i < MAX_BEARERS; i++) {
-			l_ptr = n_ptr->links[i].link;
-			if (l_ptr && !strcmp(l_ptr->name, link_name)) {
-				*bearer_id = i;
-				found_node = n_ptr;
-				break;
-			}
-		}
-		tipc_node_unlock(n_ptr);
-		if (found_node)
-			break;
-	}
-	rcu_read_unlock();
-
-	return found_node;
-}
-
 /**
- * link_reset_statistics - reset link statistics
- * @l_ptr: pointer to link
+ * link_reset_stats - reset link statistics
+ * @l: pointer to link
  */
-static void link_reset_statistics(struct tipc_link *l_ptr)
+void tipc_link_reset_stats(struct tipc_link *l)
 {
-	memset(&l_ptr->stats, 0, sizeof(l_ptr->stats));
-	l_ptr->stats.sent_info = l_ptr->snd_nxt;
-	l_ptr->stats.recv_info = l_ptr->rcv_nxt;
+	memset(&l->stats, 0, sizeof(l->stats));
+	if (!link_is_bc_sndlink(l)) {
+		l->stats.sent_info = l->snd_nxt;
+		l->stats.recv_info = l->rcv_nxt;
+	}
 }
 
 static void link_print(struct tipc_link *l, const char *str)
@@ -1624,84 +1768,6 @@ int tipc_nl_parse_link_prop(struct nlattr *prop, struct nlattr *props[])
 	return 0;
 }
 
-int tipc_nl_link_set(struct sk_buff *skb, struct genl_info *info)
-{
-	int err;
-	int res = 0;
-	int bearer_id;
-	char *name;
-	struct tipc_link *link;
-	struct tipc_node *node;
-	struct nlattr *attrs[TIPC_NLA_LINK_MAX + 1];
-	struct net *net = sock_net(skb->sk);
-
-	if (!info->attrs[TIPC_NLA_LINK])
-		return -EINVAL;
-
-	err = nla_parse_nested(attrs, TIPC_NLA_LINK_MAX,
-			       info->attrs[TIPC_NLA_LINK],
-			       tipc_nl_link_policy);
-	if (err)
-		return err;
-
-	if (!attrs[TIPC_NLA_LINK_NAME])
-		return -EINVAL;
-
-	name = nla_data(attrs[TIPC_NLA_LINK_NAME]);
-
-	if (strcmp(name, tipc_bclink_name) == 0)
-		return tipc_nl_bc_link_set(net, attrs);
-
-	node = tipc_link_find_owner(net, name, &bearer_id);
-	if (!node)
-		return -EINVAL;
-
-	tipc_node_lock(node);
-
-	link = node->links[bearer_id].link;
-	if (!link) {
-		res = -EINVAL;
-		goto out;
-	}
-
-	if (attrs[TIPC_NLA_LINK_PROP]) {
-		struct nlattr *props[TIPC_NLA_PROP_MAX + 1];
-
-		err = tipc_nl_parse_link_prop(attrs[TIPC_NLA_LINK_PROP],
-					      props);
-		if (err) {
-			res = err;
-			goto out;
-		}
-
-		if (props[TIPC_NLA_PROP_TOL]) {
-			u32 tol;
-
-			tol = nla_get_u32(props[TIPC_NLA_PROP_TOL]);
-			link->tolerance = tol;
-			tipc_link_proto_xmit(link, STATE_MSG, 0, 0, tol, 0);
-		}
-		if (props[TIPC_NLA_PROP_PRIO]) {
-			u32 prio;
-
-			prio = nla_get_u32(props[TIPC_NLA_PROP_PRIO]);
-			link->priority = prio;
-			tipc_link_proto_xmit(link, STATE_MSG, 0, 0, 0, prio);
-		}
-		if (props[TIPC_NLA_PROP_WIN]) {
-			u32 win;
-
-			win = nla_get_u32(props[TIPC_NLA_PROP_WIN]);
-			tipc_link_set_queue_limits(link, win);
-		}
-	}
-
-out:
-	tipc_node_unlock(node);
-
-	return res;
-}
-
 static int __tipc_nl_add_stats(struct sk_buff *skb, struct tipc_stats *s)
 {
 	int i;
@@ -1768,8 +1834,8 @@ static int __tipc_nl_add_stats(struct sk_buff *skb, struct tipc_stats *s)
 }
 
 /* Caller should hold appropriate locks to protect the link */
-static int __tipc_nl_add_link(struct net *net, struct tipc_nl_msg *msg,
-			      struct tipc_link *link, int nlflags)
+int __tipc_nl_add_link(struct net *net, struct tipc_nl_msg *msg,
+		       struct tipc_link *link, int nlflags)
 {
 	int err;
 	void *hdr;
@@ -1838,198 +1904,134 @@ static int __tipc_nl_add_link(struct net *net, struct tipc_nl_msg *msg,
 	return -EMSGSIZE;
 }
 
-/* Caller should hold node lock  */
-static int __tipc_nl_add_node_links(struct net *net, struct tipc_nl_msg *msg,
-				    struct tipc_node *node, u32 *prev_link)
+static int __tipc_nl_add_bc_link_stat(struct sk_buff *skb,
+				      struct tipc_stats *stats)
 {
-	u32 i;
-	int err;
-
-	for (i = *prev_link; i < MAX_BEARERS; i++) {
-		*prev_link = i;
-
-		if (!node->links[i].link)
-			continue;
+	int i;
+	struct nlattr *nest;
 
-		err = __tipc_nl_add_link(net, msg,
-					 node->links[i].link, NLM_F_MULTI);
-		if (err)
-			return err;
-	}
-	*prev_link = 0;
+	struct nla_map {
+		__u32 key;
+		__u32 val;
+	};
 
-	return 0;
-}
+	struct nla_map map[] = {
+		{TIPC_NLA_STATS_RX_INFO, stats->recv_info},
+		{TIPC_NLA_STATS_RX_FRAGMENTS, stats->recv_fragments},
+		{TIPC_NLA_STATS_RX_FRAGMENTED, stats->recv_fragmented},
+		{TIPC_NLA_STATS_RX_BUNDLES, stats->recv_bundles},
+		{TIPC_NLA_STATS_RX_BUNDLED, stats->recv_bundled},
+		{TIPC_NLA_STATS_TX_INFO, stats->sent_info},
+		{TIPC_NLA_STATS_TX_FRAGMENTS, stats->sent_fragments},
+		{TIPC_NLA_STATS_TX_FRAGMENTED, stats->sent_fragmented},
+		{TIPC_NLA_STATS_TX_BUNDLES, stats->sent_bundles},
+		{TIPC_NLA_STATS_TX_BUNDLED, stats->sent_bundled},
+		{TIPC_NLA_STATS_RX_NACKS, stats->recv_nacks},
+		{TIPC_NLA_STATS_RX_DEFERRED, stats->deferred_recv},
+		{TIPC_NLA_STATS_TX_NACKS, stats->sent_nacks},
+		{TIPC_NLA_STATS_TX_ACKS, stats->sent_acks},
+		{TIPC_NLA_STATS_RETRANSMITTED, stats->retransmitted},
+		{TIPC_NLA_STATS_DUPLICATES, stats->duplicates},
+		{TIPC_NLA_STATS_LINK_CONGS, stats->link_congs},
+		{TIPC_NLA_STATS_MAX_QUEUE, stats->max_queue_sz},
+		{TIPC_NLA_STATS_AVG_QUEUE, stats->queue_sz_counts ?
+			(stats->accu_queue_sz / stats->queue_sz_counts) : 0}
+	};
 
-int tipc_nl_link_dump(struct sk_buff *skb, struct netlink_callback *cb)
-{
-	struct net *net = sock_net(skb->sk);
-	struct tipc_net *tn = net_generic(net, tipc_net_id);
-	struct tipc_node *node;
-	struct tipc_nl_msg msg;
-	u32 prev_node = cb->args[0];
-	u32 prev_link = cb->args[1];
-	int done = cb->args[2];
-	int err;
+	nest = nla_nest_start(skb, TIPC_NLA_LINK_STATS);
+	if (!nest)
+		return -EMSGSIZE;
 
-	if (done)
-		return 0;
+	for (i = 0; i <  ARRAY_SIZE(map); i++)
+		if (nla_put_u32(skb, map[i].key, map[i].val))
+			goto msg_full;
 
-	msg.skb = skb;
-	msg.portid = NETLINK_CB(cb->skb).portid;
-	msg.seq = cb->nlh->nlmsg_seq;
-
-	rcu_read_lock();
-	if (prev_node) {
-		node = tipc_node_find(net, prev_node);
-		if (!node) {
-			/* We never set seq or call nl_dump_check_consistent()
-			 * this means that setting prev_seq here will cause the
-			 * consistence check to fail in the netlink callback
-			 * handler. Resulting in the last NLMSG_DONE message
-			 * having the NLM_F_DUMP_INTR flag set.
-			 */
-			cb->prev_seq = 1;
-			goto out;
-		}
-		tipc_node_put(node);
-
-		list_for_each_entry_continue_rcu(node, &tn->node_list,
-						 list) {
-			tipc_node_lock(node);
-			err = __tipc_nl_add_node_links(net, &msg, node,
-						       &prev_link);
-			tipc_node_unlock(node);
-			if (err)
-				goto out;
-
-			prev_node = node->addr;
-		}
-	} else {
-		err = tipc_nl_add_bc_link(net, &msg);
-		if (err)
-			goto out;
-
-		list_for_each_entry_rcu(node, &tn->node_list, list) {
-			tipc_node_lock(node);
-			err = __tipc_nl_add_node_links(net, &msg, node,
-						       &prev_link);
-			tipc_node_unlock(node);
-			if (err)
-				goto out;
-
-			prev_node = node->addr;
-		}
-	}
-	done = 1;
-out:
-	rcu_read_unlock();
+	nla_nest_end(skb, nest);
 
-	cb->args[0] = prev_node;
-	cb->args[1] = prev_link;
-	cb->args[2] = done;
+	return 0;
+msg_full:
+	nla_nest_cancel(skb, nest);
 
-	return skb->len;
+	return -EMSGSIZE;
 }
 
-int tipc_nl_link_get(struct sk_buff *skb, struct genl_info *info)
+int tipc_nl_add_bc_link(struct net *net, struct tipc_nl_msg *msg)
 {
-	struct net *net = genl_info_net(info);
-	struct tipc_nl_msg msg;
-	char *name;
 	int err;
+	void *hdr;
+	struct nlattr *attrs;
+	struct nlattr *prop;
+	struct tipc_net *tn = net_generic(net, tipc_net_id);
+	struct tipc_link *bcl = tn->bcl;
 
-	msg.portid = info->snd_portid;
-	msg.seq = info->snd_seq;
-
-	if (!info->attrs[TIPC_NLA_LINK_NAME])
-		return -EINVAL;
-	name = nla_data(info->attrs[TIPC_NLA_LINK_NAME]);
+	if (!bcl)
+		return 0;
 
-	msg.skb = nlmsg_new(NLMSG_GOODSIZE, GFP_KERNEL);
-	if (!msg.skb)
-		return -ENOMEM;
+	tipc_bcast_lock(net);
 
-	if (strcmp(name, tipc_bclink_name) == 0) {
-		err = tipc_nl_add_bc_link(net, &msg);
-		if (err) {
-			nlmsg_free(msg.skb);
-			return err;
-		}
-	} else {
-		int bearer_id;
-		struct tipc_node *node;
-		struct tipc_link *link;
+	hdr = genlmsg_put(msg->skb, msg->portid, msg->seq, &tipc_genl_family,
+			  NLM_F_MULTI, TIPC_NL_LINK_GET);
+	if (!hdr)
+		return -EMSGSIZE;
 
-		node = tipc_link_find_owner(net, name, &bearer_id);
-		if (!node)
-			return -EINVAL;
+	attrs = nla_nest_start(msg->skb, TIPC_NLA_LINK);
+	if (!attrs)
+		goto msg_full;
 
-		tipc_node_lock(node);
-		link = node->links[bearer_id].link;
-		if (!link) {
-			tipc_node_unlock(node);
-			nlmsg_free(msg.skb);
-			return -EINVAL;
-		}
+	/* The broadcast link is always up */
+	if (nla_put_flag(msg->skb, TIPC_NLA_LINK_UP))
+		goto attr_msg_full;
 
-		err = __tipc_nl_add_link(net, &msg, link, 0);
-		tipc_node_unlock(node);
-		if (err) {
-			nlmsg_free(msg.skb);
-			return err;
-		}
-	}
+	if (nla_put_flag(msg->skb, TIPC_NLA_LINK_BROADCAST))
+		goto attr_msg_full;
+	if (nla_put_string(msg->skb, TIPC_NLA_LINK_NAME, bcl->name))
+		goto attr_msg_full;
+	if (nla_put_u32(msg->skb, TIPC_NLA_LINK_RX, bcl->rcv_nxt))
+		goto attr_msg_full;
+	if (nla_put_u32(msg->skb, TIPC_NLA_LINK_TX, bcl->snd_nxt))
+		goto attr_msg_full;
 
-	return genlmsg_reply(msg.skb, info);
-}
+	prop = nla_nest_start(msg->skb, TIPC_NLA_LINK_PROP);
+	if (!prop)
+		goto attr_msg_full;
+	if (nla_put_u32(msg->skb, TIPC_NLA_PROP_WIN, bcl->window))
+		goto prop_msg_full;
+	nla_nest_end(msg->skb, prop);
 
-int tipc_nl_link_reset_stats(struct sk_buff *skb, struct genl_info *info)
-{
-	int err;
-	char *link_name;
-	unsigned int bearer_id;
-	struct tipc_link *link;
-	struct tipc_node *node;
-	struct nlattr *attrs[TIPC_NLA_LINK_MAX + 1];
-	struct net *net = sock_net(skb->sk);
-
-	if (!info->attrs[TIPC_NLA_LINK])
-		return -EINVAL;
-
-	err = nla_parse_nested(attrs, TIPC_NLA_LINK_MAX,
-			       info->attrs[TIPC_NLA_LINK],
-			       tipc_nl_link_policy);
+	err = __tipc_nl_add_bc_link_stat(msg->skb, &bcl->stats);
 	if (err)
-		return err;
-
-	if (!attrs[TIPC_NLA_LINK_NAME])
-		return -EINVAL;
-
-	link_name = nla_data(attrs[TIPC_NLA_LINK_NAME]);
+		goto attr_msg_full;
 
-	if (strcmp(link_name, tipc_bclink_name) == 0) {
-		err = tipc_bclink_reset_stats(net);
-		if (err)
-			return err;
-		return 0;
-	}
+	tipc_bcast_unlock(net);
+	nla_nest_end(msg->skb, attrs);
+	genlmsg_end(msg->skb, hdr);
 
-	node = tipc_link_find_owner(net, link_name, &bearer_id);
-	if (!node)
-		return -EINVAL;
+	return 0;
 
-	tipc_node_lock(node);
+prop_msg_full:
+	nla_nest_cancel(msg->skb, prop);
+attr_msg_full:
+	nla_nest_cancel(msg->skb, attrs);
+msg_full:
+	tipc_bcast_unlock(net);
+	genlmsg_cancel(msg->skb, hdr);
 
-	link = node->links[bearer_id].link;
-	if (!link) {
-		tipc_node_unlock(node);
-		return -EINVAL;
-	}
+	return -EMSGSIZE;
+}
 
-	link_reset_statistics(link);
+void tipc_link_set_tolerance(struct tipc_link *l, u32 tol)
+{
+	l->tolerance = tol;
+	tipc_link_proto_xmit(l, STATE_MSG, 0, 0, tol, 0);
+}
 
-	tipc_node_unlock(node);
+void tipc_link_set_prio(struct tipc_link *l, u32 prio)
+{
+	l->priority = prio;
+	tipc_link_proto_xmit(l, STATE_MSG, 0, 0, 0, prio);
+}
 
-	return 0;
+void tipc_link_set_abort_limit(struct tipc_link *l, u32 limit)
+{
+	l->abort_limit = limit;
 }
diff --git a/net/tipc/link.h b/net/tipc/link.h
index 66d859b66c84f..b2ae0f4276afd 100644
--- a/net/tipc/link.h
+++ b/net/tipc/link.h
@@ -45,10 +45,6 @@
 */
 #define ELINKCONG EAGAIN	/* link congestion <=> resource unavailable */
 
-/* Out-of-range value for link sequence numbers
- */
-#define INVALID_LINK_SEQ 0x10000
-
 /* Link FSM events:
  */
 enum {
@@ -75,151 +71,6 @@ enum {
  */
 #define MAX_PKT_DEFAULT 1500
 
-struct tipc_stats {
-	u32 sent_info;		/* used in counting # sent packets */
-	u32 recv_info;		/* used in counting # recv'd packets */
-	u32 sent_states;
-	u32 recv_states;
-	u32 sent_probes;
-	u32 recv_probes;
-	u32 sent_nacks;
-	u32 recv_nacks;
-	u32 sent_acks;
-	u32 sent_bundled;
-	u32 sent_bundles;
-	u32 recv_bundled;
-	u32 recv_bundles;
-	u32 retransmitted;
-	u32 sent_fragmented;
-	u32 sent_fragments;
-	u32 recv_fragmented;
-	u32 recv_fragments;
-	u32 link_congs;		/* # port sends blocked by congestion */
-	u32 deferred_recv;
-	u32 duplicates;
-	u32 max_queue_sz;	/* send queue size high water mark */
-	u32 accu_queue_sz;	/* used for send queue size profiling */
-	u32 queue_sz_counts;	/* used for send queue size profiling */
-	u32 msg_length_counts;	/* used for message length profiling */
-	u32 msg_lengths_total;	/* used for message length profiling */
-	u32 msg_length_profile[7]; /* used for msg. length profiling */
-};
-
-/**
- * struct tipc_link - TIPC link data structure
- * @addr: network address of link's peer node
- * @name: link name character string
- * @media_addr: media address to use when sending messages over link
- * @timer: link timer
- * @net: pointer to namespace struct
- * @refcnt: reference counter for permanent references (owner node & timer)
- * @peer_session: link session # being used by peer end of link
- * @peer_bearer_id: bearer id used by link's peer endpoint
- * @bearer_id: local bearer id used by link
- * @tolerance: minimum link continuity loss needed to reset link [in ms]
- * @keepalive_intv: link keepalive timer interval
- * @abort_limit: # of unacknowledged continuity probes needed to reset link
- * @state: current state of link FSM
- * @peer_caps: bitmap describing capabilities of peer node
- * @silent_intv_cnt: # of timer intervals without any reception from peer
- * @proto_msg: template for control messages generated by link
- * @pmsg: convenience pointer to "proto_msg" field
- * @priority: current link priority
- * @net_plane: current link network plane ('A' through 'H')
- * @backlog_limit: backlog queue congestion thresholds (indexed by importance)
- * @exp_msg_count: # of tunnelled messages expected during link changeover
- * @reset_rcv_checkpt: seq # of last acknowledged message at time of link reset
- * @mtu: current maximum packet size for this link
- * @advertised_mtu: advertised own mtu when link is being established
- * @transmitq: queue for sent, non-acked messages
- * @backlogq: queue for messages waiting to be sent
- * @snt_nxt: next sequence number to use for outbound messages
- * @last_retransmitted: sequence number of most recently retransmitted message
- * @stale_count: # of identical retransmit requests made by peer
- * @ackers: # of peers that needs to ack each packet before it can be released
- * @acked: # last packet acked by a certain peer. Used for broadcast.
- * @rcv_nxt: next sequence number to expect for inbound messages
- * @deferred_queue: deferred queue saved OOS b'cast message received from node
- * @unacked_window: # of inbound messages rx'd without ack'ing back to peer
- * @inputq: buffer queue for messages to be delivered upwards
- * @namedq: buffer queue for name table messages to be delivered upwards
- * @next_out: ptr to first unsent outbound message in queue
- * @wakeupq: linked list of wakeup msgs waiting for link congestion to abate
- * @long_msg_seq_no: next identifier to use for outbound fragmented messages
- * @reasm_buf: head of partially reassembled inbound message fragments
- * @bc_rcvr: marks that this is a broadcast receiver link
- * @stats: collects statistics regarding link activity
- */
-struct tipc_link {
-	u32 addr;
-	char name[TIPC_MAX_LINK_NAME];
-	struct tipc_media_addr *media_addr;
-	struct net *net;
-
-	/* Management and link supervision data */
-	u32 peer_session;
-	u32 peer_bearer_id;
-	u32 bearer_id;
-	u32 tolerance;
-	unsigned long keepalive_intv;
-	u32 abort_limit;
-	u32 state;
-	u16 peer_caps;
-	bool active;
-	u32 silent_intv_cnt;
-	struct {
-		unchar hdr[INT_H_SIZE];
-		unchar body[TIPC_MAX_IF_NAME];
-	} proto_msg;
-	struct tipc_msg *pmsg;
-	u32 priority;
-	char net_plane;
-
-	/* Failover/synch */
-	u16 drop_point;
-	struct sk_buff *failover_reasm_skb;
-
-	/* Max packet negotiation */
-	u16 mtu;
-	u16 advertised_mtu;
-
-	/* Sending */
-	struct sk_buff_head transmq;
-	struct sk_buff_head backlogq;
-	struct {
-		u16 len;
-		u16 limit;
-	} backlog[5];
-	u16 snd_nxt;
-	u16 last_retransm;
-	u16 window;
-	u32 stale_count;
-
-	/* Reception */
-	u16 rcv_nxt;
-	u32 rcv_unacked;
-	struct sk_buff_head deferdq;
-	struct sk_buff_head *inputq;
-	struct sk_buff_head *namedq;
-
-	/* Congestion handling */
-	struct sk_buff_head wakeupq;
-
-	/* Fragmentation/reassembly */
-	struct sk_buff *reasm_buf;
-
-	/* Broadcast */
-	u16 ackers;
-	u16 acked;
-	struct tipc_link *bc_rcvlink;
-	struct tipc_link *bc_sndlink;
-	int nack_state;
-	bool bc_peer_is_up;
-
-	/* Statistics */
-	struct tipc_stats stats;
-};
-
 bool tipc_link_create(struct net *net, char *if_name, int bearer_id,
 		      int tolerance, char net_plane, u32 mtu, int priority,
 		      int window, u32 session, u32 ownnode, u32 peer,
@@ -235,11 +86,11 @@ bool tipc_link_bc_create(struct net *net, u32 ownnode, u32 peer,
 			 struct sk_buff_head *namedq,
 			 struct tipc_link *bc_sndlink,
 			 struct tipc_link **link);
+void tipc_link_reinit(struct tipc_link *l, u32 addr);
 void tipc_link_tnl_prepare(struct tipc_link *l, struct tipc_link *tnl,
 			   int mtyp, struct sk_buff_head *xmitq);
 void tipc_link_build_reset_msg(struct tipc_link *l, struct sk_buff_head *xmitq);
 int tipc_link_fsm_evt(struct tipc_link *l, int evt);
-void tipc_link_reset_fragments(struct tipc_link *l_ptr);
 bool tipc_link_is_up(struct tipc_link *l);
 bool tipc_link_peer_is_down(struct tipc_link *l);
 bool tipc_link_is_reset(struct tipc_link *l);
@@ -248,15 +99,25 @@ bool tipc_link_is_synching(struct tipc_link *l);
 bool tipc_link_is_failingover(struct tipc_link *l);
 bool tipc_link_is_blocked(struct tipc_link *l);
 void tipc_link_set_active(struct tipc_link *l, bool active);
-void tipc_link_reset(struct tipc_link *l_ptr);
-int tipc_link_xmit(struct tipc_link *link,	struct sk_buff_head *list,
+void tipc_link_reset(struct tipc_link *l);
+void tipc_link_reset_stats(struct tipc_link *l);
+int tipc_link_xmit(struct tipc_link *link, struct sk_buff_head *list,
 		   struct sk_buff_head *xmitq);
+struct sk_buff_head *tipc_link_inputq(struct tipc_link *l);
+u16 tipc_link_rcv_nxt(struct tipc_link *l);
+u16 tipc_link_acked(struct tipc_link *l);
+u32 tipc_link_id(struct tipc_link *l);
+char *tipc_link_name(struct tipc_link *l);
+char tipc_link_plane(struct tipc_link *l);
+int tipc_link_prio(struct tipc_link *l);
+int tipc_link_window(struct tipc_link *l);
+unsigned long tipc_link_tolerance(struct tipc_link *l);
+void tipc_link_set_tolerance(struct tipc_link *l, u32 tol);
+void tipc_link_set_prio(struct tipc_link *l, u32 prio);
+void tipc_link_set_abort_limit(struct tipc_link *l, u32 limit);
 void tipc_link_set_queue_limits(struct tipc_link *l, u32 window);
-
-int tipc_nl_link_dump(struct sk_buff *skb, struct netlink_callback *cb);
-int tipc_nl_link_get(struct sk_buff *skb, struct genl_info *info);
-int tipc_nl_link_set(struct sk_buff *skb, struct genl_info *info);
-int tipc_nl_link_reset_stats(struct sk_buff *skb, struct genl_info *info);
+int __tipc_nl_add_link(struct net *net, struct tipc_nl_msg *msg,
+		       struct tipc_link *link, int nlflags);
 int tipc_nl_parse_link_prop(struct nlattr *prop, struct nlattr *props[]);
 int tipc_link_timeout(struct tipc_link *l, struct sk_buff_head *xmitq);
 int tipc_link_rcv(struct tipc_link *l, struct sk_buff *skb,
diff --git a/net/tipc/name_distr.c b/net/tipc/name_distr.c
index c07612bab95c0..ebe9d0ff6e9e9 100644
--- a/net/tipc/name_distr.c
+++ b/net/tipc/name_distr.c
@@ -84,31 +84,6 @@ static struct sk_buff *named_prepare_buf(struct net *net, u32 type, u32 size,
 	return buf;
 }
 
-void named_cluster_distribute(struct net *net, struct sk_buff *skb)
-{
-	struct tipc_net *tn = net_generic(net, tipc_net_id);
-	struct sk_buff *oskb;
-	struct tipc_node *node;
-	u32 dnode;
-
-	rcu_read_lock();
-	list_for_each_entry_rcu(node, &tn->node_list, list) {
-		dnode = node->addr;
-		if (in_own_node(net, dnode))
-			continue;
-		if (!tipc_node_is_up(node))
-			continue;
-		oskb = pskb_copy(skb, GFP_ATOMIC);
-		if (!oskb)
-			break;
-		msg_set_destnode(buf_msg(oskb), dnode);
-		tipc_node_xmit_skb(net, oskb, dnode, 0);
-	}
-	rcu_read_unlock();
-
-	kfree_skb(skb);
-}
-
 /**
  * tipc_named_publish - tell other nodes about a new publication by this node
  */
@@ -226,42 +201,6 @@ void tipc_named_node_up(struct net *net, u32 dnode)
 	tipc_node_xmit(net, &head, dnode, 0);
 }
 
-static void tipc_publ_subscribe(struct net *net, struct publication *publ,
-				u32 addr)
-{
-	struct tipc_node *node;
-
-	if (in_own_node(net, addr))
-		return;
-
-	node = tipc_node_find(net, addr);
-	if (!node) {
-		pr_warn("Node subscription rejected, unknown node 0x%x\n",
-			addr);
-		return;
-	}
-
-	tipc_node_lock(node);
-	list_add_tail(&publ->nodesub_list, &node->publ_list);
-	tipc_node_unlock(node);
-	tipc_node_put(node);
-}
-
-static void tipc_publ_unsubscribe(struct net *net, struct publication *publ,
-				  u32 addr)
-{
-	struct tipc_node *node;
-
-	node = tipc_node_find(net, addr);
-	if (!node)
-		return;
-
-	tipc_node_lock(node);
-	list_del_init(&publ->nodesub_list);
-	tipc_node_unlock(node);
-	tipc_node_put(node);
-}
-
 /**
  * tipc_publ_purge - remove publication associated with a failed node
  *
@@ -277,7 +216,7 @@ static void tipc_publ_purge(struct net *net, struct publication *publ, u32 addr)
 	p = tipc_nametbl_remove_publ(net, publ->type, publ->lower,
 				     publ->node, publ->ref, publ->key);
 	if (p)
-		tipc_publ_unsubscribe(net, p, addr);
+		tipc_node_unsubscribe(net, &p->nodesub_list, addr);
 	spin_unlock_bh(&tn->nametbl_lock);
 
 	if (p != publ) {
@@ -317,7 +256,7 @@ static bool tipc_update_nametbl(struct net *net, struct distr_item *i,
 						TIPC_CLUSTER_SCOPE, node,
 						ntohl(i->ref), ntohl(i->key));
 		if (publ) {
-			tipc_publ_subscribe(net, publ, node);
+			tipc_node_subscribe(net, &publ->nodesub_list, node);
 			return true;
 		}
 	} else if (dtype == WITHDRAWAL) {
@@ -326,7 +265,7 @@ static bool tipc_update_nametbl(struct net *net, struct distr_item *i,
 						node, ntohl(i->ref),
 						ntohl(i->key));
 		if (publ) {
-			tipc_publ_unsubscribe(net, publ, node);
+			tipc_node_unsubscribe(net, &publ->nodesub_list, node);
 			kfree_rcu(publ, rcu);
 			return true;
 		}
@@ -397,6 +336,7 @@ void tipc_named_rcv(struct net *net, struct sk_buff_head *inputq)
 
 	spin_lock_bh(&tn->nametbl_lock);
 	for (skb = skb_dequeue(inputq); skb; skb = skb_dequeue(inputq)) {
+		skb_linearize(skb);
 		msg = buf_msg(skb);
 		mtype = msg_type(msg);
 		item = (struct distr_item *)msg_data(msg);
diff --git a/net/tipc/name_distr.h b/net/tipc/name_distr.h
index dd2d9fd80da2c..1264ba0af9375 100644
--- a/net/tipc/name_distr.h
+++ b/net/tipc/name_distr.h
@@ -69,7 +69,6 @@ struct distr_item {
 
 struct sk_buff *tipc_named_publish(struct net *net, struct publication *publ);
 struct sk_buff *tipc_named_withdraw(struct net *net, struct publication *publ);
-void named_cluster_distribute(struct net *net, struct sk_buff *buf);
 void tipc_named_node_up(struct net *net, u32 dnode);
 void tipc_named_rcv(struct net *net, struct sk_buff_head *msg_queue);
 void tipc_named_reinit(struct net *net);
diff --git a/net/tipc/name_table.c b/net/tipc/name_table.c
index 0f47f08bf38f0..91fce70291a89 100644
--- a/net/tipc/name_table.c
+++ b/net/tipc/name_table.c
@@ -42,6 +42,7 @@
 #include "subscr.h"
 #include "bcast.h"
 #include "addr.h"
+#include "node.h"
 #include <net/genetlink.h>
 
 #define TIPC_NAMETBL_SIZE 1024		/* must be a power of 2 */
@@ -677,7 +678,7 @@ struct publication *tipc_nametbl_publish(struct net *net, u32 type, u32 lower,
 	spin_unlock_bh(&tn->nametbl_lock);
 
 	if (buf)
-		named_cluster_distribute(net, buf);
+		tipc_node_broadcast(net, buf);
 	return publ;
 }
 
@@ -709,7 +710,7 @@ int tipc_nametbl_withdraw(struct net *net, u32 type, u32 lower, u32 ref,
 	spin_unlock_bh(&tn->nametbl_lock);
 
 	if (skb) {
-		named_cluster_distribute(net, skb);
+		tipc_node_broadcast(net, skb);
 		return 1;
 	}
 	return 0;
diff --git a/net/tipc/netlink.c b/net/tipc/netlink.c
index 7f6475efc984d..8975b0135b764 100644
--- a/net/tipc/netlink.c
+++ b/net/tipc/netlink.c
@@ -101,18 +101,18 @@ static const struct genl_ops tipc_genl_v2_ops[] = {
 	},
 	{
 		.cmd	= TIPC_NL_LINK_GET,
-		.doit   = tipc_nl_link_get,
-		.dumpit	= tipc_nl_link_dump,
+		.doit   = tipc_nl_node_get_link,
+		.dumpit	= tipc_nl_node_dump_link,
 		.policy = tipc_nl_policy,
 	},
 	{
 		.cmd	= TIPC_NL_LINK_SET,
-		.doit	= tipc_nl_link_set,
+		.doit	= tipc_nl_node_set_link,
 		.policy = tipc_nl_policy,
 	},
 	{
 		.cmd	= TIPC_NL_LINK_RESET_STATS,
-		.doit   = tipc_nl_link_reset_stats,
+		.doit   = tipc_nl_node_reset_link_stats,
 		.policy = tipc_nl_policy,
 	},
 	{
diff --git a/net/tipc/netlink_compat.c b/net/tipc/netlink_compat.c
index 1eadc95e11329..2c016fdefe977 100644
--- a/net/tipc/netlink_compat.c
+++ b/net/tipc/netlink_compat.c
@@ -1023,25 +1023,25 @@ static int tipc_nl_compat_handle(struct tipc_nl_compat_msg *msg)
 		msg->req_type = TIPC_TLV_LINK_NAME;
 		msg->rep_size = ULTRA_STRING_MAX_LEN;
 		msg->rep_type = TIPC_TLV_ULTRA_STRING;
-		dump.dumpit = tipc_nl_link_dump;
+		dump.dumpit = tipc_nl_node_dump_link;
 		dump.format = tipc_nl_compat_link_stat_dump;
 		return tipc_nl_compat_dumpit(&dump, msg);
 	case TIPC_CMD_GET_LINKS:
 		msg->req_type = TIPC_TLV_NET_ADDR;
 		msg->rep_size = ULTRA_STRING_MAX_LEN;
-		dump.dumpit = tipc_nl_link_dump;
+		dump.dumpit = tipc_nl_node_dump_link;
 		dump.format = tipc_nl_compat_link_dump;
 		return tipc_nl_compat_dumpit(&dump, msg);
 	case TIPC_CMD_SET_LINK_TOL:
 	case TIPC_CMD_SET_LINK_PRI:
 	case TIPC_CMD_SET_LINK_WINDOW:
 		msg->req_type =  TIPC_TLV_LINK_CONFIG;
-		doit.doit = tipc_nl_link_set;
+		doit.doit = tipc_nl_node_set_link;
 		doit.transcode = tipc_nl_compat_link_set;
 		return tipc_nl_compat_doit(&doit, msg);
 	case TIPC_CMD_RESET_LINK_STATS:
 		msg->req_type = TIPC_TLV_LINK_NAME;
-		doit.doit = tipc_nl_link_reset_stats;
+		doit.doit = tipc_nl_node_reset_link_stats;
 		doit.transcode = tipc_nl_compat_link_reset_stats;
 		return tipc_nl_compat_doit(&doit, msg);
 	case TIPC_CMD_SHOW_NAME_TABLE:
diff --git a/net/tipc/node.c b/net/tipc/node.c
index 20cddec0a43c7..3f7a4ed719900 100644
--- a/net/tipc/node.c
+++ b/net/tipc/node.c
@@ -42,6 +42,84 @@
 #include "bcast.h"
 #include "discover.h"
 
+#define INVALID_NODE_SIG	0x10000
+
+/* Flags used to take different actions according to flag type
+ * TIPC_NOTIFY_NODE_DOWN: notify node is down
+ * TIPC_NOTIFY_NODE_UP: notify node is up
+ * TIPC_DISTRIBUTE_NAME: publish or withdraw link state name type
+ */
+enum {
+	TIPC_NOTIFY_NODE_DOWN		= (1 << 3),
+	TIPC_NOTIFY_NODE_UP		= (1 << 4),
+	TIPC_NOTIFY_LINK_UP		= (1 << 6),
+	TIPC_NOTIFY_LINK_DOWN		= (1 << 7)
+};
+
+struct tipc_link_entry {
+	struct tipc_link *link;
+	spinlock_t lock; /* per link */
+	u32 mtu;
+	struct sk_buff_head inputq;
+	struct tipc_media_addr maddr;
+};
+
+struct tipc_bclink_entry {
+	struct tipc_link *link;
+	struct sk_buff_head inputq1;
+	struct sk_buff_head arrvq;
+	struct sk_buff_head inputq2;
+	struct sk_buff_head namedq;
+};
+
+/**
+ * struct tipc_node - TIPC node structure
+ * @addr: network address of node
+ * @ref: reference counter to node object
+ * @lock: rwlock governing access to structure
+ * @net: the applicable net namespace
+ * @hash: links to adjacent nodes in unsorted hash chain
+ * @inputq: pointer to input queue containing messages for msg event
+ * @namedq: pointer to name table input queue with name table messages
+ * @active_links: bearer ids of active links, used as index into links[] array
+ * @links: array containing references to all links to node
+ * @action_flags: bit mask of different types of node actions
+ * @state: connectivity state vs peer node
+ * @sync_point: sequence number where synch/failover is finished
+ * @list: links to adjacent nodes in sorted list of cluster's nodes
+ * @working_links: number of working links to node (both active and standby)
+ * @link_cnt: number of links to node
+ * @capabilities: bitmap, indicating peer node's functional capabilities
+ * @signature: node instance identifier
+ * @link_id: local and remote bearer ids of changing link, if any
+ * @publ_list: list of publications
+ * @rcu: rcu struct for tipc_node
+ */
+struct tipc_node {
+	u32 addr;
+	struct kref kref;
+	rwlock_t lock;
+	struct net *net;
+	struct hlist_node hash;
+	int active_links[2];
+	struct tipc_link_entry links[MAX_BEARERS];
+	struct tipc_bclink_entry bc_entry;
+	int action_flags;
+	struct list_head list;
+	int state;
+	u16 sync_point;
+	int link_cnt;
+	u16 working_links;
+	u16 capabilities;
+	u32 signature;
+	u32 link_id;
+	struct list_head publ_list;
+	struct list_head conn_sks;
+	unsigned long keepalive_intv;
+	struct timer_list timer;
+	struct rcu_head rcu;
+};
+
 /* Node FSM states and events:
  */
 enum {
@@ -75,6 +153,9 @@ static void node_lost_contact(struct tipc_node *n, struct sk_buff_head *inputq);
 static void tipc_node_delete(struct tipc_node *node);
 static void tipc_node_timeout(unsigned long data);
 static void tipc_node_fsm_evt(struct tipc_node *n, int evt);
+static struct tipc_node *tipc_node_find(struct net *net, u32 addr);
+static void tipc_node_put(struct tipc_node *node);
+static bool tipc_node_is_up(struct tipc_node *n);
 
 struct tipc_sock_conn {
 	u32 port;
@@ -83,12 +164,54 @@ struct tipc_sock_conn {
 	struct list_head list;
 };
 
+static const struct nla_policy tipc_nl_link_policy[TIPC_NLA_LINK_MAX + 1] = {
+	[TIPC_NLA_LINK_UNSPEC]		= { .type = NLA_UNSPEC },
+	[TIPC_NLA_LINK_NAME] = {
+		.type = NLA_STRING,
+		.len = TIPC_MAX_LINK_NAME
+	},
+	[TIPC_NLA_LINK_MTU]		= { .type = NLA_U32 },
+	[TIPC_NLA_LINK_BROADCAST]	= { .type = NLA_FLAG },
+	[TIPC_NLA_LINK_UP]		= { .type = NLA_FLAG },
+	[TIPC_NLA_LINK_ACTIVE]		= { .type = NLA_FLAG },
+	[TIPC_NLA_LINK_PROP]		= { .type = NLA_NESTED },
+	[TIPC_NLA_LINK_STATS]		= { .type = NLA_NESTED },
+	[TIPC_NLA_LINK_RX]		= { .type = NLA_U32 },
+	[TIPC_NLA_LINK_TX]		= { .type = NLA_U32 }
+};
+
 static const struct nla_policy tipc_nl_node_policy[TIPC_NLA_NODE_MAX + 1] = {
 	[TIPC_NLA_NODE_UNSPEC]		= { .type = NLA_UNSPEC },
 	[TIPC_NLA_NODE_ADDR]		= { .type = NLA_U32 },
 	[TIPC_NLA_NODE_UP]		= { .type = NLA_FLAG }
 };
 
+static struct tipc_link *node_active_link(struct tipc_node *n, int sel)
+{
+	int bearer_id = n->active_links[sel & 1];
+
+	if (unlikely(bearer_id == INVALID_BEARER_ID))
+		return NULL;
+
+	return n->links[bearer_id].link;
+}
+
+int tipc_node_get_mtu(struct net *net, u32 addr, u32 sel)
+{
+	struct tipc_node *n;
+	int bearer_id;
+	unsigned int mtu = MAX_MSG_SIZE;
+
+	n = tipc_node_find(net, addr);
+	if (unlikely(!n))
+		return mtu;
+
+	bearer_id = n->active_links[sel & 1];
+	if (likely(bearer_id != INVALID_BEARER_ID))
+		mtu = n->links[bearer_id].mtu;
+	tipc_node_put(n);
+	return mtu;
+}
 /*
  * A trivial power-of-two bitmask technique is used for speed, since this
  * operation is done for every incoming TIPC packet. The number of hash table
@@ -107,7 +230,7 @@ static void tipc_node_kref_release(struct kref *kref)
 	tipc_node_delete(node);
 }
 
-void tipc_node_put(struct tipc_node *node)
+static void tipc_node_put(struct tipc_node *node)
 {
 	kref_put(&node->kref, tipc_node_kref_release);
 }
@@ -120,7 +243,7 @@ static void tipc_node_get(struct tipc_node *node)
 /*
  * tipc_node_find - locate specified node object, if it exists
  */
-struct tipc_node *tipc_node_find(struct net *net, u32 addr)
+static struct tipc_node *tipc_node_find(struct net *net, u32 addr)
 {
 	struct tipc_net *tn = net_generic(net, tipc_net_id);
 	struct tipc_node *node;
@@ -141,66 +264,122 @@ struct tipc_node *tipc_node_find(struct net *net, u32 addr)
 	return NULL;
 }
 
+static void tipc_node_read_lock(struct tipc_node *n)
+{
+	read_lock_bh(&n->lock);
+}
+
+static void tipc_node_read_unlock(struct tipc_node *n)
+{
+	read_unlock_bh(&n->lock);
+}
+
+static void tipc_node_write_lock(struct tipc_node *n)
+{
+	write_lock_bh(&n->lock);
+}
+
+static void tipc_node_write_unlock(struct tipc_node *n)
+{
+	struct net *net = n->net;
+	u32 addr = 0;
+	u32 flags = n->action_flags;
+	u32 link_id = 0;
+	struct list_head *publ_list;
+
+	if (likely(!flags)) {
+		write_unlock_bh(&n->lock);
+		return;
+	}
+
+	addr = n->addr;
+	link_id = n->link_id;
+	publ_list = &n->publ_list;
+
+	n->action_flags &= ~(TIPC_NOTIFY_NODE_DOWN | TIPC_NOTIFY_NODE_UP |
+			     TIPC_NOTIFY_LINK_DOWN | TIPC_NOTIFY_LINK_UP);
+
+	write_unlock_bh(&n->lock);
+
+	if (flags & TIPC_NOTIFY_NODE_DOWN)
+		tipc_publ_notify(net, publ_list, addr);
+
+	if (flags & TIPC_NOTIFY_NODE_UP)
+		tipc_named_node_up(net, addr);
+
+	if (flags & TIPC_NOTIFY_LINK_UP)
+		tipc_nametbl_publish(net, TIPC_LINK_STATE, addr, addr,
+				     TIPC_NODE_SCOPE, link_id, addr);
+
+	if (flags & TIPC_NOTIFY_LINK_DOWN)
+		tipc_nametbl_withdraw(net, TIPC_LINK_STATE, addr,
+				      link_id, addr);
+}
+
 struct tipc_node *tipc_node_create(struct net *net, u32 addr, u16 capabilities)
 {
 	struct tipc_net *tn = net_generic(net, tipc_net_id);
-	struct tipc_node *n_ptr, *temp_node;
+	struct tipc_node *n, *temp_node;
+	int i;
 
 	spin_lock_bh(&tn->node_list_lock);
-	n_ptr = tipc_node_find(net, addr);
-	if (n_ptr)
+	n = tipc_node_find(net, addr);
+	if (n)
 		goto exit;
-	n_ptr = kzalloc(sizeof(*n_ptr), GFP_ATOMIC);
-	if (!n_ptr) {
+	n = kzalloc(sizeof(*n), GFP_ATOMIC);
+	if (!n) {
 		pr_warn("Node creation failed, no memory\n");
 		goto exit;
 	}
-	n_ptr->addr = addr;
-	n_ptr->net = net;
-	n_ptr->capabilities = capabilities;
-	kref_init(&n_ptr->kref);
-	spin_lock_init(&n_ptr->lock);
-	INIT_HLIST_NODE(&n_ptr->hash);
-	INIT_LIST_HEAD(&n_ptr->list);
-	INIT_LIST_HEAD(&n_ptr->publ_list);
-	INIT_LIST_HEAD(&n_ptr->conn_sks);
-	skb_queue_head_init(&n_ptr->bc_entry.namedq);
-	skb_queue_head_init(&n_ptr->bc_entry.inputq1);
-	__skb_queue_head_init(&n_ptr->bc_entry.arrvq);
-	skb_queue_head_init(&n_ptr->bc_entry.inputq2);
-	hlist_add_head_rcu(&n_ptr->hash, &tn->node_htable[tipc_hashfn(addr)]);
+	n->addr = addr;
+	n->net = net;
+	n->capabilities = capabilities;
+	kref_init(&n->kref);
+	rwlock_init(&n->lock);
+	INIT_HLIST_NODE(&n->hash);
+	INIT_LIST_HEAD(&n->list);
+	INIT_LIST_HEAD(&n->publ_list);
+	INIT_LIST_HEAD(&n->conn_sks);
+	skb_queue_head_init(&n->bc_entry.namedq);
+	skb_queue_head_init(&n->bc_entry.inputq1);
+	__skb_queue_head_init(&n->bc_entry.arrvq);
+	skb_queue_head_init(&n->bc_entry.inputq2);
+	for (i = 0; i < MAX_BEARERS; i++)
+		spin_lock_init(&n->links[i].lock);
+	hlist_add_head_rcu(&n->hash, &tn->node_htable[tipc_hashfn(addr)]);
 	list_for_each_entry_rcu(temp_node, &tn->node_list, list) {
-		if (n_ptr->addr < temp_node->addr)
+		if (n->addr < temp_node->addr)
 			break;
 	}
-	list_add_tail_rcu(&n_ptr->list, &temp_node->list);
-	n_ptr->state = SELF_DOWN_PEER_LEAVING;
-	n_ptr->signature = INVALID_NODE_SIG;
-	n_ptr->active_links[0] = INVALID_BEARER_ID;
-	n_ptr->active_links[1] = INVALID_BEARER_ID;
-	if (!tipc_link_bc_create(net, tipc_own_addr(net), n_ptr->addr,
-				 U16_MAX, tipc_bc_sndlink(net)->window,
-				 n_ptr->capabilities,
-				 &n_ptr->bc_entry.inputq1,
-				 &n_ptr->bc_entry.namedq,
+	list_add_tail_rcu(&n->list, &temp_node->list);
+	n->state = SELF_DOWN_PEER_LEAVING;
+	n->signature = INVALID_NODE_SIG;
+	n->active_links[0] = INVALID_BEARER_ID;
+	n->active_links[1] = INVALID_BEARER_ID;
+	if (!tipc_link_bc_create(net, tipc_own_addr(net), n->addr,
+				 U16_MAX,
+				 tipc_link_window(tipc_bc_sndlink(net)),
+				 n->capabilities,
+				 &n->bc_entry.inputq1,
+				 &n->bc_entry.namedq,
 				 tipc_bc_sndlink(net),
-				 &n_ptr->bc_entry.link)) {
+				 &n->bc_entry.link)) {
 		pr_warn("Broadcast rcv link creation failed, no memory\n");
-		kfree(n_ptr);
-		n_ptr = NULL;
+		kfree(n);
+		n = NULL;
 		goto exit;
 	}
-	tipc_node_get(n_ptr);
-	setup_timer(&n_ptr->timer, tipc_node_timeout, (unsigned long)n_ptr);
-	n_ptr->keepalive_intv = U32_MAX;
+	tipc_node_get(n);
+	setup_timer(&n->timer, tipc_node_timeout, (unsigned long)n);
+	n->keepalive_intv = U32_MAX;
 exit:
 	spin_unlock_bh(&tn->node_list_lock);
-	return n_ptr;
+	return n;
 }
 
 static void tipc_node_calculate_timer(struct tipc_node *n, struct tipc_link *l)
 {
-	unsigned long tol = l->tolerance;
+	unsigned long tol = tipc_link_tolerance(l);
 	unsigned long intv = ((tol / 4) > 500) ? 500 : tol / 4;
 	unsigned long keepalive_intv = msecs_to_jiffies(intv);
 
@@ -209,7 +388,7 @@ static void tipc_node_calculate_timer(struct tipc_node *n, struct tipc_link *l)
 		n->keepalive_intv = keepalive_intv;
 
 	/* Ensure link's abort limit corresponds to current interval */
-	l->abort_limit = l->tolerance / jiffies_to_msecs(n->keepalive_intv);
+	tipc_link_set_abort_limit(l, tol / jiffies_to_msecs(n->keepalive_intv));
 }
 
 static void tipc_node_delete(struct tipc_node *node)
@@ -234,6 +413,42 @@ void tipc_node_stop(struct net *net)
 	spin_unlock_bh(&tn->node_list_lock);
 }
 
+void tipc_node_subscribe(struct net *net, struct list_head *subscr, u32 addr)
+{
+	struct tipc_node *n;
+
+	if (in_own_node(net, addr))
+		return;
+
+	n = tipc_node_find(net, addr);
+	if (!n) {
+		pr_warn("Node subscribe rejected, unknown node 0x%x\n", addr);
+		return;
+	}
+	tipc_node_write_lock(n);
+	list_add_tail(subscr, &n->publ_list);
+	tipc_node_write_unlock(n);
+	tipc_node_put(n);
+}
+
+void tipc_node_unsubscribe(struct net *net, struct list_head *subscr, u32 addr)
+{
+	struct tipc_node *n;
+
+	if (in_own_node(net, addr))
+		return;
+
+	n = tipc_node_find(net, addr);
+	if (!n) {
+		pr_warn("Node unsubscribe rejected, unknown node 0x%x\n", addr);
+		return;
+	}
+	tipc_node_write_lock(n);
+	list_del_init(subscr);
+	tipc_node_write_unlock(n);
+	tipc_node_put(n);
+}
+
 int tipc_node_add_conn(struct net *net, u32 dnode, u32 port, u32 peer_port)
 {
 	struct tipc_node *node;
@@ -257,9 +472,9 @@ int tipc_node_add_conn(struct net *net, u32 dnode, u32 port, u32 peer_port)
 	conn->port = port;
 	conn->peer_port = peer_port;
 
-	tipc_node_lock(node);
+	tipc_node_write_lock(node);
 	list_add_tail(&conn->list, &node->conn_sks);
-	tipc_node_unlock(node);
+	tipc_node_write_unlock(node);
 exit:
 	tipc_node_put(node);
 	return err;
@@ -277,14 +492,14 @@ void tipc_node_remove_conn(struct net *net, u32 dnode, u32 port)
 	if (!node)
 		return;
 
-	tipc_node_lock(node);
+	tipc_node_write_lock(node);
 	list_for_each_entry_safe(conn, safe, &node->conn_sks, list) {
 		if (port != conn->port)
 			continue;
 		list_del(&conn->list);
 		kfree(conn);
 	}
-	tipc_node_unlock(node);
+	tipc_node_write_unlock(node);
 	tipc_node_put(node);
 }
 
@@ -301,14 +516,16 @@ static void tipc_node_timeout(unsigned long data)
 	__skb_queue_head_init(&xmitq);
 
 	for (bearer_id = 0; bearer_id < MAX_BEARERS; bearer_id++) {
-		tipc_node_lock(n);
+		tipc_node_read_lock(n);
 		le = &n->links[bearer_id];
+		spin_lock_bh(&le->lock);
 		if (le->link) {
 			/* Link tolerance may change asynchronously: */
 			tipc_node_calculate_timer(n, le->link);
 			rc = tipc_link_timeout(le->link, &xmitq);
 		}
-		tipc_node_unlock(n);
+		spin_unlock_bh(&le->lock);
+		tipc_node_read_unlock(n);
 		tipc_bearer_xmit(n->net, bearer_id, &xmitq, &le->maddr);
 		if (rc & TIPC_LINK_DOWN_EVT)
 			tipc_node_link_down(n, bearer_id, false);
@@ -340,16 +557,16 @@ static void __tipc_node_link_up(struct tipc_node *n, int bearer_id,
 
 	n->working_links++;
 	n->action_flags |= TIPC_NOTIFY_LINK_UP;
-	n->link_id = nl->peer_bearer_id << 16 | bearer_id;
+	n->link_id = tipc_link_id(nl);
 
 	/* Leave room for tunnel header when returning 'mtu' to users: */
-	n->links[bearer_id].mtu = nl->mtu - INT_H_SIZE;
+	n->links[bearer_id].mtu = tipc_link_mtu(nl) - INT_H_SIZE;
 
 	tipc_bearer_add_dest(n->net, bearer_id, n->addr);
 	tipc_bcast_inc_bearer_dst_cnt(n->net, bearer_id);
 
 	pr_debug("Established link <%s> on network plane %c\n",
-		 nl->name, nl->net_plane);
+		 tipc_link_name(nl), tipc_link_plane(nl));
 
 	/* First link? => give it both slots */
 	if (!ol) {
@@ -362,17 +579,17 @@ static void __tipc_node_link_up(struct tipc_node *n, int bearer_id,
 	}
 
 	/* Second link => redistribute slots */
-	if (nl->priority > ol->priority) {
-		pr_debug("Old link <%s> becomes standby\n", ol->name);
+	if (tipc_link_prio(nl) > tipc_link_prio(ol)) {
+		pr_debug("Old link <%s> becomes standby\n", tipc_link_name(ol));
 		*slot0 = bearer_id;
 		*slot1 = bearer_id;
 		tipc_link_set_active(nl, true);
 		tipc_link_set_active(ol, false);
-	} else if (nl->priority == ol->priority) {
+	} else if (tipc_link_prio(nl) == tipc_link_prio(ol)) {
 		tipc_link_set_active(nl, true);
 		*slot1 = bearer_id;
 	} else {
-		pr_debug("New link <%s> is standby\n", nl->name);
+		pr_debug("New link <%s> is standby\n", tipc_link_name(nl));
 	}
 
 	/* Prepare synchronization with first link */
@@ -387,9 +604,9 @@ static void __tipc_node_link_up(struct tipc_node *n, int bearer_id,
 static void tipc_node_link_up(struct tipc_node *n, int bearer_id,
 			      struct sk_buff_head *xmitq)
 {
-	tipc_node_lock(n);
+	tipc_node_write_lock(n);
 	__tipc_node_link_up(n, bearer_id, xmitq);
-	tipc_node_unlock(n);
+	tipc_node_write_unlock(n);
 }
 
 /**
@@ -402,7 +619,7 @@ static void __tipc_node_link_down(struct tipc_node *n, int *bearer_id,
 	struct tipc_link_entry *le = &n->links[*bearer_id];
 	int *slot0 = &n->active_links[0];
 	int *slot1 = &n->active_links[1];
-	int i, highest = 0;
+	int i, highest = 0, prio;
 	struct tipc_link *l, *_l, *tnl;
 
 	l = n->links[*bearer_id].link;
@@ -411,12 +628,12 @@ static void __tipc_node_link_down(struct tipc_node *n, int *bearer_id,
 
 	n->working_links--;
 	n->action_flags |= TIPC_NOTIFY_LINK_DOWN;
-	n->link_id = l->peer_bearer_id << 16 | *bearer_id;
+	n->link_id = tipc_link_id(l);
 
 	tipc_bearer_remove_dest(n->net, *bearer_id, n->addr);
 
 	pr_debug("Lost link <%s> on network plane %c\n",
-		 l->name, l->net_plane);
+		 tipc_link_name(l), tipc_link_plane(l));
 
 	/* Select new active link if any available */
 	*slot0 = INVALID_BEARER_ID;
@@ -427,10 +644,11 @@ static void __tipc_node_link_down(struct tipc_node *n, int *bearer_id,
 			continue;
 		if (_l == l)
 			continue;
-		if (_l->priority < highest)
+		prio = tipc_link_prio(_l);
+		if (prio < highest)
 			continue;
-		if (_l->priority > highest) {
-			highest = _l->priority;
+		if (prio > highest) {
+			highest = prio;
 			*slot0 = i;
 			*slot1 = i;
 			continue;
@@ -453,17 +671,17 @@ static void __tipc_node_link_down(struct tipc_node *n, int *bearer_id,
 	tipc_bcast_dec_bearer_dst_cnt(n->net, *bearer_id);
 
 	/* There is still a working link => initiate failover */
-	tnl = node_active_link(n, 0);
+	*bearer_id = n->active_links[0];
+	tnl = n->links[*bearer_id].link;
 	tipc_link_fsm_evt(tnl, LINK_SYNCH_END_EVT);
 	tipc_node_fsm_evt(n, NODE_SYNCH_END_EVT);
-	n->sync_point = tnl->rcv_nxt + (U16_MAX / 2 - 1);
+	n->sync_point = tipc_link_rcv_nxt(tnl) + (U16_MAX / 2 - 1);
 	tipc_link_tnl_prepare(l, tnl, FAILOVER_MSG, xmitq);
 	tipc_link_reset(l);
 	tipc_link_fsm_evt(l, LINK_RESET_EVT);
 	tipc_link_fsm_evt(l, LINK_FAILOVER_BEGIN_EVT);
 	tipc_node_fsm_evt(n, NODE_FAILOVER_BEGIN_EVT);
-	*maddr = &n->links[tnl->bearer_id].maddr;
-	*bearer_id = tnl->bearer_id;
+	*maddr = &n->links[*bearer_id].maddr;
 }
 
 static void tipc_node_link_down(struct tipc_node *n, int bearer_id, bool delete)
@@ -478,7 +696,7 @@ static void tipc_node_link_down(struct tipc_node *n, int bearer_id, bool delete)
 
 	__skb_queue_head_init(&xmitq);
 
-	tipc_node_lock(n);
+	tipc_node_write_lock(n);
 	if (!tipc_link_is_establishing(l)) {
 		__tipc_node_link_down(n, &bearer_id, &xmitq, &maddr);
 		if (delete) {
@@ -490,12 +708,12 @@ static void tipc_node_link_down(struct tipc_node *n, int bearer_id, bool delete)
 		/* Defuse pending tipc_node_link_up() */
 		tipc_link_fsm_evt(l, LINK_RESET_EVT);
 	}
-	tipc_node_unlock(n);
+	tipc_node_write_unlock(n);
 	tipc_bearer_xmit(n->net, bearer_id, &xmitq, maddr);
 	tipc_sk_rcv(n->net, &le->inputq);
 }
 
-bool tipc_node_is_up(struct tipc_node *n)
+static bool tipc_node_is_up(struct tipc_node *n)
 {
 	return n->active_links[0] != INVALID_BEARER_ID;
 }
@@ -523,7 +741,7 @@ void tipc_node_check_dest(struct net *net, u32 onode,
 	if (!n)
 		return;
 
-	tipc_node_lock(n);
+	tipc_node_write_lock(n);
 
 	le = &n->links[b->identity];
 
@@ -626,7 +844,7 @@ void tipc_node_check_dest(struct net *net, u32 onode,
 	}
 	memcpy(&le->maddr, maddr, sizeof(*maddr));
 exit:
-	tipc_node_unlock(n);
+	tipc_node_write_unlock(n);
 	if (reset && !tipc_link_is_reset(l))
 		tipc_node_link_down(n, b->identity, false);
 	tipc_node_put(n);
@@ -834,24 +1052,6 @@ static void tipc_node_fsm_evt(struct tipc_node *n, int evt)
 	pr_err("Illegal node fsm evt %x in state %x\n", evt, state);
 }
 
-bool tipc_node_filter_pkt(struct tipc_node *n, struct tipc_msg *hdr)
-{
-	int state = n->state;
-
-	if (likely(state == SELF_UP_PEER_UP))
-		return true;
-
-	if (state == SELF_LEAVING_PEER_DOWN)
-		return false;
-
-	if (state == SELF_DOWN_PEER_LEAVING) {
-		if (msg_peer_node_is_up(hdr))
-			return false;
-	}
-
-	return true;
-}
-
 static void node_lost_contact(struct tipc_node *n,
 			      struct sk_buff_head *inputq)
 {
@@ -913,56 +1113,18 @@ int tipc_node_get_linkname(struct net *net, u32 bearer_id, u32 addr,
 	if (bearer_id >= MAX_BEARERS)
 		goto exit;
 
-	tipc_node_lock(node);
+	tipc_node_read_lock(node);
 	link = node->links[bearer_id].link;
 	if (link) {
-		strncpy(linkname, link->name, len);
+		strncpy(linkname, tipc_link_name(link), len);
 		err = 0;
 	}
 exit:
-	tipc_node_unlock(node);
+	tipc_node_read_unlock(node);
 	tipc_node_put(node);
 	return err;
 }
 
-void tipc_node_unlock(struct tipc_node *node)
-{
-	struct net *net = node->net;
-	u32 addr = 0;
-	u32 flags = node->action_flags;
-	u32 link_id = 0;
-	struct list_head *publ_list;
-
-	if (likely(!flags)) {
-		spin_unlock_bh(&node->lock);
-		return;
-	}
-
-	addr = node->addr;
-	link_id = node->link_id;
-	publ_list = &node->publ_list;
-
-	node->action_flags &= ~(TIPC_NOTIFY_NODE_DOWN | TIPC_NOTIFY_NODE_UP |
-				TIPC_NOTIFY_LINK_DOWN | TIPC_NOTIFY_LINK_UP);
-
-	spin_unlock_bh(&node->lock);
-
-	if (flags & TIPC_NOTIFY_NODE_DOWN)
-		tipc_publ_notify(net, publ_list, addr);
-
-	if (flags & TIPC_NOTIFY_NODE_UP)
-		tipc_named_node_up(net, addr);
-
-	if (flags & TIPC_NOTIFY_LINK_UP)
-		tipc_nametbl_publish(net, TIPC_LINK_STATE, addr, addr,
-				     TIPC_NODE_SCOPE, link_id, addr);
-
-	if (flags & TIPC_NOTIFY_LINK_DOWN)
-		tipc_nametbl_withdraw(net, TIPC_LINK_STATE, addr,
-				      link_id, addr);
-
-}
-
 /* Caller should hold node lock for the passed node */
 static int __tipc_nl_add_node(struct tipc_nl_msg *msg, struct tipc_node *node)
 {
@@ -997,20 +1159,6 @@ static int __tipc_nl_add_node(struct tipc_nl_msg *msg, struct tipc_node *node)
 	return -EMSGSIZE;
 }
 
-static struct tipc_link *tipc_node_select_link(struct tipc_node *n, int sel,
-					       int *bearer_id,
-					       struct tipc_media_addr **maddr)
-{
-	int id = n->active_links[sel & 1];
-
-	if (unlikely(id < 0))
-		return NULL;
-
-	*bearer_id = id;
-	*maddr = &n->links[id].maddr;
-	return n->links[id].link;
-}
-
 /**
  * tipc_node_xmit() is the general link level function for message sending
  * @net: the applicable net namespace
@@ -1023,34 +1171,38 @@ static struct tipc_link *tipc_node_select_link(struct tipc_node *n, int sel,
 int tipc_node_xmit(struct net *net, struct sk_buff_head *list,
 		   u32 dnode, int selector)
 {
-	struct tipc_link *l = NULL;
+	struct tipc_link_entry *le = NULL;
 	struct tipc_node *n;
 	struct sk_buff_head xmitq;
-	struct tipc_media_addr *maddr;
-	int bearer_id;
+	int bearer_id = -1;
 	int rc = -EHOSTUNREACH;
 
 	__skb_queue_head_init(&xmitq);
 	n = tipc_node_find(net, dnode);
 	if (likely(n)) {
-		tipc_node_lock(n);
-		l = tipc_node_select_link(n, selector, &bearer_id, &maddr);
-		if (likely(l))
-			rc = tipc_link_xmit(l, list, &xmitq);
-		tipc_node_unlock(n);
+		tipc_node_read_lock(n);
+		bearer_id = n->active_links[selector & 1];
+		if (bearer_id >= 0) {
+			le = &n->links[bearer_id];
+			spin_lock_bh(&le->lock);
+			rc = tipc_link_xmit(le->link, list, &xmitq);
+			spin_unlock_bh(&le->lock);
+		}
+		tipc_node_read_unlock(n);
+		if (likely(!skb_queue_empty(&xmitq))) {
+			tipc_bearer_xmit(net, bearer_id, &xmitq, &le->maddr);
+			return 0;
+		}
 		if (unlikely(rc == -ENOBUFS))
 			tipc_node_link_down(n, bearer_id, false);
 		tipc_node_put(n);
+		return rc;
 	}
-	if (likely(!rc)) {
-		tipc_bearer_xmit(net, bearer_id, &xmitq, maddr);
-		return 0;
-	}
-	if (likely(in_own_node(net, dnode))) {
-		tipc_sk_rcv(net, list);
-		return 0;
-	}
-	return rc;
+
+	if (unlikely(!in_own_node(net, dnode)))
+		return rc;
+	tipc_sk_rcv(net, list);
+	return 0;
 }
 
 /* tipc_node_xmit_skb(): send single buffer to destination
@@ -1075,6 +1227,30 @@ int tipc_node_xmit_skb(struct net *net, struct sk_buff *skb, u32 dnode,
 	return 0;
 }
 
+void tipc_node_broadcast(struct net *net, struct sk_buff *skb)
+{
+	struct sk_buff *txskb;
+	struct tipc_node *n;
+	u32 dst;
+
+	rcu_read_lock();
+	list_for_each_entry_rcu(n, tipc_nodes(net), list) {
+		dst = n->addr;
+		if (in_own_node(net, dst))
+			continue;
+		if (!tipc_node_is_up(n))
+			continue;
+		txskb = pskb_copy(skb, GFP_ATOMIC);
+		if (!txskb)
+			break;
+		msg_set_destnode(buf_msg(txskb), dst);
+		tipc_node_xmit_skb(net, txskb, dst, 0);
+	}
+	rcu_read_unlock();
+
+	kfree_skb(skb);
+}
+
 /**
  * tipc_node_bc_rcv - process TIPC broadcast packet arriving from off-node
  * @net: the applicable net namespace
@@ -1116,9 +1292,9 @@ static void tipc_node_bc_rcv(struct net *net, struct sk_buff *skb, int bearer_id
 
 	/* Broadcast ACKs are sent on a unicast link */
 	if (rc & TIPC_LINK_SND_BC_ACK) {
-		tipc_node_lock(n);
+		tipc_node_read_lock(n);
 		tipc_link_build_ack_msg(le->link, &xmitq);
-		tipc_node_unlock(n);
+		tipc_node_read_unlock(n);
 	}
 
 	if (!skb_queue_empty(&xmitq))
@@ -1151,30 +1327,30 @@ static bool tipc_node_check_state(struct tipc_node *n, struct sk_buff *skb,
 	u16 oseqno = msg_seqno(hdr);
 	u16 iseqno = msg_seqno(msg_get_wrapped(hdr));
 	u16 exp_pkts = msg_msgcnt(hdr);
-	u16 rcv_nxt, syncpt, dlv_nxt;
+	u16 rcv_nxt, syncpt, dlv_nxt, inputq_len;
 	int state = n->state;
 	struct tipc_link *l, *tnl, *pl = NULL;
 	struct tipc_media_addr *maddr;
-	int i, pb_id;
+	int pb_id;
 
 	l = n->links[bearer_id].link;
 	if (!l)
 		return false;
-	rcv_nxt = l->rcv_nxt;
+	rcv_nxt = tipc_link_rcv_nxt(l);
 
 
 	if (likely((state == SELF_UP_PEER_UP) && (usr != TUNNEL_PROTOCOL)))
 		return true;
 
 	/* Find parallel link, if any */
-	for (i = 0; i < MAX_BEARERS; i++) {
-		if ((i != bearer_id) && n->links[i].link) {
-			pl = n->links[i].link;
+	for (pb_id = 0; pb_id < MAX_BEARERS; pb_id++) {
+		if ((pb_id != bearer_id) && n->links[pb_id].link) {
+			pl = n->links[pb_id].link;
 			break;
 		}
 	}
 
-	/* Update node accesibility if applicable */
+	/* Check and update node accesibility if applicable */
 	if (state == SELF_UP_PEER_COMING) {
 		if (!tipc_link_is_up(l))
 			return true;
@@ -1187,8 +1363,12 @@ static bool tipc_node_check_state(struct tipc_node *n, struct sk_buff *skb,
 		if (msg_peer_node_is_up(hdr))
 			return false;
 		tipc_node_fsm_evt(n, PEER_LOST_CONTACT_EVT);
+		return true;
 	}
 
+	if (state == SELF_LEAVING_PEER_DOWN)
+		return false;
+
 	/* Ignore duplicate packets */
 	if ((usr != LINK_PROTOCOL) && less(oseqno, rcv_nxt))
 		return true;
@@ -1197,9 +1377,9 @@ static bool tipc_node_check_state(struct tipc_node *n, struct sk_buff *skb,
 	if ((usr == TUNNEL_PROTOCOL) && (mtyp == FAILOVER_MSG)) {
 		syncpt = oseqno + exp_pkts - 1;
 		if (pl && tipc_link_is_up(pl)) {
-			pb_id = pl->bearer_id;
 			__tipc_node_link_down(n, &pb_id, xmitq, &maddr);
-			tipc_skb_queue_splice_tail_init(pl->inputq, l->inputq);
+			tipc_skb_queue_splice_tail_init(tipc_link_inputq(pl),
+							tipc_link_inputq(l));
 		}
 		/* If pkts arrive out of order, use lowest calculated syncpt */
 		if (less(syncpt, n->sync_point))
@@ -1232,19 +1412,18 @@ static bool tipc_node_check_state(struct tipc_node *n, struct sk_buff *skb,
 			tipc_link_fsm_evt(l, LINK_SYNCH_BEGIN_EVT);
 			tipc_node_fsm_evt(n, NODE_SYNCH_BEGIN_EVT);
 		}
-		if (less(syncpt, n->sync_point))
-			n->sync_point = syncpt;
 	}
 
 	/* Open tunnel link when parallel link reaches synch point */
-	if ((n->state == NODE_SYNCHING) && tipc_link_is_synching(l)) {
+	if (n->state == NODE_SYNCHING) {
 		if (tipc_link_is_synching(l)) {
 			tnl = l;
 		} else {
 			tnl = pl;
 			pl = l;
 		}
-		dlv_nxt = pl->rcv_nxt - mod(skb_queue_len(pl->inputq));
+		inputq_len = skb_queue_len(tipc_link_inputq(pl));
+		dlv_nxt = tipc_link_rcv_nxt(pl) - inputq_len;
 		if (more(dlv_nxt, n->sync_point)) {
 			tipc_link_fsm_evt(tnl, LINK_SYNCH_END_EVT);
 			tipc_node_fsm_evt(n, NODE_SYNCH_END_EVT);
@@ -1304,22 +1483,32 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b)
 	/* Ensure broadcast reception is in synch with peer's send state */
 	if (unlikely(usr == LINK_PROTOCOL))
 		tipc_bcast_sync_rcv(net, n->bc_entry.link, hdr);
-	else if (unlikely(n->bc_entry.link->acked != bc_ack))
+	else if (unlikely(tipc_link_acked(n->bc_entry.link) != bc_ack))
 		tipc_bcast_ack_rcv(net, n->bc_entry.link, bc_ack);
 
-	tipc_node_lock(n);
-
-	/* Is reception permitted at the moment ? */
-	if (!tipc_node_filter_pkt(n, hdr))
-		goto unlock;
-
-	/* Check and if necessary update node state */
-	if (likely(tipc_node_check_state(n, skb, bearer_id, &xmitq))) {
-		rc = tipc_link_rcv(le->link, skb, &xmitq);
-		skb = NULL;
+	/* Receive packet directly if conditions permit */
+	tipc_node_read_lock(n);
+	if (likely((n->state == SELF_UP_PEER_UP) && (usr != TUNNEL_PROTOCOL))) {
+		spin_lock_bh(&le->lock);
+		if (le->link) {
+			rc = tipc_link_rcv(le->link, skb, &xmitq);
+			skb = NULL;
+		}
+		spin_unlock_bh(&le->lock);
+	}
+	tipc_node_read_unlock(n);
+
+	/* Check/update node state before receiving */
+	if (unlikely(skb)) {
+		tipc_node_write_lock(n);
+		if (tipc_node_check_state(n, skb, bearer_id, &xmitq)) {
+			if (le->link) {
+				rc = tipc_link_rcv(le->link, skb, &xmitq);
+				skb = NULL;
+			}
+		}
+		tipc_node_write_unlock(n);
 	}
-unlock:
-	tipc_node_unlock(n);
 
 	if (unlikely(rc & TIPC_LINK_UP_EVT))
 		tipc_node_link_up(n, bearer_id, &xmitq);
@@ -1384,15 +1573,15 @@ int tipc_nl_node_dump(struct sk_buff *skb, struct netlink_callback *cb)
 				continue;
 		}
 
-		tipc_node_lock(node);
+		tipc_node_read_lock(node);
 		err = __tipc_nl_add_node(&msg, node);
 		if (err) {
 			last_addr = node->addr;
-			tipc_node_unlock(node);
+			tipc_node_read_unlock(node);
 			goto out;
 		}
 
-		tipc_node_unlock(node);
+		tipc_node_read_unlock(node);
 	}
 	done = 1;
 out:
@@ -1402,3 +1591,314 @@ int tipc_nl_node_dump(struct sk_buff *skb, struct netlink_callback *cb)
 
 	return skb->len;
 }
+
+/* tipc_node_find_by_name - locate owner node of link by link's name
+ * @net: the applicable net namespace
+ * @name: pointer to link name string
+ * @bearer_id: pointer to index in 'node->links' array where the link was found.
+ *
+ * Returns pointer to node owning the link, or 0 if no matching link is found.
+ */
+static struct tipc_node *tipc_node_find_by_name(struct net *net,
+						const char *link_name,
+						unsigned int *bearer_id)
+{
+	struct tipc_net *tn = net_generic(net, tipc_net_id);
+	struct tipc_link *l;
+	struct tipc_node *n;
+	struct tipc_node *found_node = NULL;
+	int i;
+
+	*bearer_id = 0;
+	rcu_read_lock();
+	list_for_each_entry_rcu(n, &tn->node_list, list) {
+		tipc_node_read_lock(n);
+		for (i = 0; i < MAX_BEARERS; i++) {
+			l = n->links[i].link;
+			if (l && !strcmp(tipc_link_name(l), link_name)) {
+				*bearer_id = i;
+				found_node = n;
+				break;
+			}
+		}
+		tipc_node_read_unlock(n);
+		if (found_node)
+			break;
+	}
+	rcu_read_unlock();
+
+	return found_node;
+}
+
+int tipc_nl_node_set_link(struct sk_buff *skb, struct genl_info *info)
+{
+	int err;
+	int res = 0;
+	int bearer_id;
+	char *name;
+	struct tipc_link *link;
+	struct tipc_node *node;
+	struct nlattr *attrs[TIPC_NLA_LINK_MAX + 1];
+	struct net *net = sock_net(skb->sk);
+
+	if (!info->attrs[TIPC_NLA_LINK])
+		return -EINVAL;
+
+	err = nla_parse_nested(attrs, TIPC_NLA_LINK_MAX,
+			       info->attrs[TIPC_NLA_LINK],
+			       tipc_nl_link_policy);
+	if (err)
+		return err;
+
+	if (!attrs[TIPC_NLA_LINK_NAME])
+		return -EINVAL;
+
+	name = nla_data(attrs[TIPC_NLA_LINK_NAME]);
+
+	if (strcmp(name, tipc_bclink_name) == 0)
+		return tipc_nl_bc_link_set(net, attrs);
+
+	node = tipc_node_find_by_name(net, name, &bearer_id);
+	if (!node)
+		return -EINVAL;
+
+	tipc_node_read_lock(node);
+
+	link = node->links[bearer_id].link;
+	if (!link) {
+		res = -EINVAL;
+		goto out;
+	}
+
+	if (attrs[TIPC_NLA_LINK_PROP]) {
+		struct nlattr *props[TIPC_NLA_PROP_MAX + 1];
+
+		err = tipc_nl_parse_link_prop(attrs[TIPC_NLA_LINK_PROP],
+					      props);
+		if (err) {
+			res = err;
+			goto out;
+		}
+
+		if (props[TIPC_NLA_PROP_TOL]) {
+			u32 tol;
+
+			tol = nla_get_u32(props[TIPC_NLA_PROP_TOL]);
+			tipc_link_set_tolerance(link, tol);
+		}
+		if (props[TIPC_NLA_PROP_PRIO]) {
+			u32 prio;
+
+			prio = nla_get_u32(props[TIPC_NLA_PROP_PRIO]);
+			tipc_link_set_prio(link, prio);
+		}
+		if (props[TIPC_NLA_PROP_WIN]) {
+			u32 win;
+
+			win = nla_get_u32(props[TIPC_NLA_PROP_WIN]);
+			tipc_link_set_queue_limits(link, win);
+		}
+	}
+
+out:
+	tipc_node_read_unlock(node);
+
+	return res;
+}
+
+int tipc_nl_node_get_link(struct sk_buff *skb, struct genl_info *info)
+{
+	struct net *net = genl_info_net(info);
+	struct tipc_nl_msg msg;
+	char *name;
+	int err;
+
+	msg.portid = info->snd_portid;
+	msg.seq = info->snd_seq;
+
+	if (!info->attrs[TIPC_NLA_LINK_NAME])
+		return -EINVAL;
+	name = nla_data(info->attrs[TIPC_NLA_LINK_NAME]);
+
+	msg.skb = nlmsg_new(NLMSG_GOODSIZE, GFP_KERNEL);
+	if (!msg.skb)
+		return -ENOMEM;
+
+	if (strcmp(name, tipc_bclink_name) == 0) {
+		err = tipc_nl_add_bc_link(net, &msg);
+		if (err) {
+			nlmsg_free(msg.skb);
+			return err;
+		}
+	} else {
+		int bearer_id;
+		struct tipc_node *node;
+		struct tipc_link *link;
+
+		node = tipc_node_find_by_name(net, name, &bearer_id);
+		if (!node)
+			return -EINVAL;
+
+		tipc_node_read_lock(node);
+		link = node->links[bearer_id].link;
+		if (!link) {
+			tipc_node_read_unlock(node);
+			nlmsg_free(msg.skb);
+			return -EINVAL;
+		}
+
+		err = __tipc_nl_add_link(net, &msg, link, 0);
+		tipc_node_read_unlock(node);
+		if (err) {
+			nlmsg_free(msg.skb);
+			return err;
+		}
+	}
+
+	return genlmsg_reply(msg.skb, info);
+}
+
+int tipc_nl_node_reset_link_stats(struct sk_buff *skb, struct genl_info *info)
+{
+	int err;
+	char *link_name;
+	unsigned int bearer_id;
+	struct tipc_link *link;
+	struct tipc_node *node;
+	struct nlattr *attrs[TIPC_NLA_LINK_MAX + 1];
+	struct net *net = sock_net(skb->sk);
+	struct tipc_link_entry *le;
+
+	if (!info->attrs[TIPC_NLA_LINK])
+		return -EINVAL;
+
+	err = nla_parse_nested(attrs, TIPC_NLA_LINK_MAX,
+			       info->attrs[TIPC_NLA_LINK],
+			       tipc_nl_link_policy);
+	if (err)
+		return err;
+
+	if (!attrs[TIPC_NLA_LINK_NAME])
+		return -EINVAL;
+
+	link_name = nla_data(attrs[TIPC_NLA_LINK_NAME]);
+
+	if (strcmp(link_name, tipc_bclink_name) == 0) {
+		err = tipc_bclink_reset_stats(net);
+		if (err)
+			return err;
+		return 0;
+	}
+
+	node = tipc_node_find_by_name(net, link_name, &bearer_id);
+	if (!node)
+		return -EINVAL;
+
+	le = &node->links[bearer_id];
+	tipc_node_read_lock(node);
+	spin_lock_bh(&le->lock);
+	link = node->links[bearer_id].link;
+	if (!link) {
+		spin_unlock_bh(&le->lock);
+		tipc_node_read_unlock(node);
+		return -EINVAL;
+	}
+	tipc_link_reset_stats(link);
+	spin_unlock_bh(&le->lock);
+	tipc_node_read_unlock(node);
+	return 0;
+}
+
+/* Caller should hold node lock  */
+static int __tipc_nl_add_node_links(struct net *net, struct tipc_nl_msg *msg,
+				    struct tipc_node *node, u32 *prev_link)
+{
+	u32 i;
+	int err;
+
+	for (i = *prev_link; i < MAX_BEARERS; i++) {
+		*prev_link = i;
+
+		if (!node->links[i].link)
+			continue;
+
+		err = __tipc_nl_add_link(net, msg,
+					 node->links[i].link, NLM_F_MULTI);
+		if (err)
+			return err;
+	}
+	*prev_link = 0;
+
+	return 0;
+}
+
+int tipc_nl_node_dump_link(struct sk_buff *skb, struct netlink_callback *cb)
+{
+	struct net *net = sock_net(skb->sk);
+	struct tipc_net *tn = net_generic(net, tipc_net_id);
+	struct tipc_node *node;
+	struct tipc_nl_msg msg;
+	u32 prev_node = cb->args[0];
+	u32 prev_link = cb->args[1];
+	int done = cb->args[2];
+	int err;
+
+	if (done)
+		return 0;
+
+	msg.skb = skb;
+	msg.portid = NETLINK_CB(cb->skb).portid;
+	msg.seq = cb->nlh->nlmsg_seq;
+
+	rcu_read_lock();
+	if (prev_node) {
+		node = tipc_node_find(net, prev_node);
+		if (!node) {
+			/* We never set seq or call nl_dump_check_consistent()
+			 * this means that setting prev_seq here will cause the
+			 * consistence check to fail in the netlink callback
+			 * handler. Resulting in the last NLMSG_DONE message
+			 * having the NLM_F_DUMP_INTR flag set.
+			 */
+			cb->prev_seq = 1;
+			goto out;
+		}
+		tipc_node_put(node);
+
+		list_for_each_entry_continue_rcu(node, &tn->node_list,
+						 list) {
+			tipc_node_read_lock(node);
+			err = __tipc_nl_add_node_links(net, &msg, node,
+						       &prev_link);
+			tipc_node_read_unlock(node);
+			if (err)
+				goto out;
+
+			prev_node = node->addr;
+		}
+	} else {
+		err = tipc_nl_add_bc_link(net, &msg);
+		if (err)
+			goto out;
+
+		list_for_each_entry_rcu(node, &tn->node_list, list) {
+			tipc_node_read_lock(node);
+			err = __tipc_nl_add_node_links(net, &msg, node,
+						       &prev_link);
+			tipc_node_read_unlock(node);
+			if (err)
+				goto out;
+
+			prev_node = node->addr;
+		}
+	}
+	done = 1;
+out:
+	rcu_read_unlock();
+
+	cb->args[0] = prev_node;
+	cb->args[1] = prev_link;
+	cb->args[2] = done;
+
+	return skb->len;
+}
diff --git a/net/tipc/node.h b/net/tipc/node.h
index 6734562d3c6e5..f39d9d06e8bb0 100644
--- a/net/tipc/node.h
+++ b/net/tipc/node.h
@@ -42,23 +42,6 @@
 #include "bearer.h"
 #include "msg.h"
 
-/* Out-of-range value for node signature */
-#define INVALID_NODE_SIG	0x10000
-
-#define INVALID_BEARER_ID -1
-
-/* Flags used to take different actions according to flag type
- * TIPC_NOTIFY_NODE_DOWN: notify node is down
- * TIPC_NOTIFY_NODE_UP: notify node is up
- * TIPC_DISTRIBUTE_NAME: publish or withdraw link state name type
- */
-enum {
-	TIPC_NOTIFY_NODE_DOWN		= (1 << 3),
-	TIPC_NOTIFY_NODE_UP		= (1 << 4),
-	TIPC_NOTIFY_LINK_UP		= (1 << 6),
-	TIPC_NOTIFY_LINK_DOWN		= (1 << 7)
-};
-
 /* Optional capabilities supported by this code version
  */
 enum {
@@ -66,72 +49,8 @@ enum {
 };
 
 #define TIPC_NODE_CAPABILITIES TIPC_BCAST_SYNCH
+#define INVALID_BEARER_ID -1
 
-struct tipc_link_entry {
-	struct tipc_link *link;
-	u32 mtu;
-	struct sk_buff_head inputq;
-	struct tipc_media_addr maddr;
-};
-
-struct tipc_bclink_entry {
-	struct tipc_link *link;
-	struct sk_buff_head inputq1;
-	struct sk_buff_head arrvq;
-	struct sk_buff_head inputq2;
-	struct sk_buff_head namedq;
-};
-
-/**
- * struct tipc_node - TIPC node structure
- * @addr: network address of node
- * @ref: reference counter to node object
- * @lock: spinlock governing access to structure
- * @net: the applicable net namespace
- * @hash: links to adjacent nodes in unsorted hash chain
- * @inputq: pointer to input queue containing messages for msg event
- * @namedq: pointer to name table input queue with name table messages
- * @active_links: bearer ids of active links, used as index into links[] array
- * @links: array containing references to all links to node
- * @action_flags: bit mask of different types of node actions
- * @state: connectivity state vs peer node
- * @sync_point: sequence number where synch/failover is finished
- * @list: links to adjacent nodes in sorted list of cluster's nodes
- * @working_links: number of working links to node (both active and standby)
- * @link_cnt: number of links to node
- * @capabilities: bitmap, indicating peer node's functional capabilities
- * @signature: node instance identifier
- * @link_id: local and remote bearer ids of changing link, if any
- * @publ_list: list of publications
- * @rcu: rcu struct for tipc_node
- */
-struct tipc_node {
-	u32 addr;
-	struct kref kref;
-	spinlock_t lock;
-	struct net *net;
-	struct hlist_node hash;
-	int active_links[2];
-	struct tipc_link_entry links[MAX_BEARERS];
-	struct tipc_bclink_entry bc_entry;
-	int action_flags;
-	struct list_head list;
-	int state;
-	u16 sync_point;
-	int link_cnt;
-	u16 working_links;
-	u16 capabilities;
-	u32 signature;
-	u32 link_id;
-	struct list_head publ_list;
-	struct list_head conn_sks;
-	unsigned long keepalive_intv;
-	struct timer_list timer;
-	struct rcu_head rcu;
-};
-
-struct tipc_node *tipc_node_find(struct net *net, u32 addr);
-void tipc_node_put(struct tipc_node *node);
 void tipc_node_stop(struct net *net);
 void tipc_node_check_dest(struct net *net, u32 onode,
 			  struct tipc_bearer *bearer,
@@ -139,50 +58,22 @@ void tipc_node_check_dest(struct net *net, u32 onode,
 			  struct tipc_media_addr *maddr,
 			  bool *respond, bool *dupl_addr);
 void tipc_node_delete_links(struct net *net, int bearer_id);
-void tipc_node_attach_link(struct tipc_node *n_ptr, struct tipc_link *l_ptr);
-void tipc_node_detach_link(struct tipc_node *n_ptr, struct tipc_link *l_ptr);
-bool tipc_node_is_up(struct tipc_node *n);
 int tipc_node_get_linkname(struct net *net, u32 bearer_id, u32 node,
 			   char *linkname, size_t len);
-void tipc_node_unlock(struct tipc_node *node);
 int tipc_node_xmit(struct net *net, struct sk_buff_head *list, u32 dnode,
 		   int selector);
 int tipc_node_xmit_skb(struct net *net, struct sk_buff *skb, u32 dest,
 		       u32 selector);
+void tipc_node_subscribe(struct net *net, struct list_head *subscr, u32 addr);
+void tipc_node_unsubscribe(struct net *net, struct list_head *subscr, u32 addr);
+void tipc_node_broadcast(struct net *net, struct sk_buff *skb);
 int tipc_node_add_conn(struct net *net, u32 dnode, u32 port, u32 peer_port);
 void tipc_node_remove_conn(struct net *net, u32 dnode, u32 port);
+int tipc_node_get_mtu(struct net *net, u32 addr, u32 sel);
 int tipc_nl_node_dump(struct sk_buff *skb, struct netlink_callback *cb);
-
-static inline void tipc_node_lock(struct tipc_node *node)
-{
-	spin_lock_bh(&node->lock);
-}
-
-static inline struct tipc_link *node_active_link(struct tipc_node *n, int sel)
-{
-	int bearer_id = n->active_links[sel & 1];
-
-	if (unlikely(bearer_id == INVALID_BEARER_ID))
-		return NULL;
-
-	return n->links[bearer_id].link;
-}
-
-static inline unsigned int tipc_node_get_mtu(struct net *net, u32 addr, u32 sel)
-{
-	struct tipc_node *n;
-	int bearer_id;
-	unsigned int mtu = MAX_MSG_SIZE;
-
-	n = tipc_node_find(net, addr);
-	if (unlikely(!n))
-		return mtu;
-
-	bearer_id = n->active_links[sel & 1];
-	if (likely(bearer_id != INVALID_BEARER_ID))
-		mtu = n->links[bearer_id].mtu;
-	tipc_node_put(n);
-	return mtu;
-}
+int tipc_nl_node_dump_link(struct sk_buff *skb, struct netlink_callback *cb);
+int tipc_nl_node_reset_link_stats(struct sk_buff *skb, struct genl_info *info);
+int tipc_nl_node_get_link(struct sk_buff *skb, struct genl_info *info);
+int tipc_nl_node_set_link(struct sk_buff *skb, struct genl_info *info);
 
 #endif
diff --git a/net/tipc/udp_media.c b/net/tipc/udp_media.c
index ad2719ad4c1ba..816914ef228da 100644
--- a/net/tipc/udp_media.c
+++ b/net/tipc/udp_media.c
@@ -48,7 +48,6 @@
 #include <linux/tipc_netlink.h>
 #include "core.h"
 #include "bearer.h"
-#include "msg.h"
 
 /* IANA assigned UDP port */
 #define UDP_PORT_DEFAULT	6118
@@ -221,10 +220,6 @@ static int tipc_udp_recv(struct sock *sk, struct sk_buff *skb)
 {
 	struct udp_bearer *ub;
 	struct tipc_bearer *b;
-	int usr = msg_user(buf_msg(skb));
-
-	if ((usr == LINK_PROTOCOL) || (usr == NAME_DISTRIBUTOR))
-		skb_linearize(skb);
 
 	ub = rcu_dereference_sk_user_data(sk);
 	if (!ub) {