Skip to content

Commit

Permalink
[DLM] Use workqueues for dlm lowcomms
Browse files Browse the repository at this point in the history
This patch converts the DLM TCP lowcomms to use workqueues rather than using its
own daemon functions. Simultaneously removing a lot of code and making it more
scalable on multi-processor machines.

Signed-Off-By: Patrick Caulfield <pcaulfie@redhat.com>
Signed-off-by: Steven Whitehouse <swhiteho@redhat.com>
  • Loading branch information
Patrick Caulfield authored and Steven Whitehouse committed Feb 5, 2007
1 parent 03dc6a5 commit 1d6e813
Show file tree
Hide file tree
Showing 2 changed files with 115 additions and 284 deletions.
145 changes: 64 additions & 81 deletions fs/dlm/lowcomms-sctp.c
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ struct nodeinfo {
struct list_head writequeue; /* outgoing writequeue_entries */
spinlock_t writequeue_lock;
int nodeid;
struct work_struct swork; /* Send workqueue */
struct work_struct lwork; /* Locking workqueue */
};

static DEFINE_IDR(nodeinfo_idr);
Expand All @@ -96,6 +98,7 @@ struct connection {
atomic_t waiting_requests;
struct cbuf cb;
int eagain_flag;
struct work_struct work; /* Send workqueue */
};

/* An entry waiting to be sent */
Expand Down Expand Up @@ -137,19 +140,23 @@ static void cbuf_eat(struct cbuf *cb, int n)
static LIST_HEAD(write_nodes);
static DEFINE_SPINLOCK(write_nodes_lock);


/* Maximum number of incoming messages to process before
* doing a schedule()
*/
#define MAX_RX_MSG_COUNT 25

/* Manage daemons */
static struct task_struct *recv_task;
static struct task_struct *send_task;
static DECLARE_WAIT_QUEUE_HEAD(lowcomms_recv_wait);
/* Work queues */
static struct workqueue_struct *recv_workqueue;
static struct workqueue_struct *send_workqueue;
static struct workqueue_struct *lock_workqueue;

/* The SCTP connection */
static struct connection sctp_con;

static void process_send_sockets(struct work_struct *work);
static void process_recv_sockets(struct work_struct *work);
static void process_lock_request(struct work_struct *work);

static int nodeid_to_addr(int nodeid, struct sockaddr *retaddr)
{
Expand Down Expand Up @@ -222,6 +229,8 @@ static struct nodeinfo *nodeid2nodeinfo(int nodeid, gfp_t alloc)
spin_lock_init(&ni->lock);
INIT_LIST_HEAD(&ni->writequeue);
spin_lock_init(&ni->writequeue_lock);
INIT_WORK(&ni->lwork, process_lock_request);
INIT_WORK(&ni->swork, process_send_sockets);
ni->nodeid = nodeid;

if (nodeid > max_nodeid)
Expand Down Expand Up @@ -249,11 +258,8 @@ static struct nodeinfo *assoc2nodeinfo(sctp_assoc_t assoc)
/* Data or notification available on socket */
static void lowcomms_data_ready(struct sock *sk, int count_unused)
{
atomic_inc(&sctp_con.waiting_requests);
if (test_and_set_bit(CF_READ_PENDING, &sctp_con.flags))
return;

wake_up_interruptible(&lowcomms_recv_wait);
queue_work(recv_workqueue, &sctp_con.work);
}


Expand Down Expand Up @@ -361,10 +367,10 @@ static void init_failed(void)
spin_lock_bh(&write_nodes_lock);
list_add_tail(&ni->write_list, &write_nodes);
spin_unlock_bh(&write_nodes_lock);
queue_work(send_workqueue, &ni->swork);
}
}
}
wake_up_process(send_task);
}

/* Something happened to an association */
Expand Down Expand Up @@ -446,8 +452,8 @@ static void process_sctp_notification(struct msghdr *msg, char *buf)
spin_lock_bh(&write_nodes_lock);
list_add_tail(&ni->write_list, &write_nodes);
spin_unlock_bh(&write_nodes_lock);
queue_work(send_workqueue, &ni->swork);
}
wake_up_process(send_task);
}
break;

Expand Down Expand Up @@ -580,8 +586,8 @@ static int receive_from_sock(void)
spin_lock_bh(&write_nodes_lock);
list_add_tail(&ni->write_list, &write_nodes);
spin_unlock_bh(&write_nodes_lock);
queue_work(send_workqueue, &ni->swork);
}
wake_up_process(send_task);
}
}

Expand All @@ -590,6 +596,7 @@ static int receive_from_sock(void)
return 0;

cbuf_add(&sctp_con.cb, ret);
// PJC: TODO: Add to node's workqueue....can we ??
ret = dlm_process_incoming_buffer(cpu_to_le32(sinfo->sinfo_ppid),
page_address(sctp_con.rx_page),
sctp_con.cb.base, sctp_con.cb.len,
Expand Down Expand Up @@ -820,7 +827,8 @@ void dlm_lowcomms_commit_buffer(void *arg)
spin_lock_bh(&write_nodes_lock);
list_add_tail(&ni->write_list, &write_nodes);
spin_unlock_bh(&write_nodes_lock);
wake_up_process(send_task);

queue_work(send_workqueue, &ni->swork);
}
return;

Expand Down Expand Up @@ -1088,101 +1096,75 @@ int dlm_lowcomms_close(int nodeid)
return 0;
}

static int write_list_empty(void)
// PJC: The work queue function for receiving.
static void process_recv_sockets(struct work_struct *work)
{
int status;
if (test_and_clear_bit(CF_READ_PENDING, &sctp_con.flags)) {
int ret;
int count = 0;

spin_lock_bh(&write_nodes_lock);
status = list_empty(&write_nodes);
spin_unlock_bh(&write_nodes_lock);
do {
ret = receive_from_sock();

return status;
/* Don't starve out everyone else */
if (++count >= MAX_RX_MSG_COUNT) {
cond_resched();
count = 0;
}
} while (!kthread_should_stop() && ret >=0);
}
cond_resched();
}

static int dlm_recvd(void *data)
// PJC: the work queue function for sending
static void process_send_sockets(struct work_struct *work)
{
DECLARE_WAITQUEUE(wait, current);

while (!kthread_should_stop()) {
int count = 0;

set_current_state(TASK_INTERRUPTIBLE);
add_wait_queue(&lowcomms_recv_wait, &wait);
if (!test_bit(CF_READ_PENDING, &sctp_con.flags))
schedule();
remove_wait_queue(&lowcomms_recv_wait, &wait);
set_current_state(TASK_RUNNING);

if (test_and_clear_bit(CF_READ_PENDING, &sctp_con.flags)) {
int ret;

do {
ret = receive_from_sock();

/* Don't starve out everyone else */
if (++count >= MAX_RX_MSG_COUNT) {
cond_resched();
count = 0;
}
} while (!kthread_should_stop() && ret >=0);
}
cond_resched();
if (sctp_con.eagain_flag) {
sctp_con.eagain_flag = 0;
refill_write_queue();
}

return 0;
process_output_queue();
}

static int dlm_sendd(void *data)
// PJC: Process lock requests from a particular node.
// TODO: can we optimise this out on UP ??
static void process_lock_request(struct work_struct *work)
{
DECLARE_WAITQUEUE(wait, current);

add_wait_queue(sctp_con.sock->sk->sk_sleep, &wait);

while (!kthread_should_stop()) {
set_current_state(TASK_INTERRUPTIBLE);
if (write_list_empty())
schedule();
set_current_state(TASK_RUNNING);

if (sctp_con.eagain_flag) {
sctp_con.eagain_flag = 0;
refill_write_queue();
}
process_output_queue();
}

remove_wait_queue(sctp_con.sock->sk->sk_sleep, &wait);

return 0;
}

static void daemons_stop(void)
{
kthread_stop(recv_task);
kthread_stop(send_task);
destroy_workqueue(recv_workqueue);
destroy_workqueue(send_workqueue);
destroy_workqueue(lock_workqueue);
}

static int daemons_start(void)
{
struct task_struct *p;
int error;
recv_workqueue = create_workqueue("dlm_recv");
error = IS_ERR(recv_workqueue);
if (error) {
log_print("can't start dlm_recv %d", error);
return error;
}

p = kthread_run(dlm_recvd, NULL, "dlm_recvd");
error = IS_ERR(p);
send_workqueue = create_singlethread_workqueue("dlm_send");
error = IS_ERR(send_workqueue);
if (error) {
log_print("can't start dlm_recvd %d", error);
log_print("can't start dlm_send %d", error);
destroy_workqueue(recv_workqueue);
return error;
}
recv_task = p;

p = kthread_run(dlm_sendd, NULL, "dlm_sendd");
error = IS_ERR(p);
lock_workqueue = create_workqueue("dlm_rlock");
error = IS_ERR(lock_workqueue);
if (error) {
log_print("can't start dlm_sendd %d", error);
kthread_stop(recv_task);
log_print("can't start dlm_rlock %d", error);
destroy_workqueue(send_workqueue);
destroy_workqueue(recv_workqueue);
return error;
}
send_task = p;

return 0;
}
Expand All @@ -1194,6 +1176,8 @@ int dlm_lowcomms_start(void)
{
int error;

INIT_WORK(&sctp_con.work, process_recv_sockets);

error = init_sock();
if (error)
goto fail_sock;
Expand Down Expand Up @@ -1224,4 +1208,3 @@ void dlm_lowcomms_stop(void)
for (i = 0; i < dlm_local_count; i++)
kfree(dlm_local_addr[i]);
}

Loading

0 comments on commit 1d6e813

Please sign in to comment.