Skip to content

Commit

Permalink
tipc: add ability to order and receive topology events in driver
Browse files Browse the repository at this point in the history
As preparation for introducing communication groups, we add the ability
to issue topology subscriptions and receive topology events from kernel
space. This will make it possible for group member sockets to keep track
of other group members.

Signed-off-by: Jon Maloy <jon.maloy@ericsson.com>
Acked-by: Ying Xue <ying.xue@windriver.com>
Signed-off-by: David S. Miller <davem@davemloft.net>
  • Loading branch information
Jon Maloy authored and David S. Miller committed Oct 13, 2017
1 parent 2d0d21c commit 14c0449
Show file tree
Hide file tree
Showing 5 changed files with 124 additions and 40 deletions.
5 changes: 5 additions & 0 deletions net/tipc/core.h
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,11 @@ static inline struct list_head *tipc_nodes(struct net *net)
return &tipc_net(net)->node_list;
}

static inline struct tipc_server *tipc_topsrv(struct net *net)
{
return tipc_net(net)->topsrv;
}

static inline unsigned int tipc_hashfn(u32 addr)
{
return addr & (NODE_HTABLE_SIZE - 1);
Expand Down
1 change: 1 addition & 0 deletions net/tipc/msg.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ struct plist;
#define MSG_FRAGMENTER 12
#define LINK_CONFIG 13
#define SOCK_WAKEUP 14 /* pseudo user */
#define TOP_SRV 15 /* pseudo user */

/*
* Message header sizes
Expand Down
121 changes: 94 additions & 27 deletions net/tipc/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
#include "server.h"
#include "core.h"
#include "socket.h"
#include "addr.h"
#include "msg.h"
#include <net/sock.h>
#include <linux/module.h>

Expand Down Expand Up @@ -105,13 +107,11 @@ static void tipc_conn_kref_release(struct kref *kref)
kernel_bind(sock, (struct sockaddr *)saddr, sizeof(*saddr));
sock_release(sock);
con->sock = NULL;

spin_lock_bh(&s->idr_lock);
idr_remove(&s->conn_idr, con->conid);
s->idr_in_use--;
spin_unlock_bh(&s->idr_lock);
}

spin_lock_bh(&s->idr_lock);
idr_remove(&s->conn_idr, con->conid);
s->idr_in_use--;
spin_unlock_bh(&s->idr_lock);
tipc_clean_outqueues(con);
kfree(con);
}
Expand Down Expand Up @@ -197,7 +197,8 @@ static void tipc_close_conn(struct tipc_conn *con)
struct tipc_server *s = con->server;

if (test_and_clear_bit(CF_CONNECTED, &con->flags)) {
tipc_unregister_callbacks(con);
if (con->sock)
tipc_unregister_callbacks(con);

if (con->conid)
s->tipc_conn_release(con->conid, con->usr_data);
Expand All @@ -207,8 +208,8 @@ static void tipc_close_conn(struct tipc_conn *con)
* are harmless for us here as we have already deleted this
* connection from server connection list.
*/
kernel_sock_shutdown(con->sock, SHUT_RDWR);

if (con->sock)
kernel_sock_shutdown(con->sock, SHUT_RDWR);
conn_put(con);
}
}
Expand Down Expand Up @@ -487,38 +488,104 @@ void tipc_conn_terminate(struct tipc_server *s, int conid)
}
}

bool tipc_topsrv_kern_subscr(struct net *net, u32 port, u32 type,
u32 lower, u32 upper, int *conid)
{
struct tipc_subscriber *scbr;
struct tipc_subscr sub;
struct tipc_server *s;
struct tipc_conn *con;

sub.seq.type = type;
sub.seq.lower = lower;
sub.seq.upper = upper;
sub.timeout = TIPC_WAIT_FOREVER;
sub.filter = TIPC_SUB_PORTS;
*(u32 *)&sub.usr_handle = port;

con = tipc_alloc_conn(tipc_topsrv(net));
if (!con)
return false;

*conid = con->conid;
s = con->server;
scbr = s->tipc_conn_new(*conid);
if (!scbr) {
tipc_close_conn(con);
return false;
}

con->usr_data = scbr;
con->sock = NULL;
s->tipc_conn_recvmsg(net, *conid, NULL, scbr, &sub, sizeof(sub));
return true;
}

void tipc_topsrv_kern_unsubscr(struct net *net, int conid)
{
struct tipc_conn *con;

con = tipc_conn_lookup(tipc_topsrv(net), conid);
if (!con)
return;
tipc_close_conn(con);
conn_put(con);
}

static void tipc_send_kern_top_evt(struct net *net, struct tipc_event *evt)
{
u32 port = *(u32 *)&evt->s.usr_handle;
u32 self = tipc_own_addr(net);
struct sk_buff_head evtq;
struct sk_buff *skb;

skb = tipc_msg_create(TOP_SRV, 0, INT_H_SIZE, sizeof(*evt),
self, self, port, port, 0);
if (!skb)
return;
msg_set_dest_droppable(buf_msg(skb), true);
memcpy(msg_data(buf_msg(skb)), evt, sizeof(*evt));
skb_queue_head_init(&evtq);
__skb_queue_tail(&evtq, skb);
tipc_sk_rcv(net, &evtq);
}

static void tipc_send_to_sock(struct tipc_conn *con)
{
int count = 0;
struct tipc_server *s = con->server;
struct outqueue_entry *e;
struct tipc_event *evt;
struct msghdr msg;
int count = 0;
int ret;

spin_lock_bh(&con->outqueue_lock);
while (test_bit(CF_CONNECTED, &con->flags)) {
e = list_entry(con->outqueue.next, struct outqueue_entry,
list);
e = list_entry(con->outqueue.next, struct outqueue_entry, list);
if ((struct list_head *) e == &con->outqueue)
break;
spin_unlock_bh(&con->outqueue_lock);

memset(&msg, 0, sizeof(msg));
msg.msg_flags = MSG_DONTWAIT;
spin_unlock_bh(&con->outqueue_lock);

if (s->type == SOCK_DGRAM || s->type == SOCK_RDM) {
msg.msg_name = &e->dest;
msg.msg_namelen = sizeof(struct sockaddr_tipc);
}
ret = kernel_sendmsg(con->sock, &msg, &e->iov, 1,
e->iov.iov_len);
if (ret == -EWOULDBLOCK || ret == 0) {
cond_resched();
goto out;
} else if (ret < 0) {
goto send_err;
if (con->sock) {
memset(&msg, 0, sizeof(msg));
msg.msg_flags = MSG_DONTWAIT;
if (s->type == SOCK_DGRAM || s->type == SOCK_RDM) {
msg.msg_name = &e->dest;
msg.msg_namelen = sizeof(struct sockaddr_tipc);
}
ret = kernel_sendmsg(con->sock, &msg, &e->iov, 1,
e->iov.iov_len);
if (ret == -EWOULDBLOCK || ret == 0) {
cond_resched();
goto out;
} else if (ret < 0) {
goto send_err;
}
} else {
evt = e->iov.iov_base;
tipc_send_kern_top_evt(s->net, evt);
}

/* Don't starve users filling buffers */
if (++count >= MAX_SEND_MSG_COUNT) {
cond_resched();
Expand Down
5 changes: 4 additions & 1 deletion net/tipc/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,13 +83,16 @@ struct tipc_server {
int tipc_conn_sendmsg(struct tipc_server *s, int conid,
struct sockaddr_tipc *addr, void *data, size_t len);

bool tipc_topsrv_kern_subscr(struct net *net, u32 port, u32 type,
u32 lower, u32 upper, int *conid);
void tipc_topsrv_kern_unsubscr(struct net *net, int conid);

/**
* tipc_conn_terminate - terminate connection with server
*
* Note: Must call it in process context since it might sleep
*/
void tipc_conn_terminate(struct tipc_server *s, int conid);

int tipc_server_start(struct tipc_server *s);

void tipc_server_stop(struct tipc_server *s);
Expand Down
32 changes: 20 additions & 12 deletions net/tipc/socket.c
Original file line number Diff line number Diff line change
Expand Up @@ -896,6 +896,10 @@ static void tipc_sk_proto_rcv(struct tipc_sock *tsk, struct sk_buff *skb,
kfree_skb(skb);
}

static void tipc_sk_top_evt(struct tipc_sock *tsk, struct tipc_event *evt)
{
}

/**
* tipc_sendmsg - send message in connectionless manner
* @sock: socket structure
Expand Down Expand Up @@ -1671,20 +1675,24 @@ static bool filter_rcv(struct sock *sk, struct sk_buff *skb,
struct tipc_msg *hdr = buf_msg(skb);
unsigned int limit = rcvbuf_limit(sk, skb);
int err = TIPC_OK;
int usr = msg_user(hdr);
u32 onode;

if (unlikely(msg_user(hdr) == CONN_MANAGER)) {
tipc_sk_proto_rcv(tsk, skb, xmitq);
return false;
}

if (unlikely(usr == SOCK_WAKEUP)) {
onode = msg_orignode(hdr);
if (unlikely(!msg_isdata(hdr))) {
switch (msg_user(hdr)) {
case CONN_MANAGER:
tipc_sk_proto_rcv(tsk, skb, xmitq);
return false;
case SOCK_WAKEUP:
u32_del(&tsk->cong_links, msg_orignode(hdr));
tsk->cong_link_cnt--;
sk->sk_write_space(sk);
break;
case TOP_SRV:
tipc_sk_top_evt(tsk, (void *)msg_data(hdr));
break;
default:
break;
}
kfree_skb(skb);
u32_del(&tsk->cong_links, onode);
tsk->cong_link_cnt--;
sk->sk_write_space(sk);
return false;
}

Expand Down

0 comments on commit 14c0449

Please sign in to comment.