Skip to content

Commit

Permalink
tipc: simplify interaction between subscription and topology connection
Browse files Browse the repository at this point in the history
The message transmission and reception in the topology server is more
generic than is currently necessary. By basing the funtionality on the
fact that we only send items of type struct tipc_event and always
receive items of struct tipc_subcr we can make several simplifications,
and also get rid of some unnecessary dynamic memory allocations.

Acked-by: Ying Xue <ying.xue@windriver.com>
Signed-off-by: Jon Maloy <jon.maloy@ericsson.com>
Signed-off-by: David S. Miller <davem@davemloft.net>
  • Loading branch information
Jon Maloy authored and David S. Miller committed Feb 16, 2018
1 parent df79d04 commit 414574a
Show file tree
Hide file tree
Showing 5 changed files with 88 additions and 149 deletions.
10 changes: 6 additions & 4 deletions net/tipc/name_table.c
Original file line number Diff line number Diff line change
Expand Up @@ -810,14 +810,15 @@ int tipc_nametbl_withdraw(struct net *net, u32 type, u32 lower, u32 ref,
*/
void tipc_nametbl_subscribe(struct tipc_subscription *s, bool status)
{
struct tipc_net *tn = net_generic(s->net, tipc_net_id);
struct tipc_server *srv = s->server;
struct tipc_net *tn = tipc_net(srv->net);
u32 type = tipc_subscrp_convert_seq_type(s->evt.s.seq.type, s->swap);
int index = hash(type);
struct name_seq *seq;
struct tipc_name_seq ns;

spin_lock_bh(&tn->nametbl_lock);
seq = nametbl_find_seq(s->net, type);
seq = nametbl_find_seq(srv->net, type);
if (!seq)
seq = tipc_nameseq_create(type, &tn->nametbl->seq_hlist[index]);
if (seq) {
Expand All @@ -837,12 +838,13 @@ void tipc_nametbl_subscribe(struct tipc_subscription *s, bool status)
*/
void tipc_nametbl_unsubscribe(struct tipc_subscription *s)
{
struct tipc_net *tn = net_generic(s->net, tipc_net_id);
struct tipc_server *srv = s->server;
struct tipc_net *tn = tipc_net(srv->net);
struct name_seq *seq;
u32 type = tipc_subscrp_convert_seq_type(s->evt.s.seq.type, s->swap);

spin_lock_bh(&tn->nametbl_lock);
seq = nametbl_find_seq(s->net, type);
seq = nametbl_find_seq(srv->net, type);
if (seq != NULL) {
spin_lock_bh(&seq->lock);
list_del_init(&s->nameseq_list);
Expand Down
170 changes: 58 additions & 112 deletions net/tipc/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,9 @@ struct tipc_conn {

/* An entry waiting to be sent */
struct outqueue_entry {
u32 evt;
bool inactive;
struct tipc_event evt;
struct list_head list;
struct kvec iov;
};

static void tipc_recv_work(struct work_struct *work);
Expand Down Expand Up @@ -154,6 +154,9 @@ static struct tipc_conn *tipc_conn_lookup(struct tipc_server *s, int conid)
return con;
}

/* sock_data_ready - interrupt callback indicating the socket has data to read
* The queued job is launched in tipc_recv_from_sock()
*/
static void sock_data_ready(struct sock *sk)
{
struct tipc_conn *con;
Expand All @@ -168,6 +171,10 @@ static void sock_data_ready(struct sock *sk)
read_unlock_bh(&sk->sk_callback_lock);
}

/* sock_write_space - interrupt callback after a sendmsg EAGAIN
* Indicates that there now is more is space in the send buffer
* The queued job is launched in tipc_send_to_sock()
*/
static void sock_write_space(struct sock *sk)
{
struct tipc_conn *con;
Expand Down Expand Up @@ -273,10 +280,10 @@ static struct tipc_conn *tipc_alloc_conn(struct tipc_server *s)
return con;
}

int tipc_con_rcv_sub(struct net *net, int conid, struct tipc_conn *con,
void *buf, size_t len)
static int tipc_con_rcv_sub(struct tipc_server *srv,
struct tipc_conn *con,
struct tipc_subscr *s)
{
struct tipc_subscr *s = (struct tipc_subscr *)buf;
struct tipc_subscription *sub;
bool status;
int swap;
Expand All @@ -292,7 +299,7 @@ int tipc_con_rcv_sub(struct net *net, int conid, struct tipc_conn *con,
return 0;
}
status = !(s->filter & htohl(TIPC_SUB_NO_STATUS, swap));
sub = tipc_subscrp_subscribe(net, s, conid, swap, status);
sub = tipc_subscrp_subscribe(srv, s, con->conid, swap, status);
if (!sub)
return -1;

Expand All @@ -304,43 +311,27 @@ int tipc_con_rcv_sub(struct net *net, int conid, struct tipc_conn *con,

static int tipc_receive_from_sock(struct tipc_conn *con)
{
struct tipc_server *s = con->server;
struct tipc_server *srv = con->server;
struct sock *sk = con->sock->sk;
struct msghdr msg = {};
struct tipc_subscr s;
struct kvec iov;
void *buf;
int ret;

buf = kmem_cache_alloc(s->rcvbuf_cache, GFP_ATOMIC);
if (!buf) {
ret = -ENOMEM;
goto out_close;
}

iov.iov_base = buf;
iov.iov_len = s->max_rcvbuf_size;
iov.iov_base = &s;
iov.iov_len = sizeof(s);
msg.msg_name = NULL;
iov_iter_kvec(&msg.msg_iter, READ | ITER_KVEC, &iov, 1, iov.iov_len);
ret = sock_recvmsg(con->sock, &msg, MSG_DONTWAIT);
if (ret <= 0) {
kmem_cache_free(s->rcvbuf_cache, buf);
goto out_close;
if (ret == -EWOULDBLOCK)
return -EWOULDBLOCK;
if (ret > 0) {
read_lock_bh(&sk->sk_callback_lock);
ret = tipc_con_rcv_sub(srv, con, &s);
read_unlock_bh(&sk->sk_callback_lock);
}

read_lock_bh(&sk->sk_callback_lock);
ret = tipc_con_rcv_sub(s->net, con->conid, con, buf, ret);
read_unlock_bh(&sk->sk_callback_lock);
kmem_cache_free(s->rcvbuf_cache, buf);
if (ret < 0)
tipc_conn_terminate(s, con->conid);
return ret;

out_close:
if (ret != -EWOULDBLOCK)
tipc_close_conn(con);
else if (ret == 0)
/* Don't return success if we really got EOF */
ret = -EAGAIN;

return ret;
}
Expand Down Expand Up @@ -442,84 +433,47 @@ static int tipc_open_listening_sock(struct tipc_server *s)
return 0;
}

static struct outqueue_entry *tipc_alloc_entry(void *data, int len)
{
struct outqueue_entry *entry;
void *buf;

entry = kmalloc(sizeof(struct outqueue_entry), GFP_ATOMIC);
if (!entry)
return NULL;

buf = kmemdup(data, len, GFP_ATOMIC);
if (!buf) {
kfree(entry);
return NULL;
}

entry->iov.iov_base = buf;
entry->iov.iov_len = len;

return entry;
}

static void tipc_free_entry(struct outqueue_entry *e)
{
kfree(e->iov.iov_base);
kfree(e);
}

static void tipc_clean_outqueues(struct tipc_conn *con)
{
struct outqueue_entry *e, *safe;

spin_lock_bh(&con->outqueue_lock);
list_for_each_entry_safe(e, safe, &con->outqueue, list) {
list_del(&e->list);
tipc_free_entry(e);
kfree(e);
}
spin_unlock_bh(&con->outqueue_lock);
}

int tipc_conn_sendmsg(struct tipc_server *s, int conid,
u32 evt, void *data, size_t len)
/* tipc_conn_queue_evt - interrupt level call from a subscription instance
* The queued job is launched in tipc_send_to_sock()
*/
void tipc_conn_queue_evt(struct tipc_server *s, int conid,
u32 event, struct tipc_event *evt)
{
struct outqueue_entry *e;
struct tipc_conn *con;

con = tipc_conn_lookup(s, conid);
if (!con)
return -EINVAL;
return;

if (!connected(con)) {
conn_put(con);
return 0;
}
if (!connected(con))
goto err;

e = tipc_alloc_entry(data, len);
if (!e) {
conn_put(con);
return -ENOMEM;
}
e->evt = evt;
e = kmalloc(sizeof(*e), GFP_ATOMIC);
if (!e)
goto err;
e->inactive = (event == TIPC_SUBSCR_TIMEOUT);
memcpy(&e->evt, evt, sizeof(*evt));
spin_lock_bh(&con->outqueue_lock);
list_add_tail(&e->list, &con->outqueue);
spin_unlock_bh(&con->outqueue_lock);

if (!queue_work(s->send_wq, &con->swork))
conn_put(con);
return 0;
}

void tipc_conn_terminate(struct tipc_server *s, int conid)
{
struct tipc_conn *con;

con = tipc_conn_lookup(s, conid);
if (con) {
tipc_close_conn(con);
conn_put(con);
}
if (queue_work(s->send_wq, &con->swork))
return;
err:
conn_put(con);
}

bool tipc_topsrv_kern_subscr(struct net *net, u32 port, u32 type, u32 lower,
Expand All @@ -542,7 +496,7 @@ bool tipc_topsrv_kern_subscr(struct net *net, u32 port, u32 type, u32 lower,

*conid = con->conid;
con->sock = NULL;
rc = tipc_con_rcv_sub(net, *conid, con, &sub, sizeof(sub));
rc = tipc_con_rcv_sub(tipc_topsrv(net), con, &sub);
if (rc < 0)
tipc_close_conn(con);
return !rc;
Expand Down Expand Up @@ -587,34 +541,36 @@ static void tipc_send_to_sock(struct tipc_conn *con)
struct outqueue_entry *e;
struct tipc_event *evt;
struct msghdr msg;
struct kvec iov;
int count = 0;
int ret;

spin_lock_bh(&con->outqueue_lock);

while (!list_empty(queue)) {
e = list_first_entry(queue, struct outqueue_entry, list);

evt = &e->evt;
spin_unlock_bh(&con->outqueue_lock);

if (e->evt == TIPC_SUBSCR_TIMEOUT) {
evt = (struct tipc_event *)e->iov.iov_base;
if (e->inactive)
tipc_con_delete_sub(con, &evt->s);
}

memset(&msg, 0, sizeof(msg));
msg.msg_flags = MSG_DONTWAIT;
iov.iov_base = evt;
iov.iov_len = sizeof(*evt);
msg.msg_name = NULL;

if (con->sock) {
ret = kernel_sendmsg(con->sock, &msg, &e->iov, 1,
e->iov.iov_len);
ret = kernel_sendmsg(con->sock, &msg, &iov,
1, sizeof(*evt));
if (ret == -EWOULDBLOCK || ret == 0) {
cond_resched();
goto out;
} else if (ret < 0) {
goto send_err;
goto err;
}
} else {
evt = e->iov.iov_base;
tipc_send_kern_top_evt(srv->net, evt);
}

Expand All @@ -625,13 +581,12 @@ static void tipc_send_to_sock(struct tipc_conn *con)
}
spin_lock_bh(&con->outqueue_lock);
list_del(&e->list);
tipc_free_entry(e);
kfree(e);
}
spin_unlock_bh(&con->outqueue_lock);
out:
return;

send_err:
err:
tipc_close_conn(con);
}

Expand Down Expand Up @@ -695,22 +650,14 @@ int tipc_server_start(struct tipc_server *s)
idr_init(&s->conn_idr);
s->idr_in_use = 0;

s->rcvbuf_cache = kmem_cache_create(s->name, s->max_rcvbuf_size,
0, SLAB_HWCACHE_ALIGN, NULL);
if (!s->rcvbuf_cache)
return -ENOMEM;

ret = tipc_work_start(s);
if (ret < 0) {
kmem_cache_destroy(s->rcvbuf_cache);
if (ret < 0)
return ret;
}

ret = tipc_open_listening_sock(s);
if (ret < 0) {
if (ret < 0)
tipc_work_stop(s);
kmem_cache_destroy(s->rcvbuf_cache);
return ret;
}

return ret;
}

Expand All @@ -731,6 +678,5 @@ void tipc_server_stop(struct tipc_server *s)
spin_unlock_bh(&s->idr_lock);

tipc_work_stop(s);
kmem_cache_destroy(s->rcvbuf_cache);
idr_destroy(&s->conn_idr);
}
12 changes: 3 additions & 9 deletions net/tipc/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
#ifndef _TIPC_SERVER_H
#define _TIPC_SERVER_H

#include "core.h"
#include <linux/idr.h>
#include <linux/tipc.h>
#include <net/net_namespace.h>
Expand Down Expand Up @@ -68,27 +69,20 @@ struct tipc_server {
spinlock_t idr_lock;
int idr_in_use;
struct net *net;
struct kmem_cache *rcvbuf_cache;
struct workqueue_struct *rcv_wq;
struct workqueue_struct *send_wq;
int max_rcvbuf_size;
struct sockaddr_tipc *saddr;
char name[TIPC_SERVER_NAME_LEN];
};

int tipc_conn_sendmsg(struct tipc_server *s, int conid,
u32 evt, void *data, size_t len);
void tipc_conn_queue_evt(struct tipc_server *s, int conid,
u32 event, struct tipc_event *evt);

bool tipc_topsrv_kern_subscr(struct net *net, u32 port, u32 type, u32 lower,
u32 upper, u32 filter, int *conid);
void tipc_topsrv_kern_unsubscr(struct net *net, int conid);

/**
* tipc_conn_terminate - terminate connection with server
*
* Note: Must call it in process context since it might sleep
*/
void tipc_conn_terminate(struct tipc_server *s, int conid);
int tipc_server_start(struct tipc_server *s);

void tipc_server_stop(struct tipc_server *s);
Expand Down
Loading

0 comments on commit 414574a

Please sign in to comment.