static void __io_submit_flush_completions(struct io_ring_ctx *ctx);
static __cold void io_fallback_tw(struct io_uring_task *tctx);
-static struct kmem_cache *req_cachep;
+struct kmem_cache *req_cachep;
struct sock *io_uring_get_socket(struct file *file)
{
static inline void io_req_add_to_cache(struct io_kiocb *req, struct io_ring_ctx *ctx)
{
wq_stack_add_head(&req->comp_list, &ctx->submit_state.free_list);
+ kasan_poison_object_data(req_cachep, req);
}
static __cold void io_ring_ctx_ref_free(struct percpu_ref *ref)
fallback_work.work);
struct llist_node *node = llist_del_all(&ctx->fallback_llist);
struct io_kiocb *req, *tmp;
- bool locked = false;
+ bool locked = true;
- percpu_ref_get(&ctx->refs);
+ mutex_lock(&ctx->uring_lock);
llist_for_each_entry_safe(req, tmp, node, io_task_work.node)
req->io_task_work.func(req, &locked);
-
- if (locked) {
- io_submit_flush_completions(ctx);
- mutex_unlock(&ctx->uring_lock);
- }
- percpu_ref_put(&ctx->refs);
+ if (WARN_ON_ONCE(!locked))
+ return;
+ io_submit_flush_completions(ctx);
+ mutex_unlock(&ctx->uring_lock);
}
static int io_alloc_hash_table(struct io_hash_table *table, unsigned bits)
xa_init_flags(&ctx->personalities, XA_FLAGS_ALLOC1);
mutex_init(&ctx->uring_lock);
init_waitqueue_head(&ctx->cq_wait);
+ init_waitqueue_head(&ctx->poll_wq);
spin_lock_init(&ctx->completion_lock);
spin_lock_init(&ctx->timeout_lock);
INIT_WQ_LIST(&ctx->iopoll_list);
static void io_prep_async_work(struct io_kiocb *req)
{
- const struct io_op_def *def = &io_op_defs[req->opcode];
+ const struct io_issue_def *def = &io_issue_defs[req->opcode];
struct io_ring_ctx *ctx = req->ctx;
if (!(req->flags & REQ_F_CREDS)) {
void __io_commit_cqring_flush(struct io_ring_ctx *ctx)
{
+ if (ctx->poll_activated)
+ io_poll_wq_wake(ctx);
if (ctx->off_timeout_used)
io_flush_timeouts(ctx);
if (ctx->drain_active) {
io_cqring_wake(ctx);
}
+static inline void __io_cq_unlock_post_flush(struct io_ring_ctx *ctx)
+ __releases(ctx->completion_lock)
+{
+ io_commit_cqring(ctx);
+ __io_cq_unlock(ctx);
+ io_commit_cqring_flush(ctx);
+
+ /*
+ * As ->task_complete implies that the ring is single tasked, cq_wait
+ * may only be waited on by the current in io_cqring_wait(), but since
+ * it will re-check the wakeup conditions once we return we can safely
+ * skip waking it up.
+ */
+ if (!(ctx->flags & IORING_SETUP_DEFER_TASKRUN)) {
+ smp_mb();
+ __io_cqring_wake(ctx);
+ }
+}
+
void io_cq_unlock_post(struct io_ring_ctx *ctx)
__releases(ctx->completion_lock)
{
}
}
-/* Returns true if there are no backlogged entries after the flush */
static void __io_cqring_overflow_flush(struct io_ring_ctx *ctx)
{
size_t cqe_size = sizeof(struct io_uring_cqe);
io_cqring_do_overflow_flush(ctx);
}
-void __io_put_task(struct task_struct *task, int nr)
+/* can be called by any task */
+static void io_put_task_remote(struct task_struct *task, int nr)
{
struct io_uring_task *tctx = task->io_uring;
percpu_counter_sub(&tctx->inflight, nr);
- if (unlikely(atomic_read(&tctx->in_idle)))
+ if (unlikely(atomic_read(&tctx->in_cancel)))
wake_up(&tctx->wait);
put_task_struct_many(task, nr);
}
+/* used by a task to put its own references */
+static void io_put_task_local(struct task_struct *task, int nr)
+{
+ task->io_uring->cached_refs += nr;
+}
+
+/* must to be called somewhat shortly after putting a request */
+static inline void io_put_task(struct task_struct *task, int nr)
+{
+ if (likely(task == current))
+ io_put_task_local(task, nr);
+ else
+ io_put_task_remote(task, nr);
+}
+
void io_task_refs_refill(struct io_uring_task *tctx)
{
unsigned int refill = -tctx->cached_refs + IO_TCTX_REFS_CACHE_NR;
req->link = NULL;
}
}
+ io_put_kbuf_comp(req);
+ io_dismantle_req(req);
io_req_put_rsrc(req);
/*
* Selected buffer deallocation in io_clean_op() assumes that
* we don't hold ->completion_lock. Clean them here to avoid
* deadlocks.
*/
- io_put_kbuf_comp(req);
- io_dismantle_req(req);
- io_put_task(req->task, 1);
+ io_put_task_remote(req->task, 1);
wq_list_add_head(&req->comp_list, &ctx->locked_free_list);
ctx->locked_free_nr++;
}
void io_req_defer_failed(struct io_kiocb *req, s32 res)
__must_hold(&ctx->uring_lock)
{
- const struct io_op_def *def = &io_op_defs[req->opcode];
+ const struct io_cold_def *def = &io_cold_defs[req->opcode];
lockdep_assert_held(&req->ctx->uring_lock);
io_req_put_rsrc(req);
io_dismantle_req(req);
- io_put_task(req->task, 1);
+ io_put_task_remote(req->task, 1);
spin_lock(&ctx->completion_lock);
wq_list_add_head(&req->comp_list, &ctx->locked_free_list);
{
unsigned int count = 0;
- while (node != last) {
+ while (node && node != last) {
struct llist_node *next = node->next;
struct io_kiocb *req = container_of(node, struct io_kiocb,
io_task_work.node);
/* if not contended, grab and improve batching */
*locked = mutex_trylock(&(*ctx)->uring_lock);
percpu_ref_get(&(*ctx)->refs);
- }
+ } else if (!*locked)
+ *locked = mutex_trylock(&(*ctx)->uring_lock);
req->io_task_work.func(req, locked);
node = next;
count++;
+ if (unlikely(need_resched())) {
+ ctx_flush_and_put(*ctx, locked);
+ *ctx = NULL;
+ cond_resched();
+ }
}
return count;
task_work);
struct llist_node fake = {};
struct llist_node *node;
- unsigned int loops = 1;
- unsigned int count;
+ unsigned int loops = 0;
+ unsigned int count = 0;
if (unlikely(current->flags & PF_EXITING)) {
io_fallback_tw(tctx);
return;
}
- node = io_llist_xchg(&tctx->task_list, &fake);
- count = handle_tw_list(node, &ctx, &uring_locked, NULL);
- node = io_llist_cmpxchg(&tctx->task_list, &fake, NULL);
- while (node != &fake) {
+ do {
loops++;
node = io_llist_xchg(&tctx->task_list, &fake);
count += handle_tw_list(node, &ctx, &uring_locked, &fake);
+
+ /* skip expensive cmpxchg if there are items in the list */
+ if (READ_ONCE(tctx->task_list.first) != &fake)
+ continue;
+ if (uring_locked && !wq_list_empty(&ctx->submit_state.compl_reqs)) {
+ io_submit_flush_completions(ctx);
+ if (READ_ONCE(tctx->task_list.first) != &fake)
+ continue;
+ }
node = io_llist_cmpxchg(&tctx->task_list, &fake, NULL);
- }
+ } while (node != &fake);
ctx_flush_and_put(ctx, &uring_locked);
- /* relaxed read is enough as only the task itself sets ->in_idle */
- if (unlikely(atomic_read(&tctx->in_idle)))
+ /* relaxed read is enough as only the task itself sets ->in_cancel */
+ if (unlikely(atomic_read(&tctx->in_cancel)))
io_uring_drop_tctx_refs(current);
trace_io_uring_task_work_run(tctx, count, loops);
percpu_ref_get(&ctx->refs);
- if (!llist_add(&req->io_task_work.node, &ctx->work_llist)) {
- percpu_ref_put(&ctx->refs);
- return;
- }
- /* need it for the following io_cqring_wake() */
+ if (!llist_add(&req->io_task_work.node, &ctx->work_llist))
+ goto put_ref;
+
+ /* needed for the following wake up */
smp_mb__after_atomic();
- if (unlikely(atomic_read(&req->task->io_uring->in_idle))) {
+ if (unlikely(atomic_read(&req->task->io_uring->in_cancel))) {
io_move_task_work_from_local(ctx);
- percpu_ref_put(&ctx->refs);
- return;
+ goto put_ref;
}
if (ctx->flags & IORING_SETUP_TASKRUN_FLAG)
atomic_or(IORING_SQ_TASKRUN, &ctx->rings->sq_flags);
-
if (ctx->has_evfd)
io_eventfd_signal(ctx);
- __io_cqring_wake(ctx);
+
+ if (READ_ONCE(ctx->cq_waiting))
+ wake_up_state(ctx->submitter_task, TASK_INTERRUPTIBLE);
+
+put_ref:
percpu_ref_put(&ctx->refs);
}
}
}
-int __io_run_local_work(struct io_ring_ctx *ctx, bool *locked)
+static int __io_run_local_work(struct io_ring_ctx *ctx, bool *locked)
{
struct llist_node *node;
- struct llist_node fake;
- struct llist_node *current_final = NULL;
- int ret;
- unsigned int loops = 1;
+ unsigned int loops = 0;
+ int ret = 0;
- if (unlikely(ctx->submitter_task != current))
+ if (WARN_ON_ONCE(ctx->submitter_task != current))
return -EEXIST;
-
- node = io_llist_xchg(&ctx->work_llist, &fake);
- ret = 0;
+ if (ctx->flags & IORING_SETUP_TASKRUN_FLAG)
+ atomic_andnot(IORING_SQ_TASKRUN, &ctx->rings->sq_flags);
again:
- while (node != current_final) {
+ node = io_llist_xchg(&ctx->work_llist, NULL);
+ while (node) {
struct llist_node *next = node->next;
struct io_kiocb *req = container_of(node, struct io_kiocb,
io_task_work.node);
ret++;
node = next;
}
+ loops++;
- if (ctx->flags & IORING_SETUP_TASKRUN_FLAG)
- atomic_andnot(IORING_SQ_TASKRUN, &ctx->rings->sq_flags);
-
- node = io_llist_cmpxchg(&ctx->work_llist, &fake, NULL);
- if (node != &fake) {
- loops++;
- current_final = &fake;
- node = io_llist_xchg(&ctx->work_llist, &fake);
+ if (!llist_empty(&ctx->work_llist))
goto again;
- }
-
- if (*locked)
+ if (*locked) {
io_submit_flush_completions(ctx);
+ if (!llist_empty(&ctx->work_llist))
+ goto again;
+ }
trace_io_uring_local_work_run(ctx, ret, loops);
return ret;
-
}
-int io_run_local_work(struct io_ring_ctx *ctx)
+static inline int io_run_local_work_locked(struct io_ring_ctx *ctx)
{
bool locked;
int ret;
if (llist_empty(&ctx->work_llist))
return 0;
- __set_current_state(TASK_RUNNING);
- locked = mutex_trylock(&ctx->uring_lock);
+ locked = true;
+ ret = __io_run_local_work(ctx, &locked);
+ /* shouldn't happen! */
+ if (WARN_ON_ONCE(!locked))
+ mutex_lock(&ctx->uring_lock);
+ return ret;
+}
+
+static int io_run_local_work(struct io_ring_ctx *ctx)
+{
+ bool locked = mutex_trylock(&ctx->uring_lock);
+ int ret;
+
ret = __io_run_local_work(ctx, &locked);
if (locked)
mutex_unlock(&ctx->uring_lock);
{
io_tw_lock(req->ctx, locked);
/* req->task == current here, checking PF_EXITING is safe */
- if (likely(!(req->task->flags & PF_EXITING)))
- io_queue_sqe(req);
- else
+ if (unlikely(req->task->flags & PF_EXITING))
io_req_defer_failed(req, -EFAULT);
+ else if (req->flags & REQ_F_FORCE_ASYNC)
+ io_queue_iowq(req, locked);
+ else
+ io_queue_sqe(req);
}
void io_req_task_queue_fail(struct io_kiocb *req, int ret)
}
}
}
- __io_cq_unlock_post(ctx);
+ __io_cq_unlock_post_flush(ctx);
if (!wq_list_empty(&ctx->submit_state.compl_reqs)) {
io_free_batch_list(ctx, state->compl_reqs.first);
bool io_alloc_async_data(struct io_kiocb *req)
{
- WARN_ON_ONCE(!io_op_defs[req->opcode].async_size);
- req->async_data = kmalloc(io_op_defs[req->opcode].async_size, GFP_KERNEL);
+ WARN_ON_ONCE(!io_cold_defs[req->opcode].async_size);
+ req->async_data = kmalloc(io_cold_defs[req->opcode].async_size, GFP_KERNEL);
if (req->async_data) {
req->flags |= REQ_F_ASYNC_DATA;
return false;
int io_req_prep_async(struct io_kiocb *req)
{
- const struct io_op_def *def = &io_op_defs[req->opcode];
+ const struct io_cold_def *cdef = &io_cold_defs[req->opcode];
+ const struct io_issue_def *def = &io_issue_defs[req->opcode];
/* assign early for deferred execution for non-fixed file */
- if (def->needs_file && !(req->flags & REQ_F_FIXED_FILE))
+ if (def->needs_file && !(req->flags & REQ_F_FIXED_FILE) && !req->file)
req->file = io_file_get_normal(req, req->cqe.fd);
- if (!def->prep_async)
+ if (!cdef->prep_async)
return 0;
if (WARN_ON_ONCE(req_has_async_data(req)))
return -EFAULT;
- if (!io_op_defs[req->opcode].manual_alloc) {
+ if (!def->manual_alloc) {
if (io_alloc_async_data(req))
return -EAGAIN;
}
- return def->prep_async(req);
+ return cdef->prep_async(req);
}
static u32 io_get_sequence(struct io_kiocb *req)
}
if (req->flags & REQ_F_NEED_CLEANUP) {
- const struct io_op_def *def = &io_op_defs[req->opcode];
+ const struct io_cold_def *def = &io_cold_defs[req->opcode];
if (def->cleanup)
def->cleanup(req);
req->flags &= ~IO_REQ_CLEAN_FLAGS;
}
-static bool io_assign_file(struct io_kiocb *req, unsigned int issue_flags)
+static bool io_assign_file(struct io_kiocb *req, const struct io_issue_def *def,
+ unsigned int issue_flags)
{
- if (req->file || !io_op_defs[req->opcode].needs_file)
+ if (req->file || !def->needs_file)
return true;
if (req->flags & REQ_F_FIXED_FILE)
static int io_issue_sqe(struct io_kiocb *req, unsigned int issue_flags)
{
- const struct io_op_def *def = &io_op_defs[req->opcode];
+ const struct io_issue_def *def = &io_issue_defs[req->opcode];
const struct cred *creds = NULL;
int ret;
- if (unlikely(!io_assign_file(req, issue_flags)))
+ if (unlikely(!io_assign_file(req, def, issue_flags)))
return -EBADF;
if (unlikely((req->flags & REQ_F_CREDS) && req->creds != current_cred()))
void io_wq_submit_work(struct io_wq_work *work)
{
struct io_kiocb *req = container_of(work, struct io_kiocb, work);
- const struct io_op_def *def = &io_op_defs[req->opcode];
+ const struct io_issue_def *def = &io_issue_defs[req->opcode];
unsigned int issue_flags = IO_URING_F_UNLOCKED | IO_URING_F_IOWQ;
bool needs_poll = false;
int ret = 0, err = -ECANCELED;
io_req_task_queue_fail(req, err);
return;
}
- if (!io_assign_file(req, issue_flags)) {
+ if (!io_assign_file(req, def, issue_flags)) {
err = -EBADF;
work->flags |= IO_WQ_WORK_CANCEL;
goto fail;
const struct io_uring_sqe *sqe)
__must_hold(&ctx->uring_lock)
{
- const struct io_op_def *def;
+ const struct io_issue_def *def;
unsigned int sqe_flags;
int personality;
u8 opcode;
req->opcode = 0;
return -EINVAL;
}
- def = &io_op_defs[opcode];
+ def = &io_issue_defs[opcode];
if (unlikely(sqe_flags & ~SQE_COMMON_FLAGS)) {
/* enforce forwards compatibility on users */
if (sqe_flags & ~SQE_VALID_FLAGS)
* used, it's important that those reads are done through READ_ONCE() to
* prevent a re-load down the line.
*/
-static const struct io_uring_sqe *io_get_sqe(struct io_ring_ctx *ctx)
+static bool io_get_sqe(struct io_ring_ctx *ctx, const struct io_uring_sqe **sqe)
{
unsigned head, mask = ctx->sq_entries - 1;
unsigned sq_idx = ctx->cached_sq_head++ & mask;
/* double index for 128-byte SQEs, twice as long */
if (ctx->flags & IORING_SETUP_SQE128)
head <<= 1;
- return &ctx->sq_sqes[head];
+ *sqe = &ctx->sq_sqes[head];
+ return true;
}
/* drop invalid entries */
ctx->cq_extra--;
WRITE_ONCE(ctx->rings->sq_dropped,
READ_ONCE(ctx->rings->sq_dropped) + 1);
- return NULL;
+ return false;
}
int io_submit_sqes(struct io_ring_ctx *ctx, unsigned int nr)
const struct io_uring_sqe *sqe;
struct io_kiocb *req;
- if (unlikely(!io_alloc_req_refill(ctx)))
+ if (unlikely(!io_alloc_req(ctx, &req)))
break;
- req = io_alloc_req(ctx);
- sqe = io_get_sqe(ctx);
- if (unlikely(!sqe)) {
+ if (unlikely(!io_get_sqe(ctx, &sqe))) {
io_req_add_to_cache(req, ctx);
break;
}
struct io_ring_ctx *ctx;
unsigned cq_tail;
unsigned nr_timeouts;
+ ktime_t timeout;
};
static inline bool io_has_work(struct io_ring_ctx *ctx)
{
return test_bit(IO_CHECK_CQ_OVERFLOW_BIT, &ctx->check_cq) ||
- ((ctx->flags & IORING_SETUP_DEFER_TASKRUN) &&
- !llist_empty(&ctx->work_llist));
+ !llist_empty(&ctx->work_llist);
}
static inline bool io_should_wake(struct io_wait_queue *iowq)
static int io_wake_function(struct wait_queue_entry *curr, unsigned int mode,
int wake_flags, void *key)
{
- struct io_wait_queue *iowq = container_of(curr, struct io_wait_queue,
- wq);
- struct io_ring_ctx *ctx = iowq->ctx;
+ struct io_wait_queue *iowq = container_of(curr, struct io_wait_queue, wq);
/*
* Cannot safely flush overflowed CQEs from here, ensure we wake up
* the task, and the next invocation will do it.
*/
- if (io_should_wake(iowq) || io_has_work(ctx))
+ if (io_should_wake(iowq) || io_has_work(iowq->ctx))
return autoremove_wake_function(curr, mode, wake_flags, key);
return -1;
}
int io_run_task_work_sig(struct io_ring_ctx *ctx)
{
- if (io_run_task_work_ctx(ctx) > 0)
+ if (!llist_empty(&ctx->work_llist)) {
+ __set_current_state(TASK_RUNNING);
+ if (io_run_local_work(ctx) > 0)
+ return 1;
+ }
+ if (io_run_task_work() > 0)
return 1;
if (task_sigpending(current))
return -EINTR;
/* when returns >0, the caller should retry */
static inline int io_cqring_wait_schedule(struct io_ring_ctx *ctx,
- struct io_wait_queue *iowq,
- ktime_t *timeout)
+ struct io_wait_queue *iowq)
{
- int ret;
- unsigned long check_cq;
-
- /* make sure we run task_work before checking for signals */
- ret = io_run_task_work_sig(ctx);
- if (ret || io_should_wake(iowq))
- return ret;
-
- check_cq = READ_ONCE(ctx->check_cq);
- if (unlikely(check_cq)) {
- /* let the caller flush overflows, retry */
- if (check_cq & BIT(IO_CHECK_CQ_OVERFLOW_BIT))
- return 1;
- if (check_cq & BIT(IO_CHECK_CQ_DROPPED_BIT))
- return -EBADR;
- }
- if (!schedule_hrtimeout(timeout, HRTIMER_MODE_ABS))
+ if (unlikely(READ_ONCE(ctx->check_cq)))
+ return 1;
+ if (unlikely(!llist_empty(&ctx->work_llist)))
+ return 1;
+ if (unlikely(test_thread_flag(TIF_NOTIFY_SIGNAL)))
+ return 1;
+ if (unlikely(task_sigpending(current)))
+ return -EINTR;
+ if (unlikely(io_should_wake(iowq)))
+ return 0;
+ if (iowq->timeout == KTIME_MAX)
+ schedule();
+ else if (!schedule_hrtimeout(&iowq->timeout, HRTIMER_MODE_ABS))
return -ETIME;
-
- /*
- * Run task_work after scheduling. If we got woken because of
- * task_work being processed, run it now rather than let the caller
- * do another wait loop.
- */
- ret = io_run_task_work_sig(ctx);
- return ret < 0 ? ret : 1;
+ return 0;
}
/*
{
struct io_wait_queue iowq;
struct io_rings *rings = ctx->rings;
- ktime_t timeout = KTIME_MAX;
int ret;
if (!io_allowed_run_tw(ctx))
return -EEXIST;
-
- do {
- /* always run at least 1 task work to process local work */
- ret = io_run_task_work_ctx(ctx);
- if (ret < 0)
- return ret;
- io_cqring_overflow_flush(ctx);
-
- /* if user messes with these they will just get an early return */
- if (__io_cqring_events_user(ctx) >= min_events)
- return 0;
- } while (ret > 0);
+ if (!llist_empty(&ctx->work_llist))
+ io_run_local_work(ctx);
+ io_run_task_work();
+ io_cqring_overflow_flush(ctx);
+ /* if user messes with these they will just get an early return */
+ if (__io_cqring_events_user(ctx) >= min_events)
+ return 0;
if (sig) {
#ifdef CONFIG_COMPAT
return ret;
}
- if (uts) {
- struct timespec64 ts;
-
- if (get_timespec64(&ts, uts))
- return -EFAULT;
- timeout = ktime_add_ns(timespec64_to_ktime(ts), ktime_get_ns());
- }
-
init_waitqueue_func_entry(&iowq.wq, io_wake_function);
iowq.wq.private = current;
INIT_LIST_HEAD(&iowq.wq.entry);
iowq.ctx = ctx;
iowq.nr_timeouts = atomic_read(&ctx->cq_timeouts);
iowq.cq_tail = READ_ONCE(ctx->rings->cq.head) + min_events;
+ iowq.timeout = KTIME_MAX;
+
+ if (uts) {
+ struct timespec64 ts;
+
+ if (get_timespec64(&ts, uts))
+ return -EFAULT;
+ iowq.timeout = ktime_add_ns(timespec64_to_ktime(ts), ktime_get_ns());
+ }
trace_io_uring_cqring_wait(ctx, min_events);
do {
- if (test_bit(IO_CHECK_CQ_OVERFLOW_BIT, &ctx->check_cq)) {
- finish_wait(&ctx->cq_wait, &iowq.wq);
- io_cqring_do_overflow_flush(ctx);
+ unsigned long check_cq;
+
+ if (ctx->flags & IORING_SETUP_DEFER_TASKRUN) {
+ WRITE_ONCE(ctx->cq_waiting, 1);
+ set_current_state(TASK_INTERRUPTIBLE);
+ } else {
+ prepare_to_wait_exclusive(&ctx->cq_wait, &iowq.wq,
+ TASK_INTERRUPTIBLE);
}
- prepare_to_wait_exclusive(&ctx->cq_wait, &iowq.wq,
- TASK_INTERRUPTIBLE);
- ret = io_cqring_wait_schedule(ctx, &iowq, &timeout);
- if (__io_cqring_events_user(ctx) >= min_events)
+
+ ret = io_cqring_wait_schedule(ctx, &iowq);
+ __set_current_state(TASK_RUNNING);
+ WRITE_ONCE(ctx->cq_waiting, 0);
+
+ if (ret < 0)
break;
+ /*
+ * Run task_work after scheduling and before io_should_wake().
+ * If we got woken because of task_work being processed, run it
+ * now rather than let the caller do another wait loop.
+ */
+ io_run_task_work();
+ if (!llist_empty(&ctx->work_llist))
+ io_run_local_work(ctx);
+
+ check_cq = READ_ONCE(ctx->check_cq);
+ if (unlikely(check_cq)) {
+ /* let the caller flush overflows, retry */
+ if (check_cq & BIT(IO_CHECK_CQ_OVERFLOW_BIT))
+ io_cqring_do_overflow_flush(ctx);
+ if (check_cq & BIT(IO_CHECK_CQ_DROPPED_BIT)) {
+ ret = -EBADR;
+ break;
+ }
+ }
+
+ if (io_should_wake(&iowq)) {
+ ret = 0;
+ break;
+ }
cond_resched();
- } while (ret > 0);
+ } while (1);
- finish_wait(&ctx->cq_wait, &iowq.wq);
+ if (!(ctx->flags & IORING_SETUP_DEFER_TASKRUN))
+ finish_wait(&ctx->cq_wait, &iowq.wq);
restore_saved_sigmask_unless(ret == -EINTR);
return READ_ONCE(rings->cq.head) == READ_ONCE(rings->cq.tail) ? ret : 0;
static void io_req_caches_free(struct io_ring_ctx *ctx)
{
+ struct io_kiocb *req;
int nr = 0;
mutex_lock(&ctx->uring_lock);
io_flush_cached_locked_reqs(ctx, &ctx->submit_state);
while (!io_req_cache_empty(ctx)) {
- struct io_kiocb *req = io_alloc_req(ctx);
-
+ req = io_extract_req(ctx);
kmem_cache_free(req_cachep, req);
nr++;
}
kfree(ctx);
}
+static __cold void io_activate_pollwq_cb(struct callback_head *cb)
+{
+ struct io_ring_ctx *ctx = container_of(cb, struct io_ring_ctx,
+ poll_wq_task_work);
+
+ mutex_lock(&ctx->uring_lock);
+ ctx->poll_activated = true;
+ mutex_unlock(&ctx->uring_lock);
+
+ /*
+ * Wake ups for some events between start of polling and activation
+ * might've been lost due to loose synchronisation.
+ */
+ wake_up_all(&ctx->poll_wq);
+ percpu_ref_put(&ctx->refs);
+}
+
+static __cold void io_activate_pollwq(struct io_ring_ctx *ctx)
+{
+ spin_lock(&ctx->completion_lock);
+ /* already activated or in progress */
+ if (ctx->poll_activated || ctx->poll_wq_task_work.func)
+ goto out;
+ if (WARN_ON_ONCE(!ctx->task_complete))
+ goto out;
+ if (!ctx->submitter_task)
+ goto out;
+ /*
+ * with ->submitter_task only the submitter task completes requests, we
+ * only need to sync with it, which is done by injecting a tw
+ */
+ init_task_work(&ctx->poll_wq_task_work, io_activate_pollwq_cb);
+ percpu_ref_get(&ctx->refs);
+ if (task_work_add(ctx->submitter_task, &ctx->poll_wq_task_work, TWA_SIGNAL))
+ percpu_ref_put(&ctx->refs);
+out:
+ spin_unlock(&ctx->completion_lock);
+}
+
static __poll_t io_uring_poll(struct file *file, poll_table *wait)
{
struct io_ring_ctx *ctx = file->private_data;
__poll_t mask = 0;
- poll_wait(file, &ctx->cq_wait, wait);
+ if (unlikely(!ctx->poll_activated))
+ io_activate_pollwq(ctx);
+
+ poll_wait(file, &ctx->poll_wq, wait);
/*
* synchronizes with barrier from wq_has_sleeper call in
* io_commit_cqring
* pushes them to do the flush.
*/
- if (io_cqring_events(ctx) || io_has_work(ctx))
+ if (__io_cqring_events_user(ctx) || io_has_work(ctx))
mask |= EPOLLIN | EPOLLRDNORM;
return mask;
work = container_of(cb, struct io_tctx_exit, task_work);
/*
- * When @in_idle, we're in cancellation and it's racy to remove the
+ * When @in_cancel, we're in cancellation and it's racy to remove the
* node. It'll be removed by the end of cancellation, just ignore it.
* tctx can be NULL if the queueing of this task_work raced with
* work cancelation off the exec path.
*/
- if (tctx && !atomic_read(&tctx->in_idle))
+ if (tctx && !atomic_read(&tctx->in_cancel))
io_uring_del_tctx_node((unsigned long)work->ctx);
complete(&work->completion);
}
while (!wq_list_empty(&ctx->iopoll_list)) {
io_iopoll_try_reap_events(ctx);
ret = true;
+ cond_resched();
}
}
- if (ctx->flags & IORING_SETUP_DEFER_TASKRUN)
+ if ((ctx->flags & IORING_SETUP_DEFER_TASKRUN) &&
+ io_allowed_defer_tw_run(ctx))
ret |= io_run_local_work(ctx) > 0;
ret |= io_cancel_defer_files(ctx, task, cancel_all);
mutex_lock(&ctx->uring_lock);
if (tctx->io_wq)
io_wq_exit_start(tctx->io_wq);
- atomic_inc(&tctx->in_idle);
+ atomic_inc(&tctx->in_cancel);
do {
bool loop = false;
if (cancel_all) {
/*
* We shouldn't run task_works after cancel, so just leave
- * ->in_idle set for normal exit.
+ * ->in_cancel set for normal exit.
*/
- atomic_dec(&tctx->in_idle);
+ atomic_dec(&tctx->in_cancel);
/* for exec all current's requests should be gone, kill tctx */
__io_uring_free(current);
}
}
if (flags & IORING_ENTER_SQ_WAKEUP)
wake_up(&ctx->sq_data->wait);
- if (flags & IORING_ENTER_SQ_WAIT) {
- ret = io_sqpoll_wait_sq(ctx);
- if (ret)
- goto out;
- }
+ if (flags & IORING_ENTER_SQ_WAIT)
+ io_sqpoll_wait_sq(ctx);
+
ret = to_submit;
} else if (to_submit) {
ret = io_uring_add_tctx_node(ctx);
ctx->task_complete = true;
/*
+ * lazy poll_wq activation relies on ->task_complete for synchronisation
+ * purposes, see io_activate_pollwq()
+ */
+ if (!ctx->task_complete)
+ ctx->poll_activated = true;
+
+ /*
* When SETUP_IOPOLL and SETUP_SQPOLL are both enabled, user
* space applications don't need to do io completion events
* polling again, they can rely on io_sq_thread to do polling
IORING_FEAT_POLL_32BITS | IORING_FEAT_SQPOLL_NONFIXED |
IORING_FEAT_EXT_ARG | IORING_FEAT_NATIVE_WORKERS |
IORING_FEAT_RSRC_TAGS | IORING_FEAT_CQE_SKIP |
- IORING_FEAT_LINKED_FILE;
+ IORING_FEAT_LINKED_FILE | IORING_FEAT_REG_REG_RING;
if (copy_to_user(params, p, sizeof(*p))) {
ret = -EFAULT;
for (i = 0; i < nr_args; i++) {
p->ops[i].op = i;
- if (!io_op_defs[i].not_supported)
+ if (!io_issue_defs[i].not_supported)
p->ops[i].flags = IO_URING_OP_SUPPORTED;
}
p->ops_len = i;
if (!(ctx->flags & IORING_SETUP_R_DISABLED))
return -EBADFD;
- if (ctx->flags & IORING_SETUP_SINGLE_ISSUER && !ctx->submitter_task)
+ if (ctx->flags & IORING_SETUP_SINGLE_ISSUER && !ctx->submitter_task) {
WRITE_ONCE(ctx->submitter_task, get_task_struct(current));
+ /*
+ * Lazy activation attempts would fail if it was polled before
+ * submitter_task is set.
+ */
+ if (wq_has_sleeper(&ctx->poll_wq))
+ io_activate_pollwq(ctx);
+ }
if (ctx->restrictions.registered)
ctx->restricted = 1;
struct io_ring_ctx *ctx;
long ret = -EBADF;
struct fd f;
+ bool use_registered_ring;
+
+ use_registered_ring = !!(opcode & IORING_REGISTER_USE_REGISTERED_RING);
+ opcode &= ~IORING_REGISTER_USE_REGISTERED_RING;
if (opcode >= IORING_REGISTER_LAST)
return -EINVAL;
- f = fdget(fd);
- if (!f.file)
- return -EBADF;
+ if (use_registered_ring) {
+ /*
+ * Ring fd has been registered via IORING_REGISTER_RING_FDS, we
+ * need only dereference our task private array to find it.
+ */
+ struct io_uring_task *tctx = current->io_uring;
- ret = -EOPNOTSUPP;
- if (!io_is_uring_fops(f.file))
- goto out_fput;
+ if (unlikely(!tctx || fd >= IO_RINGFD_REG_MAX))
+ return -EINVAL;
+ fd = array_index_nospec(fd, IO_RINGFD_REG_MAX);
+ f.file = tctx->registered_rings[fd];
+ f.flags = 0;
+ if (unlikely(!f.file))
+ return -EBADF;
+ } else {
+ f = fdget(fd);
+ if (unlikely(!f.file))
+ return -EBADF;
+ ret = -EOPNOTSUPP;
+ if (!io_is_uring_fops(f.file))
+ goto out_fput;
+ }
ctx = f.file->private_data;