io_uring: fix CQE reordering
authorPavel Begunkov <asml.silence@gmail.com>
Fri, 23 Sep 2022 13:53:25 +0000 (14:53 +0100)
committerJens Axboe <axboe@kernel.dk>
Fri, 23 Sep 2022 21:04:20 +0000 (15:04 -0600)
Overflowing CQEs may result in reordering, which is buggy in case of
links, F_MORE and so on. If we guarantee that we don't reorder for
the unlikely event of a CQ ring overflow, then we can further extend
this to not have to terminate multishot requests if it happens. For
other operations, like zerocopy sends, we have no choice but to honor
CQE ordering.

Reported-by: Dylan Yudaken <dylany@fb.com>
Signed-off-by: Pavel Begunkov <asml.silence@gmail.com>
Link: https://lore.kernel.org/r/ec3bc55687b0768bbe20fb62d7d06cfced7d7e70.1663892031.git.asml.silence@gmail.com
Signed-off-by: Jens Axboe <axboe@kernel.dk>
io_uring/io_uring.c
io_uring/io_uring.h

index f359e24..62d1f55 100644 (file)
@@ -609,7 +609,7 @@ static bool __io_cqring_overflow_flush(struct io_ring_ctx *ctx, bool force)
 
        io_cq_lock(ctx);
        while (!list_empty(&ctx->cq_overflow_list)) {
-               struct io_uring_cqe *cqe = io_get_cqe(ctx);
+               struct io_uring_cqe *cqe = io_get_cqe_overflow(ctx, true);
                struct io_overflow_cqe *ocqe;
 
                if (!cqe && !force)
@@ -736,12 +736,19 @@ bool io_req_cqe_overflow(struct io_kiocb *req)
  * control dependency is enough as we're using WRITE_ONCE to
  * fill the cq entry
  */
-struct io_uring_cqe *__io_get_cqe(struct io_ring_ctx *ctx)
+struct io_uring_cqe *__io_get_cqe(struct io_ring_ctx *ctx, bool overflow)
 {
        struct io_rings *rings = ctx->rings;
        unsigned int off = ctx->cached_cq_tail & (ctx->cq_entries - 1);
        unsigned int free, queued, len;
 
+       /*
+        * Posting into the CQ when there are pending overflowed CQEs may break
+        * ordering guarantees, which will affect links, F_MORE users and more.
+        * Force overflow the completion.
+        */
+       if (!overflow && (ctx->check_cq & BIT(IO_CHECK_CQ_OVERFLOW_BIT)))
+               return NULL;
 
        /* userspace may cheat modifying the tail, be safe and do min */
        queued = min(__io_cqring_events(ctx), ctx->cq_entries);
@@ -2394,6 +2401,7 @@ static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events,
                if (ret < 0)
                        return ret;
                io_cqring_overflow_flush(ctx);
+
                if (io_cqring_events(ctx) >= min_events)
                        return 0;
        } while (ret > 0);
index d38173b..177bd55 100644 (file)
@@ -24,7 +24,7 @@ enum {
        IOU_STOP_MULTISHOT      = -ECANCELED,
 };
 
-struct io_uring_cqe *__io_get_cqe(struct io_ring_ctx *ctx);
+struct io_uring_cqe *__io_get_cqe(struct io_ring_ctx *ctx, bool overflow);
 bool io_req_cqe_overflow(struct io_kiocb *req);
 int io_run_task_work_sig(struct io_ring_ctx *ctx);
 int __io_run_local_work(struct io_ring_ctx *ctx, bool locked);
@@ -93,7 +93,8 @@ static inline void io_cq_lock(struct io_ring_ctx *ctx)
 
 void io_cq_unlock_post(struct io_ring_ctx *ctx);
 
-static inline struct io_uring_cqe *io_get_cqe(struct io_ring_ctx *ctx)
+static inline struct io_uring_cqe *io_get_cqe_overflow(struct io_ring_ctx *ctx,
+                                                      bool overflow)
 {
        if (likely(ctx->cqe_cached < ctx->cqe_sentinel)) {
                struct io_uring_cqe *cqe = ctx->cqe_cached;
@@ -105,7 +106,12 @@ static inline struct io_uring_cqe *io_get_cqe(struct io_ring_ctx *ctx)
                return cqe;
        }
 
-       return __io_get_cqe(ctx);
+       return __io_get_cqe(ctx, overflow);
+}
+
+static inline struct io_uring_cqe *io_get_cqe(struct io_ring_ctx *ctx)
+{
+       return io_get_cqe_overflow(ctx, false);
 }
 
 static inline bool __io_fill_cqe_req(struct io_ring_ctx *ctx,