drbd: allow to dequeue batches of work at a time
authorLars Ellenberg <lars.ellenberg@linbit.com>
Wed, 19 Oct 2011 09:50:57 +0000 (11:50 +0200)
committerPhilipp Reisner <philipp.reisner@linbit.com>
Thu, 8 Nov 2012 15:58:34 +0000 (16:58 +0100)
cherry-picked and adapted from drbd 9 devel branch

In 8.4, we still use drbd_queue_work_front(),
so in normal operation, we can not dequeue batches,
but only single items.

Still, followup commits will wake the worker
without explicitly queueing a work item,
so up() is replaced by a simple wake_up().

Signed-off-by: Philipp Reisner <philipp.reisner@linbit.com>
Signed-off-by: Lars Ellenberg <lars.ellenberg@linbit.com>
drivers/block/drbd/drbd_int.h
drivers/block/drbd/drbd_main.c
drivers/block/drbd/drbd_worker.c

index d7ca76c..e84c7b6 100644 (file)
@@ -735,8 +735,8 @@ enum bm_flag {
 
 struct drbd_work_queue {
        struct list_head q;
-       struct semaphore s; /* producers up it, worker down()s it */
        spinlock_t q_lock;  /* to protect the list. */
+       wait_queue_head_t q_wait;
 };
 
 struct drbd_socket {
@@ -1832,9 +1832,8 @@ drbd_queue_work_front(struct drbd_work_queue *q, struct drbd_work *w)
        unsigned long flags;
        spin_lock_irqsave(&q->q_lock, flags);
        list_add(&w->list, &q->q);
-       up(&q->s); /* within the spinlock,
-                     see comment near end of drbd_worker() */
        spin_unlock_irqrestore(&q->q_lock, flags);
+       wake_up(&q->q_wait);
 }
 
 static inline void
@@ -1843,9 +1842,8 @@ drbd_queue_work(struct drbd_work_queue *q, struct drbd_work *w)
        unsigned long flags;
        spin_lock_irqsave(&q->q_lock, flags);
        list_add_tail(&w->list, &q->q);
-       up(&q->s); /* within the spinlock,
-                     see comment near end of drbd_worker() */
        spin_unlock_irqrestore(&q->q_lock, flags);
+       wake_up(&q->q_wait);
 }
 
 static inline void wake_asender(struct drbd_tconn *tconn)
index bfe6975..f379d33 100644 (file)
@@ -2535,9 +2535,9 @@ out:
 
 static void drbd_init_workqueue(struct drbd_work_queue* wq)
 {
-       sema_init(&wq->s, 0);
        spin_lock_init(&wq->q_lock);
        INIT_LIST_HEAD(&wq->q);
+       init_waitqueue_head(&wq->q_wait);
 }
 
 struct drbd_tconn *conn_get_by_name(const char *name)
index d7573f4..fb2e6c8 100644 (file)
@@ -1673,6 +1673,23 @@ void drbd_start_resync(struct drbd_conf *mdev, enum drbd_conns side)
        mutex_unlock(mdev->state_mutex);
 }
 
+bool dequeue_work_batch(struct drbd_work_queue *queue, struct list_head *work_list)
+{
+       spin_lock_irq(&queue->q_lock);
+       list_splice_init(&queue->q, work_list);
+       spin_unlock_irq(&queue->q_lock);
+       return !list_empty(work_list);
+}
+
+bool dequeue_work_item(struct drbd_work_queue *queue, struct list_head *work_list)
+{
+       spin_lock_irq(&queue->q_lock);
+       if (!list_empty(&queue->q))
+               list_move(queue->q.next, work_list);
+       spin_unlock_irq(&queue->q_lock);
+       return !list_empty(work_list);
+}
+
 int drbd_worker(struct drbd_thread *thi)
 {
        struct drbd_tconn *tconn = thi->tconn;
@@ -1680,15 +1697,21 @@ int drbd_worker(struct drbd_thread *thi)
        struct drbd_conf *mdev;
        struct net_conf *nc;
        LIST_HEAD(work_list);
-       int vnr, intr = 0;
+       int vnr;
        int cork;
 
        while (get_t_state(thi) == RUNNING) {
                drbd_thread_current_set_cpu(thi);
 
-               if (down_trylock(&tconn->data.work.s)) {
-                       mutex_lock(&tconn->data.mutex);
+               /* as long as we use drbd_queue_work_front(),
+                * we may only dequeue single work items here, not batches. */
+               if (list_empty(&work_list))
+                       dequeue_work_item(&tconn->data.work, &work_list);
 
+               /* Still nothing to do? Poke TCP, just in case,
+                * then wait for new work (or signal). */
+               if (list_empty(&work_list)) {
+                       mutex_lock(&tconn->data.mutex);
                        rcu_read_lock();
                        nc = rcu_dereference(tconn->net_conf);
                        cork = nc ? nc->tcp_cork : 0;
@@ -1698,15 +1721,16 @@ int drbd_worker(struct drbd_thread *thi)
                                drbd_tcp_uncork(tconn->data.socket);
                        mutex_unlock(&tconn->data.mutex);
 
-                       intr = down_interruptible(&tconn->data.work.s);
+                       wait_event_interruptible(tconn->data.work.q_wait,
+                               dequeue_work_item(&tconn->data.work, &work_list));
 
                        mutex_lock(&tconn->data.mutex);
-                       if (tconn->data.socket  && cork)
+                       if (tconn->data.socket && cork)
                                drbd_tcp_cork(tconn->data.socket);
                        mutex_unlock(&tconn->data.mutex);
                }
 
-               if (intr) {
+               if (signal_pending(current)) {
                        flush_signals(current);
                        if (get_t_state(thi) == RUNNING) {
                                conn_warn(tconn, "Worker got an unexpected signal\n");
@@ -1717,59 +1741,25 @@ int drbd_worker(struct drbd_thread *thi)
 
                if (get_t_state(thi) != RUNNING)
                        break;
-               /* With this break, we have done a down() but not consumed
-                  the entry from the list. The cleanup code takes care of
-                  this...   */
-
-               w = NULL;
-               spin_lock_irq(&tconn->data.work.q_lock);
-               if (list_empty(&tconn->data.work.q)) {
-                       /* something terribly wrong in our logic.
-                        * we were able to down() the semaphore,
-                        * but the list is empty... doh.
-                        *
-                        * what is the best thing to do now?
-                        * try again from scratch, restarting the receiver,
-                        * asender, whatnot? could break even more ugly,
-                        * e.g. when we are primary, but no good local data.
-                        *
-                        * I'll try to get away just starting over this loop.
-                        */
-                       conn_warn(tconn, "Work list unexpectedly empty\n");
-                       spin_unlock_irq(&tconn->data.work.q_lock);
-                       continue;
-               }
-               w = list_entry(tconn->data.work.q.next, struct drbd_work, list);
-               list_del_init(&w->list);
-               spin_unlock_irq(&tconn->data.work.q_lock);
 
-               if (w->cb(w, tconn->cstate < C_WF_REPORT_PARAMS)) {
-                       /* dev_warn(DEV, "worker: a callback failed! \n"); */
+               while (!list_empty(&work_list)) {
+                       w = list_first_entry(&work_list, struct drbd_work, list);
+                       list_del_init(&w->list);
+                       if (w->cb(w, tconn->cstate < C_WF_REPORT_PARAMS) == 0)
+                               continue;
                        if (tconn->cstate >= C_WF_REPORT_PARAMS)
                                conn_request_state(tconn, NS(conn, C_NETWORK_FAILURE), CS_HARD);
                }
        }
 
-       spin_lock_irq(&tconn->data.work.q_lock);
-       while (!list_empty(&tconn->data.work.q)) {
-               list_splice_init(&tconn->data.work.q, &work_list);
-               spin_unlock_irq(&tconn->data.work.q_lock);
-
+       do {
                while (!list_empty(&work_list)) {
-                       w = list_entry(work_list.next, struct drbd_work, list);
+                       w = list_first_entry(&work_list, struct drbd_work, list);
                        list_del_init(&w->list);
                        w->cb(w, 1);
                }
-
-               spin_lock_irq(&tconn->data.work.q_lock);
-       }
-       sema_init(&tconn->data.work.s, 0);
-       /* DANGEROUS race: if someone did queue his work within the spinlock,
-        * but up() ed outside the spinlock, we could get an up() on the
-        * semaphore without corresponding list entry.
-        * So don't do that.
-        */
-       spin_unlock_irq(&tconn->data.work.q_lock);
+               dequeue_work_batch(&tconn->data.work, &work_list);
+       } while (!list_empty(&work_list));
 
        rcu_read_lock();
        idr_for_each_entry(&tconn->volumes, mdev, vnr) {