io_uring: reduce scheduling due to tw
authorPavel Begunkov <asml.silence@gmail.com>
Thu, 6 Apr 2023 13:20:12 +0000 (14:20 +0100)
committerJens Axboe <axboe@kernel.dk>
Thu, 6 Apr 2023 22:23:28 +0000 (16:23 -0600)
Every task_work will try to wake the task to be executed, which causes
excessive scheduling and additional overhead. For some tw it's
justified, but others won't do much but post a single CQE.

When a task waits for multiple cqes, every such task_work will wake it
up. Instead, the task may give a hint about how many cqes it waits for,
io_req_local_work_add() will compare against it and skip wake ups
if #cqes + #tw is not enough to satisfy the waiting condition. Task_work
that uses the optimisation should be simple enough and never post more
than one CQE. It's also ignored for non DEFER_TASKRUN rings.

Signed-off-by: Pavel Begunkov <asml.silence@gmail.com>
Link: https://lore.kernel.org/r/d2b77e99d1e86624d8a69f7037d764b739dcd225.1680782017.git.asml.silence@gmail.com
Signed-off-by: Jens Axboe <axboe@kernel.dk>
include/linux/io_uring_types.h
io_uring/io_uring.c
io_uring/io_uring.h
io_uring/notif.c
io_uring/notif.h
io_uring/rw.c

index 4a6ce03..fa621a5 100644 (file)
@@ -296,7 +296,7 @@ struct io_ring_ctx {
                spinlock_t              completion_lock;
 
                bool                    poll_multi_queue;
-               bool                    cq_waiting;
+               atomic_t                cq_wait_nr;
 
                /*
                 * ->iopoll_list is protected by the ctx->uring_lock for
@@ -566,6 +566,7 @@ struct io_kiocb {
        atomic_t                        refs;
        atomic_t                        poll_refs;
        struct io_task_work             io_task_work;
+       unsigned                        nr_tw;
        /* for polled requests, i.e. IORING_OP_POLL_ADD and async armed poll */
        union {
                struct hlist_node       hash_node;
index 786ecfa..8a327a8 100644 (file)
@@ -1300,35 +1300,59 @@ static __cold void io_fallback_tw(struct io_uring_task *tctx)
        }
 }
 
-static void io_req_local_work_add(struct io_kiocb *req)
+static void io_req_local_work_add(struct io_kiocb *req, unsigned flags)
 {
        struct io_ring_ctx *ctx = req->ctx;
+       unsigned nr_wait, nr_tw, nr_tw_prev;
        struct llist_node *first;
 
+       if (req->flags & (REQ_F_LINK | REQ_F_HARDLINK))
+               flags &= ~IOU_F_TWQ_LAZY_WAKE;
+
        first = READ_ONCE(ctx->work_llist.first);
        do {
+               nr_tw_prev = 0;
+               if (first) {
+                       struct io_kiocb *first_req = container_of(first,
+                                                       struct io_kiocb,
+                                                       io_task_work.node);
+                       /*
+                        * Might be executed at any moment, rely on
+                        * SLAB_TYPESAFE_BY_RCU to keep it alive.
+                        */
+                       nr_tw_prev = READ_ONCE(first_req->nr_tw);
+               }
+               nr_tw = nr_tw_prev + 1;
+               /* Large enough to fail the nr_wait comparison below */
+               if (!(flags & IOU_F_TWQ_LAZY_WAKE))
+                       nr_tw = -1U;
+
+               req->nr_tw = nr_tw;
                req->io_task_work.node.next = first;
        } while (!try_cmpxchg(&ctx->work_llist.first, &first,
                              &req->io_task_work.node));
 
-       if (first)
-               return;
-
-       /* needed for the following wake up */
-       smp_mb__after_atomic();
-
-       if (unlikely(atomic_read(&req->task->io_uring->in_cancel))) {
-               io_move_task_work_from_local(ctx);
-               return;
+       if (!first) {
+               if (unlikely(atomic_read(&req->task->io_uring->in_cancel))) {
+                       io_move_task_work_from_local(ctx);
+                       return;
+               }
+               if (ctx->flags & IORING_SETUP_TASKRUN_FLAG)
+                       atomic_or(IORING_SQ_TASKRUN, &ctx->rings->sq_flags);
+               if (ctx->has_evfd)
+                       io_eventfd_signal(ctx);
        }
 
-       if (ctx->flags & IORING_SETUP_TASKRUN_FLAG)
-               atomic_or(IORING_SQ_TASKRUN, &ctx->rings->sq_flags);
-       if (ctx->has_evfd)
-               io_eventfd_signal(ctx);
-
-       if (READ_ONCE(ctx->cq_waiting))
-               wake_up_state(ctx->submitter_task, TASK_INTERRUPTIBLE);
+       nr_wait = atomic_read(&ctx->cq_wait_nr);
+       /* no one is waiting */
+       if (!nr_wait)
+               return;
+       /* either not enough or the previous add has already woken it up */
+       if (nr_wait > nr_tw || nr_tw_prev >= nr_wait)
+               return;
+       /* pairs with set_current_state() in io_cqring_wait() */
+       smp_mb__after_atomic();
+       wake_up_state(ctx->submitter_task, TASK_INTERRUPTIBLE);
 }
 
 void __io_req_task_work_add(struct io_kiocb *req, unsigned flags)
@@ -1339,7 +1363,7 @@ void __io_req_task_work_add(struct io_kiocb *req, unsigned flags)
        if (!(flags & IOU_F_TWQ_FORCE_NORMAL) &&
            (ctx->flags & IORING_SETUP_DEFER_TASKRUN)) {
                rcu_read_lock();
-               io_req_local_work_add(req);
+               io_req_local_work_add(req, flags);
                rcu_read_unlock();
                return;
        }
@@ -2625,7 +2649,9 @@ static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events,
                unsigned long check_cq;
 
                if (ctx->flags & IORING_SETUP_DEFER_TASKRUN) {
-                       WRITE_ONCE(ctx->cq_waiting, 1);
+                       int nr_wait = (int) iowq.cq_tail - READ_ONCE(ctx->rings->cq.tail);
+
+                       atomic_set(&ctx->cq_wait_nr, nr_wait);
                        set_current_state(TASK_INTERRUPTIBLE);
                } else {
                        prepare_to_wait_exclusive(&ctx->cq_wait, &iowq.wq,
@@ -2634,7 +2660,7 @@ static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events,
 
                ret = io_cqring_wait_schedule(ctx, &iowq);
                __set_current_state(TASK_RUNNING);
-               WRITE_ONCE(ctx->cq_waiting, 0);
+               atomic_set(&ctx->cq_wait_nr, 0);
 
                if (ret < 0)
                        break;
@@ -4517,7 +4543,7 @@ static int __init io_uring_init(void)
        io_uring_optable_init();
 
        req_cachep = KMEM_CACHE(io_kiocb, SLAB_HWCACHE_ALIGN | SLAB_PANIC |
-                               SLAB_ACCOUNT);
+                               SLAB_ACCOUNT | SLAB_TYPESAFE_BY_RCU);
        return 0;
 };
 __initcall(io_uring_init);
index cb4309a..ef449e4 100644 (file)
 enum {
        /* don't use deferred task_work */
        IOU_F_TWQ_FORCE_NORMAL                  = 1,
+
+       /*
+        * A hint to not wake right away but delay until there are enough of
+        * tw's queued to match the number of CQEs the task is waiting for.
+        *
+        * Must not be used wirh requests generating more than one CQE.
+        * It's also ignored unless IORING_SETUP_DEFER_TASKRUN is set.
+        */
+       IOU_F_TWQ_LAZY_WAKE                     = 2,
 };
 
 enum {
index 172105e..e1846a2 100644 (file)
@@ -31,7 +31,7 @@ static void io_tx_ubuf_callback(struct sk_buff *skb, struct ubuf_info *uarg,
        struct io_kiocb *notif = cmd_to_io_kiocb(nd);
 
        if (refcount_dec_and_test(&uarg->refcnt))
-               io_req_task_work_add(notif);
+               __io_req_task_work_add(notif, IOU_F_TWQ_LAZY_WAKE);
 }
 
 static void io_tx_ubuf_callback_ext(struct sk_buff *skb, struct ubuf_info *uarg,
index c88c800..6dd1b30 100644 (file)
@@ -33,7 +33,7 @@ static inline void io_notif_flush(struct io_kiocb *notif)
 
        /* drop slot's master ref */
        if (refcount_dec_and_test(&nd->uarg.refcnt))
-               io_req_task_work_add(notif);
+               __io_req_task_work_add(notif, IOU_F_TWQ_LAZY_WAKE);
 }
 
 static inline int io_notif_account_mem(struct io_kiocb *notif, unsigned len)
index f148686..6c7d265 100644 (file)
@@ -304,7 +304,7 @@ static void io_complete_rw(struct kiocb *kiocb, long res)
                return;
        io_req_set_res(req, io_fixup_rw_res(req, res), 0);
        req->io_task_work.func = io_req_rw_complete;
-       io_req_task_work_add(req);
+       __io_req_task_work_add(req, IOU_F_TWQ_LAZY_WAKE);
 }
 
 static void io_complete_rw_iopoll(struct kiocb *kiocb, long res)