io_uring: move read/write related opcodes to its own file
authorJens Axboe <axboe@kernel.dk>
Mon, 13 Jun 2022 13:27:03 +0000 (07:27 -0600)
committerJens Axboe <axboe@kernel.dk>
Mon, 25 Jul 2022 00:39:12 +0000 (18:39 -0600)
Signed-off-by: Jens Axboe <axboe@kernel.dk>
io_uring/Makefile
io_uring/io_uring.c
io_uring/io_uring.h
io_uring/rw.c [new file with mode: 0644]
io_uring/rw.h [new file with mode: 0644]

index 360a83039c2a4f78ec943e2cd11480746400274d..d70deed65a0bb2c27588e7384924b89f6034fac8 100644 (file)
@@ -7,5 +7,5 @@ obj-$(CONFIG_IO_URING)          += io_uring.o xattr.o nop.o fs.o splice.o \
                                        openclose.o uring_cmd.o epoll.o \
                                        statx.o net.o msg_ring.o timeout.o \
                                        sqpoll.o fdinfo.o tctx.o poll.o \
-                                       cancel.o kbuf.o rsrc.o
+                                       cancel.o kbuf.o rsrc.o rw.o
 obj-$(CONFIG_IO_WQ)            += io-wq.o
index c0f1f79933ac2a58c6d9283a30da8b1dd4b343b8..0af61a6c29cfee51a6f501e332ccb225ccdc5083 100644 (file)
@@ -43,7 +43,6 @@
 #include <linux/init.h>
 #include <linux/errno.h>
 #include <linux/syscalls.h>
-#include <linux/compat.h>
 #include <net/compat.h>
 #include <linux/refcount.h>
 #include <linux/uio.h>
@@ -57,7 +56,6 @@
 #include <linux/mman.h>
 #include <linux/percpu.h>
 #include <linux/slab.h>
-#include <linux/blk-mq.h>
 #include <linux/bvec.h>
 #include <linux/net.h>
 #include <net/sock.h>
 #include <linux/sched/mm.h>
 #include <linux/uaccess.h>
 #include <linux/nospec.h>
-#include <linux/sizes.h>
 #include <linux/highmem.h>
 #include <linux/fsnotify.h>
 #include <linux/fadvise.h>
-#include <linux/eventpoll.h>
 #include <linux/task_work.h>
-#include <linux/pagemap.h>
 #include <linux/io_uring.h>
 #include <linux/audit.h>
 #include <linux/security.h>
 #include "timeout.h"
 #include "poll.h"
 #include "cancel.h"
+#include "rw.h"
 
 #define IORING_MAX_ENTRIES     32768
 #define IORING_MAX_CQ_ENTRIES  (2 * IORING_MAX_ENTRIES)
 #define IO_REQ_CACHE_SIZE              32
 #define IO_REQ_ALLOC_BATCH             8
 
-/*
- * First field must be the file pointer in all the
- * iocb unions! See also 'struct kiocb' in <linux/fs.h>
- */
-struct io_rw {
-       /* NOTE: kiocb has the file as the first member, so don't do it here */
-       struct kiocb                    kiocb;
-       u64                             addr;
-       u32                             len;
-       rwf_t                           flags;
-};
-
-struct io_rw_state {
-       struct iov_iter                 iter;
-       struct iov_iter_state           iter_state;
-       struct iovec                    fast_iov[UIO_FASTIOV];
-};
-
-struct io_async_rw {
-       struct io_rw_state              s;
-       const struct iovec              *free_iovec;
-       size_t                          bytes_done;
-       struct wait_page_queue          wpq;
-};
-
 enum {
        IO_CHECK_CQ_OVERFLOW_BIT,
        IO_CHECK_CQ_DROPPED_BIT,
@@ -184,9 +155,7 @@ static void io_dismantle_req(struct io_kiocb *req);
 static void io_clean_op(struct io_kiocb *req);
 static void io_queue_sqe(struct io_kiocb *req);
 
-static void io_req_task_queue(struct io_kiocb *req);
 static void __io_submit_flush_completions(struct io_ring_ctx *ctx);
-static int io_req_prep_async(struct io_kiocb *req);
 
 static void io_eventfd_signal(struct io_ring_ctx *ctx);
 
@@ -393,11 +362,6 @@ static bool req_need_defer(struct io_kiocb *req, u32 seq)
        return false;
 }
 
-static inline bool io_req_ffs_set(struct io_kiocb *req)
-{
-       return req->flags & REQ_F_FIXED_FILE;
-}
-
 static inline void io_req_track_inflight(struct io_kiocb *req)
 {
        if (!(req->flags & REQ_F_INFLIGHT)) {
@@ -489,7 +453,7 @@ static inline void io_req_add_compl_list(struct io_kiocb *req)
        wq_list_add_tail(&req->comp_list, &state->compl_reqs);
 }
 
-static void io_queue_iowq(struct io_kiocb *req, bool *dont_use)
+void io_queue_iowq(struct io_kiocb *req, bool *dont_use)
 {
        struct io_kiocb *link = io_prep_linked_timeout(req);
        struct io_uring_task *tctx = req->task->io_uring;
@@ -532,7 +496,7 @@ static __cold void io_queue_deferred(struct io_ring_ctx *ctx)
        }
 }
 
-static void __io_commit_cqring_flush(struct io_ring_ctx *ctx)
+void __io_commit_cqring_flush(struct io_ring_ctx *ctx)
 {
        if (ctx->off_timeout_used || ctx->drain_active) {
                spin_lock(&ctx->completion_lock);
@@ -547,60 +511,6 @@ static void __io_commit_cqring_flush(struct io_ring_ctx *ctx)
                io_eventfd_signal(ctx);
 }
 
-static inline unsigned int __io_cqring_events(struct io_ring_ctx *ctx)
-{
-       return ctx->cached_cq_tail - READ_ONCE(ctx->rings->cq.head);
-}
-
-/*
- * writes to the cq entry need to come after reading head; the
- * control dependency is enough as we're using WRITE_ONCE to
- * fill the cq entry
- */
-static noinline struct io_uring_cqe *__io_get_cqe(struct io_ring_ctx *ctx)
-{
-       struct io_rings *rings = ctx->rings;
-       unsigned int off = ctx->cached_cq_tail & (ctx->cq_entries - 1);
-       unsigned int shift = 0;
-       unsigned int free, queued, len;
-
-       if (ctx->flags & IORING_SETUP_CQE32)
-               shift = 1;
-
-       /* userspace may cheat modifying the tail, be safe and do min */
-       queued = min(__io_cqring_events(ctx), ctx->cq_entries);
-       free = ctx->cq_entries - queued;
-       /* we need a contiguous range, limit based on the current array offset */
-       len = min(free, ctx->cq_entries - off);
-       if (!len)
-               return NULL;
-
-       ctx->cached_cq_tail++;
-       ctx->cqe_cached = &rings->cqes[off];
-       ctx->cqe_sentinel = ctx->cqe_cached + len;
-       ctx->cqe_cached++;
-       return &rings->cqes[off << shift];
-}
-
-static inline struct io_uring_cqe *io_get_cqe(struct io_ring_ctx *ctx)
-{
-       if (likely(ctx->cqe_cached < ctx->cqe_sentinel)) {
-               struct io_uring_cqe *cqe = ctx->cqe_cached;
-
-               if (ctx->flags & IORING_SETUP_CQE32) {
-                       unsigned int off = ctx->cqe_cached - ctx->rings->cqes;
-
-                       cqe += off;
-               }
-
-               ctx->cached_cq_tail++;
-               ctx->cqe_cached++;
-               return cqe;
-       }
-
-       return __io_get_cqe(ctx);
-}
-
 static void io_eventfd_signal(struct io_ring_ctx *ctx)
 {
        struct io_ev_fd *ev_fd;
@@ -628,17 +538,6 @@ out:
        rcu_read_unlock();
 }
 
-static inline void io_cqring_wake(struct io_ring_ctx *ctx)
-{
-       /*
-        * wake_up_all() may seem excessive, but io_wake_function() and
-        * io_should_wake() handle the termination of the loop and only
-        * wake as many waiters as we need to.
-        */
-       if (wq_has_sleeper(&ctx->cq_wait))
-               wake_up_all(&ctx->cq_wait);
-}
-
 /*
  * This should only get called when at least one event has been posted.
  * Some applications rely on the eventfd notification count only changing
@@ -655,16 +554,6 @@ void io_cqring_ev_posted(struct io_ring_ctx *ctx)
        io_cqring_wake(ctx);
 }
 
-static void io_cqring_ev_posted_iopoll(struct io_ring_ctx *ctx)
-{
-       if (unlikely(ctx->off_timeout_used || ctx->drain_active ||
-                    ctx->has_evfd))
-               __io_commit_cqring_flush(ctx);
-
-       if (ctx->flags & IORING_SETUP_SQPOLL)
-               io_cqring_wake(ctx);
-}
-
 /* Returns true if there are no backlogged entries after the flush */
 static bool __io_cqring_overflow_flush(struct io_ring_ctx *ctx, bool force)
 {
@@ -775,9 +664,8 @@ static __cold void io_uring_drop_tctx_refs(struct task_struct *task)
        }
 }
 
-static bool io_cqring_event_overflow(struct io_ring_ctx *ctx, u64 user_data,
-                                    s32 res, u32 cflags, u64 extra1,
-                                    u64 extra2)
+bool io_cqring_event_overflow(struct io_ring_ctx *ctx, u64 user_data, s32 res,
+                             u32 cflags, u64 extra1, u64 extra2)
 {
        struct io_overflow_cqe *ocqe;
        size_t ocq_size = sizeof(struct io_overflow_cqe);
@@ -814,59 +702,6 @@ static bool io_cqring_event_overflow(struct io_ring_ctx *ctx, u64 user_data,
        return true;
 }
 
-static inline bool __io_fill_cqe_req(struct io_ring_ctx *ctx,
-                                    struct io_kiocb *req)
-{
-       struct io_uring_cqe *cqe;
-
-       if (!(ctx->flags & IORING_SETUP_CQE32)) {
-               trace_io_uring_complete(req->ctx, req, req->cqe.user_data,
-                                       req->cqe.res, req->cqe.flags, 0, 0);
-
-               /*
-                * If we can't get a cq entry, userspace overflowed the
-                * submission (by quite a lot). Increment the overflow count in
-                * the ring.
-                */
-               cqe = io_get_cqe(ctx);
-               if (likely(cqe)) {
-                       memcpy(cqe, &req->cqe, sizeof(*cqe));
-                       return true;
-               }
-
-               return io_cqring_event_overflow(ctx, req->cqe.user_data,
-                                               req->cqe.res, req->cqe.flags,
-                                               0, 0);
-       } else {
-               u64 extra1 = 0, extra2 = 0;
-
-               if (req->flags & REQ_F_CQE32_INIT) {
-                       extra1 = req->extra1;
-                       extra2 = req->extra2;
-               }
-
-               trace_io_uring_complete(req->ctx, req, req->cqe.user_data,
-                                       req->cqe.res, req->cqe.flags, extra1, extra2);
-
-               /*
-                * If we can't get a cq entry, userspace overflowed the
-                * submission (by quite a lot). Increment the overflow count in
-                * the ring.
-                */
-               cqe = io_get_cqe(ctx);
-               if (likely(cqe)) {
-                       memcpy(cqe, &req->cqe, sizeof(struct io_uring_cqe));
-                       WRITE_ONCE(cqe->big_cqe[0], extra1);
-                       WRITE_ONCE(cqe->big_cqe[1], extra2);
-                       return true;
-               }
-
-               return io_cqring_event_overflow(ctx, req->cqe.user_data,
-                               req->cqe.res, req->cqe.flags,
-                               extra1, extra2);
-       }
-}
-
 bool io_fill_cqe_aux(struct io_ring_ctx *ctx, u64 user_data, s32 res,
                     u32 cflags)
 {
@@ -1269,7 +1104,7 @@ void io_req_task_work_add(struct io_kiocb *req)
        __io_req_task_work_add(req, tctx, &tctx->task_list);
 }
 
-static void io_req_task_prio_work_add(struct io_kiocb *req)
+void io_req_task_prio_work_add(struct io_kiocb *req)
 {
        struct io_uring_task *tctx = req->task->io_uring;
 
@@ -1315,18 +1150,12 @@ void io_req_task_queue_fail(struct io_kiocb *req, int ret)
        io_req_task_work_add(req);
 }
 
-static void io_req_task_queue(struct io_kiocb *req)
+void io_req_task_queue(struct io_kiocb *req)
 {
        req->io_task_work.func = io_req_task_submit;
        io_req_task_work_add(req);
 }
 
-static void io_req_task_queue_reissue(struct io_kiocb *req)
-{
-       req->io_task_work.func = io_queue_iowq;
-       io_req_task_work_add(req);
-}
-
 void io_queue_next(struct io_kiocb *req)
 {
        struct io_kiocb *nxt = io_req_find_next(req);
@@ -1335,8 +1164,7 @@ void io_queue_next(struct io_kiocb *req)
                io_req_task_queue(nxt);
 }
 
-static void io_free_batch_list(struct io_ring_ctx *ctx,
-                               struct io_wq_work_node *node)
+void io_free_batch_list(struct io_ring_ctx *ctx, struct io_wq_work_node *node)
        __must_hold(&ctx->uring_lock)
 {
        struct task_struct *task = NULL;
@@ -1435,76 +1263,6 @@ static unsigned io_cqring_events(struct io_ring_ctx *ctx)
        return __io_cqring_events(ctx);
 }
 
-int io_do_iopoll(struct io_ring_ctx *ctx, bool force_nonspin)
-{
-       struct io_wq_work_node *pos, *start, *prev;
-       unsigned int poll_flags = BLK_POLL_NOSLEEP;
-       DEFINE_IO_COMP_BATCH(iob);
-       int nr_events = 0;
-
-       /*
-        * Only spin for completions if we don't have multiple devices hanging
-        * off our complete list.
-        */
-       if (ctx->poll_multi_queue || force_nonspin)
-               poll_flags |= BLK_POLL_ONESHOT;
-
-       wq_list_for_each(pos, start, &ctx->iopoll_list) {
-               struct io_kiocb *req = container_of(pos, struct io_kiocb, comp_list);
-               struct io_rw *rw = io_kiocb_to_cmd(req);
-               int ret;
-
-               /*
-                * Move completed and retryable entries to our local lists.
-                * If we find a request that requires polling, break out
-                * and complete those lists first, if we have entries there.
-                */
-               if (READ_ONCE(req->iopoll_completed))
-                       break;
-
-               ret = rw->kiocb.ki_filp->f_op->iopoll(&rw->kiocb, &iob, poll_flags);
-               if (unlikely(ret < 0))
-                       return ret;
-               else if (ret)
-                       poll_flags |= BLK_POLL_ONESHOT;
-
-               /* iopoll may have completed current req */
-               if (!rq_list_empty(iob.req_list) ||
-                   READ_ONCE(req->iopoll_completed))
-                       break;
-       }
-
-       if (!rq_list_empty(iob.req_list))
-               iob.complete(&iob);
-       else if (!pos)
-               return 0;
-
-       prev = start;
-       wq_list_for_each_resume(pos, prev) {
-               struct io_kiocb *req = container_of(pos, struct io_kiocb, comp_list);
-
-               /* order with io_complete_rw_iopoll(), e.g. ->result updates */
-               if (!smp_load_acquire(&req->iopoll_completed))
-                       break;
-               nr_events++;
-               if (unlikely(req->flags & REQ_F_CQE_SKIP))
-                       continue;
-
-               req->cqe.flags = io_put_kbuf(req, 0);
-               __io_fill_cqe_req(req->ctx, req);
-       }
-
-       if (unlikely(!nr_events))
-               return 0;
-
-       io_commit_cqring(ctx);
-       io_cqring_ev_posted_iopoll(ctx);
-       pos = start ? start->next : ctx->iopoll_list.first;
-       wq_list_cut(&ctx->iopoll_list, prev, start);
-       io_free_batch_list(ctx, pos);
-       return nr_events;
-}
-
 /*
  * We can't just wait for polled events to come to us, we have to actively
  * find and complete them.
@@ -1589,90 +1347,6 @@ static int io_iopoll_check(struct io_ring_ctx *ctx, long min)
 
        return ret;
 }
-
-static void kiocb_end_write(struct io_kiocb *req)
-{
-       /*
-        * Tell lockdep we inherited freeze protection from submission
-        * thread.
-        */
-       if (req->flags & REQ_F_ISREG) {
-               struct super_block *sb = file_inode(req->file)->i_sb;
-
-               __sb_writers_acquired(sb, SB_FREEZE_WRITE);
-               sb_end_write(sb);
-       }
-}
-
-#ifdef CONFIG_BLOCK
-static bool io_resubmit_prep(struct io_kiocb *req)
-{
-       struct io_async_rw *io = req->async_data;
-
-       if (!req_has_async_data(req))
-               return !io_req_prep_async(req);
-       iov_iter_restore(&io->s.iter, &io->s.iter_state);
-       return true;
-}
-
-static bool io_rw_should_reissue(struct io_kiocb *req)
-{
-       umode_t mode = file_inode(req->file)->i_mode;
-       struct io_ring_ctx *ctx = req->ctx;
-
-       if (!S_ISBLK(mode) && !S_ISREG(mode))
-               return false;
-       if ((req->flags & REQ_F_NOWAIT) || (io_wq_current_is_worker() &&
-           !(ctx->flags & IORING_SETUP_IOPOLL)))
-               return false;
-       /*
-        * If ref is dying, we might be running poll reap from the exit work.
-        * Don't attempt to reissue from that path, just let it fail with
-        * -EAGAIN.
-        */
-       if (percpu_ref_is_dying(&ctx->refs))
-               return false;
-       /*
-        * Play it safe and assume not safe to re-import and reissue if we're
-        * not in the original thread group (or in task context).
-        */
-       if (!same_thread_group(req->task, current) || !in_task())
-               return false;
-       return true;
-}
-#else
-static bool io_resubmit_prep(struct io_kiocb *req)
-{
-       return false;
-}
-static bool io_rw_should_reissue(struct io_kiocb *req)
-{
-       return false;
-}
-#endif
-
-static bool __io_complete_rw_common(struct io_kiocb *req, long res)
-{
-       struct io_rw *rw = io_kiocb_to_cmd(req);
-
-       if (rw->kiocb.ki_flags & IOCB_WRITE) {
-               kiocb_end_write(req);
-               fsnotify_modify(req->file);
-       } else {
-               fsnotify_access(req->file);
-       }
-       if (unlikely(res != req->cqe.res)) {
-               if ((res == -EAGAIN || res == -EOPNOTSUPP) &&
-                   io_rw_should_reissue(req)) {
-                       req->flags |= REQ_F_REISSUE | REQ_F_PARTIAL_IO;
-                       return true;
-               }
-               req_set_fail(req);
-               req->cqe.res = res;
-       }
-       return false;
-}
-
 inline void io_req_task_complete(struct io_kiocb *req, bool *locked)
 {
        if (*locked) {
@@ -1685,46 +1359,6 @@ inline void io_req_task_complete(struct io_kiocb *req, bool *locked)
        }
 }
 
-static void __io_complete_rw(struct io_kiocb *req, long res,
-                            unsigned int issue_flags)
-{
-       if (__io_complete_rw_common(req, res))
-               return;
-       io_req_set_res(req, req->cqe.res, io_put_kbuf(req, issue_flags));
-       __io_req_complete(req, issue_flags);
-}
-
-static void io_complete_rw(struct kiocb *kiocb, long res)
-{
-       struct io_rw *rw = container_of(kiocb, struct io_rw, kiocb);
-       struct io_kiocb *req = cmd_to_io_kiocb(rw);
-
-       if (__io_complete_rw_common(req, res))
-               return;
-       io_req_set_res(req, res, 0);
-       req->io_task_work.func = io_req_task_complete;
-       io_req_task_prio_work_add(req);
-}
-
-static void io_complete_rw_iopoll(struct kiocb *kiocb, long res)
-{
-       struct io_rw *rw = container_of(kiocb, struct io_rw, kiocb);
-       struct io_kiocb *req = cmd_to_io_kiocb(rw);
-
-       if (kiocb->ki_flags & IOCB_WRITE)
-               kiocb_end_write(req);
-       if (unlikely(res != req->cqe.res)) {
-               if (res == -EAGAIN && io_rw_should_reissue(req)) {
-                       req->flags |= REQ_F_REISSUE | REQ_F_PARTIAL_IO;
-                       return;
-               }
-               req->cqe.res = res;
-       }
-
-       /* order with io_iopoll_complete() checking ->iopoll_completed */
-       smp_store_release(&req->iopoll_completed, 1);
-}
-
 /*
  * After the iocb has been issued, it's safe to be found on the poll list.
  * Adding the kiocb to the list AFTER submission ensures that we don't
@@ -1833,426 +1467,6 @@ unsigned int io_file_get_flags(struct file *file)
        return res;
 }
 
-static inline bool io_file_supports_nowait(struct io_kiocb *req)
-{
-       return req->flags & REQ_F_SUPPORT_NOWAIT;
-}
-
-static int io_prep_rw(struct io_kiocb *req, const struct io_uring_sqe *sqe)
-{
-       struct io_rw *rw = io_kiocb_to_cmd(req);
-       unsigned ioprio;
-       int ret;
-
-       rw->kiocb.ki_pos = READ_ONCE(sqe->off);
-       /* used for fixed read/write too - just read unconditionally */
-       req->buf_index = READ_ONCE(sqe->buf_index);
-
-       if (req->opcode == IORING_OP_READ_FIXED ||
-           req->opcode == IORING_OP_WRITE_FIXED) {
-               struct io_ring_ctx *ctx = req->ctx;
-               u16 index;
-
-               if (unlikely(req->buf_index >= ctx->nr_user_bufs))
-                       return -EFAULT;
-               index = array_index_nospec(req->buf_index, ctx->nr_user_bufs);
-               req->imu = ctx->user_bufs[index];
-               io_req_set_rsrc_node(req, ctx, 0);
-       }
-
-       ioprio = READ_ONCE(sqe->ioprio);
-       if (ioprio) {
-               ret = ioprio_check_cap(ioprio);
-               if (ret)
-                       return ret;
-
-               rw->kiocb.ki_ioprio = ioprio;
-       } else {
-               rw->kiocb.ki_ioprio = get_current_ioprio();
-       }
-
-       rw->addr = READ_ONCE(sqe->addr);
-       rw->len = READ_ONCE(sqe->len);
-       rw->flags = READ_ONCE(sqe->rw_flags);
-       return 0;
-}
-
-static void io_readv_writev_cleanup(struct io_kiocb *req)
-{
-       struct io_async_rw *io = req->async_data;
-
-       kfree(io->free_iovec);
-}
-
-static inline void io_rw_done(struct kiocb *kiocb, ssize_t ret)
-{
-       switch (ret) {
-       case -EIOCBQUEUED:
-               break;
-       case -ERESTARTSYS:
-       case -ERESTARTNOINTR:
-       case -ERESTARTNOHAND:
-       case -ERESTART_RESTARTBLOCK:
-               /*
-                * We can't just restart the syscall, since previously
-                * submitted sqes may already be in progress. Just fail this
-                * IO with EINTR.
-                */
-               ret = -EINTR;
-               fallthrough;
-       default:
-               kiocb->ki_complete(kiocb, ret);
-       }
-}
-
-static inline loff_t *io_kiocb_update_pos(struct io_kiocb *req)
-{
-       struct io_rw *rw = io_kiocb_to_cmd(req);
-
-       if (rw->kiocb.ki_pos != -1)
-               return &rw->kiocb.ki_pos;
-
-       if (!(req->file->f_mode & FMODE_STREAM)) {
-               req->flags |= REQ_F_CUR_POS;
-               rw->kiocb.ki_pos = req->file->f_pos;
-               return &rw->kiocb.ki_pos;
-       }
-
-       rw->kiocb.ki_pos = 0;
-       return NULL;
-}
-
-static void kiocb_done(struct io_kiocb *req, ssize_t ret,
-                      unsigned int issue_flags)
-{
-       struct io_async_rw *io = req->async_data;
-       struct io_rw *rw = io_kiocb_to_cmd(req);
-
-       /* add previously done IO, if any */
-       if (req_has_async_data(req) && io->bytes_done > 0) {
-               if (ret < 0)
-                       ret = io->bytes_done;
-               else
-                       ret += io->bytes_done;
-       }
-
-       if (req->flags & REQ_F_CUR_POS)
-               req->file->f_pos = rw->kiocb.ki_pos;
-       if (ret >= 0 && (rw->kiocb.ki_complete == io_complete_rw))
-               __io_complete_rw(req, ret, issue_flags);
-       else
-               io_rw_done(&rw->kiocb, ret);
-
-       if (req->flags & REQ_F_REISSUE) {
-               req->flags &= ~REQ_F_REISSUE;
-               if (io_resubmit_prep(req))
-                       io_req_task_queue_reissue(req);
-               else
-                       io_req_task_queue_fail(req, ret);
-       }
-}
-
-static int __io_import_fixed(struct io_kiocb *req, int ddir,
-                            struct iov_iter *iter, struct io_mapped_ubuf *imu)
-{
-       struct io_rw *rw = io_kiocb_to_cmd(req);
-       size_t len = rw->len;
-       u64 buf_end, buf_addr = rw->addr;
-       size_t offset;
-
-       if (unlikely(check_add_overflow(buf_addr, (u64)len, &buf_end)))
-               return -EFAULT;
-       /* not inside the mapped region */
-       if (unlikely(buf_addr < imu->ubuf || buf_end > imu->ubuf_end))
-               return -EFAULT;
-
-       /*
-        * May not be a start of buffer, set size appropriately
-        * and advance us to the beginning.
-        */
-       offset = buf_addr - imu->ubuf;
-       iov_iter_bvec(iter, ddir, imu->bvec, imu->nr_bvecs, offset + len);
-
-       if (offset) {
-               /*
-                * Don't use iov_iter_advance() here, as it's really slow for
-                * using the latter parts of a big fixed buffer - it iterates
-                * over each segment manually. We can cheat a bit here, because
-                * we know that:
-                *
-                * 1) it's a BVEC iter, we set it up
-                * 2) all bvecs are PAGE_SIZE in size, except potentially the
-                *    first and last bvec
-                *
-                * So just find our index, and adjust the iterator afterwards.
-                * If the offset is within the first bvec (or the whole first
-                * bvec, just use iov_iter_advance(). This makes it easier
-                * since we can just skip the first segment, which may not
-                * be PAGE_SIZE aligned.
-                */
-               const struct bio_vec *bvec = imu->bvec;
-
-               if (offset <= bvec->bv_len) {
-                       iov_iter_advance(iter, offset);
-               } else {
-                       unsigned long seg_skip;
-
-                       /* skip first vec */
-                       offset -= bvec->bv_len;
-                       seg_skip = 1 + (offset >> PAGE_SHIFT);
-
-                       iter->bvec = bvec + seg_skip;
-                       iter->nr_segs -= seg_skip;
-                       iter->count -= bvec->bv_len + offset;
-                       iter->iov_offset = offset & ~PAGE_MASK;
-               }
-       }
-
-       return 0;
-}
-
-static int io_import_fixed(struct io_kiocb *req, int rw, struct iov_iter *iter,
-                          unsigned int issue_flags)
-{
-       if (WARN_ON_ONCE(!req->imu))
-               return -EFAULT;
-       return __io_import_fixed(req, rw, iter, req->imu);
-}
-
-#ifdef CONFIG_COMPAT
-static ssize_t io_compat_import(struct io_kiocb *req, struct iovec *iov,
-                               unsigned int issue_flags)
-{
-       struct io_rw *rw = io_kiocb_to_cmd(req);
-       struct compat_iovec __user *uiov;
-       compat_ssize_t clen;
-       void __user *buf;
-       size_t len;
-
-       uiov = u64_to_user_ptr(rw->addr);
-       if (!access_ok(uiov, sizeof(*uiov)))
-               return -EFAULT;
-       if (__get_user(clen, &uiov->iov_len))
-               return -EFAULT;
-       if (clen < 0)
-               return -EINVAL;
-
-       len = clen;
-       buf = io_buffer_select(req, &len, issue_flags);
-       if (!buf)
-               return -ENOBUFS;
-       rw->addr = (unsigned long) buf;
-       iov[0].iov_base = buf;
-       rw->len = iov[0].iov_len = (compat_size_t) len;
-       return 0;
-}
-#endif
-
-static ssize_t __io_iov_buffer_select(struct io_kiocb *req, struct iovec *iov,
-                                     unsigned int issue_flags)
-{
-       struct io_rw *rw = io_kiocb_to_cmd(req);
-       struct iovec __user *uiov = u64_to_user_ptr(rw->addr);
-       void __user *buf;
-       ssize_t len;
-
-       if (copy_from_user(iov, uiov, sizeof(*uiov)))
-               return -EFAULT;
-
-       len = iov[0].iov_len;
-       if (len < 0)
-               return -EINVAL;
-       buf = io_buffer_select(req, &len, issue_flags);
-       if (!buf)
-               return -ENOBUFS;
-       rw->addr = (unsigned long) buf;
-       iov[0].iov_base = buf;
-       rw->len = iov[0].iov_len = len;
-       return 0;
-}
-
-static ssize_t io_iov_buffer_select(struct io_kiocb *req, struct iovec *iov,
-                                   unsigned int issue_flags)
-{
-       struct io_rw *rw = io_kiocb_to_cmd(req);
-
-       if (req->flags & (REQ_F_BUFFER_SELECTED|REQ_F_BUFFER_RING)) {
-               iov[0].iov_base = u64_to_user_ptr(rw->addr);
-               iov[0].iov_len = rw->len;
-               return 0;
-       }
-       if (rw->len != 1)
-               return -EINVAL;
-
-#ifdef CONFIG_COMPAT
-       if (req->ctx->compat)
-               return io_compat_import(req, iov, issue_flags);
-#endif
-
-       return __io_iov_buffer_select(req, iov, issue_flags);
-}
-
-static struct iovec *__io_import_iovec(int ddir, struct io_kiocb *req,
-                                      struct io_rw_state *s,
-                                      unsigned int issue_flags)
-{
-       struct io_rw *rw = io_kiocb_to_cmd(req);
-       struct iov_iter *iter = &s->iter;
-       u8 opcode = req->opcode;
-       struct iovec *iovec;
-       void __user *buf;
-       size_t sqe_len;
-       ssize_t ret;
-
-       if (opcode == IORING_OP_READ_FIXED || opcode == IORING_OP_WRITE_FIXED) {
-               ret = io_import_fixed(req, ddir, iter, issue_flags);
-               if (ret)
-                       return ERR_PTR(ret);
-               return NULL;
-       }
-
-       buf = u64_to_user_ptr(rw->addr);
-       sqe_len = rw->len;
-
-       if (opcode == IORING_OP_READ || opcode == IORING_OP_WRITE) {
-               if (io_do_buffer_select(req)) {
-                       buf = io_buffer_select(req, &sqe_len, issue_flags);
-                       if (!buf)
-                               return ERR_PTR(-ENOBUFS);
-                       rw->addr = (unsigned long) buf;
-                       rw->len = sqe_len;
-               }
-
-               ret = import_single_range(ddir, buf, sqe_len, s->fast_iov, iter);
-               if (ret)
-                       return ERR_PTR(ret);
-               return NULL;
-       }
-
-       iovec = s->fast_iov;
-       if (req->flags & REQ_F_BUFFER_SELECT) {
-               ret = io_iov_buffer_select(req, iovec, issue_flags);
-               if (ret)
-                       return ERR_PTR(ret);
-               iov_iter_init(iter, ddir, iovec, 1, iovec->iov_len);
-               return NULL;
-       }
-
-       ret = __import_iovec(ddir, buf, sqe_len, UIO_FASTIOV, &iovec, iter,
-                             req->ctx->compat);
-       if (unlikely(ret < 0))
-               return ERR_PTR(ret);
-       return iovec;
-}
-
-static inline int io_import_iovec(int rw, struct io_kiocb *req,
-                                 struct iovec **iovec, struct io_rw_state *s,
-                                 unsigned int issue_flags)
-{
-       *iovec = __io_import_iovec(rw, req, s, issue_flags);
-       if (unlikely(IS_ERR(*iovec)))
-               return PTR_ERR(*iovec);
-
-       iov_iter_save_state(&s->iter, &s->iter_state);
-       return 0;
-}
-
-static inline loff_t *io_kiocb_ppos(struct kiocb *kiocb)
-{
-       return (kiocb->ki_filp->f_mode & FMODE_STREAM) ? NULL : &kiocb->ki_pos;
-}
-
-/*
- * For files that don't have ->read_iter() and ->write_iter(), handle them
- * by looping over ->read() or ->write() manually.
- */
-static ssize_t loop_rw_iter(int ddir, struct io_rw *rw, struct iov_iter *iter)
-{
-       struct kiocb *kiocb = &rw->kiocb;
-       struct file *file = kiocb->ki_filp;
-       ssize_t ret = 0;
-       loff_t *ppos;
-
-       /*
-        * Don't support polled IO through this interface, and we can't
-        * support non-blocking either. For the latter, this just causes
-        * the kiocb to be handled from an async context.
-        */
-       if (kiocb->ki_flags & IOCB_HIPRI)
-               return -EOPNOTSUPP;
-       if ((kiocb->ki_flags & IOCB_NOWAIT) &&
-           !(kiocb->ki_filp->f_flags & O_NONBLOCK))
-               return -EAGAIN;
-
-       ppos = io_kiocb_ppos(kiocb);
-
-       while (iov_iter_count(iter)) {
-               struct iovec iovec;
-               ssize_t nr;
-
-               if (!iov_iter_is_bvec(iter)) {
-                       iovec = iov_iter_iovec(iter);
-               } else {
-                       iovec.iov_base = u64_to_user_ptr(rw->addr);
-                       iovec.iov_len = rw->len;
-               }
-
-               if (ddir == READ) {
-                       nr = file->f_op->read(file, iovec.iov_base,
-                                             iovec.iov_len, ppos);
-               } else {
-                       nr = file->f_op->write(file, iovec.iov_base,
-                                              iovec.iov_len, ppos);
-               }
-
-               if (nr < 0) {
-                       if (!ret)
-                               ret = nr;
-                       break;
-               }
-               ret += nr;
-               if (!iov_iter_is_bvec(iter)) {
-                       iov_iter_advance(iter, nr);
-               } else {
-                       rw->addr += nr;
-                       rw->len -= nr;
-                       if (!rw->len)
-                               break;
-               }
-               if (nr != iovec.iov_len)
-                       break;
-       }
-
-       return ret;
-}
-
-static void io_req_map_rw(struct io_kiocb *req, const struct iovec *iovec,
-                         const struct iovec *fast_iov, struct iov_iter *iter)
-{
-       struct io_async_rw *io = req->async_data;
-
-       memcpy(&io->s.iter, iter, sizeof(*iter));
-       io->free_iovec = iovec;
-       io->bytes_done = 0;
-       /* can only be fixed buffers, no need to do anything */
-       if (iov_iter_is_bvec(iter))
-               return;
-       if (!iovec) {
-               unsigned iov_off = 0;
-
-               io->s.iter.iov = io->s.fast_iov;
-               if (iter->iov != fast_iov) {
-                       iov_off = iter->iov - fast_iov;
-                       io->s.iter.iov += iov_off;
-               }
-               if (io->s.fast_iov != fast_iov)
-                       memcpy(io->s.fast_iov + iov_off, fast_iov + iov_off,
-                              sizeof(struct iovec) * iter->nr_segs);
-       } else {
-               req->flags |= REQ_F_NEED_CLEANUP;
-       }
-}
-
 bool io_alloc_async_data(struct io_kiocb *req)
 {
        WARN_ON_ONCE(!io_op_defs[req->opcode].async_size);
@@ -2264,448 +1478,13 @@ bool io_alloc_async_data(struct io_kiocb *req)
        return true;
 }
 
-static int io_setup_async_rw(struct io_kiocb *req, const struct iovec *iovec,
-                            struct io_rw_state *s, bool force)
-{
-       if (!force && !io_op_defs[req->opcode].prep_async)
-               return 0;
-       if (!req_has_async_data(req)) {
-               struct io_async_rw *iorw;
-
-               if (io_alloc_async_data(req)) {
-                       kfree(iovec);
-                       return -ENOMEM;
-               }
-
-               io_req_map_rw(req, iovec, s->fast_iov, &s->iter);
-               iorw = req->async_data;
-               /* we've copied and mapped the iter, ensure state is saved */
-               iov_iter_save_state(&iorw->s.iter, &iorw->s.iter_state);
-       }
-       return 0;
-}
-
-static inline int io_rw_prep_async(struct io_kiocb *req, int rw)
-{
-       struct io_async_rw *iorw = req->async_data;
-       struct iovec *iov;
-       int ret;
-
-       /* submission path, ->uring_lock should already be taken */
-       ret = io_import_iovec(rw, req, &iov, &iorw->s, 0);
-       if (unlikely(ret < 0))
-               return ret;
-
-       iorw->bytes_done = 0;
-       iorw->free_iovec = iov;
-       if (iov)
-               req->flags |= REQ_F_NEED_CLEANUP;
-       return 0;
-}
-
-static int io_readv_prep_async(struct io_kiocb *req)
-{
-       return io_rw_prep_async(req, READ);
-}
-
-static int io_writev_prep_async(struct io_kiocb *req)
-{
-       return io_rw_prep_async(req, WRITE);
-}
-
-/*
- * This is our waitqueue callback handler, registered through __folio_lock_async()
- * when we initially tried to do the IO with the iocb armed our waitqueue.
- * This gets called when the page is unlocked, and we generally expect that to
- * happen when the page IO is completed and the page is now uptodate. This will
- * queue a task_work based retry of the operation, attempting to copy the data
- * again. If the latter fails because the page was NOT uptodate, then we will
- * do a thread based blocking retry of the operation. That's the unexpected
- * slow path.
- */
-static int io_async_buf_func(struct wait_queue_entry *wait, unsigned mode,
-                            int sync, void *arg)
-{
-       struct wait_page_queue *wpq;
-       struct io_kiocb *req = wait->private;
-       struct io_rw *rw = io_kiocb_to_cmd(req);
-       struct wait_page_key *key = arg;
-
-       wpq = container_of(wait, struct wait_page_queue, wait);
-
-       if (!wake_page_match(wpq, key))
-               return 0;
-
-       rw->kiocb.ki_flags &= ~IOCB_WAITQ;
-       list_del_init(&wait->entry);
-       io_req_task_queue(req);
-       return 1;
-}
-
-/*
- * This controls whether a given IO request should be armed for async page
- * based retry. If we return false here, the request is handed to the async
- * worker threads for retry. If we're doing buffered reads on a regular file,
- * we prepare a private wait_page_queue entry and retry the operation. This
- * will either succeed because the page is now uptodate and unlocked, or it
- * will register a callback when the page is unlocked at IO completion. Through
- * that callback, io_uring uses task_work to setup a retry of the operation.
- * That retry will attempt the buffered read again. The retry will generally
- * succeed, or in rare cases where it fails, we then fall back to using the
- * async worker threads for a blocking retry.
- */
-static bool io_rw_should_retry(struct io_kiocb *req)
-{
-       struct io_async_rw *io = req->async_data;
-       struct wait_page_queue *wait = &io->wpq;
-       struct io_rw *rw = io_kiocb_to_cmd(req);
-       struct kiocb *kiocb = &rw->kiocb;
-
-       /* never retry for NOWAIT, we just complete with -EAGAIN */
-       if (req->flags & REQ_F_NOWAIT)
-               return false;
-
-       /* Only for buffered IO */
-       if (kiocb->ki_flags & (IOCB_DIRECT | IOCB_HIPRI))
-               return false;
-
-       /*
-        * just use poll if we can, and don't attempt if the fs doesn't
-        * support callback based unlocks
-        */
-       if (file_can_poll(req->file) || !(req->file->f_mode & FMODE_BUF_RASYNC))
-               return false;
-
-       wait->wait.func = io_async_buf_func;
-       wait->wait.private = req;
-       wait->wait.flags = 0;
-       INIT_LIST_HEAD(&wait->wait.entry);
-       kiocb->ki_flags |= IOCB_WAITQ;
-       kiocb->ki_flags &= ~IOCB_NOWAIT;
-       kiocb->ki_waitq = wait;
-       return true;
-}
-
-static inline int io_iter_do_read(struct io_rw *rw, struct iov_iter *iter)
-{
-       struct file *file = rw->kiocb.ki_filp;
-
-       if (likely(file->f_op->read_iter))
-               return call_read_iter(file, &rw->kiocb, iter);
-       else if (file->f_op->read)
-               return loop_rw_iter(READ, rw, iter);
-       else
-               return -EINVAL;
-}
-
-static bool need_read_all(struct io_kiocb *req)
-{
-       return req->flags & REQ_F_ISREG ||
-               S_ISBLK(file_inode(req->file)->i_mode);
-}
-
-static int io_rw_init_file(struct io_kiocb *req, fmode_t mode)
-{
-       struct io_rw *rw = io_kiocb_to_cmd(req);
-       struct kiocb *kiocb = &rw->kiocb;
-       struct io_ring_ctx *ctx = req->ctx;
-       struct file *file = req->file;
-       int ret;
-
-       if (unlikely(!file || !(file->f_mode & mode)))
-               return -EBADF;
-
-       if (!io_req_ffs_set(req))
-               req->flags |= io_file_get_flags(file) << REQ_F_SUPPORT_NOWAIT_BIT;
-
-       kiocb->ki_flags = iocb_flags(file);
-       ret = kiocb_set_rw_flags(kiocb, rw->flags);
-       if (unlikely(ret))
-               return ret;
-
-       /*
-        * If the file is marked O_NONBLOCK, still allow retry for it if it
-        * supports async. Otherwise it's impossible to use O_NONBLOCK files
-        * reliably. If not, or it IOCB_NOWAIT is set, don't retry.
-        */
-       if ((kiocb->ki_flags & IOCB_NOWAIT) ||
-           ((file->f_flags & O_NONBLOCK) && !io_file_supports_nowait(req)))
-               req->flags |= REQ_F_NOWAIT;
-
-       if (ctx->flags & IORING_SETUP_IOPOLL) {
-               if (!(kiocb->ki_flags & IOCB_DIRECT) || !file->f_op->iopoll)
-                       return -EOPNOTSUPP;
-
-               kiocb->private = NULL;
-               kiocb->ki_flags |= IOCB_HIPRI | IOCB_ALLOC_CACHE;
-               kiocb->ki_complete = io_complete_rw_iopoll;
-               req->iopoll_completed = 0;
-       } else {
-               if (kiocb->ki_flags & IOCB_HIPRI)
-                       return -EINVAL;
-               kiocb->ki_complete = io_complete_rw;
-       }
-
-       return 0;
-}
-
-static int io_read(struct io_kiocb *req, unsigned int issue_flags)
-{
-       struct io_rw *rw = io_kiocb_to_cmd(req);
-       struct io_rw_state __s, *s = &__s;
-       struct iovec *iovec;
-       struct kiocb *kiocb = &rw->kiocb;
-       bool force_nonblock = issue_flags & IO_URING_F_NONBLOCK;
-       struct io_async_rw *io;
-       ssize_t ret, ret2;
-       loff_t *ppos;
-
-       if (!req_has_async_data(req)) {
-               ret = io_import_iovec(READ, req, &iovec, s, issue_flags);
-               if (unlikely(ret < 0))
-                       return ret;
-       } else {
-               io = req->async_data;
-               s = &io->s;
-
-               /*
-                * Safe and required to re-import if we're using provided
-                * buffers, as we dropped the selected one before retry.
-                */
-               if (io_do_buffer_select(req)) {
-                       ret = io_import_iovec(READ, req, &iovec, s, issue_flags);
-                       if (unlikely(ret < 0))
-                               return ret;
-               }
-
-               /*
-                * We come here from an earlier attempt, restore our state to
-                * match in case it doesn't. It's cheap enough that we don't
-                * need to make this conditional.
-                */
-               iov_iter_restore(&s->iter, &s->iter_state);
-               iovec = NULL;
-       }
-       ret = io_rw_init_file(req, FMODE_READ);
-       if (unlikely(ret)) {
-               kfree(iovec);
-               return ret;
-       }
-       req->cqe.res = iov_iter_count(&s->iter);
-
-       if (force_nonblock) {
-               /* If the file doesn't support async, just async punt */
-               if (unlikely(!io_file_supports_nowait(req))) {
-                       ret = io_setup_async_rw(req, iovec, s, true);
-                       return ret ?: -EAGAIN;
-               }
-               kiocb->ki_flags |= IOCB_NOWAIT;
-       } else {
-               /* Ensure we clear previously set non-block flag */
-               kiocb->ki_flags &= ~IOCB_NOWAIT;
-       }
-
-       ppos = io_kiocb_update_pos(req);
-
-       ret = rw_verify_area(READ, req->file, ppos, req->cqe.res);
-       if (unlikely(ret)) {
-               kfree(iovec);
-               return ret;
-       }
-
-       ret = io_iter_do_read(rw, &s->iter);
-
-       if (ret == -EAGAIN || (req->flags & REQ_F_REISSUE)) {
-               req->flags &= ~REQ_F_REISSUE;
-               /* if we can poll, just do that */
-               if (req->opcode == IORING_OP_READ && file_can_poll(req->file))
-                       return -EAGAIN;
-               /* IOPOLL retry should happen for io-wq threads */
-               if (!force_nonblock && !(req->ctx->flags & IORING_SETUP_IOPOLL))
-                       goto done;
-               /* no retry on NONBLOCK nor RWF_NOWAIT */
-               if (req->flags & REQ_F_NOWAIT)
-                       goto done;
-               ret = 0;
-       } else if (ret == -EIOCBQUEUED) {
-               goto out_free;
-       } else if (ret == req->cqe.res || ret <= 0 || !force_nonblock ||
-                  (req->flags & REQ_F_NOWAIT) || !need_read_all(req)) {
-               /* read all, failed, already did sync or don't want to retry */
-               goto done;
-       }
-
-       /*
-        * Don't depend on the iter state matching what was consumed, or being
-        * untouched in case of error. Restore it and we'll advance it
-        * manually if we need to.
-        */
-       iov_iter_restore(&s->iter, &s->iter_state);
-
-       ret2 = io_setup_async_rw(req, iovec, s, true);
-       if (ret2)
-               return ret2;
-
-       iovec = NULL;
-       io = req->async_data;
-       s = &io->s;
-       /*
-        * Now use our persistent iterator and state, if we aren't already.
-        * We've restored and mapped the iter to match.
-        */
-
-       do {
-               /*
-                * We end up here because of a partial read, either from
-                * above or inside this loop. Advance the iter by the bytes
-                * that were consumed.
-                */
-               iov_iter_advance(&s->iter, ret);
-               if (!iov_iter_count(&s->iter))
-                       break;
-               io->bytes_done += ret;
-               iov_iter_save_state(&s->iter, &s->iter_state);
-
-               /* if we can retry, do so with the callbacks armed */
-               if (!io_rw_should_retry(req)) {
-                       kiocb->ki_flags &= ~IOCB_WAITQ;
-                       return -EAGAIN;
-               }
-
-               /*
-                * Now retry read with the IOCB_WAITQ parts set in the iocb. If
-                * we get -EIOCBQUEUED, then we'll get a notification when the
-                * desired page gets unlocked. We can also get a partial read
-                * here, and if we do, then just retry at the new offset.
-                */
-               ret = io_iter_do_read(rw, &s->iter);
-               if (ret == -EIOCBQUEUED)
-                       return IOU_ISSUE_SKIP_COMPLETE;
-               /* we got some bytes, but not all. retry. */
-               kiocb->ki_flags &= ~IOCB_WAITQ;
-               iov_iter_restore(&s->iter, &s->iter_state);
-       } while (ret > 0);
-done:
-       kiocb_done(req, ret, issue_flags);
-out_free:
-       /* it's faster to check here then delegate to kfree */
-       if (iovec)
-               kfree(iovec);
-       return IOU_ISSUE_SKIP_COMPLETE;
-}
-
-static int io_write(struct io_kiocb *req, unsigned int issue_flags)
-{
-       struct io_rw *rw = io_kiocb_to_cmd(req);
-       struct io_rw_state __s, *s = &__s;
-       struct iovec *iovec;
-       struct kiocb *kiocb = &rw->kiocb;
-       bool force_nonblock = issue_flags & IO_URING_F_NONBLOCK;
-       ssize_t ret, ret2;
-       loff_t *ppos;
-
-       if (!req_has_async_data(req)) {
-               ret = io_import_iovec(WRITE, req, &iovec, s, issue_flags);
-               if (unlikely(ret < 0))
-                       return ret;
-       } else {
-               struct io_async_rw *io = req->async_data;
-
-               s = &io->s;
-               iov_iter_restore(&s->iter, &s->iter_state);
-               iovec = NULL;
-       }
-       ret = io_rw_init_file(req, FMODE_WRITE);
-       if (unlikely(ret)) {
-               kfree(iovec);
-               return ret;
-       }
-       req->cqe.res = iov_iter_count(&s->iter);
-
-       if (force_nonblock) {
-               /* If the file doesn't support async, just async punt */
-               if (unlikely(!io_file_supports_nowait(req)))
-                       goto copy_iov;
-
-               /* file path doesn't support NOWAIT for non-direct_IO */
-               if (force_nonblock && !(kiocb->ki_flags & IOCB_DIRECT) &&
-                   (req->flags & REQ_F_ISREG))
-                       goto copy_iov;
-
-               kiocb->ki_flags |= IOCB_NOWAIT;
-       } else {
-               /* Ensure we clear previously set non-block flag */
-               kiocb->ki_flags &= ~IOCB_NOWAIT;
-       }
-
-       ppos = io_kiocb_update_pos(req);
-
-       ret = rw_verify_area(WRITE, req->file, ppos, req->cqe.res);
-       if (unlikely(ret))
-               goto out_free;
-
-       /*
-        * Open-code file_start_write here to grab freeze protection,
-        * which will be released by another thread in
-        * io_complete_rw().  Fool lockdep by telling it the lock got
-        * released so that it doesn't complain about the held lock when
-        * we return to userspace.
-        */
-       if (req->flags & REQ_F_ISREG) {
-               sb_start_write(file_inode(req->file)->i_sb);
-               __sb_writers_release(file_inode(req->file)->i_sb,
-                                       SB_FREEZE_WRITE);
-       }
-       kiocb->ki_flags |= IOCB_WRITE;
-
-       if (likely(req->file->f_op->write_iter))
-               ret2 = call_write_iter(req->file, kiocb, &s->iter);
-       else if (req->file->f_op->write)
-               ret2 = loop_rw_iter(WRITE, rw, &s->iter);
-       else
-               ret2 = -EINVAL;
-
-       if (req->flags & REQ_F_REISSUE) {
-               req->flags &= ~REQ_F_REISSUE;
-               ret2 = -EAGAIN;
-       }
-
-       /*
-        * Raw bdev writes will return -EOPNOTSUPP for IOCB_NOWAIT. Just
-        * retry them without IOCB_NOWAIT.
-        */
-       if (ret2 == -EOPNOTSUPP && (kiocb->ki_flags & IOCB_NOWAIT))
-               ret2 = -EAGAIN;
-       /* no retry on NONBLOCK nor RWF_NOWAIT */
-       if (ret2 == -EAGAIN && (req->flags & REQ_F_NOWAIT))
-               goto done;
-       if (!force_nonblock || ret2 != -EAGAIN) {
-               /* IOPOLL retry should happen for io-wq threads */
-               if (ret2 == -EAGAIN && (req->ctx->flags & IORING_SETUP_IOPOLL))
-                       goto copy_iov;
-done:
-               kiocb_done(req, ret2, issue_flags);
-               ret = IOU_ISSUE_SKIP_COMPLETE;
-       } else {
-copy_iov:
-               iov_iter_restore(&s->iter, &s->iter_state);
-               ret = io_setup_async_rw(req, iovec, s, false);
-               return ret ?: -EAGAIN;
-       }
-out_free:
-       /* it's reportedly faster than delegating the null check to kfree() */
-       if (iovec)
-               kfree(iovec);
-       return ret;
-}
-
 static __maybe_unused int io_eopnotsupp_prep(struct io_kiocb *kiocb,
                                             const struct io_uring_sqe *sqe)
 {
        return -EOPNOTSUPP;
 }
 
-static int io_req_prep_async(struct io_kiocb *req)
+int io_req_prep_async(struct io_kiocb *req)
 {
        const struct io_op_def *def = &io_op_defs[req->opcode];
 
index 71afb46070e36484d312f8c3f215851c3f4d8c0e..22e6e52c42d261730d363774ef230773ec536f4b 100644 (file)
 #include <linux/lockdep.h>
 #include "io_uring_types.h"
 
+#ifndef CREATE_TRACE_POINTS
+#include <trace/events/io_uring.h>
+#endif
+
 enum {
        IOU_OK                  = 0,
        IOU_ISSUE_SKIP_COMPLETE = -EIOCBQUEUED,
 };
 
+bool io_cqring_event_overflow(struct io_ring_ctx *ctx, u64 user_data, s32 res,
+                             u32 cflags, u64 extra1, u64 extra2);
+
+static inline unsigned int __io_cqring_events(struct io_ring_ctx *ctx)
+{
+       return ctx->cached_cq_tail - READ_ONCE(ctx->rings->cq.head);
+}
+
+/*
+ * writes to the cq entry need to come after reading head; the
+ * control dependency is enough as we're using WRITE_ONCE to
+ * fill the cq entry
+ */
+static inline struct io_uring_cqe *__io_get_cqe(struct io_ring_ctx *ctx)
+{
+       struct io_rings *rings = ctx->rings;
+       unsigned int off = ctx->cached_cq_tail & (ctx->cq_entries - 1);
+       unsigned int shift = 0;
+       unsigned int free, queued, len;
+
+       if (ctx->flags & IORING_SETUP_CQE32)
+               shift = 1;
+
+       /* userspace may cheat modifying the tail, be safe and do min */
+       queued = min(__io_cqring_events(ctx), ctx->cq_entries);
+       free = ctx->cq_entries - queued;
+       /* we need a contiguous range, limit based on the current array offset */
+       len = min(free, ctx->cq_entries - off);
+       if (!len)
+               return NULL;
+
+       ctx->cached_cq_tail++;
+       ctx->cqe_cached = &rings->cqes[off];
+       ctx->cqe_sentinel = ctx->cqe_cached + len;
+       ctx->cqe_cached++;
+       return &rings->cqes[off << shift];
+}
+
+static inline struct io_uring_cqe *io_get_cqe(struct io_ring_ctx *ctx)
+{
+       if (likely(ctx->cqe_cached < ctx->cqe_sentinel)) {
+               struct io_uring_cqe *cqe = ctx->cqe_cached;
+
+               if (ctx->flags & IORING_SETUP_CQE32) {
+                       unsigned int off = ctx->cqe_cached - ctx->rings->cqes;
+
+                       cqe += off;
+               }
+
+               ctx->cached_cq_tail++;
+               ctx->cqe_cached++;
+               return cqe;
+       }
+
+       return __io_get_cqe(ctx);
+}
+
+static inline bool __io_fill_cqe_req(struct io_ring_ctx *ctx,
+                                    struct io_kiocb *req)
+{
+       struct io_uring_cqe *cqe;
+
+       if (!(ctx->flags & IORING_SETUP_CQE32)) {
+               trace_io_uring_complete(req->ctx, req, req->cqe.user_data,
+                                       req->cqe.res, req->cqe.flags, 0, 0);
+
+               /*
+                * If we can't get a cq entry, userspace overflowed the
+                * submission (by quite a lot). Increment the overflow count in
+                * the ring.
+                */
+               cqe = io_get_cqe(ctx);
+               if (likely(cqe)) {
+                       memcpy(cqe, &req->cqe, sizeof(*cqe));
+                       return true;
+               }
+
+               return io_cqring_event_overflow(ctx, req->cqe.user_data,
+                                               req->cqe.res, req->cqe.flags,
+                                               0, 0);
+       } else {
+               u64 extra1 = 0, extra2 = 0;
+
+               if (req->flags & REQ_F_CQE32_INIT) {
+                       extra1 = req->extra1;
+                       extra2 = req->extra2;
+               }
+
+               trace_io_uring_complete(req->ctx, req, req->cqe.user_data,
+                                       req->cqe.res, req->cqe.flags, extra1, extra2);
+
+               /*
+                * If we can't get a cq entry, userspace overflowed the
+                * submission (by quite a lot). Increment the overflow count in
+                * the ring.
+                */
+               cqe = io_get_cqe(ctx);
+               if (likely(cqe)) {
+                       memcpy(cqe, &req->cqe, sizeof(struct io_uring_cqe));
+                       WRITE_ONCE(cqe->big_cqe[0], extra1);
+                       WRITE_ONCE(cqe->big_cqe[1], extra2);
+                       return true;
+               }
+
+               return io_cqring_event_overflow(ctx, req->cqe.user_data,
+                               req->cqe.res, req->cqe.flags,
+                               extra1, extra2);
+       }
+}
+
 static inline void req_set_fail(struct io_kiocb *req)
 {
        req->flags |= REQ_F_FAIL;
@@ -64,6 +178,17 @@ static inline void io_commit_cqring(struct io_ring_ctx *ctx)
        smp_store_release(&ctx->rings->cq.tail, ctx->cached_cq_tail);
 }
 
+static inline void io_cqring_wake(struct io_ring_ctx *ctx)
+{
+       /*
+        * wake_up_all() may seem excessive, but io_wake_function() and
+        * io_should_wake() handle the termination of the loop and only
+        * wake as many waiters as we need to.
+        */
+       if (wq_has_sleeper(&ctx->cq_wait))
+               wake_up_all(&ctx->cq_wait);
+}
+
 static inline bool io_sqring_full(struct io_ring_ctx *ctx)
 {
        struct io_rings *r = ctx->rings;
@@ -100,6 +225,7 @@ void __io_req_complete_post(struct io_kiocb *req);
 bool io_fill_cqe_aux(struct io_ring_ctx *ctx, u64 user_data, s32 res,
                     u32 cflags);
 void io_cqring_ev_posted(struct io_ring_ctx *ctx);
+void __io_commit_cqring_flush(struct io_ring_ctx *ctx);
 
 struct page **io_pin_pages(unsigned long ubuf, unsigned long len, int *npages);
 
@@ -110,7 +236,10 @@ struct file *io_file_get_fixed(struct io_kiocb *req, int fd,
 bool io_is_uring_fops(struct file *file);
 bool io_alloc_async_data(struct io_kiocb *req);
 void io_req_task_work_add(struct io_kiocb *req);
+void io_req_task_prio_work_add(struct io_kiocb *req);
 void io_req_tw_post_queue(struct io_kiocb *req, s32 res, u32 cflags);
+void io_req_task_queue(struct io_kiocb *req);
+void io_queue_iowq(struct io_kiocb *req, bool *dont_use);
 void io_req_task_complete(struct io_kiocb *req, bool *locked);
 void io_req_task_queue_fail(struct io_kiocb *req, int ret);
 void io_req_task_submit(struct io_kiocb *req, bool *locked);
@@ -122,6 +251,8 @@ int io_uring_alloc_task_context(struct task_struct *task,
 int io_poll_issue(struct io_kiocb *req, bool *locked);
 int io_submit_sqes(struct io_ring_ctx *ctx, unsigned int nr);
 int io_do_iopoll(struct io_ring_ctx *ctx, bool force_nonspin);
+void io_free_batch_list(struct io_ring_ctx *ctx, struct io_wq_work_node *node);
+int io_req_prep_async(struct io_kiocb *req);
 
 struct io_wq_work *io_wq_free_work(struct io_wq_work *work);
 void io_wq_submit_work(struct io_wq_work *work);
diff --git a/io_uring/rw.c b/io_uring/rw.c
new file mode 100644 (file)
index 0000000..f0b6019
--- /dev/null
@@ -0,0 +1,1099 @@
+// SPDX-License-Identifier: GPL-2.0
+#include <linux/kernel.h>
+#include <linux/errno.h>
+#include <linux/fs.h>
+#include <linux/file.h>
+#include <linux/blk-mq.h>
+#include <linux/mm.h>
+#include <linux/slab.h>
+#include <linux/fsnotify.h>
+#include <linux/poll.h>
+#include <linux/nospec.h>
+#include <linux/compat.h>
+#include <linux/io_uring.h>
+
+#include <uapi/linux/io_uring.h>
+
+#include "io_uring_types.h"
+#include "io_uring.h"
+#include "opdef.h"
+#include "kbuf.h"
+#include "rsrc.h"
+#include "rw.h"
+
+struct io_rw {
+       /* NOTE: kiocb has the file as the first member, so don't do it here */
+       struct kiocb                    kiocb;
+       u64                             addr;
+       u32                             len;
+       rwf_t                           flags;
+};
+
+static inline bool io_file_supports_nowait(struct io_kiocb *req)
+{
+       return req->flags & REQ_F_SUPPORT_NOWAIT;
+}
+
+int io_prep_rw(struct io_kiocb *req, const struct io_uring_sqe *sqe)
+{
+       struct io_rw *rw = io_kiocb_to_cmd(req);
+       unsigned ioprio;
+       int ret;
+
+       rw->kiocb.ki_pos = READ_ONCE(sqe->off);
+       /* used for fixed read/write too - just read unconditionally */
+       req->buf_index = READ_ONCE(sqe->buf_index);
+
+       if (req->opcode == IORING_OP_READ_FIXED ||
+           req->opcode == IORING_OP_WRITE_FIXED) {
+               struct io_ring_ctx *ctx = req->ctx;
+               u16 index;
+
+               if (unlikely(req->buf_index >= ctx->nr_user_bufs))
+                       return -EFAULT;
+               index = array_index_nospec(req->buf_index, ctx->nr_user_bufs);
+               req->imu = ctx->user_bufs[index];
+               io_req_set_rsrc_node(req, ctx, 0);
+       }
+
+       ioprio = READ_ONCE(sqe->ioprio);
+       if (ioprio) {
+               ret = ioprio_check_cap(ioprio);
+               if (ret)
+                       return ret;
+
+               rw->kiocb.ki_ioprio = ioprio;
+       } else {
+               rw->kiocb.ki_ioprio = get_current_ioprio();
+       }
+
+       rw->addr = READ_ONCE(sqe->addr);
+       rw->len = READ_ONCE(sqe->len);
+       rw->flags = READ_ONCE(sqe->rw_flags);
+       return 0;
+}
+
+void io_readv_writev_cleanup(struct io_kiocb *req)
+{
+       struct io_async_rw *io = req->async_data;
+
+       kfree(io->free_iovec);
+}
+
+static inline void io_rw_done(struct kiocb *kiocb, ssize_t ret)
+{
+       switch (ret) {
+       case -EIOCBQUEUED:
+               break;
+       case -ERESTARTSYS:
+       case -ERESTARTNOINTR:
+       case -ERESTARTNOHAND:
+       case -ERESTART_RESTARTBLOCK:
+               /*
+                * We can't just restart the syscall, since previously
+                * submitted sqes may already be in progress. Just fail this
+                * IO with EINTR.
+                */
+               ret = -EINTR;
+               fallthrough;
+       default:
+               kiocb->ki_complete(kiocb, ret);
+       }
+}
+
+static inline loff_t *io_kiocb_update_pos(struct io_kiocb *req)
+{
+       struct io_rw *rw = io_kiocb_to_cmd(req);
+
+       if (rw->kiocb.ki_pos != -1)
+               return &rw->kiocb.ki_pos;
+
+       if (!(req->file->f_mode & FMODE_STREAM)) {
+               req->flags |= REQ_F_CUR_POS;
+               rw->kiocb.ki_pos = req->file->f_pos;
+               return &rw->kiocb.ki_pos;
+       }
+
+       rw->kiocb.ki_pos = 0;
+       return NULL;
+}
+
+static void io_req_task_queue_reissue(struct io_kiocb *req)
+{
+       req->io_task_work.func = io_queue_iowq;
+       io_req_task_work_add(req);
+}
+
+#ifdef CONFIG_BLOCK
+static bool io_resubmit_prep(struct io_kiocb *req)
+{
+       struct io_async_rw *io = req->async_data;
+
+       if (!req_has_async_data(req))
+               return !io_req_prep_async(req);
+       iov_iter_restore(&io->s.iter, &io->s.iter_state);
+       return true;
+}
+
+static bool io_rw_should_reissue(struct io_kiocb *req)
+{
+       umode_t mode = file_inode(req->file)->i_mode;
+       struct io_ring_ctx *ctx = req->ctx;
+
+       if (!S_ISBLK(mode) && !S_ISREG(mode))
+               return false;
+       if ((req->flags & REQ_F_NOWAIT) || (io_wq_current_is_worker() &&
+           !(ctx->flags & IORING_SETUP_IOPOLL)))
+               return false;
+       /*
+        * If ref is dying, we might be running poll reap from the exit work.
+        * Don't attempt to reissue from that path, just let it fail with
+        * -EAGAIN.
+        */
+       if (percpu_ref_is_dying(&ctx->refs))
+               return false;
+       /*
+        * Play it safe and assume not safe to re-import and reissue if we're
+        * not in the original thread group (or in task context).
+        */
+       if (!same_thread_group(req->task, current) || !in_task())
+               return false;
+       return true;
+}
+#else
+static bool io_resubmit_prep(struct io_kiocb *req)
+{
+       return false;
+}
+static bool io_rw_should_reissue(struct io_kiocb *req)
+{
+       return false;
+}
+#endif
+
+static void kiocb_end_write(struct io_kiocb *req)
+{
+       /*
+        * Tell lockdep we inherited freeze protection from submission
+        * thread.
+        */
+       if (req->flags & REQ_F_ISREG) {
+               struct super_block *sb = file_inode(req->file)->i_sb;
+
+               __sb_writers_acquired(sb, SB_FREEZE_WRITE);
+               sb_end_write(sb);
+       }
+}
+
+static bool __io_complete_rw_common(struct io_kiocb *req, long res)
+{
+       struct io_rw *rw = io_kiocb_to_cmd(req);
+
+       if (rw->kiocb.ki_flags & IOCB_WRITE) {
+               kiocb_end_write(req);
+               fsnotify_modify(req->file);
+       } else {
+               fsnotify_access(req->file);
+       }
+       if (unlikely(res != req->cqe.res)) {
+               if ((res == -EAGAIN || res == -EOPNOTSUPP) &&
+                   io_rw_should_reissue(req)) {
+                       req->flags |= REQ_F_REISSUE | REQ_F_PARTIAL_IO;
+                       return true;
+               }
+               req_set_fail(req);
+               req->cqe.res = res;
+       }
+       return false;
+}
+
+static void __io_complete_rw(struct io_kiocb *req, long res,
+                            unsigned int issue_flags)
+{
+       if (__io_complete_rw_common(req, res))
+               return;
+       io_req_set_res(req, req->cqe.res, io_put_kbuf(req, issue_flags));
+       __io_req_complete(req, issue_flags);
+}
+
+static void io_complete_rw(struct kiocb *kiocb, long res)
+{
+       struct io_rw *rw = container_of(kiocb, struct io_rw, kiocb);
+       struct io_kiocb *req = cmd_to_io_kiocb(rw);
+
+       if (__io_complete_rw_common(req, res))
+               return;
+       io_req_set_res(req, res, 0);
+       req->io_task_work.func = io_req_task_complete;
+       io_req_task_prio_work_add(req);
+}
+
+static void io_complete_rw_iopoll(struct kiocb *kiocb, long res)
+{
+       struct io_rw *rw = container_of(kiocb, struct io_rw, kiocb);
+       struct io_kiocb *req = cmd_to_io_kiocb(rw);
+
+       if (kiocb->ki_flags & IOCB_WRITE)
+               kiocb_end_write(req);
+       if (unlikely(res != req->cqe.res)) {
+               if (res == -EAGAIN && io_rw_should_reissue(req)) {
+                       req->flags |= REQ_F_REISSUE | REQ_F_PARTIAL_IO;
+                       return;
+               }
+               req->cqe.res = res;
+       }
+
+       /* order with io_iopoll_complete() checking ->iopoll_completed */
+       smp_store_release(&req->iopoll_completed, 1);
+}
+
+static void kiocb_done(struct io_kiocb *req, ssize_t ret,
+                      unsigned int issue_flags)
+{
+       struct io_async_rw *io = req->async_data;
+       struct io_rw *rw = io_kiocb_to_cmd(req);
+
+       /* add previously done IO, if any */
+       if (req_has_async_data(req) && io->bytes_done > 0) {
+               if (ret < 0)
+                       ret = io->bytes_done;
+               else
+                       ret += io->bytes_done;
+       }
+
+       if (req->flags & REQ_F_CUR_POS)
+               req->file->f_pos = rw->kiocb.ki_pos;
+       if (ret >= 0 && (rw->kiocb.ki_complete == io_complete_rw))
+               __io_complete_rw(req, ret, issue_flags);
+       else
+               io_rw_done(&rw->kiocb, ret);
+
+       if (req->flags & REQ_F_REISSUE) {
+               req->flags &= ~REQ_F_REISSUE;
+               if (io_resubmit_prep(req))
+                       io_req_task_queue_reissue(req);
+               else
+                       io_req_task_queue_fail(req, ret);
+       }
+}
+
+static int __io_import_fixed(struct io_kiocb *req, int ddir,
+                            struct iov_iter *iter, struct io_mapped_ubuf *imu)
+{
+       struct io_rw *rw = io_kiocb_to_cmd(req);
+       size_t len = rw->len;
+       u64 buf_end, buf_addr = rw->addr;
+       size_t offset;
+
+       if (unlikely(check_add_overflow(buf_addr, (u64)len, &buf_end)))
+               return -EFAULT;
+       /* not inside the mapped region */
+       if (unlikely(buf_addr < imu->ubuf || buf_end > imu->ubuf_end))
+               return -EFAULT;
+
+       /*
+        * May not be a start of buffer, set size appropriately
+        * and advance us to the beginning.
+        */
+       offset = buf_addr - imu->ubuf;
+       iov_iter_bvec(iter, ddir, imu->bvec, imu->nr_bvecs, offset + len);
+
+       if (offset) {
+               /*
+                * Don't use iov_iter_advance() here, as it's really slow for
+                * using the latter parts of a big fixed buffer - it iterates
+                * over each segment manually. We can cheat a bit here, because
+                * we know that:
+                *
+                * 1) it's a BVEC iter, we set it up
+                * 2) all bvecs are PAGE_SIZE in size, except potentially the
+                *    first and last bvec
+                *
+                * So just find our index, and adjust the iterator afterwards.
+                * If the offset is within the first bvec (or the whole first
+                * bvec, just use iov_iter_advance(). This makes it easier
+                * since we can just skip the first segment, which may not
+                * be PAGE_SIZE aligned.
+                */
+               const struct bio_vec *bvec = imu->bvec;
+
+               if (offset <= bvec->bv_len) {
+                       iov_iter_advance(iter, offset);
+               } else {
+                       unsigned long seg_skip;
+
+                       /* skip first vec */
+                       offset -= bvec->bv_len;
+                       seg_skip = 1 + (offset >> PAGE_SHIFT);
+
+                       iter->bvec = bvec + seg_skip;
+                       iter->nr_segs -= seg_skip;
+                       iter->count -= bvec->bv_len + offset;
+                       iter->iov_offset = offset & ~PAGE_MASK;
+               }
+       }
+
+       return 0;
+}
+
+static int io_import_fixed(struct io_kiocb *req, int rw, struct iov_iter *iter,
+                          unsigned int issue_flags)
+{
+       if (WARN_ON_ONCE(!req->imu))
+               return -EFAULT;
+       return __io_import_fixed(req, rw, iter, req->imu);
+}
+
+#ifdef CONFIG_COMPAT
+static ssize_t io_compat_import(struct io_kiocb *req, struct iovec *iov,
+                               unsigned int issue_flags)
+{
+       struct io_rw *rw = io_kiocb_to_cmd(req);
+       struct compat_iovec __user *uiov;
+       compat_ssize_t clen;
+       void __user *buf;
+       size_t len;
+
+       uiov = u64_to_user_ptr(rw->addr);
+       if (!access_ok(uiov, sizeof(*uiov)))
+               return -EFAULT;
+       if (__get_user(clen, &uiov->iov_len))
+               return -EFAULT;
+       if (clen < 0)
+               return -EINVAL;
+
+       len = clen;
+       buf = io_buffer_select(req, &len, issue_flags);
+       if (!buf)
+               return -ENOBUFS;
+       rw->addr = (unsigned long) buf;
+       iov[0].iov_base = buf;
+       rw->len = iov[0].iov_len = (compat_size_t) len;
+       return 0;
+}
+#endif
+
+static ssize_t __io_iov_buffer_select(struct io_kiocb *req, struct iovec *iov,
+                                     unsigned int issue_flags)
+{
+       struct io_rw *rw = io_kiocb_to_cmd(req);
+       struct iovec __user *uiov = u64_to_user_ptr(rw->addr);
+       void __user *buf;
+       ssize_t len;
+
+       if (copy_from_user(iov, uiov, sizeof(*uiov)))
+               return -EFAULT;
+
+       len = iov[0].iov_len;
+       if (len < 0)
+               return -EINVAL;
+       buf = io_buffer_select(req, &len, issue_flags);
+       if (!buf)
+               return -ENOBUFS;
+       rw->addr = (unsigned long) buf;
+       iov[0].iov_base = buf;
+       rw->len = iov[0].iov_len = len;
+       return 0;
+}
+
+static ssize_t io_iov_buffer_select(struct io_kiocb *req, struct iovec *iov,
+                                   unsigned int issue_flags)
+{
+       struct io_rw *rw = io_kiocb_to_cmd(req);
+
+       if (req->flags & (REQ_F_BUFFER_SELECTED|REQ_F_BUFFER_RING)) {
+               iov[0].iov_base = u64_to_user_ptr(rw->addr);
+               iov[0].iov_len = rw->len;
+               return 0;
+       }
+       if (rw->len != 1)
+               return -EINVAL;
+
+#ifdef CONFIG_COMPAT
+       if (req->ctx->compat)
+               return io_compat_import(req, iov, issue_flags);
+#endif
+
+       return __io_iov_buffer_select(req, iov, issue_flags);
+}
+
+static struct iovec *__io_import_iovec(int ddir, struct io_kiocb *req,
+                                      struct io_rw_state *s,
+                                      unsigned int issue_flags)
+{
+       struct io_rw *rw = io_kiocb_to_cmd(req);
+       struct iov_iter *iter = &s->iter;
+       u8 opcode = req->opcode;
+       struct iovec *iovec;
+       void __user *buf;
+       size_t sqe_len;
+       ssize_t ret;
+
+       if (opcode == IORING_OP_READ_FIXED || opcode == IORING_OP_WRITE_FIXED) {
+               ret = io_import_fixed(req, ddir, iter, issue_flags);
+               if (ret)
+                       return ERR_PTR(ret);
+               return NULL;
+       }
+
+       buf = u64_to_user_ptr(rw->addr);
+       sqe_len = rw->len;
+
+       if (opcode == IORING_OP_READ || opcode == IORING_OP_WRITE) {
+               if (io_do_buffer_select(req)) {
+                       buf = io_buffer_select(req, &sqe_len, issue_flags);
+                       if (!buf)
+                               return ERR_PTR(-ENOBUFS);
+                       rw->addr = (unsigned long) buf;
+                       rw->len = sqe_len;
+               }
+
+               ret = import_single_range(ddir, buf, sqe_len, s->fast_iov, iter);
+               if (ret)
+                       return ERR_PTR(ret);
+               return NULL;
+       }
+
+       iovec = s->fast_iov;
+       if (req->flags & REQ_F_BUFFER_SELECT) {
+               ret = io_iov_buffer_select(req, iovec, issue_flags);
+               if (ret)
+                       return ERR_PTR(ret);
+               iov_iter_init(iter, ddir, iovec, 1, iovec->iov_len);
+               return NULL;
+       }
+
+       ret = __import_iovec(ddir, buf, sqe_len, UIO_FASTIOV, &iovec, iter,
+                             req->ctx->compat);
+       if (unlikely(ret < 0))
+               return ERR_PTR(ret);
+       return iovec;
+}
+
+static inline int io_import_iovec(int rw, struct io_kiocb *req,
+                                 struct iovec **iovec, struct io_rw_state *s,
+                                 unsigned int issue_flags)
+{
+       *iovec = __io_import_iovec(rw, req, s, issue_flags);
+       if (unlikely(IS_ERR(*iovec)))
+               return PTR_ERR(*iovec);
+
+       iov_iter_save_state(&s->iter, &s->iter_state);
+       return 0;
+}
+
+static inline loff_t *io_kiocb_ppos(struct kiocb *kiocb)
+{
+       return (kiocb->ki_filp->f_mode & FMODE_STREAM) ? NULL : &kiocb->ki_pos;
+}
+
+/*
+ * For files that don't have ->read_iter() and ->write_iter(), handle them
+ * by looping over ->read() or ->write() manually.
+ */
+static ssize_t loop_rw_iter(int ddir, struct io_rw *rw, struct iov_iter *iter)
+{
+       struct kiocb *kiocb = &rw->kiocb;
+       struct file *file = kiocb->ki_filp;
+       ssize_t ret = 0;
+       loff_t *ppos;
+
+       /*
+        * Don't support polled IO through this interface, and we can't
+        * support non-blocking either. For the latter, this just causes
+        * the kiocb to be handled from an async context.
+        */
+       if (kiocb->ki_flags & IOCB_HIPRI)
+               return -EOPNOTSUPP;
+       if ((kiocb->ki_flags & IOCB_NOWAIT) &&
+           !(kiocb->ki_filp->f_flags & O_NONBLOCK))
+               return -EAGAIN;
+
+       ppos = io_kiocb_ppos(kiocb);
+
+       while (iov_iter_count(iter)) {
+               struct iovec iovec;
+               ssize_t nr;
+
+               if (!iov_iter_is_bvec(iter)) {
+                       iovec = iov_iter_iovec(iter);
+               } else {
+                       iovec.iov_base = u64_to_user_ptr(rw->addr);
+                       iovec.iov_len = rw->len;
+               }
+
+               if (ddir == READ) {
+                       nr = file->f_op->read(file, iovec.iov_base,
+                                             iovec.iov_len, ppos);
+               } else {
+                       nr = file->f_op->write(file, iovec.iov_base,
+                                              iovec.iov_len, ppos);
+               }
+
+               if (nr < 0) {
+                       if (!ret)
+                               ret = nr;
+                       break;
+               }
+               ret += nr;
+               if (!iov_iter_is_bvec(iter)) {
+                       iov_iter_advance(iter, nr);
+               } else {
+                       rw->addr += nr;
+                       rw->len -= nr;
+                       if (!rw->len)
+                               break;
+               }
+               if (nr != iovec.iov_len)
+                       break;
+       }
+
+       return ret;
+}
+
+static void io_req_map_rw(struct io_kiocb *req, const struct iovec *iovec,
+                         const struct iovec *fast_iov, struct iov_iter *iter)
+{
+       struct io_async_rw *io = req->async_data;
+
+       memcpy(&io->s.iter, iter, sizeof(*iter));
+       io->free_iovec = iovec;
+       io->bytes_done = 0;
+       /* can only be fixed buffers, no need to do anything */
+       if (iov_iter_is_bvec(iter))
+               return;
+       if (!iovec) {
+               unsigned iov_off = 0;
+
+               io->s.iter.iov = io->s.fast_iov;
+               if (iter->iov != fast_iov) {
+                       iov_off = iter->iov - fast_iov;
+                       io->s.iter.iov += iov_off;
+               }
+               if (io->s.fast_iov != fast_iov)
+                       memcpy(io->s.fast_iov + iov_off, fast_iov + iov_off,
+                              sizeof(struct iovec) * iter->nr_segs);
+       } else {
+               req->flags |= REQ_F_NEED_CLEANUP;
+       }
+}
+
+static int io_setup_async_rw(struct io_kiocb *req, const struct iovec *iovec,
+                            struct io_rw_state *s, bool force)
+{
+       if (!force && !io_op_defs[req->opcode].prep_async)
+               return 0;
+       if (!req_has_async_data(req)) {
+               struct io_async_rw *iorw;
+
+               if (io_alloc_async_data(req)) {
+                       kfree(iovec);
+                       return -ENOMEM;
+               }
+
+               io_req_map_rw(req, iovec, s->fast_iov, &s->iter);
+               iorw = req->async_data;
+               /* we've copied and mapped the iter, ensure state is saved */
+               iov_iter_save_state(&iorw->s.iter, &iorw->s.iter_state);
+       }
+       return 0;
+}
+
+static inline int io_rw_prep_async(struct io_kiocb *req, int rw)
+{
+       struct io_async_rw *iorw = req->async_data;
+       struct iovec *iov;
+       int ret;
+
+       /* submission path, ->uring_lock should already be taken */
+       ret = io_import_iovec(rw, req, &iov, &iorw->s, 0);
+       if (unlikely(ret < 0))
+               return ret;
+
+       iorw->bytes_done = 0;
+       iorw->free_iovec = iov;
+       if (iov)
+               req->flags |= REQ_F_NEED_CLEANUP;
+       return 0;
+}
+
+int io_readv_prep_async(struct io_kiocb *req)
+{
+       return io_rw_prep_async(req, READ);
+}
+
+int io_writev_prep_async(struct io_kiocb *req)
+{
+       return io_rw_prep_async(req, WRITE);
+}
+
+/*
+ * This is our waitqueue callback handler, registered through __folio_lock_async()
+ * when we initially tried to do the IO with the iocb armed our waitqueue.
+ * This gets called when the page is unlocked, and we generally expect that to
+ * happen when the page IO is completed and the page is now uptodate. This will
+ * queue a task_work based retry of the operation, attempting to copy the data
+ * again. If the latter fails because the page was NOT uptodate, then we will
+ * do a thread based blocking retry of the operation. That's the unexpected
+ * slow path.
+ */
+static int io_async_buf_func(struct wait_queue_entry *wait, unsigned mode,
+                            int sync, void *arg)
+{
+       struct wait_page_queue *wpq;
+       struct io_kiocb *req = wait->private;
+       struct io_rw *rw = io_kiocb_to_cmd(req);
+       struct wait_page_key *key = arg;
+
+       wpq = container_of(wait, struct wait_page_queue, wait);
+
+       if (!wake_page_match(wpq, key))
+               return 0;
+
+       rw->kiocb.ki_flags &= ~IOCB_WAITQ;
+       list_del_init(&wait->entry);
+       io_req_task_queue(req);
+       return 1;
+}
+
+/*
+ * This controls whether a given IO request should be armed for async page
+ * based retry. If we return false here, the request is handed to the async
+ * worker threads for retry. If we're doing buffered reads on a regular file,
+ * we prepare a private wait_page_queue entry and retry the operation. This
+ * will either succeed because the page is now uptodate and unlocked, or it
+ * will register a callback when the page is unlocked at IO completion. Through
+ * that callback, io_uring uses task_work to setup a retry of the operation.
+ * That retry will attempt the buffered read again. The retry will generally
+ * succeed, or in rare cases where it fails, we then fall back to using the
+ * async worker threads for a blocking retry.
+ */
+static bool io_rw_should_retry(struct io_kiocb *req)
+{
+       struct io_async_rw *io = req->async_data;
+       struct wait_page_queue *wait = &io->wpq;
+       struct io_rw *rw = io_kiocb_to_cmd(req);
+       struct kiocb *kiocb = &rw->kiocb;
+
+       /* never retry for NOWAIT, we just complete with -EAGAIN */
+       if (req->flags & REQ_F_NOWAIT)
+               return false;
+
+       /* Only for buffered IO */
+       if (kiocb->ki_flags & (IOCB_DIRECT | IOCB_HIPRI))
+               return false;
+
+       /*
+        * just use poll if we can, and don't attempt if the fs doesn't
+        * support callback based unlocks
+        */
+       if (file_can_poll(req->file) || !(req->file->f_mode & FMODE_BUF_RASYNC))
+               return false;
+
+       wait->wait.func = io_async_buf_func;
+       wait->wait.private = req;
+       wait->wait.flags = 0;
+       INIT_LIST_HEAD(&wait->wait.entry);
+       kiocb->ki_flags |= IOCB_WAITQ;
+       kiocb->ki_flags &= ~IOCB_NOWAIT;
+       kiocb->ki_waitq = wait;
+       return true;
+}
+
+static inline int io_iter_do_read(struct io_rw *rw, struct iov_iter *iter)
+{
+       struct file *file = rw->kiocb.ki_filp;
+
+       if (likely(file->f_op->read_iter))
+               return call_read_iter(file, &rw->kiocb, iter);
+       else if (file->f_op->read)
+               return loop_rw_iter(READ, rw, iter);
+       else
+               return -EINVAL;
+}
+
+static bool need_read_all(struct io_kiocb *req)
+{
+       return req->flags & REQ_F_ISREG ||
+               S_ISBLK(file_inode(req->file)->i_mode);
+}
+
+static inline bool io_req_ffs_set(struct io_kiocb *req)
+{
+       return req->flags & REQ_F_FIXED_FILE;
+}
+
+static int io_rw_init_file(struct io_kiocb *req, fmode_t mode)
+{
+       struct io_rw *rw = io_kiocb_to_cmd(req);
+       struct kiocb *kiocb = &rw->kiocb;
+       struct io_ring_ctx *ctx = req->ctx;
+       struct file *file = req->file;
+       int ret;
+
+       if (unlikely(!file || !(file->f_mode & mode)))
+               return -EBADF;
+
+       if (!io_req_ffs_set(req))
+               req->flags |= io_file_get_flags(file) << REQ_F_SUPPORT_NOWAIT_BIT;
+
+       kiocb->ki_flags = iocb_flags(file);
+       ret = kiocb_set_rw_flags(kiocb, rw->flags);
+       if (unlikely(ret))
+               return ret;
+
+       /*
+        * If the file is marked O_NONBLOCK, still allow retry for it if it
+        * supports async. Otherwise it's impossible to use O_NONBLOCK files
+        * reliably. If not, or it IOCB_NOWAIT is set, don't retry.
+        */
+       if ((kiocb->ki_flags & IOCB_NOWAIT) ||
+           ((file->f_flags & O_NONBLOCK) && !io_file_supports_nowait(req)))
+               req->flags |= REQ_F_NOWAIT;
+
+       if (ctx->flags & IORING_SETUP_IOPOLL) {
+               if (!(kiocb->ki_flags & IOCB_DIRECT) || !file->f_op->iopoll)
+                       return -EOPNOTSUPP;
+
+               kiocb->private = NULL;
+               kiocb->ki_flags |= IOCB_HIPRI | IOCB_ALLOC_CACHE;
+               kiocb->ki_complete = io_complete_rw_iopoll;
+               req->iopoll_completed = 0;
+       } else {
+               if (kiocb->ki_flags & IOCB_HIPRI)
+                       return -EINVAL;
+               kiocb->ki_complete = io_complete_rw;
+       }
+
+       return 0;
+}
+
+int io_read(struct io_kiocb *req, unsigned int issue_flags)
+{
+       struct io_rw *rw = io_kiocb_to_cmd(req);
+       struct io_rw_state __s, *s = &__s;
+       struct iovec *iovec;
+       struct kiocb *kiocb = &rw->kiocb;
+       bool force_nonblock = issue_flags & IO_URING_F_NONBLOCK;
+       struct io_async_rw *io;
+       ssize_t ret, ret2;
+       loff_t *ppos;
+
+       if (!req_has_async_data(req)) {
+               ret = io_import_iovec(READ, req, &iovec, s, issue_flags);
+               if (unlikely(ret < 0))
+                       return ret;
+       } else {
+               io = req->async_data;
+               s = &io->s;
+
+               /*
+                * Safe and required to re-import if we're using provided
+                * buffers, as we dropped the selected one before retry.
+                */
+               if (io_do_buffer_select(req)) {
+                       ret = io_import_iovec(READ, req, &iovec, s, issue_flags);
+                       if (unlikely(ret < 0))
+                               return ret;
+               }
+
+               /*
+                * We come here from an earlier attempt, restore our state to
+                * match in case it doesn't. It's cheap enough that we don't
+                * need to make this conditional.
+                */
+               iov_iter_restore(&s->iter, &s->iter_state);
+               iovec = NULL;
+       }
+       ret = io_rw_init_file(req, FMODE_READ);
+       if (unlikely(ret)) {
+               kfree(iovec);
+               return ret;
+       }
+       req->cqe.res = iov_iter_count(&s->iter);
+
+       if (force_nonblock) {
+               /* If the file doesn't support async, just async punt */
+               if (unlikely(!io_file_supports_nowait(req))) {
+                       ret = io_setup_async_rw(req, iovec, s, true);
+                       return ret ?: -EAGAIN;
+               }
+               kiocb->ki_flags |= IOCB_NOWAIT;
+       } else {
+               /* Ensure we clear previously set non-block flag */
+               kiocb->ki_flags &= ~IOCB_NOWAIT;
+       }
+
+       ppos = io_kiocb_update_pos(req);
+
+       ret = rw_verify_area(READ, req->file, ppos, req->cqe.res);
+       if (unlikely(ret)) {
+               kfree(iovec);
+               return ret;
+       }
+
+       ret = io_iter_do_read(rw, &s->iter);
+
+       if (ret == -EAGAIN || (req->flags & REQ_F_REISSUE)) {
+               req->flags &= ~REQ_F_REISSUE;
+               /* if we can poll, just do that */
+               if (req->opcode == IORING_OP_READ && file_can_poll(req->file))
+                       return -EAGAIN;
+               /* IOPOLL retry should happen for io-wq threads */
+               if (!force_nonblock && !(req->ctx->flags & IORING_SETUP_IOPOLL))
+                       goto done;
+               /* no retry on NONBLOCK nor RWF_NOWAIT */
+               if (req->flags & REQ_F_NOWAIT)
+                       goto done;
+               ret = 0;
+       } else if (ret == -EIOCBQUEUED) {
+               goto out_free;
+       } else if (ret == req->cqe.res || ret <= 0 || !force_nonblock ||
+                  (req->flags & REQ_F_NOWAIT) || !need_read_all(req)) {
+               /* read all, failed, already did sync or don't want to retry */
+               goto done;
+       }
+
+       /*
+        * Don't depend on the iter state matching what was consumed, or being
+        * untouched in case of error. Restore it and we'll advance it
+        * manually if we need to.
+        */
+       iov_iter_restore(&s->iter, &s->iter_state);
+
+       ret2 = io_setup_async_rw(req, iovec, s, true);
+       if (ret2)
+               return ret2;
+
+       iovec = NULL;
+       io = req->async_data;
+       s = &io->s;
+       /*
+        * Now use our persistent iterator and state, if we aren't already.
+        * We've restored and mapped the iter to match.
+        */
+
+       do {
+               /*
+                * We end up here because of a partial read, either from
+                * above or inside this loop. Advance the iter by the bytes
+                * that were consumed.
+                */
+               iov_iter_advance(&s->iter, ret);
+               if (!iov_iter_count(&s->iter))
+                       break;
+               io->bytes_done += ret;
+               iov_iter_save_state(&s->iter, &s->iter_state);
+
+               /* if we can retry, do so with the callbacks armed */
+               if (!io_rw_should_retry(req)) {
+                       kiocb->ki_flags &= ~IOCB_WAITQ;
+                       return -EAGAIN;
+               }
+
+               /*
+                * Now retry read with the IOCB_WAITQ parts set in the iocb. If
+                * we get -EIOCBQUEUED, then we'll get a notification when the
+                * desired page gets unlocked. We can also get a partial read
+                * here, and if we do, then just retry at the new offset.
+                */
+               ret = io_iter_do_read(rw, &s->iter);
+               if (ret == -EIOCBQUEUED)
+                       return IOU_ISSUE_SKIP_COMPLETE;
+               /* we got some bytes, but not all. retry. */
+               kiocb->ki_flags &= ~IOCB_WAITQ;
+               iov_iter_restore(&s->iter, &s->iter_state);
+       } while (ret > 0);
+done:
+       kiocb_done(req, ret, issue_flags);
+out_free:
+       /* it's faster to check here then delegate to kfree */
+       if (iovec)
+               kfree(iovec);
+       return IOU_ISSUE_SKIP_COMPLETE;
+}
+
+int io_write(struct io_kiocb *req, unsigned int issue_flags)
+{
+       struct io_rw *rw = io_kiocb_to_cmd(req);
+       struct io_rw_state __s, *s = &__s;
+       struct iovec *iovec;
+       struct kiocb *kiocb = &rw->kiocb;
+       bool force_nonblock = issue_flags & IO_URING_F_NONBLOCK;
+       ssize_t ret, ret2;
+       loff_t *ppos;
+
+       if (!req_has_async_data(req)) {
+               ret = io_import_iovec(WRITE, req, &iovec, s, issue_flags);
+               if (unlikely(ret < 0))
+                       return ret;
+       } else {
+               struct io_async_rw *io = req->async_data;
+
+               s = &io->s;
+               iov_iter_restore(&s->iter, &s->iter_state);
+               iovec = NULL;
+       }
+       ret = io_rw_init_file(req, FMODE_WRITE);
+       if (unlikely(ret)) {
+               kfree(iovec);
+               return ret;
+       }
+       req->cqe.res = iov_iter_count(&s->iter);
+
+       if (force_nonblock) {
+               /* If the file doesn't support async, just async punt */
+               if (unlikely(!io_file_supports_nowait(req)))
+                       goto copy_iov;
+
+               /* file path doesn't support NOWAIT for non-direct_IO */
+               if (force_nonblock && !(kiocb->ki_flags & IOCB_DIRECT) &&
+                   (req->flags & REQ_F_ISREG))
+                       goto copy_iov;
+
+               kiocb->ki_flags |= IOCB_NOWAIT;
+       } else {
+               /* Ensure we clear previously set non-block flag */
+               kiocb->ki_flags &= ~IOCB_NOWAIT;
+       }
+
+       ppos = io_kiocb_update_pos(req);
+
+       ret = rw_verify_area(WRITE, req->file, ppos, req->cqe.res);
+       if (unlikely(ret))
+               goto out_free;
+
+       /*
+        * Open-code file_start_write here to grab freeze protection,
+        * which will be released by another thread in
+        * io_complete_rw().  Fool lockdep by telling it the lock got
+        * released so that it doesn't complain about the held lock when
+        * we return to userspace.
+        */
+       if (req->flags & REQ_F_ISREG) {
+               sb_start_write(file_inode(req->file)->i_sb);
+               __sb_writers_release(file_inode(req->file)->i_sb,
+                                       SB_FREEZE_WRITE);
+       }
+       kiocb->ki_flags |= IOCB_WRITE;
+
+       if (likely(req->file->f_op->write_iter))
+               ret2 = call_write_iter(req->file, kiocb, &s->iter);
+       else if (req->file->f_op->write)
+               ret2 = loop_rw_iter(WRITE, rw, &s->iter);
+       else
+               ret2 = -EINVAL;
+
+       if (req->flags & REQ_F_REISSUE) {
+               req->flags &= ~REQ_F_REISSUE;
+               ret2 = -EAGAIN;
+       }
+
+       /*
+        * Raw bdev writes will return -EOPNOTSUPP for IOCB_NOWAIT. Just
+        * retry them without IOCB_NOWAIT.
+        */
+       if (ret2 == -EOPNOTSUPP && (kiocb->ki_flags & IOCB_NOWAIT))
+               ret2 = -EAGAIN;
+       /* no retry on NONBLOCK nor RWF_NOWAIT */
+       if (ret2 == -EAGAIN && (req->flags & REQ_F_NOWAIT))
+               goto done;
+       if (!force_nonblock || ret2 != -EAGAIN) {
+               /* IOPOLL retry should happen for io-wq threads */
+               if (ret2 == -EAGAIN && (req->ctx->flags & IORING_SETUP_IOPOLL))
+                       goto copy_iov;
+done:
+               kiocb_done(req, ret2, issue_flags);
+               ret = IOU_ISSUE_SKIP_COMPLETE;
+       } else {
+copy_iov:
+               iov_iter_restore(&s->iter, &s->iter_state);
+               ret = io_setup_async_rw(req, iovec, s, false);
+               return ret ?: -EAGAIN;
+       }
+out_free:
+       /* it's reportedly faster than delegating the null check to kfree() */
+       if (iovec)
+               kfree(iovec);
+       return ret;
+}
+
+static void io_cqring_ev_posted_iopoll(struct io_ring_ctx *ctx)
+{
+       if (unlikely(ctx->off_timeout_used || ctx->drain_active ||
+                    ctx->has_evfd))
+               __io_commit_cqring_flush(ctx);
+
+       if (ctx->flags & IORING_SETUP_SQPOLL)
+               io_cqring_wake(ctx);
+}
+
+int io_do_iopoll(struct io_ring_ctx *ctx, bool force_nonspin)
+{
+       struct io_wq_work_node *pos, *start, *prev;
+       unsigned int poll_flags = BLK_POLL_NOSLEEP;
+       DEFINE_IO_COMP_BATCH(iob);
+       int nr_events = 0;
+
+       /*
+        * Only spin for completions if we don't have multiple devices hanging
+        * off our complete list.
+        */
+       if (ctx->poll_multi_queue || force_nonspin)
+               poll_flags |= BLK_POLL_ONESHOT;
+
+       wq_list_for_each(pos, start, &ctx->iopoll_list) {
+               struct io_kiocb *req = container_of(pos, struct io_kiocb, comp_list);
+               struct io_rw *rw = io_kiocb_to_cmd(req);
+               int ret;
+
+               /*
+                * Move completed and retryable entries to our local lists.
+                * If we find a request that requires polling, break out
+                * and complete those lists first, if we have entries there.
+                */
+               if (READ_ONCE(req->iopoll_completed))
+                       break;
+
+               ret = rw->kiocb.ki_filp->f_op->iopoll(&rw->kiocb, &iob, poll_flags);
+               if (unlikely(ret < 0))
+                       return ret;
+               else if (ret)
+                       poll_flags |= BLK_POLL_ONESHOT;
+
+               /* iopoll may have completed current req */
+               if (!rq_list_empty(iob.req_list) ||
+                   READ_ONCE(req->iopoll_completed))
+                       break;
+       }
+
+       if (!rq_list_empty(iob.req_list))
+               iob.complete(&iob);
+       else if (!pos)
+               return 0;
+
+       prev = start;
+       wq_list_for_each_resume(pos, prev) {
+               struct io_kiocb *req = container_of(pos, struct io_kiocb, comp_list);
+
+               /* order with io_complete_rw_iopoll(), e.g. ->result updates */
+               if (!smp_load_acquire(&req->iopoll_completed))
+                       break;
+               nr_events++;
+               if (unlikely(req->flags & REQ_F_CQE_SKIP))
+                       continue;
+
+               req->cqe.flags = io_put_kbuf(req, 0);
+               __io_fill_cqe_req(req->ctx, req);
+       }
+
+       if (unlikely(!nr_events))
+               return 0;
+
+       io_commit_cqring(ctx);
+       io_cqring_ev_posted_iopoll(ctx);
+       pos = start ? start->next : ctx->iopoll_list.first;
+       wq_list_cut(&ctx->iopoll_list, prev, start);
+       io_free_batch_list(ctx, pos);
+       return nr_events;
+}
diff --git a/io_uring/rw.h b/io_uring/rw.h
new file mode 100644 (file)
index 0000000..0204c3f
--- /dev/null
@@ -0,0 +1,23 @@
+// SPDX-License-Identifier: GPL-2.0
+
+#include <linux/pagemap.h>
+
+struct io_rw_state {
+       struct iov_iter                 iter;
+       struct iov_iter_state           iter_state;
+       struct iovec                    fast_iov[UIO_FASTIOV];
+};
+
+struct io_async_rw {
+       struct io_rw_state              s;
+       const struct iovec              *free_iovec;
+       size_t                          bytes_done;
+       struct wait_page_queue          wpq;
+};
+
+int io_prep_rw(struct io_kiocb *req, const struct io_uring_sqe *sqe);
+int io_read(struct io_kiocb *req, unsigned int issue_flags);
+int io_readv_prep_async(struct io_kiocb *req);
+int io_write(struct io_kiocb *req, unsigned int issue_flags);
+int io_writev_prep_async(struct io_kiocb *req);
+void io_readv_writev_cleanup(struct io_kiocb *req);