io_uring/io-wq: reduce frequency of acct->lock acquisitions
authorJens Axboe <axboe@kernel.dk>
Wed, 9 Aug 2023 18:59:40 +0000 (12:59 -0600)
committerJens Axboe <axboe@kernel.dk>
Fri, 11 Aug 2023 16:36:17 +0000 (10:36 -0600)
When we check if we have work to run, we grab the acct lock, check,
drop it, and then return the result. If we do have work to run, then
running the work will again grab acct->lock and get the work item.

This causes us to grab acct->lock more frequently than we need to.
If we have work to do, have io_acct_run_queue() return with the acct
lock still acquired. io_worker_handle_work() is then always invoked
with the acct lock already held.

In a simple test cases that stats files (IORING_OP_STATX always hits
io-wq), we see a nice reduction in locking overhead with this change:

19.32%   -12.55%  [kernel.kallsyms]      [k] __cmpwait_case_32
20.90%   -12.07%  [kernel.kallsyms]      [k] queued_spin_lock_slowpath

Reviewed-by: Hao Xu <howeyxu@tencent.com>
Signed-off-by: Jens Axboe <axboe@kernel.dk>
io_uring/io-wq.c

index 3e7025b..18a049f 100644 (file)
@@ -232,17 +232,25 @@ static void io_worker_exit(struct io_worker *worker)
        do_exit(0);
 }
 
-static inline bool io_acct_run_queue(struct io_wq_acct *acct)
+static inline bool __io_acct_run_queue(struct io_wq_acct *acct)
 {
-       bool ret = false;
+       return !test_bit(IO_ACCT_STALLED_BIT, &acct->flags) &&
+               !wq_list_empty(&acct->work_list);
+}
 
+/*
+ * If there's work to do, returns true with acct->lock acquired. If not,
+ * returns false with no lock held.
+ */
+static inline bool io_acct_run_queue(struct io_wq_acct *acct)
+       __acquires(&acct->lock)
+{
        raw_spin_lock(&acct->lock);
-       if (!wq_list_empty(&acct->work_list) &&
-           !test_bit(IO_ACCT_STALLED_BIT, &acct->flags))
-               ret = true;
-       raw_spin_unlock(&acct->lock);
+       if (__io_acct_run_queue(acct))
+               return true;
 
-       return ret;
+       raw_spin_unlock(&acct->lock);
+       return false;
 }
 
 /*
@@ -397,6 +405,7 @@ static void io_wq_dec_running(struct io_worker *worker)
        if (!io_acct_run_queue(acct))
                return;
 
+       raw_spin_unlock(&acct->lock);
        atomic_inc(&acct->nr_running);
        atomic_inc(&wq->worker_refs);
        io_queue_worker_create(worker, acct, create_worker_cb);
@@ -521,9 +530,13 @@ static void io_assign_current_work(struct io_worker *worker,
        raw_spin_unlock(&worker->lock);
 }
 
-static void io_worker_handle_work(struct io_worker *worker)
+/*
+ * Called with acct->lock held, drops it before returning
+ */
+static void io_worker_handle_work(struct io_wq_acct *acct,
+                                 struct io_worker *worker)
+       __releases(&acct->lock)
 {
-       struct io_wq_acct *acct = io_wq_get_acct(worker);
        struct io_wq *wq = worker->wq;
        bool do_kill = test_bit(IO_WQ_BIT_EXIT, &wq->state);
 
@@ -537,7 +550,6 @@ static void io_worker_handle_work(struct io_worker *worker)
                 * can't make progress, any work completion or insertion will
                 * clear the stalled flag.
                 */
-               raw_spin_lock(&acct->lock);
                work = io_get_next_work(acct, worker);
                raw_spin_unlock(&acct->lock);
                if (work) {
@@ -591,6 +603,10 @@ static void io_worker_handle_work(struct io_worker *worker)
                                        wake_up(&wq->hash->wait);
                        }
                } while (work);
+
+               if (!__io_acct_run_queue(acct))
+                       break;
+               raw_spin_lock(&acct->lock);
        } while (1);
 }
 
@@ -611,8 +627,13 @@ static int io_wq_worker(void *data)
                long ret;
 
                set_current_state(TASK_INTERRUPTIBLE);
+
+               /*
+                * If we have work to do, io_acct_run_queue() returns with
+                * the acct->lock held. If not, it will drop it.
+                */
                while (io_acct_run_queue(acct))
-                       io_worker_handle_work(worker);
+                       io_worker_handle_work(acct, worker);
 
                raw_spin_lock(&wq->lock);
                /*
@@ -645,8 +666,8 @@ static int io_wq_worker(void *data)
                }
        }
 
-       if (test_bit(IO_WQ_BIT_EXIT, &wq->state))
-               io_worker_handle_work(worker);
+       if (test_bit(IO_WQ_BIT_EXIT, &wq->state) && io_acct_run_queue(acct))
+               io_worker_handle_work(acct, worker);
 
        io_worker_exit(worker);
        return 0;