io_uring: don't export io_put_task()
[platform/kernel/linux-starfive.git] / io_uring / io_uring.c
index 79d9438..f49d003 100644 (file)
@@ -316,6 +316,7 @@ static __cold struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p)
        xa_init_flags(&ctx->personalities, XA_FLAGS_ALLOC1);
        mutex_init(&ctx->uring_lock);
        init_waitqueue_head(&ctx->cq_wait);
+       init_waitqueue_head(&ctx->poll_wq);
        spin_lock_init(&ctx->completion_lock);
        spin_lock_init(&ctx->timeout_lock);
        INIT_WQ_LIST(&ctx->iopoll_list);
@@ -572,6 +573,8 @@ static void io_eventfd_flush_signal(struct io_ring_ctx *ctx)
 
 void __io_commit_cqring_flush(struct io_ring_ctx *ctx)
 {
+       if (ctx->poll_activated)
+               io_poll_wq_wake(ctx);
        if (ctx->off_timeout_used)
                io_flush_timeouts(ctx);
        if (ctx->drain_active) {
@@ -618,6 +621,25 @@ static inline void __io_cq_unlock_post(struct io_ring_ctx *ctx)
        io_cqring_wake(ctx);
 }
 
+static inline void __io_cq_unlock_post_flush(struct io_ring_ctx *ctx)
+       __releases(ctx->completion_lock)
+{
+       io_commit_cqring(ctx);
+       __io_cq_unlock(ctx);
+       io_commit_cqring_flush(ctx);
+
+       /*
+        * As ->task_complete implies that the ring is single tasked, cq_wait
+        * may only be waited on by the current in io_cqring_wait(), but since
+        * it will re-check the wakeup conditions once we return we can safely
+        * skip waking it up.
+        */
+       if (!(ctx->flags & IORING_SETUP_DEFER_TASKRUN)) {
+               smp_mb();
+               __io_cqring_wake(ctx);
+       }
+}
+
 void io_cq_unlock_post(struct io_ring_ctx *ctx)
        __releases(ctx->completion_lock)
 {
@@ -693,7 +715,7 @@ static void io_cqring_overflow_flush(struct io_ring_ctx *ctx)
                io_cqring_do_overflow_flush(ctx);
 }
 
-void __io_put_task(struct task_struct *task, int nr)
+static void __io_put_task(struct task_struct *task, int nr)
 {
        struct io_uring_task *tctx = task->io_uring;
 
@@ -703,6 +725,15 @@ void __io_put_task(struct task_struct *task, int nr)
        put_task_struct_many(task, nr);
 }
 
+/* must to be called somewhat shortly after putting a request */
+static inline void io_put_task(struct task_struct *task, int nr)
+{
+       if (likely(task == current))
+               task->io_uring->cached_refs += nr;
+       else
+               __io_put_task(task, nr);
+}
+
 void io_task_refs_refill(struct io_uring_task *tctx)
 {
        unsigned int refill = -tctx->cached_refs + IO_TCTX_REFS_CACHE_NR;
@@ -1241,7 +1272,7 @@ static void io_req_local_work_add(struct io_kiocb *req)
                percpu_ref_put(&ctx->refs);
                return;
        }
-       /* need it for the following io_cqring_wake() */
+       /* needed for the following wake up */
        smp_mb__after_atomic();
 
        if (unlikely(atomic_read(&req->task->io_uring->in_idle))) {
@@ -1252,10 +1283,11 @@ static void io_req_local_work_add(struct io_kiocb *req)
 
        if (ctx->flags & IORING_SETUP_TASKRUN_FLAG)
                atomic_or(IORING_SQ_TASKRUN, &ctx->rings->sq_flags);
-
        if (ctx->has_evfd)
                io_eventfd_signal(ctx);
-       __io_cqring_wake(ctx);
+
+       if (READ_ONCE(ctx->cq_waiting))
+               wake_up_state(ctx->submitter_task, TASK_INTERRUPTIBLE);
        percpu_ref_put(&ctx->refs);
 }
 
@@ -1296,20 +1328,19 @@ static void __cold io_move_task_work_from_local(struct io_ring_ctx *ctx)
        }
 }
 
-int __io_run_local_work(struct io_ring_ctx *ctx, bool *locked)
+static int __io_run_local_work(struct io_ring_ctx *ctx, bool *locked)
 {
        struct llist_node *node;
-       struct llist_node fake;
-       struct llist_node *current_final = NULL;
+       unsigned int loops = 0;
        int ret = 0;
-       unsigned int loops = 1;
 
        if (WARN_ON_ONCE(ctx->submitter_task != current))
                return -EEXIST;
-
-       node = io_llist_xchg(&ctx->work_llist, &fake);
+       if (ctx->flags & IORING_SETUP_TASKRUN_FLAG)
+               atomic_andnot(IORING_SQ_TASKRUN, &ctx->rings->sq_flags);
 again:
-       while (node != current_final) {
+       node = io_llist_xchg(&ctx->work_llist, NULL);
+       while (node) {
                struct llist_node *next = node->next;
                struct io_kiocb *req = container_of(node, struct io_kiocb,
                                                    io_task_work.node);
@@ -1318,23 +1349,33 @@ again:
                ret++;
                node = next;
        }
+       loops++;
 
-       if (ctx->flags & IORING_SETUP_TASKRUN_FLAG)
-               atomic_andnot(IORING_SQ_TASKRUN, &ctx->rings->sq_flags);
-
-       node = io_llist_cmpxchg(&ctx->work_llist, &fake, NULL);
-       if (node != &fake) {
-               loops++;
-               current_final = &fake;
-               node = io_llist_xchg(&ctx->work_llist, &fake);
+       if (!llist_empty(&ctx->work_llist))
                goto again;
-       }
-
-       if (*locked)
+       if (*locked) {
                io_submit_flush_completions(ctx);
+               if (!llist_empty(&ctx->work_llist))
+                       goto again;
+       }
        trace_io_uring_local_work_run(ctx, ret, loops);
        return ret;
+}
+
+static inline int io_run_local_work_locked(struct io_ring_ctx *ctx)
+{
+       bool locked;
+       int ret;
+
+       if (llist_empty(&ctx->work_llist))
+               return 0;
 
+       locked = true;
+       ret = __io_run_local_work(ctx, &locked);
+       /* shouldn't happen! */
+       if (WARN_ON_ONCE(!locked))
+               mutex_lock(&ctx->uring_lock);
+       return ret;
 }
 
 static int io_run_local_work(struct io_ring_ctx *ctx)
@@ -1461,7 +1502,7 @@ static void __io_submit_flush_completions(struct io_ring_ctx *ctx)
                        }
                }
        }
-       __io_cq_unlock_post(ctx);
+       __io_cq_unlock_post_flush(ctx);
 
        if (!wq_list_empty(&ctx->submit_state.compl_reqs)) {
                io_free_batch_list(ctx, state->compl_reqs.first);
@@ -2538,12 +2579,20 @@ static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events,
        do {
                unsigned long check_cq;
 
-               prepare_to_wait_exclusive(&ctx->cq_wait, &iowq.wq,
-                                               TASK_INTERRUPTIBLE);
+               if (ctx->flags & IORING_SETUP_DEFER_TASKRUN) {
+                       WRITE_ONCE(ctx->cq_waiting, 1);
+                       set_current_state(TASK_INTERRUPTIBLE);
+               } else {
+                       prepare_to_wait_exclusive(&ctx->cq_wait, &iowq.wq,
+                                                       TASK_INTERRUPTIBLE);
+               }
+
                ret = io_cqring_wait_schedule(ctx, &iowq);
+               __set_current_state(TASK_RUNNING);
+               WRITE_ONCE(ctx->cq_waiting, 0);
+
                if (ret < 0)
                        break;
-               __set_current_state(TASK_RUNNING);
                /*
                 * Run task_work after scheduling and before io_should_wake().
                 * If we got woken because of task_work being processed, run it
@@ -2571,7 +2620,8 @@ static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events,
                cond_resched();
        } while (1);
 
-       finish_wait(&ctx->cq_wait, &iowq.wq);
+       if (!(ctx->flags & IORING_SETUP_DEFER_TASKRUN))
+               finish_wait(&ctx->cq_wait, &iowq.wq);
        restore_saved_sigmask_unless(ret == -EINTR);
 
        return READ_ONCE(rings->cq.head) == READ_ONCE(rings->cq.tail) ? ret : 0;
@@ -2765,12 +2815,54 @@ static __cold void io_ring_ctx_free(struct io_ring_ctx *ctx)
        kfree(ctx);
 }
 
+static __cold void io_activate_pollwq_cb(struct callback_head *cb)
+{
+       struct io_ring_ctx *ctx = container_of(cb, struct io_ring_ctx,
+                                              poll_wq_task_work);
+
+       mutex_lock(&ctx->uring_lock);
+       ctx->poll_activated = true;
+       mutex_unlock(&ctx->uring_lock);
+
+       /*
+        * Wake ups for some events between start of polling and activation
+        * might've been lost due to loose synchronisation.
+        */
+       wake_up_all(&ctx->poll_wq);
+       percpu_ref_put(&ctx->refs);
+}
+
+static __cold void io_activate_pollwq(struct io_ring_ctx *ctx)
+{
+       spin_lock(&ctx->completion_lock);
+       /* already activated or in progress */
+       if (ctx->poll_activated || ctx->poll_wq_task_work.func)
+               goto out;
+       if (WARN_ON_ONCE(!ctx->task_complete))
+               goto out;
+       if (!ctx->submitter_task)
+               goto out;
+       /*
+        * with ->submitter_task only the submitter task completes requests, we
+        * only need to sync with it, which is done by injecting a tw
+        */
+       init_task_work(&ctx->poll_wq_task_work, io_activate_pollwq_cb);
+       percpu_ref_get(&ctx->refs);
+       if (task_work_add(ctx->submitter_task, &ctx->poll_wq_task_work, TWA_SIGNAL))
+               percpu_ref_put(&ctx->refs);
+out:
+       spin_unlock(&ctx->completion_lock);
+}
+
 static __poll_t io_uring_poll(struct file *file, poll_table *wait)
 {
        struct io_ring_ctx *ctx = file->private_data;
        __poll_t mask = 0;
 
-       poll_wait(file, &ctx->cq_wait, wait);
+       if (unlikely(!ctx->poll_activated))
+               io_activate_pollwq(ctx);
+
+       poll_wait(file, &ctx->poll_wq, wait);
        /*
         * synchronizes with barrier from wq_has_sleeper call in
         * io_commit_cqring
@@ -3332,11 +3424,9 @@ SYSCALL_DEFINE6(io_uring_enter, unsigned int, fd, u32, to_submit,
                }
                if (flags & IORING_ENTER_SQ_WAKEUP)
                        wake_up(&ctx->sq_data->wait);
-               if (flags & IORING_ENTER_SQ_WAIT) {
-                       ret = io_sqpoll_wait_sq(ctx);
-                       if (ret)
-                               goto out;
-               }
+               if (flags & IORING_ENTER_SQ_WAIT)
+                       io_sqpoll_wait_sq(ctx);
+
                ret = to_submit;
        } else if (to_submit) {
                ret = io_uring_add_tctx_node(ctx);
@@ -3577,6 +3667,13 @@ static __cold int io_uring_create(unsigned entries, struct io_uring_params *p,
                ctx->task_complete = true;
 
        /*
+        * lazy poll_wq activation relies on ->task_complete for synchronisation
+        * purposes, see io_activate_pollwq()
+        */
+       if (!ctx->task_complete)
+               ctx->poll_activated = true;
+
+       /*
         * When SETUP_IOPOLL and SETUP_SQPOLL are both enabled, user
         * space applications don't need to do io completion events
         * polling again, they can rely on io_sq_thread to do polling
@@ -3869,8 +3966,15 @@ static int io_register_enable_rings(struct io_ring_ctx *ctx)
        if (!(ctx->flags & IORING_SETUP_R_DISABLED))
                return -EBADFD;
 
-       if (ctx->flags & IORING_SETUP_SINGLE_ISSUER && !ctx->submitter_task)
+       if (ctx->flags & IORING_SETUP_SINGLE_ISSUER && !ctx->submitter_task) {
                WRITE_ONCE(ctx->submitter_task, get_task_struct(current));
+               /*
+                * Lazy activation attempts would fail if it was polled before
+                * submitter_task is set.
+                */
+               if (wq_has_sleeper(&ctx->poll_wq))
+                       io_activate_pollwq(ctx);
+       }
 
        if (ctx->restrictions.registered)
                ctx->restricted = 1;