io_uring: fix fget leak when fs don't support nowait buffered read
[platform/kernel/linux-rpi.git] / io_uring / io_uring.c
index db623b3..7625597 100644 (file)
@@ -151,7 +151,7 @@ static void io_move_task_work_from_local(struct io_ring_ctx *ctx);
 static void __io_submit_flush_completions(struct io_ring_ctx *ctx);
 static __cold void io_fallback_tw(struct io_uring_task *tctx);
 
-static struct kmem_cache *req_cachep;
+struct kmem_cache *req_cachep;
 
 struct sock *io_uring_get_socket(struct file *file)
 {
@@ -230,6 +230,7 @@ static inline void req_fail_link_node(struct io_kiocb *req, int res)
 static inline void io_req_add_to_cache(struct io_kiocb *req, struct io_ring_ctx *ctx)
 {
        wq_stack_add_head(&req->comp_list, &ctx->submit_state.free_list);
+       kasan_poison_object_data(req_cachep, req);
 }
 
 static __cold void io_ring_ctx_ref_free(struct percpu_ref *ref)
@@ -245,17 +246,15 @@ static __cold void io_fallback_req_func(struct work_struct *work)
                                                fallback_work.work);
        struct llist_node *node = llist_del_all(&ctx->fallback_llist);
        struct io_kiocb *req, *tmp;
-       bool locked = false;
+       bool locked = true;
 
-       percpu_ref_get(&ctx->refs);
+       mutex_lock(&ctx->uring_lock);
        llist_for_each_entry_safe(req, tmp, node, io_task_work.node)
                req->io_task_work.func(req, &locked);
-
-       if (locked) {
-               io_submit_flush_completions(ctx);
-               mutex_unlock(&ctx->uring_lock);
-       }
-       percpu_ref_put(&ctx->refs);
+       if (WARN_ON_ONCE(!locked))
+               return;
+       io_submit_flush_completions(ctx);
+       mutex_unlock(&ctx->uring_lock);
 }
 
 static int io_alloc_hash_table(struct io_hash_table *table, unsigned bits)
@@ -316,6 +315,7 @@ static __cold struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p)
        xa_init_flags(&ctx->personalities, XA_FLAGS_ALLOC1);
        mutex_init(&ctx->uring_lock);
        init_waitqueue_head(&ctx->cq_wait);
+       init_waitqueue_head(&ctx->poll_wq);
        spin_lock_init(&ctx->completion_lock);
        spin_lock_init(&ctx->timeout_lock);
        INIT_WQ_LIST(&ctx->iopoll_list);
@@ -407,7 +407,7 @@ static inline void io_arm_ltimeout(struct io_kiocb *req)
 
 static void io_prep_async_work(struct io_kiocb *req)
 {
-       const struct io_op_def *def = &io_op_defs[req->opcode];
+       const struct io_issue_def *def = &io_issue_defs[req->opcode];
        struct io_ring_ctx *ctx = req->ctx;
 
        if (!(req->flags & REQ_F_CREDS)) {
@@ -572,6 +572,8 @@ static void io_eventfd_flush_signal(struct io_ring_ctx *ctx)
 
 void __io_commit_cqring_flush(struct io_ring_ctx *ctx)
 {
+       if (ctx->poll_activated)
+               io_poll_wq_wake(ctx);
        if (ctx->off_timeout_used)
                io_flush_timeouts(ctx);
        if (ctx->drain_active) {
@@ -618,6 +620,25 @@ static inline void __io_cq_unlock_post(struct io_ring_ctx *ctx)
        io_cqring_wake(ctx);
 }
 
+static inline void __io_cq_unlock_post_flush(struct io_ring_ctx *ctx)
+       __releases(ctx->completion_lock)
+{
+       io_commit_cqring(ctx);
+       __io_cq_unlock(ctx);
+       io_commit_cqring_flush(ctx);
+
+       /*
+        * As ->task_complete implies that the ring is single tasked, cq_wait
+        * may only be waited on by the current in io_cqring_wait(), but since
+        * it will re-check the wakeup conditions once we return we can safely
+        * skip waking it up.
+        */
+       if (!(ctx->flags & IORING_SETUP_DEFER_TASKRUN)) {
+               smp_mb();
+               __io_cqring_wake(ctx);
+       }
+}
+
 void io_cq_unlock_post(struct io_ring_ctx *ctx)
        __releases(ctx->completion_lock)
 {
@@ -645,7 +666,6 @@ static void io_cqring_overflow_kill(struct io_ring_ctx *ctx)
        }
 }
 
-/* Returns true if there are no backlogged entries after the flush */
 static void __io_cqring_overflow_flush(struct io_ring_ctx *ctx)
 {
        size_t cqe_size = sizeof(struct io_uring_cqe);
@@ -693,16 +713,32 @@ static void io_cqring_overflow_flush(struct io_ring_ctx *ctx)
                io_cqring_do_overflow_flush(ctx);
 }
 
-void __io_put_task(struct task_struct *task, int nr)
+/* can be called by any task */
+static void io_put_task_remote(struct task_struct *task, int nr)
 {
        struct io_uring_task *tctx = task->io_uring;
 
        percpu_counter_sub(&tctx->inflight, nr);
-       if (unlikely(atomic_read(&tctx->in_idle)))
+       if (unlikely(atomic_read(&tctx->in_cancel)))
                wake_up(&tctx->wait);
        put_task_struct_many(task, nr);
 }
 
+/* used by a task to put its own references */
+static void io_put_task_local(struct task_struct *task, int nr)
+{
+       task->io_uring->cached_refs += nr;
+}
+
+/* must to be called somewhat shortly after putting a request */
+static inline void io_put_task(struct task_struct *task, int nr)
+{
+       if (likely(task == current))
+               io_put_task_local(task, nr);
+       else
+               io_put_task_remote(task, nr);
+}
+
 void io_task_refs_refill(struct io_uring_task *tctx)
 {
        unsigned int refill = -tctx->cached_refs + IO_TCTX_REFS_CACHE_NR;
@@ -945,15 +981,15 @@ static void __io_req_complete_post(struct io_kiocb *req)
                                req->link = NULL;
                        }
                }
+               io_put_kbuf_comp(req);
+               io_dismantle_req(req);
                io_req_put_rsrc(req);
                /*
                 * Selected buffer deallocation in io_clean_op() assumes that
                 * we don't hold ->completion_lock. Clean them here to avoid
                 * deadlocks.
                 */
-               io_put_kbuf_comp(req);
-               io_dismantle_req(req);
-               io_put_task(req->task, 1);
+               io_put_task_remote(req->task, 1);
                wq_list_add_head(&req->comp_list, &ctx->locked_free_list);
                ctx->locked_free_nr++;
        }
@@ -980,7 +1016,7 @@ void io_req_complete_post(struct io_kiocb *req, unsigned issue_flags)
 void io_req_defer_failed(struct io_kiocb *req, s32 res)
        __must_hold(&ctx->uring_lock)
 {
-       const struct io_op_def *def = &io_op_defs[req->opcode];
+       const struct io_cold_def *def = &io_cold_defs[req->opcode];
 
        lockdep_assert_held(&req->ctx->uring_lock);
 
@@ -1076,7 +1112,7 @@ __cold void io_free_req(struct io_kiocb *req)
 
        io_req_put_rsrc(req);
        io_dismantle_req(req);
-       io_put_task(req->task, 1);
+       io_put_task_remote(req->task, 1);
 
        spin_lock(&ctx->completion_lock);
        wq_list_add_head(&req->comp_list, &ctx->locked_free_list);
@@ -1130,7 +1166,7 @@ static unsigned int handle_tw_list(struct llist_node *node,
 {
        unsigned int count = 0;
 
-       while (node != last) {
+       while (node && node != last) {
                struct llist_node *next = node->next;
                struct io_kiocb *req = container_of(node, struct io_kiocb,
                                                    io_task_work.node);
@@ -1143,10 +1179,16 @@ static unsigned int handle_tw_list(struct llist_node *node,
                        /* if not contended, grab and improve batching */
                        *locked = mutex_trylock(&(*ctx)->uring_lock);
                        percpu_ref_get(&(*ctx)->refs);
-               }
+               } else if (!*locked)
+                       *locked = mutex_trylock(&(*ctx)->uring_lock);
                req->io_task_work.func(req, locked);
                node = next;
                count++;
+               if (unlikely(need_resched())) {
+                       ctx_flush_and_put(*ctx, locked);
+                       *ctx = NULL;
+                       cond_resched();
+               }
        }
 
        return count;
@@ -1190,28 +1232,34 @@ void tctx_task_work(struct callback_head *cb)
                                                  task_work);
        struct llist_node fake = {};
        struct llist_node *node;
-       unsigned int loops = 1;
-       unsigned int count;
+       unsigned int loops = 0;
+       unsigned int count = 0;
 
        if (unlikely(current->flags & PF_EXITING)) {
                io_fallback_tw(tctx);
                return;
        }
 
-       node = io_llist_xchg(&tctx->task_list, &fake);
-       count = handle_tw_list(node, &ctx, &uring_locked, NULL);
-       node = io_llist_cmpxchg(&tctx->task_list, &fake, NULL);
-       while (node != &fake) {
+       do {
                loops++;
                node = io_llist_xchg(&tctx->task_list, &fake);
                count += handle_tw_list(node, &ctx, &uring_locked, &fake);
+
+               /* skip expensive cmpxchg if there are items in the list */
+               if (READ_ONCE(tctx->task_list.first) != &fake)
+                       continue;
+               if (uring_locked && !wq_list_empty(&ctx->submit_state.compl_reqs)) {
+                       io_submit_flush_completions(ctx);
+                       if (READ_ONCE(tctx->task_list.first) != &fake)
+                               continue;
+               }
                node = io_llist_cmpxchg(&tctx->task_list, &fake, NULL);
-       }
+       } while (node != &fake);
 
        ctx_flush_and_put(ctx, &uring_locked);
 
-       /* relaxed read is enough as only the task itself sets ->in_idle */
-       if (unlikely(atomic_read(&tctx->in_idle)))
+       /* relaxed read is enough as only the task itself sets ->in_cancel */
+       if (unlikely(atomic_read(&tctx->in_cancel)))
                io_uring_drop_tctx_refs(current);
 
        trace_io_uring_task_work_run(tctx, count, loops);
@@ -1237,25 +1285,26 @@ static void io_req_local_work_add(struct io_kiocb *req)
 
        percpu_ref_get(&ctx->refs);
 
-       if (!llist_add(&req->io_task_work.node, &ctx->work_llist)) {
-               percpu_ref_put(&ctx->refs);
-               return;
-       }
-       /* need it for the following io_cqring_wake() */
+       if (!llist_add(&req->io_task_work.node, &ctx->work_llist))
+               goto put_ref;
+
+       /* needed for the following wake up */
        smp_mb__after_atomic();
 
-       if (unlikely(atomic_read(&req->task->io_uring->in_idle))) {
+       if (unlikely(atomic_read(&req->task->io_uring->in_cancel))) {
                io_move_task_work_from_local(ctx);
-               percpu_ref_put(&ctx->refs);
-               return;
+               goto put_ref;
        }
 
        if (ctx->flags & IORING_SETUP_TASKRUN_FLAG)
                atomic_or(IORING_SQ_TASKRUN, &ctx->rings->sq_flags);
-
        if (ctx->has_evfd)
                io_eventfd_signal(ctx);
-       __io_cqring_wake(ctx);
+
+       if (READ_ONCE(ctx->cq_waiting))
+               wake_up_state(ctx->submitter_task, TASK_INTERRUPTIBLE);
+
+put_ref:
        percpu_ref_put(&ctx->refs);
 }
 
@@ -1296,21 +1345,19 @@ static void __cold io_move_task_work_from_local(struct io_ring_ctx *ctx)
        }
 }
 
-int __io_run_local_work(struct io_ring_ctx *ctx, bool *locked)
+static int __io_run_local_work(struct io_ring_ctx *ctx, bool *locked)
 {
        struct llist_node *node;
-       struct llist_node fake;
-       struct llist_node *current_final = NULL;
-       int ret;
-       unsigned int loops = 1;
+       unsigned int loops = 0;
+       int ret = 0;
 
-       if (unlikely(ctx->submitter_task != current))
+       if (WARN_ON_ONCE(ctx->submitter_task != current))
                return -EEXIST;
-
-       node = io_llist_xchg(&ctx->work_llist, &fake);
-       ret = 0;
+       if (ctx->flags & IORING_SETUP_TASKRUN_FLAG)
+               atomic_andnot(IORING_SQ_TASKRUN, &ctx->rings->sq_flags);
 again:
-       while (node != current_final) {
+       node = io_llist_xchg(&ctx->work_llist, NULL);
+       while (node) {
                struct llist_node *next = node->next;
                struct io_kiocb *req = container_of(node, struct io_kiocb,
                                                    io_task_work.node);
@@ -1319,26 +1366,20 @@ again:
                ret++;
                node = next;
        }
+       loops++;
 
-       if (ctx->flags & IORING_SETUP_TASKRUN_FLAG)
-               atomic_andnot(IORING_SQ_TASKRUN, &ctx->rings->sq_flags);
-
-       node = io_llist_cmpxchg(&ctx->work_llist, &fake, NULL);
-       if (node != &fake) {
-               loops++;
-               current_final = &fake;
-               node = io_llist_xchg(&ctx->work_llist, &fake);
+       if (!llist_empty(&ctx->work_llist))
                goto again;
-       }
-
-       if (*locked)
+       if (*locked) {
                io_submit_flush_completions(ctx);
+               if (!llist_empty(&ctx->work_llist))
+                       goto again;
+       }
        trace_io_uring_local_work_run(ctx, ret, loops);
        return ret;
-
 }
 
-int io_run_local_work(struct io_ring_ctx *ctx)
+static inline int io_run_local_work_locked(struct io_ring_ctx *ctx)
 {
        bool locked;
        int ret;
@@ -1346,8 +1387,19 @@ int io_run_local_work(struct io_ring_ctx *ctx)
        if (llist_empty(&ctx->work_llist))
                return 0;
 
-       __set_current_state(TASK_RUNNING);
-       locked = mutex_trylock(&ctx->uring_lock);
+       locked = true;
+       ret = __io_run_local_work(ctx, &locked);
+       /* shouldn't happen! */
+       if (WARN_ON_ONCE(!locked))
+               mutex_lock(&ctx->uring_lock);
+       return ret;
+}
+
+static int io_run_local_work(struct io_ring_ctx *ctx)
+{
+       bool locked = mutex_trylock(&ctx->uring_lock);
+       int ret;
+
        ret = __io_run_local_work(ctx, &locked);
        if (locked)
                mutex_unlock(&ctx->uring_lock);
@@ -1365,10 +1417,12 @@ void io_req_task_submit(struct io_kiocb *req, bool *locked)
 {
        io_tw_lock(req->ctx, locked);
        /* req->task == current here, checking PF_EXITING is safe */
-       if (likely(!(req->task->flags & PF_EXITING)))
-               io_queue_sqe(req);
-       else
+       if (unlikely(req->task->flags & PF_EXITING))
                io_req_defer_failed(req, -EFAULT);
+       else if (req->flags & REQ_F_FORCE_ASYNC)
+               io_queue_iowq(req, locked);
+       else
+               io_queue_sqe(req);
 }
 
 void io_req_task_queue_fail(struct io_kiocb *req, int ret)
@@ -1467,7 +1521,7 @@ static void __io_submit_flush_completions(struct io_ring_ctx *ctx)
                        }
                }
        }
-       __io_cq_unlock_post(ctx);
+       __io_cq_unlock_post_flush(ctx);
 
        if (!wq_list_empty(&ctx->submit_state.compl_reqs)) {
                io_free_batch_list(ctx, state->compl_reqs.first);
@@ -1708,8 +1762,8 @@ unsigned int io_file_get_flags(struct file *file)
 
 bool io_alloc_async_data(struct io_kiocb *req)
 {
-       WARN_ON_ONCE(!io_op_defs[req->opcode].async_size);
-       req->async_data = kmalloc(io_op_defs[req->opcode].async_size, GFP_KERNEL);
+       WARN_ON_ONCE(!io_cold_defs[req->opcode].async_size);
+       req->async_data = kmalloc(io_cold_defs[req->opcode].async_size, GFP_KERNEL);
        if (req->async_data) {
                req->flags |= REQ_F_ASYNC_DATA;
                return false;
@@ -1719,20 +1773,21 @@ bool io_alloc_async_data(struct io_kiocb *req)
 
 int io_req_prep_async(struct io_kiocb *req)
 {
-       const struct io_op_def *def = &io_op_defs[req->opcode];
+       const struct io_cold_def *cdef = &io_cold_defs[req->opcode];
+       const struct io_issue_def *def = &io_issue_defs[req->opcode];
 
        /* assign early for deferred execution for non-fixed file */
-       if (def->needs_file && !(req->flags & REQ_F_FIXED_FILE))
+       if (def->needs_file && !(req->flags & REQ_F_FIXED_FILE) && !req->file)
                req->file = io_file_get_normal(req, req->cqe.fd);
-       if (!def->prep_async)
+       if (!cdef->prep_async)
                return 0;
        if (WARN_ON_ONCE(req_has_async_data(req)))
                return -EFAULT;
-       if (!io_op_defs[req->opcode].manual_alloc) {
+       if (!def->manual_alloc) {
                if (io_alloc_async_data(req))
                        return -EAGAIN;
        }
-       return def->prep_async(req);
+       return cdef->prep_async(req);
 }
 
 static u32 io_get_sequence(struct io_kiocb *req)
@@ -1796,7 +1851,7 @@ static void io_clean_op(struct io_kiocb *req)
        }
 
        if (req->flags & REQ_F_NEED_CLEANUP) {
-               const struct io_op_def *def = &io_op_defs[req->opcode];
+               const struct io_cold_def *def = &io_cold_defs[req->opcode];
 
                if (def->cleanup)
                        def->cleanup(req);
@@ -1820,9 +1875,10 @@ static void io_clean_op(struct io_kiocb *req)
        req->flags &= ~IO_REQ_CLEAN_FLAGS;
 }
 
-static bool io_assign_file(struct io_kiocb *req, unsigned int issue_flags)
+static bool io_assign_file(struct io_kiocb *req, const struct io_issue_def *def,
+                          unsigned int issue_flags)
 {
-       if (req->file || !io_op_defs[req->opcode].needs_file)
+       if (req->file || !def->needs_file)
                return true;
 
        if (req->flags & REQ_F_FIXED_FILE)
@@ -1835,11 +1891,11 @@ static bool io_assign_file(struct io_kiocb *req, unsigned int issue_flags)
 
 static int io_issue_sqe(struct io_kiocb *req, unsigned int issue_flags)
 {
-       const struct io_op_def *def = &io_op_defs[req->opcode];
+       const struct io_issue_def *def = &io_issue_defs[req->opcode];
        const struct cred *creds = NULL;
        int ret;
 
-       if (unlikely(!io_assign_file(req, issue_flags)))
+       if (unlikely(!io_assign_file(req, def, issue_flags)))
                return -EBADF;
 
        if (unlikely((req->flags & REQ_F_CREDS) && req->creds != current_cred()))
@@ -1889,7 +1945,7 @@ struct io_wq_work *io_wq_free_work(struct io_wq_work *work)
 void io_wq_submit_work(struct io_wq_work *work)
 {
        struct io_kiocb *req = container_of(work, struct io_kiocb, work);
-       const struct io_op_def *def = &io_op_defs[req->opcode];
+       const struct io_issue_def *def = &io_issue_defs[req->opcode];
        unsigned int issue_flags = IO_URING_F_UNLOCKED | IO_URING_F_IOWQ;
        bool needs_poll = false;
        int ret = 0, err = -ECANCELED;
@@ -1908,7 +1964,7 @@ fail:
                io_req_task_queue_fail(req, err);
                return;
        }
-       if (!io_assign_file(req, issue_flags)) {
+       if (!io_assign_file(req, def, issue_flags)) {
                err = -EBADF;
                work->flags |= IO_WQ_WORK_CANCEL;
                goto fail;
@@ -2104,7 +2160,7 @@ static int io_init_req(struct io_ring_ctx *ctx, struct io_kiocb *req,
                       const struct io_uring_sqe *sqe)
        __must_hold(&ctx->uring_lock)
 {
-       const struct io_op_def *def;
+       const struct io_issue_def *def;
        unsigned int sqe_flags;
        int personality;
        u8 opcode;
@@ -2122,7 +2178,7 @@ static int io_init_req(struct io_ring_ctx *ctx, struct io_kiocb *req,
                req->opcode = 0;
                return -EINVAL;
        }
-       def = &io_op_defs[opcode];
+       def = &io_issue_defs[opcode];
        if (unlikely(sqe_flags & ~SQE_COMMON_FLAGS)) {
                /* enforce forwards compatibility on users */
                if (sqe_flags & ~SQE_VALID_FLAGS)
@@ -2333,7 +2389,7 @@ static void io_commit_sqring(struct io_ring_ctx *ctx)
  * used, it's important that those reads are done through READ_ONCE() to
  * prevent a re-load down the line.
  */
-static const struct io_uring_sqe *io_get_sqe(struct io_ring_ctx *ctx)
+static bool io_get_sqe(struct io_ring_ctx *ctx, const struct io_uring_sqe **sqe)
 {
        unsigned head, mask = ctx->sq_entries - 1;
        unsigned sq_idx = ctx->cached_sq_head++ & mask;
@@ -2351,14 +2407,15 @@ static const struct io_uring_sqe *io_get_sqe(struct io_ring_ctx *ctx)
                /* double index for 128-byte SQEs, twice as long */
                if (ctx->flags & IORING_SETUP_SQE128)
                        head <<= 1;
-               return &ctx->sq_sqes[head];
+               *sqe = &ctx->sq_sqes[head];
+               return true;
        }
 
        /* drop invalid entries */
        ctx->cq_extra--;
        WRITE_ONCE(ctx->rings->sq_dropped,
                   READ_ONCE(ctx->rings->sq_dropped) + 1);
-       return NULL;
+       return false;
 }
 
 int io_submit_sqes(struct io_ring_ctx *ctx, unsigned int nr)
@@ -2379,11 +2436,9 @@ int io_submit_sqes(struct io_ring_ctx *ctx, unsigned int nr)
                const struct io_uring_sqe *sqe;
                struct io_kiocb *req;
 
-               if (unlikely(!io_alloc_req_refill(ctx)))
+               if (unlikely(!io_alloc_req(ctx, &req)))
                        break;
-               req = io_alloc_req(ctx);
-               sqe = io_get_sqe(ctx);
-               if (unlikely(!sqe)) {
+               if (unlikely(!io_get_sqe(ctx, &sqe))) {
                        io_req_add_to_cache(req, ctx);
                        break;
                }
@@ -2418,13 +2473,13 @@ struct io_wait_queue {
        struct io_ring_ctx *ctx;
        unsigned cq_tail;
        unsigned nr_timeouts;
+       ktime_t timeout;
 };
 
 static inline bool io_has_work(struct io_ring_ctx *ctx)
 {
        return test_bit(IO_CHECK_CQ_OVERFLOW_BIT, &ctx->check_cq) ||
-              ((ctx->flags & IORING_SETUP_DEFER_TASKRUN) &&
-               !llist_empty(&ctx->work_llist));
+              !llist_empty(&ctx->work_llist);
 }
 
 static inline bool io_should_wake(struct io_wait_queue *iowq)
@@ -2443,22 +2498,25 @@ static inline bool io_should_wake(struct io_wait_queue *iowq)
 static int io_wake_function(struct wait_queue_entry *curr, unsigned int mode,
                            int wake_flags, void *key)
 {
-       struct io_wait_queue *iowq = container_of(curr, struct io_wait_queue,
-                                                       wq);
-       struct io_ring_ctx *ctx = iowq->ctx;
+       struct io_wait_queue *iowq = container_of(curr, struct io_wait_queue, wq);
 
        /*
         * Cannot safely flush overflowed CQEs from here, ensure we wake up
         * the task, and the next invocation will do it.
         */
-       if (io_should_wake(iowq) || io_has_work(ctx))
+       if (io_should_wake(iowq) || io_has_work(iowq->ctx))
                return autoremove_wake_function(curr, mode, wake_flags, key);
        return -1;
 }
 
 int io_run_task_work_sig(struct io_ring_ctx *ctx)
 {
-       if (io_run_task_work_ctx(ctx) > 0)
+       if (!llist_empty(&ctx->work_llist)) {
+               __set_current_state(TASK_RUNNING);
+               if (io_run_local_work(ctx) > 0)
+                       return 1;
+       }
+       if (io_run_task_work() > 0)
                return 1;
        if (task_sigpending(current))
                return -EINTR;
@@ -2467,35 +2525,23 @@ int io_run_task_work_sig(struct io_ring_ctx *ctx)
 
 /* when returns >0, the caller should retry */
 static inline int io_cqring_wait_schedule(struct io_ring_ctx *ctx,
-                                         struct io_wait_queue *iowq,
-                                         ktime_t *timeout)
+                                         struct io_wait_queue *iowq)
 {
-       int ret;
-       unsigned long check_cq;
-
-       /* make sure we run task_work before checking for signals */
-       ret = io_run_task_work_sig(ctx);
-       if (ret || io_should_wake(iowq))
-               return ret;
-
-       check_cq = READ_ONCE(ctx->check_cq);
-       if (unlikely(check_cq)) {
-               /* let the caller flush overflows, retry */
-               if (check_cq & BIT(IO_CHECK_CQ_OVERFLOW_BIT))
-                       return 1;
-               if (check_cq & BIT(IO_CHECK_CQ_DROPPED_BIT))
-                       return -EBADR;
-       }
-       if (!schedule_hrtimeout(timeout, HRTIMER_MODE_ABS))
+       if (unlikely(READ_ONCE(ctx->check_cq)))
+               return 1;
+       if (unlikely(!llist_empty(&ctx->work_llist)))
+               return 1;
+       if (unlikely(test_thread_flag(TIF_NOTIFY_SIGNAL)))
+               return 1;
+       if (unlikely(task_sigpending(current)))
+               return -EINTR;
+       if (unlikely(io_should_wake(iowq)))
+               return 0;
+       if (iowq->timeout == KTIME_MAX)
+               schedule();
+       else if (!schedule_hrtimeout(&iowq->timeout, HRTIMER_MODE_ABS))
                return -ETIME;
-
-       /*
-        * Run task_work after scheduling. If we got woken because of
-        * task_work being processed, run it now rather than let the caller
-        * do another wait loop.
-        */
-       ret = io_run_task_work_sig(ctx);
-       return ret < 0 ? ret : 1;
+       return 0;
 }
 
 /*
@@ -2508,23 +2554,17 @@ static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events,
 {
        struct io_wait_queue iowq;
        struct io_rings *rings = ctx->rings;
-       ktime_t timeout = KTIME_MAX;
        int ret;
 
        if (!io_allowed_run_tw(ctx))
                return -EEXIST;
-
-       do {
-               /* always run at least 1 task work to process local work */
-               ret = io_run_task_work_ctx(ctx);
-               if (ret < 0)
-                       return ret;
-               io_cqring_overflow_flush(ctx);
-
-               /* if user messes with these they will just get an early return */
-               if (__io_cqring_events_user(ctx) >= min_events)
-                       return 0;
-       } while (ret > 0);
+       if (!llist_empty(&ctx->work_llist))
+               io_run_local_work(ctx);
+       io_run_task_work();
+       io_cqring_overflow_flush(ctx);
+       /* if user messes with these they will just get an early return */
+       if (__io_cqring_events_user(ctx) >= min_events)
+               return 0;
 
        if (sig) {
 #ifdef CONFIG_COMPAT
@@ -2539,36 +2579,69 @@ static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events,
                        return ret;
        }
 
-       if (uts) {
-               struct timespec64 ts;
-
-               if (get_timespec64(&ts, uts))
-                       return -EFAULT;
-               timeout = ktime_add_ns(timespec64_to_ktime(ts), ktime_get_ns());
-       }
-
        init_waitqueue_func_entry(&iowq.wq, io_wake_function);
        iowq.wq.private = current;
        INIT_LIST_HEAD(&iowq.wq.entry);
        iowq.ctx = ctx;
        iowq.nr_timeouts = atomic_read(&ctx->cq_timeouts);
        iowq.cq_tail = READ_ONCE(ctx->rings->cq.head) + min_events;
+       iowq.timeout = KTIME_MAX;
+
+       if (uts) {
+               struct timespec64 ts;
+
+               if (get_timespec64(&ts, uts))
+                       return -EFAULT;
+               iowq.timeout = ktime_add_ns(timespec64_to_ktime(ts), ktime_get_ns());
+       }
 
        trace_io_uring_cqring_wait(ctx, min_events);
        do {
-               if (test_bit(IO_CHECK_CQ_OVERFLOW_BIT, &ctx->check_cq)) {
-                       finish_wait(&ctx->cq_wait, &iowq.wq);
-                       io_cqring_do_overflow_flush(ctx);
+               unsigned long check_cq;
+
+               if (ctx->flags & IORING_SETUP_DEFER_TASKRUN) {
+                       WRITE_ONCE(ctx->cq_waiting, 1);
+                       set_current_state(TASK_INTERRUPTIBLE);
+               } else {
+                       prepare_to_wait_exclusive(&ctx->cq_wait, &iowq.wq,
+                                                       TASK_INTERRUPTIBLE);
                }
-               prepare_to_wait_exclusive(&ctx->cq_wait, &iowq.wq,
-                                               TASK_INTERRUPTIBLE);
-               ret = io_cqring_wait_schedule(ctx, &iowq, &timeout);
-               if (__io_cqring_events_user(ctx) >= min_events)
+
+               ret = io_cqring_wait_schedule(ctx, &iowq);
+               __set_current_state(TASK_RUNNING);
+               WRITE_ONCE(ctx->cq_waiting, 0);
+
+               if (ret < 0)
                        break;
+               /*
+                * Run task_work after scheduling and before io_should_wake().
+                * If we got woken because of task_work being processed, run it
+                * now rather than let the caller do another wait loop.
+                */
+               io_run_task_work();
+               if (!llist_empty(&ctx->work_llist))
+                       io_run_local_work(ctx);
+
+               check_cq = READ_ONCE(ctx->check_cq);
+               if (unlikely(check_cq)) {
+                       /* let the caller flush overflows, retry */
+                       if (check_cq & BIT(IO_CHECK_CQ_OVERFLOW_BIT))
+                               io_cqring_do_overflow_flush(ctx);
+                       if (check_cq & BIT(IO_CHECK_CQ_DROPPED_BIT)) {
+                               ret = -EBADR;
+                               break;
+                       }
+               }
+
+               if (io_should_wake(&iowq)) {
+                       ret = 0;
+                       break;
+               }
                cond_resched();
-       } while (ret > 0);
+       } while (1);
 
-       finish_wait(&ctx->cq_wait, &iowq.wq);
+       if (!(ctx->flags & IORING_SETUP_DEFER_TASKRUN))
+               finish_wait(&ctx->cq_wait, &iowq.wq);
        restore_saved_sigmask_unless(ret == -EINTR);
 
        return READ_ONCE(rings->cq.head) == READ_ONCE(rings->cq.tail) ? ret : 0;
@@ -2683,14 +2756,14 @@ static int io_eventfd_unregister(struct io_ring_ctx *ctx)
 
 static void io_req_caches_free(struct io_ring_ctx *ctx)
 {
+       struct io_kiocb *req;
        int nr = 0;
 
        mutex_lock(&ctx->uring_lock);
        io_flush_cached_locked_reqs(ctx, &ctx->submit_state);
 
        while (!io_req_cache_empty(ctx)) {
-               struct io_kiocb *req = io_alloc_req(ctx);
-
+               req = io_extract_req(ctx);
                kmem_cache_free(req_cachep, req);
                nr++;
        }
@@ -2762,12 +2835,54 @@ static __cold void io_ring_ctx_free(struct io_ring_ctx *ctx)
        kfree(ctx);
 }
 
+static __cold void io_activate_pollwq_cb(struct callback_head *cb)
+{
+       struct io_ring_ctx *ctx = container_of(cb, struct io_ring_ctx,
+                                              poll_wq_task_work);
+
+       mutex_lock(&ctx->uring_lock);
+       ctx->poll_activated = true;
+       mutex_unlock(&ctx->uring_lock);
+
+       /*
+        * Wake ups for some events between start of polling and activation
+        * might've been lost due to loose synchronisation.
+        */
+       wake_up_all(&ctx->poll_wq);
+       percpu_ref_put(&ctx->refs);
+}
+
+static __cold void io_activate_pollwq(struct io_ring_ctx *ctx)
+{
+       spin_lock(&ctx->completion_lock);
+       /* already activated or in progress */
+       if (ctx->poll_activated || ctx->poll_wq_task_work.func)
+               goto out;
+       if (WARN_ON_ONCE(!ctx->task_complete))
+               goto out;
+       if (!ctx->submitter_task)
+               goto out;
+       /*
+        * with ->submitter_task only the submitter task completes requests, we
+        * only need to sync with it, which is done by injecting a tw
+        */
+       init_task_work(&ctx->poll_wq_task_work, io_activate_pollwq_cb);
+       percpu_ref_get(&ctx->refs);
+       if (task_work_add(ctx->submitter_task, &ctx->poll_wq_task_work, TWA_SIGNAL))
+               percpu_ref_put(&ctx->refs);
+out:
+       spin_unlock(&ctx->completion_lock);
+}
+
 static __poll_t io_uring_poll(struct file *file, poll_table *wait)
 {
        struct io_ring_ctx *ctx = file->private_data;
        __poll_t mask = 0;
 
-       poll_wait(file, &ctx->cq_wait, wait);
+       if (unlikely(!ctx->poll_activated))
+               io_activate_pollwq(ctx);
+
+       poll_wait(file, &ctx->poll_wq, wait);
        /*
         * synchronizes with barrier from wq_has_sleeper call in
         * io_commit_cqring
@@ -2790,7 +2905,7 @@ static __poll_t io_uring_poll(struct file *file, poll_table *wait)
         * pushes them to do the flush.
         */
 
-       if (io_cqring_events(ctx) || io_has_work(ctx))
+       if (__io_cqring_events_user(ctx) || io_has_work(ctx))
                mask |= EPOLLIN | EPOLLRDNORM;
 
        return mask;
@@ -2822,12 +2937,12 @@ static __cold void io_tctx_exit_cb(struct callback_head *cb)
 
        work = container_of(cb, struct io_tctx_exit, task_work);
        /*
-        * When @in_idle, we're in cancellation and it's racy to remove the
+        * When @in_cancel, we're in cancellation and it's racy to remove the
         * node. It'll be removed by the end of cancellation, just ignore it.
         * tctx can be NULL if the queueing of this task_work raced with
         * work cancelation off the exec path.
         */
-       if (tctx && !atomic_read(&tctx->in_idle))
+       if (tctx && !atomic_read(&tctx->in_cancel))
                io_uring_del_tctx_node((unsigned long)work->ctx);
        complete(&work->completion);
 }
@@ -3053,10 +3168,12 @@ static __cold bool io_uring_try_cancel_requests(struct io_ring_ctx *ctx,
                while (!wq_list_empty(&ctx->iopoll_list)) {
                        io_iopoll_try_reap_events(ctx);
                        ret = true;
+                       cond_resched();
                }
        }
 
-       if (ctx->flags & IORING_SETUP_DEFER_TASKRUN)
+       if ((ctx->flags & IORING_SETUP_DEFER_TASKRUN) &&
+           io_allowed_defer_tw_run(ctx))
                ret |= io_run_local_work(ctx) > 0;
        ret |= io_cancel_defer_files(ctx, task, cancel_all);
        mutex_lock(&ctx->uring_lock);
@@ -3093,7 +3210,7 @@ __cold void io_uring_cancel_generic(bool cancel_all, struct io_sq_data *sqd)
        if (tctx->io_wq)
                io_wq_exit_start(tctx->io_wq);
 
-       atomic_inc(&tctx->in_idle);
+       atomic_inc(&tctx->in_cancel);
        do {
                bool loop = false;
 
@@ -3144,9 +3261,9 @@ __cold void io_uring_cancel_generic(bool cancel_all, struct io_sq_data *sqd)
        if (cancel_all) {
                /*
                 * We shouldn't run task_works after cancel, so just leave
-                * ->in_idle set for normal exit.
+                * ->in_cancel set for normal exit.
                 */
-               atomic_dec(&tctx->in_idle);
+               atomic_dec(&tctx->in_cancel);
                /* for exec all current's requests should be gone, kill tctx */
                __io_uring_free(current);
        }
@@ -3328,11 +3445,9 @@ SYSCALL_DEFINE6(io_uring_enter, unsigned int, fd, u32, to_submit,
                }
                if (flags & IORING_ENTER_SQ_WAKEUP)
                        wake_up(&ctx->sq_data->wait);
-               if (flags & IORING_ENTER_SQ_WAIT) {
-                       ret = io_sqpoll_wait_sq(ctx);
-                       if (ret)
-                               goto out;
-               }
+               if (flags & IORING_ENTER_SQ_WAIT)
+                       io_sqpoll_wait_sq(ctx);
+
                ret = to_submit;
        } else if (to_submit) {
                ret = io_uring_add_tctx_node(ctx);
@@ -3573,6 +3688,13 @@ static __cold int io_uring_create(unsigned entries, struct io_uring_params *p,
                ctx->task_complete = true;
 
        /*
+        * lazy poll_wq activation relies on ->task_complete for synchronisation
+        * purposes, see io_activate_pollwq()
+        */
+       if (!ctx->task_complete)
+               ctx->poll_activated = true;
+
+       /*
         * When SETUP_IOPOLL and SETUP_SQPOLL are both enabled, user
         * space applications don't need to do io completion events
         * polling again, they can rely on io_sq_thread to do polling
@@ -3663,7 +3785,7 @@ static __cold int io_uring_create(unsigned entries, struct io_uring_params *p,
                        IORING_FEAT_POLL_32BITS | IORING_FEAT_SQPOLL_NONFIXED |
                        IORING_FEAT_EXT_ARG | IORING_FEAT_NATIVE_WORKERS |
                        IORING_FEAT_RSRC_TAGS | IORING_FEAT_CQE_SKIP |
-                       IORING_FEAT_LINKED_FILE;
+                       IORING_FEAT_LINKED_FILE | IORING_FEAT_REG_REG_RING;
 
        if (copy_to_user(params, p, sizeof(*p))) {
                ret = -EFAULT;
@@ -3760,7 +3882,7 @@ static __cold int io_probe(struct io_ring_ctx *ctx, void __user *arg,
 
        for (i = 0; i < nr_args; i++) {
                p->ops[i].op = i;
-               if (!io_op_defs[i].not_supported)
+               if (!io_issue_defs[i].not_supported)
                        p->ops[i].flags = IO_URING_OP_SUPPORTED;
        }
        p->ops_len = i;
@@ -3865,8 +3987,15 @@ static int io_register_enable_rings(struct io_ring_ctx *ctx)
        if (!(ctx->flags & IORING_SETUP_R_DISABLED))
                return -EBADFD;
 
-       if (ctx->flags & IORING_SETUP_SINGLE_ISSUER && !ctx->submitter_task)
+       if (ctx->flags & IORING_SETUP_SINGLE_ISSUER && !ctx->submitter_task) {
                WRITE_ONCE(ctx->submitter_task, get_task_struct(current));
+               /*
+                * Lazy activation attempts would fail if it was polled before
+                * submitter_task is set.
+                */
+               if (wq_has_sleeper(&ctx->poll_wq))
+                       io_activate_pollwq(ctx);
+       }
 
        if (ctx->restrictions.registered)
                ctx->restricted = 1;
@@ -4177,17 +4306,36 @@ SYSCALL_DEFINE4(io_uring_register, unsigned int, fd, unsigned int, opcode,
        struct io_ring_ctx *ctx;
        long ret = -EBADF;
        struct fd f;
+       bool use_registered_ring;
+
+       use_registered_ring = !!(opcode & IORING_REGISTER_USE_REGISTERED_RING);
+       opcode &= ~IORING_REGISTER_USE_REGISTERED_RING;
 
        if (opcode >= IORING_REGISTER_LAST)
                return -EINVAL;
 
-       f = fdget(fd);
-       if (!f.file)
-               return -EBADF;
+       if (use_registered_ring) {
+               /*
+                * Ring fd has been registered via IORING_REGISTER_RING_FDS, we
+                * need only dereference our task private array to find it.
+                */
+               struct io_uring_task *tctx = current->io_uring;
 
-       ret = -EOPNOTSUPP;
-       if (!io_is_uring_fops(f.file))
-               goto out_fput;
+               if (unlikely(!tctx || fd >= IO_RINGFD_REG_MAX))
+                       return -EINVAL;
+               fd = array_index_nospec(fd, IO_RINGFD_REG_MAX);
+               f.file = tctx->registered_rings[fd];
+               f.flags = 0;
+               if (unlikely(!f.file))
+                       return -EBADF;
+       } else {
+               f = fdget(fd);
+               if (unlikely(!f.file))
+                       return -EBADF;
+               ret = -EOPNOTSUPP;
+               if (!io_is_uring_fops(f.file))
+                       goto out_fput;
+       }
 
        ctx = f.file->private_data;