drbd: Improve asender performance
authorLars Ellenberg <lars@linbit.com>
Thu, 11 Sep 2014 12:29:11 +0000 (14:29 +0200)
committerJens Axboe <axboe@fb.com>
Thu, 11 Sep 2014 14:41:29 +0000 (08:41 -0600)
Shorten receive path in the asender thread. Reduces CPU utilisation
of asender when receiving packets, and with that increases IOPs.

Signed-off-by: Philipp Reisner <philipp.reisner@linbit.com>
Signed-off-by: Lars Ellenberg <lars.ellenberg@linbit.com>
Signed-off-by: Jens Axboe <axboe@fb.com>
drivers/block/drbd/drbd_receiver.c
drivers/block/drbd/drbd_worker.c

index 3ae769e..6960fb0 100644 (file)
@@ -5561,6 +5561,7 @@ int drbd_asender(struct drbd_thread *thi)
                 * rv <  expected: "woken" by signal during receive
                 * rv == 0       : "connection shut down by peer"
                 */
+received_more:
                if (likely(rv > 0)) {
                        received += rv;
                        buf      += rv;
@@ -5636,6 +5637,11 @@ int drbd_asender(struct drbd_thread *thi)
                        expect   = header_size;
                        cmd      = NULL;
                }
+               if (test_bit(SEND_PING, &connection->flags))
+                       continue;
+               rv = drbd_recv_short(connection->meta.socket, buf, expect-received, MSG_DONTWAIT);
+               if (rv > 0)
+                       goto received_more;
        }
 
        if (0) {
index 3b74f08..3ed2d87 100644 (file)
@@ -1994,22 +1994,13 @@ static bool dequeue_work_batch(struct drbd_work_queue *queue, struct list_head *
        return !list_empty(work_list);
 }
 
-static bool dequeue_work_item(struct drbd_work_queue *queue, struct list_head *work_list)
-{
-       spin_lock_irq(&queue->q_lock);
-       if (!list_empty(&queue->q))
-               list_move(queue->q.next, work_list);
-       spin_unlock_irq(&queue->q_lock);
-       return !list_empty(work_list);
-}
-
 static void wait_for_work(struct drbd_connection *connection, struct list_head *work_list)
 {
        DEFINE_WAIT(wait);
        struct net_conf *nc;
        int uncork, cork;
 
-       dequeue_work_item(&connection->sender_work, work_list);
+       dequeue_work_batch(&connection->sender_work, work_list);
        if (!list_empty(work_list))
                return;