Skip to content

Commit

Permalink
---
Browse files Browse the repository at this point in the history
yaml
---
r: 346207
b: refs/heads/master
c: 8c0785a
h: refs/heads/master
i:
  346205: 944768b
  346203: a5c0b3d
  346199: b337a6e
  346191: b0f57b0
  346175: d2abf8c
v: v3
  • Loading branch information
Lars Ellenberg authored and Philipp Reisner committed Nov 8, 2012
1 parent 9926034 commit 572805e
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 56 deletions.
2 changes: 1 addition & 1 deletion [refs]
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
---
refs/heads/master: b379c41ed78e83c4443fca4dbfbc358c19e4f24c
refs/heads/master: 8c0785a5c9a0f2472aff68dc32247be01728c416
8 changes: 3 additions & 5 deletions trunk/drivers/block/drbd/drbd_int.h
Original file line number Diff line number Diff line change
Expand Up @@ -735,8 +735,8 @@ enum bm_flag {

struct drbd_work_queue {
struct list_head q;
struct semaphore s; /* producers up it, worker down()s it */
spinlock_t q_lock; /* to protect the list. */
wait_queue_head_t q_wait;
};

struct drbd_socket {
Expand Down Expand Up @@ -1832,9 +1832,8 @@ drbd_queue_work_front(struct drbd_work_queue *q, struct drbd_work *w)
unsigned long flags;
spin_lock_irqsave(&q->q_lock, flags);
list_add(&w->list, &q->q);
up(&q->s); /* within the spinlock,
see comment near end of drbd_worker() */
spin_unlock_irqrestore(&q->q_lock, flags);
wake_up(&q->q_wait);
}

static inline void
Expand All @@ -1843,9 +1842,8 @@ drbd_queue_work(struct drbd_work_queue *q, struct drbd_work *w)
unsigned long flags;
spin_lock_irqsave(&q->q_lock, flags);
list_add_tail(&w->list, &q->q);
up(&q->s); /* within the spinlock,
see comment near end of drbd_worker() */
spin_unlock_irqrestore(&q->q_lock, flags);
wake_up(&q->q_wait);
}

static inline void wake_asender(struct drbd_tconn *tconn)
Expand Down
2 changes: 1 addition & 1 deletion trunk/drivers/block/drbd/drbd_main.c
Original file line number Diff line number Diff line change
Expand Up @@ -2535,9 +2535,9 @@ static int drbd_congested(void *congested_data, int bdi_bits)

static void drbd_init_workqueue(struct drbd_work_queue* wq)
{
sema_init(&wq->s, 0);
spin_lock_init(&wq->q_lock);
INIT_LIST_HEAD(&wq->q);
init_waitqueue_head(&wq->q_wait);
}

struct drbd_tconn *conn_get_by_name(const char *name)
Expand Down
88 changes: 39 additions & 49 deletions trunk/drivers/block/drbd/drbd_worker.c
Original file line number Diff line number Diff line change
Expand Up @@ -1673,22 +1673,45 @@ void drbd_start_resync(struct drbd_conf *mdev, enum drbd_conns side)
mutex_unlock(mdev->state_mutex);
}

bool dequeue_work_batch(struct drbd_work_queue *queue, struct list_head *work_list)
{
spin_lock_irq(&queue->q_lock);
list_splice_init(&queue->q, work_list);
spin_unlock_irq(&queue->q_lock);
return !list_empty(work_list);
}

bool dequeue_work_item(struct drbd_work_queue *queue, struct list_head *work_list)
{
spin_lock_irq(&queue->q_lock);
if (!list_empty(&queue->q))
list_move(queue->q.next, work_list);
spin_unlock_irq(&queue->q_lock);
return !list_empty(work_list);
}

int drbd_worker(struct drbd_thread *thi)
{
struct drbd_tconn *tconn = thi->tconn;
struct drbd_work *w = NULL;
struct drbd_conf *mdev;
struct net_conf *nc;
LIST_HEAD(work_list);
int vnr, intr = 0;
int vnr;
int cork;

while (get_t_state(thi) == RUNNING) {
drbd_thread_current_set_cpu(thi);

if (down_trylock(&tconn->data.work.s)) {
mutex_lock(&tconn->data.mutex);
/* as long as we use drbd_queue_work_front(),
* we may only dequeue single work items here, not batches. */
if (list_empty(&work_list))
dequeue_work_item(&tconn->data.work, &work_list);

/* Still nothing to do? Poke TCP, just in case,
* then wait for new work (or signal). */
if (list_empty(&work_list)) {
mutex_lock(&tconn->data.mutex);
rcu_read_lock();
nc = rcu_dereference(tconn->net_conf);
cork = nc ? nc->tcp_cork : 0;
Expand All @@ -1698,15 +1721,16 @@ int drbd_worker(struct drbd_thread *thi)
drbd_tcp_uncork(tconn->data.socket);
mutex_unlock(&tconn->data.mutex);

intr = down_interruptible(&tconn->data.work.s);
wait_event_interruptible(tconn->data.work.q_wait,
dequeue_work_item(&tconn->data.work, &work_list));

mutex_lock(&tconn->data.mutex);
if (tconn->data.socket && cork)
if (tconn->data.socket && cork)
drbd_tcp_cork(tconn->data.socket);
mutex_unlock(&tconn->data.mutex);
}

if (intr) {
if (signal_pending(current)) {
flush_signals(current);
if (get_t_state(thi) == RUNNING) {
conn_warn(tconn, "Worker got an unexpected signal\n");
Expand All @@ -1717,59 +1741,25 @@ int drbd_worker(struct drbd_thread *thi)

if (get_t_state(thi) != RUNNING)
break;
/* With this break, we have done a down() but not consumed
the entry from the list. The cleanup code takes care of
this... */

w = NULL;
spin_lock_irq(&tconn->data.work.q_lock);
if (list_empty(&tconn->data.work.q)) {
/* something terribly wrong in our logic.
* we were able to down() the semaphore,
* but the list is empty... doh.
*
* what is the best thing to do now?
* try again from scratch, restarting the receiver,
* asender, whatnot? could break even more ugly,
* e.g. when we are primary, but no good local data.
*
* I'll try to get away just starting over this loop.
*/
conn_warn(tconn, "Work list unexpectedly empty\n");
spin_unlock_irq(&tconn->data.work.q_lock);
continue;
}
w = list_entry(tconn->data.work.q.next, struct drbd_work, list);
list_del_init(&w->list);
spin_unlock_irq(&tconn->data.work.q_lock);

if (w->cb(w, tconn->cstate < C_WF_REPORT_PARAMS)) {
/* dev_warn(DEV, "worker: a callback failed! \n"); */
while (!list_empty(&work_list)) {
w = list_first_entry(&work_list, struct drbd_work, list);
list_del_init(&w->list);
if (w->cb(w, tconn->cstate < C_WF_REPORT_PARAMS) == 0)
continue;
if (tconn->cstate >= C_WF_REPORT_PARAMS)
conn_request_state(tconn, NS(conn, C_NETWORK_FAILURE), CS_HARD);
}
}

spin_lock_irq(&tconn->data.work.q_lock);
while (!list_empty(&tconn->data.work.q)) {
list_splice_init(&tconn->data.work.q, &work_list);
spin_unlock_irq(&tconn->data.work.q_lock);

do {
while (!list_empty(&work_list)) {
w = list_entry(work_list.next, struct drbd_work, list);
w = list_first_entry(&work_list, struct drbd_work, list);
list_del_init(&w->list);
w->cb(w, 1);
}

spin_lock_irq(&tconn->data.work.q_lock);
}
sema_init(&tconn->data.work.s, 0);
/* DANGEROUS race: if someone did queue his work within the spinlock,
* but up() ed outside the spinlock, we could get an up() on the
* semaphore without corresponding list entry.
* So don't do that.
*/
spin_unlock_irq(&tconn->data.work.q_lock);
dequeue_work_batch(&tconn->data.work, &work_list);
} while (!list_empty(&work_list));

rcu_read_lock();
idr_for_each_entry(&tconn->volumes, mdev, vnr) {
Expand Down

0 comments on commit 572805e

Please sign in to comment.