Skip to content

Commit

Permalink
---
Browse files Browse the repository at this point in the history
yaml
---
r: 46290
b: refs/heads/master
c: 1d6e813
h: refs/heads/master
v: v3
  • Loading branch information
Patrick Caulfield authored and Steven Whitehouse committed Feb 5, 2007
1 parent 2a6d902 commit 338e005
Show file tree
Hide file tree
Showing 3 changed files with 116 additions and 285 deletions.
2 changes: 1 addition & 1 deletion [refs]
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
---
refs/heads/master: 03dc6a538e42bcc8d5dfabcee208b639db85a80c
refs/heads/master: 1d6e8131cf0064ef5ab5f3411a82b800afbfadee
145 changes: 64 additions & 81 deletions trunk/fs/dlm/lowcomms-sctp.c
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ struct nodeinfo {
struct list_head writequeue; /* outgoing writequeue_entries */
spinlock_t writequeue_lock;
int nodeid;
struct work_struct swork; /* Send workqueue */
struct work_struct lwork; /* Locking workqueue */
};

static DEFINE_IDR(nodeinfo_idr);
Expand All @@ -96,6 +98,7 @@ struct connection {
atomic_t waiting_requests;
struct cbuf cb;
int eagain_flag;
struct work_struct work; /* Send workqueue */
};

/* An entry waiting to be sent */
Expand Down Expand Up @@ -137,19 +140,23 @@ static void cbuf_eat(struct cbuf *cb, int n)
static LIST_HEAD(write_nodes);
static DEFINE_SPINLOCK(write_nodes_lock);


/* Maximum number of incoming messages to process before
* doing a schedule()
*/
#define MAX_RX_MSG_COUNT 25

/* Manage daemons */
static struct task_struct *recv_task;
static struct task_struct *send_task;
static DECLARE_WAIT_QUEUE_HEAD(lowcomms_recv_wait);
/* Work queues */
static struct workqueue_struct *recv_workqueue;
static struct workqueue_struct *send_workqueue;
static struct workqueue_struct *lock_workqueue;

/* The SCTP connection */
static struct connection sctp_con;

static void process_send_sockets(struct work_struct *work);
static void process_recv_sockets(struct work_struct *work);
static void process_lock_request(struct work_struct *work);

static int nodeid_to_addr(int nodeid, struct sockaddr *retaddr)
{
Expand Down Expand Up @@ -222,6 +229,8 @@ static struct nodeinfo *nodeid2nodeinfo(int nodeid, gfp_t alloc)
spin_lock_init(&ni->lock);
INIT_LIST_HEAD(&ni->writequeue);
spin_lock_init(&ni->writequeue_lock);
INIT_WORK(&ni->lwork, process_lock_request);
INIT_WORK(&ni->swork, process_send_sockets);
ni->nodeid = nodeid;

if (nodeid > max_nodeid)
Expand Down Expand Up @@ -249,11 +258,8 @@ static struct nodeinfo *assoc2nodeinfo(sctp_assoc_t assoc)
/* Data or notification available on socket */
static void lowcomms_data_ready(struct sock *sk, int count_unused)
{
atomic_inc(&sctp_con.waiting_requests);
if (test_and_set_bit(CF_READ_PENDING, &sctp_con.flags))
return;

wake_up_interruptible(&lowcomms_recv_wait);
queue_work(recv_workqueue, &sctp_con.work);
}


Expand Down Expand Up @@ -361,10 +367,10 @@ static void init_failed(void)
spin_lock_bh(&write_nodes_lock);
list_add_tail(&ni->write_list, &write_nodes);
spin_unlock_bh(&write_nodes_lock);
queue_work(send_workqueue, &ni->swork);
}
}
}
wake_up_process(send_task);
}

/* Something happened to an association */
Expand Down Expand Up @@ -446,8 +452,8 @@ static void process_sctp_notification(struct msghdr *msg, char *buf)
spin_lock_bh(&write_nodes_lock);
list_add_tail(&ni->write_list, &write_nodes);
spin_unlock_bh(&write_nodes_lock);
queue_work(send_workqueue, &ni->swork);
}
wake_up_process(send_task);
}
break;

Expand Down Expand Up @@ -580,8 +586,8 @@ static int receive_from_sock(void)
spin_lock_bh(&write_nodes_lock);
list_add_tail(&ni->write_list, &write_nodes);
spin_unlock_bh(&write_nodes_lock);
queue_work(send_workqueue, &ni->swork);
}
wake_up_process(send_task);
}
}

Expand All @@ -590,6 +596,7 @@ static int receive_from_sock(void)
return 0;

cbuf_add(&sctp_con.cb, ret);
// PJC: TODO: Add to node's workqueue....can we ??
ret = dlm_process_incoming_buffer(cpu_to_le32(sinfo->sinfo_ppid),
page_address(sctp_con.rx_page),
sctp_con.cb.base, sctp_con.cb.len,
Expand Down Expand Up @@ -820,7 +827,8 @@ void dlm_lowcomms_commit_buffer(void *arg)
spin_lock_bh(&write_nodes_lock);
list_add_tail(&ni->write_list, &write_nodes);
spin_unlock_bh(&write_nodes_lock);
wake_up_process(send_task);

queue_work(send_workqueue, &ni->swork);
}
return;

Expand Down Expand Up @@ -1088,101 +1096,75 @@ int dlm_lowcomms_close(int nodeid)
return 0;
}

static int write_list_empty(void)
// PJC: The work queue function for receiving.
static void process_recv_sockets(struct work_struct *work)
{
int status;
if (test_and_clear_bit(CF_READ_PENDING, &sctp_con.flags)) {
int ret;
int count = 0;

spin_lock_bh(&write_nodes_lock);
status = list_empty(&write_nodes);
spin_unlock_bh(&write_nodes_lock);
do {
ret = receive_from_sock();

return status;
/* Don't starve out everyone else */
if (++count >= MAX_RX_MSG_COUNT) {
cond_resched();
count = 0;
}
} while (!kthread_should_stop() && ret >=0);
}
cond_resched();
}

static int dlm_recvd(void *data)
// PJC: the work queue function for sending
static void process_send_sockets(struct work_struct *work)
{
DECLARE_WAITQUEUE(wait, current);

while (!kthread_should_stop()) {
int count = 0;

set_current_state(TASK_INTERRUPTIBLE);
add_wait_queue(&lowcomms_recv_wait, &wait);
if (!test_bit(CF_READ_PENDING, &sctp_con.flags))
schedule();
remove_wait_queue(&lowcomms_recv_wait, &wait);
set_current_state(TASK_RUNNING);

if (test_and_clear_bit(CF_READ_PENDING, &sctp_con.flags)) {
int ret;

do {
ret = receive_from_sock();

/* Don't starve out everyone else */
if (++count >= MAX_RX_MSG_COUNT) {
cond_resched();
count = 0;
}
} while (!kthread_should_stop() && ret >=0);
}
cond_resched();
if (sctp_con.eagain_flag) {
sctp_con.eagain_flag = 0;
refill_write_queue();
}

return 0;
process_output_queue();
}

static int dlm_sendd(void *data)
// PJC: Process lock requests from a particular node.
// TODO: can we optimise this out on UP ??
static void process_lock_request(struct work_struct *work)
{
DECLARE_WAITQUEUE(wait, current);

add_wait_queue(sctp_con.sock->sk->sk_sleep, &wait);

while (!kthread_should_stop()) {
set_current_state(TASK_INTERRUPTIBLE);
if (write_list_empty())
schedule();
set_current_state(TASK_RUNNING);

if (sctp_con.eagain_flag) {
sctp_con.eagain_flag = 0;
refill_write_queue();
}
process_output_queue();
}

remove_wait_queue(sctp_con.sock->sk->sk_sleep, &wait);

return 0;
}

static void daemons_stop(void)
{
kthread_stop(recv_task);
kthread_stop(send_task);
destroy_workqueue(recv_workqueue);
destroy_workqueue(send_workqueue);
destroy_workqueue(lock_workqueue);
}

static int daemons_start(void)
{
struct task_struct *p;
int error;
recv_workqueue = create_workqueue("dlm_recv");
error = IS_ERR(recv_workqueue);
if (error) {
log_print("can't start dlm_recv %d", error);
return error;
}

p = kthread_run(dlm_recvd, NULL, "dlm_recvd");
error = IS_ERR(p);
send_workqueue = create_singlethread_workqueue("dlm_send");
error = IS_ERR(send_workqueue);
if (error) {
log_print("can't start dlm_recvd %d", error);
log_print("can't start dlm_send %d", error);
destroy_workqueue(recv_workqueue);
return error;
}
recv_task = p;

p = kthread_run(dlm_sendd, NULL, "dlm_sendd");
error = IS_ERR(p);
lock_workqueue = create_workqueue("dlm_rlock");
error = IS_ERR(lock_workqueue);
if (error) {
log_print("can't start dlm_sendd %d", error);
kthread_stop(recv_task);
log_print("can't start dlm_rlock %d", error);
destroy_workqueue(send_workqueue);
destroy_workqueue(recv_workqueue);
return error;
}
send_task = p;

return 0;
}
Expand All @@ -1194,6 +1176,8 @@ int dlm_lowcomms_start(void)
{
int error;

INIT_WORK(&sctp_con.work, process_recv_sockets);

error = init_sock();
if (error)
goto fail_sock;
Expand Down Expand Up @@ -1224,4 +1208,3 @@ void dlm_lowcomms_stop(void)
for (i = 0; i < dlm_local_count; i++)
kfree(dlm_local_addr[i]);
}

Loading

0 comments on commit 338e005

Please sign in to comment.