io_uring: run timeouts from task_work
authorJens Axboe <axboe@kernel.dk>
Tue, 10 Aug 2021 21:11:51 +0000 (15:11 -0600)
committerJens Axboe <axboe@kernel.dk>
Mon, 23 Aug 2021 19:10:32 +0000 (13:10 -0600)
This is in preparation to making the completion lock work outside of
hard/soft IRQ context.

Add a timeout_lock to handle the ordering of timeout completions or
cancelations with the timeouts actually triggering.

Signed-off-by: Jens Axboe <axboe@kernel.dk>
fs/io_uring.c

index 64241e2..b8d0956 100644 (file)
@@ -409,6 +409,8 @@ struct io_ring_ctx {
        struct {
                spinlock_t              completion_lock;
 
+               spinlock_t              timeout_lock;
+
                /*
                 * ->iopoll_list is protected by the ctx->uring_lock for
                 * io_uring instances that don't use IORING_SETUP_SQPOLL.
@@ -1188,6 +1190,7 @@ static struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p)
        mutex_init(&ctx->uring_lock);
        init_waitqueue_head(&ctx->cq_wait);
        spin_lock_init(&ctx->completion_lock);
+       spin_lock_init(&ctx->timeout_lock);
        INIT_LIST_HEAD(&ctx->iopoll_list);
        INIT_LIST_HEAD(&ctx->defer_list);
        INIT_LIST_HEAD(&ctx->timeout_list);
@@ -1328,6 +1331,7 @@ static void io_queue_async_work(struct io_kiocb *req)
 
 static void io_kill_timeout(struct io_kiocb *req, int status)
        __must_hold(&req->ctx->completion_lock)
+       __must_hold(&req->ctx->timeout_lock)
 {
        struct io_timeout_data *io = req->async_data;
 
@@ -1355,9 +1359,12 @@ static void io_queue_deferred(struct io_ring_ctx *ctx)
 }
 
 static void io_flush_timeouts(struct io_ring_ctx *ctx)
+       __must_hold(&ctx->completion_lock)
 {
        u32 seq = ctx->cached_cq_tail - atomic_read(&ctx->cq_timeouts);
+       unsigned long flags;
 
+       spin_lock_irqsave(&ctx->timeout_lock, flags);
        while (!list_empty(&ctx->timeout_list)) {
                u32 events_needed, events_got;
                struct io_kiocb *req = list_first_entry(&ctx->timeout_list,
@@ -1382,6 +1389,7 @@ static void io_flush_timeouts(struct io_ring_ctx *ctx)
                io_kill_timeout(req, 0);
        }
        ctx->cq_last_tm_flush = seq;
+       spin_unlock_irqrestore(&ctx->timeout_lock, flags);
 }
 
 static void __io_commit_cqring_flush(struct io_ring_ctx *ctx)
@@ -5455,6 +5463,20 @@ err:
        return 0;
 }
 
+static void io_req_task_timeout(struct io_kiocb *req)
+{
+       struct io_ring_ctx *ctx = req->ctx;
+
+       spin_lock_irq(&ctx->completion_lock);
+       io_cqring_fill_event(ctx, req->user_data, -ETIME, 0);
+       io_commit_cqring(ctx);
+       spin_unlock_irq(&ctx->completion_lock);
+
+       io_cqring_ev_posted(ctx);
+       req_set_fail(req);
+       io_put_req(req);
+}
+
 static enum hrtimer_restart io_timeout_fn(struct hrtimer *timer)
 {
        struct io_timeout_data *data = container_of(timer,
@@ -5463,24 +5485,20 @@ static enum hrtimer_restart io_timeout_fn(struct hrtimer *timer)
        struct io_ring_ctx *ctx = req->ctx;
        unsigned long flags;
 
-       spin_lock_irqsave(&ctx->completion_lock, flags);
+       spin_lock_irqsave(&ctx->timeout_lock, flags);
        list_del_init(&req->timeout.list);
        atomic_set(&req->ctx->cq_timeouts,
                atomic_read(&req->ctx->cq_timeouts) + 1);
+       spin_unlock_irqrestore(&ctx->timeout_lock, flags);
 
-       io_cqring_fill_event(ctx, req->user_data, -ETIME, 0);
-       io_commit_cqring(ctx);
-       spin_unlock_irqrestore(&ctx->completion_lock, flags);
-
-       io_cqring_ev_posted(ctx);
-       req_set_fail(req);
-       io_put_req(req);
+       req->io_task_work.func = io_req_task_timeout;
+       io_req_task_work_add(req);
        return HRTIMER_NORESTART;
 }
 
 static struct io_kiocb *io_timeout_extract(struct io_ring_ctx *ctx,
                                           __u64 user_data)
-       __must_hold(&ctx->completion_lock)
+       __must_hold(&ctx->timeout_lock)
 {
        struct io_timeout_data *io;
        struct io_kiocb *req;
@@ -5502,7 +5520,7 @@ static struct io_kiocb *io_timeout_extract(struct io_ring_ctx *ctx,
 }
 
 static int io_timeout_cancel(struct io_ring_ctx *ctx, __u64 user_data)
-       __must_hold(&ctx->completion_lock)
+       __must_hold(&ctx->timeout_lock)
 {
        struct io_kiocb *req = io_timeout_extract(ctx, user_data);
 
@@ -5517,7 +5535,7 @@ static int io_timeout_cancel(struct io_ring_ctx *ctx, __u64 user_data)
 
 static int io_timeout_update(struct io_ring_ctx *ctx, __u64 user_data,
                             struct timespec64 *ts, enum hrtimer_mode mode)
-       __must_hold(&ctx->completion_lock)
+       __must_hold(&ctx->timeout_lock)
 {
        struct io_kiocb *req = io_timeout_extract(ctx, user_data);
        struct io_timeout_data *data;
@@ -5576,13 +5594,15 @@ static int io_timeout_remove(struct io_kiocb *req, unsigned int issue_flags)
        struct io_ring_ctx *ctx = req->ctx;
        int ret;
 
-       spin_lock_irq(&ctx->completion_lock);
+       spin_lock_irq(&ctx->timeout_lock);
        if (!(req->timeout_rem.flags & IORING_TIMEOUT_UPDATE))
                ret = io_timeout_cancel(ctx, tr->addr);
        else
                ret = io_timeout_update(ctx, tr->addr, &tr->ts,
                                        io_translate_timeout_mode(tr->flags));
+       spin_unlock_irq(&ctx->timeout_lock);
 
+       spin_lock_irq(&ctx->completion_lock);
        io_cqring_fill_event(ctx, req->user_data, ret, 0);
        io_commit_cqring(ctx);
        spin_unlock_irq(&ctx->completion_lock);
@@ -5637,7 +5657,7 @@ static int io_timeout(struct io_kiocb *req, unsigned int issue_flags)
        struct list_head *entry;
        u32 tail, off = req->timeout.off;
 
-       spin_lock_irq(&ctx->completion_lock);
+       spin_lock_irq(&ctx->timeout_lock);
 
        /*
         * sqe->off holds how many events that need to occur for this
@@ -5676,7 +5696,7 @@ add:
        list_add(&req->timeout.list, entry);
        data->timer.function = io_timeout_fn;
        hrtimer_start(&data->timer, timespec64_to_ktime(data->ts), data->mode);
-       spin_unlock_irq(&ctx->completion_lock);
+       spin_unlock_irq(&ctx->timeout_lock);
        return 0;
 }
 
@@ -5730,7 +5750,9 @@ static void io_async_find_and_cancel(struct io_ring_ctx *ctx,
        spin_lock_irqsave(&ctx->completion_lock, flags);
        if (ret != -ENOENT)
                goto done;
+       spin_lock(&ctx->timeout_lock);
        ret = io_timeout_cancel(ctx, sqe_addr);
+       spin_unlock(&ctx->timeout_lock);
        if (ret != -ENOENT)
                goto done;
        ret = io_poll_cancel(ctx, sqe_addr, false);
@@ -5772,7 +5794,9 @@ static int io_async_cancel(struct io_kiocb *req, unsigned int issue_flags)
        spin_lock_irq(&ctx->completion_lock);
        if (ret != -ENOENT)
                goto done;
+       spin_lock(&ctx->timeout_lock);
        ret = io_timeout_cancel(ctx, sqe_addr);
+       spin_unlock(&ctx->timeout_lock);
        if (ret != -ENOENT)
                goto done;
        ret = io_poll_cancel(ctx, sqe_addr, false);
@@ -8801,12 +8825,14 @@ static bool io_kill_timeouts(struct io_ring_ctx *ctx, struct task_struct *tsk,
        int canceled = 0;
 
        spin_lock_irq(&ctx->completion_lock);
+       spin_lock(&ctx->timeout_lock);
        list_for_each_entry_safe(req, tmp, &ctx->timeout_list, timeout.list) {
                if (io_match_task(req, tsk, cancel_all)) {
                        io_kill_timeout(req, -ECANCELED);
                        canceled++;
                }
        }
+       spin_unlock(&ctx->timeout_lock);
        if (canceled != 0)
                io_commit_cqring(ctx);
        spin_unlock_irq(&ctx->completion_lock);