io_uring: make IORING_POLL_ADD and IORING_POLL_REMOVE deferrable
[platform/kernel/linux-starfive.git] / fs / io_uring.c
1 // SPDX-License-Identifier: GPL-2.0
2 /*
3  * Shared application/kernel submission and completion ring pairs, for
4  * supporting fast/efficient IO.
5  *
6  * A note on the read/write ordering memory barriers that are matched between
7  * the application and kernel side.
8  *
9  * After the application reads the CQ ring tail, it must use an
10  * appropriate smp_rmb() to pair with the smp_wmb() the kernel uses
11  * before writing the tail (using smp_load_acquire to read the tail will
12  * do). It also needs a smp_mb() before updating CQ head (ordering the
13  * entry load(s) with the head store), pairing with an implicit barrier
14  * through a control-dependency in io_get_cqring (smp_store_release to
15  * store head will do). Failure to do so could lead to reading invalid
16  * CQ entries.
17  *
18  * Likewise, the application must use an appropriate smp_wmb() before
19  * writing the SQ tail (ordering SQ entry stores with the tail store),
20  * which pairs with smp_load_acquire in io_get_sqring (smp_store_release
21  * to store the tail will do). And it needs a barrier ordering the SQ
22  * head load before writing new SQ entries (smp_load_acquire to read
23  * head will do).
24  *
25  * When using the SQ poll thread (IORING_SETUP_SQPOLL), the application
26  * needs to check the SQ flags for IORING_SQ_NEED_WAKEUP *after*
27  * updating the SQ tail; a full memory barrier smp_mb() is needed
28  * between.
29  *
30  * Also see the examples in the liburing library:
31  *
32  *      git://git.kernel.dk/liburing
33  *
34  * io_uring also uses READ/WRITE_ONCE() for _any_ store or load that happens
35  * from data shared between the kernel and application. This is done both
36  * for ordering purposes, but also to ensure that once a value is loaded from
37  * data that the application could potentially modify, it remains stable.
38  *
39  * Copyright (C) 2018-2019 Jens Axboe
40  * Copyright (c) 2018-2019 Christoph Hellwig
41  */
42 #include <linux/kernel.h>
43 #include <linux/init.h>
44 #include <linux/errno.h>
45 #include <linux/syscalls.h>
46 #include <linux/compat.h>
47 #include <linux/refcount.h>
48 #include <linux/uio.h>
49
50 #include <linux/sched/signal.h>
51 #include <linux/fs.h>
52 #include <linux/file.h>
53 #include <linux/fdtable.h>
54 #include <linux/mm.h>
55 #include <linux/mman.h>
56 #include <linux/mmu_context.h>
57 #include <linux/percpu.h>
58 #include <linux/slab.h>
59 #include <linux/kthread.h>
60 #include <linux/blkdev.h>
61 #include <linux/bvec.h>
62 #include <linux/net.h>
63 #include <net/sock.h>
64 #include <net/af_unix.h>
65 #include <net/scm.h>
66 #include <linux/anon_inodes.h>
67 #include <linux/sched/mm.h>
68 #include <linux/uaccess.h>
69 #include <linux/nospec.h>
70 #include <linux/sizes.h>
71 #include <linux/hugetlb.h>
72 #include <linux/highmem.h>
73
74 #define CREATE_TRACE_POINTS
75 #include <trace/events/io_uring.h>
76
77 #include <uapi/linux/io_uring.h>
78
79 #include "internal.h"
80 #include "io-wq.h"
81
82 #define IORING_MAX_ENTRIES      32768
83 #define IORING_MAX_CQ_ENTRIES   (2 * IORING_MAX_ENTRIES)
84
85 /*
86  * Shift of 9 is 512 entries, or exactly one page on 64-bit archs
87  */
88 #define IORING_FILE_TABLE_SHIFT 9
89 #define IORING_MAX_FILES_TABLE  (1U << IORING_FILE_TABLE_SHIFT)
90 #define IORING_FILE_TABLE_MASK  (IORING_MAX_FILES_TABLE - 1)
91 #define IORING_MAX_FIXED_FILES  (64 * IORING_MAX_FILES_TABLE)
92
93 struct io_uring {
94         u32 head ____cacheline_aligned_in_smp;
95         u32 tail ____cacheline_aligned_in_smp;
96 };
97
98 /*
99  * This data is shared with the application through the mmap at offsets
100  * IORING_OFF_SQ_RING and IORING_OFF_CQ_RING.
101  *
102  * The offsets to the member fields are published through struct
103  * io_sqring_offsets when calling io_uring_setup.
104  */
105 struct io_rings {
106         /*
107          * Head and tail offsets into the ring; the offsets need to be
108          * masked to get valid indices.
109          *
110          * The kernel controls head of the sq ring and the tail of the cq ring,
111          * and the application controls tail of the sq ring and the head of the
112          * cq ring.
113          */
114         struct io_uring         sq, cq;
115         /*
116          * Bitmasks to apply to head and tail offsets (constant, equals
117          * ring_entries - 1)
118          */
119         u32                     sq_ring_mask, cq_ring_mask;
120         /* Ring sizes (constant, power of 2) */
121         u32                     sq_ring_entries, cq_ring_entries;
122         /*
123          * Number of invalid entries dropped by the kernel due to
124          * invalid index stored in array
125          *
126          * Written by the kernel, shouldn't be modified by the
127          * application (i.e. get number of "new events" by comparing to
128          * cached value).
129          *
130          * After a new SQ head value was read by the application this
131          * counter includes all submissions that were dropped reaching
132          * the new SQ head (and possibly more).
133          */
134         u32                     sq_dropped;
135         /*
136          * Runtime flags
137          *
138          * Written by the kernel, shouldn't be modified by the
139          * application.
140          *
141          * The application needs a full memory barrier before checking
142          * for IORING_SQ_NEED_WAKEUP after updating the sq tail.
143          */
144         u32                     sq_flags;
145         /*
146          * Number of completion events lost because the queue was full;
147          * this should be avoided by the application by making sure
148          * there are not more requests pending than there is space in
149          * the completion queue.
150          *
151          * Written by the kernel, shouldn't be modified by the
152          * application (i.e. get number of "new events" by comparing to
153          * cached value).
154          *
155          * As completion events come in out of order this counter is not
156          * ordered with any other data.
157          */
158         u32                     cq_overflow;
159         /*
160          * Ring buffer of completion events.
161          *
162          * The kernel writes completion events fresh every time they are
163          * produced, so the application is allowed to modify pending
164          * entries.
165          */
166         struct io_uring_cqe     cqes[] ____cacheline_aligned_in_smp;
167 };
168
169 struct io_mapped_ubuf {
170         u64             ubuf;
171         size_t          len;
172         struct          bio_vec *bvec;
173         unsigned int    nr_bvecs;
174 };
175
176 struct fixed_file_table {
177         struct file             **files;
178 };
179
180 struct io_ring_ctx {
181         struct {
182                 struct percpu_ref       refs;
183         } ____cacheline_aligned_in_smp;
184
185         struct {
186                 unsigned int            flags;
187                 bool                    compat;
188                 bool                    account_mem;
189                 bool                    cq_overflow_flushed;
190                 bool                    drain_next;
191
192                 /*
193                  * Ring buffer of indices into array of io_uring_sqe, which is
194                  * mmapped by the application using the IORING_OFF_SQES offset.
195                  *
196                  * This indirection could e.g. be used to assign fixed
197                  * io_uring_sqe entries to operations and only submit them to
198                  * the queue when needed.
199                  *
200                  * The kernel modifies neither the indices array nor the entries
201                  * array.
202                  */
203                 u32                     *sq_array;
204                 unsigned                cached_sq_head;
205                 unsigned                sq_entries;
206                 unsigned                sq_mask;
207                 unsigned                sq_thread_idle;
208                 unsigned                cached_sq_dropped;
209                 atomic_t                cached_cq_overflow;
210                 struct io_uring_sqe     *sq_sqes;
211
212                 struct list_head        defer_list;
213                 struct list_head        timeout_list;
214                 struct list_head        cq_overflow_list;
215
216                 wait_queue_head_t       inflight_wait;
217         } ____cacheline_aligned_in_smp;
218
219         struct io_rings *rings;
220
221         /* IO offload */
222         struct io_wq            *io_wq;
223         struct task_struct      *sqo_thread;    /* if using sq thread polling */
224         struct mm_struct        *sqo_mm;
225         wait_queue_head_t       sqo_wait;
226
227         /*
228          * If used, fixed file set. Writers must ensure that ->refs is dead,
229          * readers must ensure that ->refs is alive as long as the file* is
230          * used. Only updated through io_uring_register(2).
231          */
232         struct fixed_file_table *file_table;
233         unsigned                nr_user_files;
234
235         /* if used, fixed mapped user buffers */
236         unsigned                nr_user_bufs;
237         struct io_mapped_ubuf   *user_bufs;
238
239         struct user_struct      *user;
240
241         const struct cred       *creds;
242
243         /* 0 is for ctx quiesce/reinit/free, 1 is for sqo_thread started */
244         struct completion       *completions;
245
246         /* if all else fails... */
247         struct io_kiocb         *fallback_req;
248
249 #if defined(CONFIG_UNIX)
250         struct socket           *ring_sock;
251 #endif
252
253         struct {
254                 unsigned                cached_cq_tail;
255                 unsigned                cq_entries;
256                 unsigned                cq_mask;
257                 atomic_t                cq_timeouts;
258                 struct wait_queue_head  cq_wait;
259                 struct fasync_struct    *cq_fasync;
260                 struct eventfd_ctx      *cq_ev_fd;
261         } ____cacheline_aligned_in_smp;
262
263         struct {
264                 struct mutex            uring_lock;
265                 wait_queue_head_t       wait;
266         } ____cacheline_aligned_in_smp;
267
268         struct {
269                 spinlock_t              completion_lock;
270                 bool                    poll_multi_file;
271                 /*
272                  * ->poll_list is protected by the ctx->uring_lock for
273                  * io_uring instances that don't use IORING_SETUP_SQPOLL.
274                  * For SQPOLL, only the single threaded io_sq_thread() will
275                  * manipulate the list, hence no extra locking is needed there.
276                  */
277                 struct list_head        poll_list;
278                 struct hlist_head       *cancel_hash;
279                 unsigned                cancel_hash_bits;
280
281                 spinlock_t              inflight_lock;
282                 struct list_head        inflight_list;
283         } ____cacheline_aligned_in_smp;
284 };
285
286 /*
287  * First field must be the file pointer in all the
288  * iocb unions! See also 'struct kiocb' in <linux/fs.h>
289  */
290 struct io_poll_iocb {
291         struct file                     *file;
292         union {
293                 struct wait_queue_head  *head;
294                 u64                     addr;
295         };
296         __poll_t                        events;
297         bool                            done;
298         bool                            canceled;
299         struct wait_queue_entry         wait;
300 };
301
302 struct io_timeout_data {
303         struct io_kiocb                 *req;
304         struct hrtimer                  timer;
305         struct timespec64               ts;
306         enum hrtimer_mode               mode;
307         u32                             seq_offset;
308 };
309
310 struct io_accept {
311         struct file                     *file;
312         struct sockaddr __user          *addr;
313         int __user                      *addr_len;
314         int                             flags;
315 };
316
317 struct io_sync {
318         struct file                     *file;
319         loff_t                          len;
320         loff_t                          off;
321         int                             flags;
322 };
323
324 struct io_async_connect {
325         struct sockaddr_storage         address;
326 };
327
328 struct io_async_msghdr {
329         struct iovec                    fast_iov[UIO_FASTIOV];
330         struct iovec                    *iov;
331         struct sockaddr __user          *uaddr;
332         struct msghdr                   msg;
333 };
334
335 struct io_async_rw {
336         struct iovec                    fast_iov[UIO_FASTIOV];
337         struct iovec                    *iov;
338         ssize_t                         nr_segs;
339         ssize_t                         size;
340 };
341
342 struct io_async_ctx {
343         struct io_uring_sqe             sqe;
344         union {
345                 struct io_async_rw      rw;
346                 struct io_async_msghdr  msg;
347                 struct io_async_connect connect;
348                 struct io_timeout_data  timeout;
349         };
350 };
351
352 /*
353  * NOTE! Each of the iocb union members has the file pointer
354  * as the first entry in their struct definition. So you can
355  * access the file pointer through any of the sub-structs,
356  * or directly as just 'ki_filp' in this struct.
357  */
358 struct io_kiocb {
359         union {
360                 struct file             *file;
361                 struct kiocb            rw;
362                 struct io_poll_iocb     poll;
363                 struct io_accept        accept;
364                 struct io_sync          sync;
365         };
366
367         const struct io_uring_sqe       *sqe;
368         struct io_async_ctx             *io;
369         struct file                     *ring_file;
370         int                             ring_fd;
371         bool                            has_user;
372         bool                            in_async;
373         bool                            needs_fixed_file;
374
375         struct io_ring_ctx      *ctx;
376         union {
377                 struct list_head        list;
378                 struct hlist_node       hash_node;
379         };
380         struct list_head        link_list;
381         unsigned int            flags;
382         refcount_t              refs;
383 #define REQ_F_NOWAIT            1       /* must not punt to workers */
384 #define REQ_F_IOPOLL_COMPLETED  2       /* polled IO has completed */
385 #define REQ_F_FIXED_FILE        4       /* ctx owns file */
386 #define REQ_F_LINK_NEXT         8       /* already grabbed next link */
387 #define REQ_F_IO_DRAIN          16      /* drain existing IO first */
388 #define REQ_F_IO_DRAINED        32      /* drain done */
389 #define REQ_F_LINK              64      /* linked sqes */
390 #define REQ_F_LINK_TIMEOUT      128     /* has linked timeout */
391 #define REQ_F_FAIL_LINK         256     /* fail rest of links */
392 #define REQ_F_DRAIN_LINK        512     /* link should be fully drained */
393 #define REQ_F_TIMEOUT           1024    /* timeout request */
394 #define REQ_F_ISREG             2048    /* regular file */
395 #define REQ_F_MUST_PUNT         4096    /* must be punted even for NONBLOCK */
396 #define REQ_F_TIMEOUT_NOSEQ     8192    /* no timeout sequence */
397 #define REQ_F_INFLIGHT          16384   /* on inflight list */
398 #define REQ_F_COMP_LOCKED       32768   /* completion under lock */
399 #define REQ_F_HARDLINK          65536   /* doesn't sever on completion < 0 */
400 #define REQ_F_PREPPED           131072  /* request already opcode prepared */
401         u64                     user_data;
402         u32                     result;
403         u32                     sequence;
404
405         struct list_head        inflight_entry;
406
407         struct io_wq_work       work;
408 };
409
410 #define IO_PLUG_THRESHOLD               2
411 #define IO_IOPOLL_BATCH                 8
412
413 struct io_submit_state {
414         struct blk_plug         plug;
415
416         /*
417          * io_kiocb alloc cache
418          */
419         void                    *reqs[IO_IOPOLL_BATCH];
420         unsigned                int free_reqs;
421         unsigned                int cur_req;
422
423         /*
424          * File reference cache
425          */
426         struct file             *file;
427         unsigned int            fd;
428         unsigned int            has_refs;
429         unsigned int            used_refs;
430         unsigned int            ios_left;
431 };
432
433 static void io_wq_submit_work(struct io_wq_work **workptr);
434 static void io_cqring_fill_event(struct io_kiocb *req, long res);
435 static void __io_free_req(struct io_kiocb *req);
436 static void io_put_req(struct io_kiocb *req);
437 static void io_double_put_req(struct io_kiocb *req);
438 static void __io_double_put_req(struct io_kiocb *req);
439 static struct io_kiocb *io_prep_linked_timeout(struct io_kiocb *req);
440 static void io_queue_linked_timeout(struct io_kiocb *req);
441
442 static struct kmem_cache *req_cachep;
443
444 static const struct file_operations io_uring_fops;
445
446 struct sock *io_uring_get_socket(struct file *file)
447 {
448 #if defined(CONFIG_UNIX)
449         if (file->f_op == &io_uring_fops) {
450                 struct io_ring_ctx *ctx = file->private_data;
451
452                 return ctx->ring_sock->sk;
453         }
454 #endif
455         return NULL;
456 }
457 EXPORT_SYMBOL(io_uring_get_socket);
458
459 static void io_ring_ctx_ref_free(struct percpu_ref *ref)
460 {
461         struct io_ring_ctx *ctx = container_of(ref, struct io_ring_ctx, refs);
462
463         complete(&ctx->completions[0]);
464 }
465
466 static struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p)
467 {
468         struct io_ring_ctx *ctx;
469         int hash_bits;
470
471         ctx = kzalloc(sizeof(*ctx), GFP_KERNEL);
472         if (!ctx)
473                 return NULL;
474
475         ctx->fallback_req = kmem_cache_alloc(req_cachep, GFP_KERNEL);
476         if (!ctx->fallback_req)
477                 goto err;
478
479         ctx->completions = kmalloc(2 * sizeof(struct completion), GFP_KERNEL);
480         if (!ctx->completions)
481                 goto err;
482
483         /*
484          * Use 5 bits less than the max cq entries, that should give us around
485          * 32 entries per hash list if totally full and uniformly spread.
486          */
487         hash_bits = ilog2(p->cq_entries);
488         hash_bits -= 5;
489         if (hash_bits <= 0)
490                 hash_bits = 1;
491         ctx->cancel_hash_bits = hash_bits;
492         ctx->cancel_hash = kmalloc((1U << hash_bits) * sizeof(struct hlist_head),
493                                         GFP_KERNEL);
494         if (!ctx->cancel_hash)
495                 goto err;
496         __hash_init(ctx->cancel_hash, 1U << hash_bits);
497
498         if (percpu_ref_init(&ctx->refs, io_ring_ctx_ref_free,
499                             PERCPU_REF_ALLOW_REINIT, GFP_KERNEL))
500                 goto err;
501
502         ctx->flags = p->flags;
503         init_waitqueue_head(&ctx->cq_wait);
504         INIT_LIST_HEAD(&ctx->cq_overflow_list);
505         init_completion(&ctx->completions[0]);
506         init_completion(&ctx->completions[1]);
507         mutex_init(&ctx->uring_lock);
508         init_waitqueue_head(&ctx->wait);
509         spin_lock_init(&ctx->completion_lock);
510         INIT_LIST_HEAD(&ctx->poll_list);
511         INIT_LIST_HEAD(&ctx->defer_list);
512         INIT_LIST_HEAD(&ctx->timeout_list);
513         init_waitqueue_head(&ctx->inflight_wait);
514         spin_lock_init(&ctx->inflight_lock);
515         INIT_LIST_HEAD(&ctx->inflight_list);
516         return ctx;
517 err:
518         if (ctx->fallback_req)
519                 kmem_cache_free(req_cachep, ctx->fallback_req);
520         kfree(ctx->completions);
521         kfree(ctx->cancel_hash);
522         kfree(ctx);
523         return NULL;
524 }
525
526 static inline bool __req_need_defer(struct io_kiocb *req)
527 {
528         struct io_ring_ctx *ctx = req->ctx;
529
530         return req->sequence != ctx->cached_cq_tail + ctx->cached_sq_dropped
531                                         + atomic_read(&ctx->cached_cq_overflow);
532 }
533
534 static inline bool req_need_defer(struct io_kiocb *req)
535 {
536         if ((req->flags & (REQ_F_IO_DRAIN|REQ_F_IO_DRAINED)) == REQ_F_IO_DRAIN)
537                 return __req_need_defer(req);
538
539         return false;
540 }
541
542 static struct io_kiocb *io_get_deferred_req(struct io_ring_ctx *ctx)
543 {
544         struct io_kiocb *req;
545
546         req = list_first_entry_or_null(&ctx->defer_list, struct io_kiocb, list);
547         if (req && !req_need_defer(req)) {
548                 list_del_init(&req->list);
549                 return req;
550         }
551
552         return NULL;
553 }
554
555 static struct io_kiocb *io_get_timeout_req(struct io_ring_ctx *ctx)
556 {
557         struct io_kiocb *req;
558
559         req = list_first_entry_or_null(&ctx->timeout_list, struct io_kiocb, list);
560         if (req) {
561                 if (req->flags & REQ_F_TIMEOUT_NOSEQ)
562                         return NULL;
563                 if (!__req_need_defer(req)) {
564                         list_del_init(&req->list);
565                         return req;
566                 }
567         }
568
569         return NULL;
570 }
571
572 static void __io_commit_cqring(struct io_ring_ctx *ctx)
573 {
574         struct io_rings *rings = ctx->rings;
575
576         if (ctx->cached_cq_tail != READ_ONCE(rings->cq.tail)) {
577                 /* order cqe stores with ring update */
578                 smp_store_release(&rings->cq.tail, ctx->cached_cq_tail);
579
580                 if (wq_has_sleeper(&ctx->cq_wait)) {
581                         wake_up_interruptible(&ctx->cq_wait);
582                         kill_fasync(&ctx->cq_fasync, SIGIO, POLL_IN);
583                 }
584         }
585 }
586
587 static inline bool io_sqe_needs_user(const struct io_uring_sqe *sqe)
588 {
589         u8 opcode = READ_ONCE(sqe->opcode);
590
591         return !(opcode == IORING_OP_READ_FIXED ||
592                  opcode == IORING_OP_WRITE_FIXED);
593 }
594
595 static inline bool io_prep_async_work(struct io_kiocb *req,
596                                       struct io_kiocb **link)
597 {
598         bool do_hashed = false;
599
600         if (req->sqe) {
601                 switch (req->sqe->opcode) {
602                 case IORING_OP_WRITEV:
603                 case IORING_OP_WRITE_FIXED:
604                         /* only regular files should be hashed for writes */
605                         if (req->flags & REQ_F_ISREG)
606                                 do_hashed = true;
607                         /* fall-through */
608                 case IORING_OP_READV:
609                 case IORING_OP_READ_FIXED:
610                 case IORING_OP_SENDMSG:
611                 case IORING_OP_RECVMSG:
612                 case IORING_OP_ACCEPT:
613                 case IORING_OP_POLL_ADD:
614                 case IORING_OP_CONNECT:
615                         /*
616                          * We know REQ_F_ISREG is not set on some of these
617                          * opcodes, but this enables us to keep the check in
618                          * just one place.
619                          */
620                         if (!(req->flags & REQ_F_ISREG))
621                                 req->work.flags |= IO_WQ_WORK_UNBOUND;
622                         break;
623                 }
624                 if (io_sqe_needs_user(req->sqe))
625                         req->work.flags |= IO_WQ_WORK_NEEDS_USER;
626         }
627
628         *link = io_prep_linked_timeout(req);
629         return do_hashed;
630 }
631
632 static inline void io_queue_async_work(struct io_kiocb *req)
633 {
634         struct io_ring_ctx *ctx = req->ctx;
635         struct io_kiocb *link;
636         bool do_hashed;
637
638         do_hashed = io_prep_async_work(req, &link);
639
640         trace_io_uring_queue_async_work(ctx, do_hashed, req, &req->work,
641                                         req->flags);
642         if (!do_hashed) {
643                 io_wq_enqueue(ctx->io_wq, &req->work);
644         } else {
645                 io_wq_enqueue_hashed(ctx->io_wq, &req->work,
646                                         file_inode(req->file));
647         }
648
649         if (link)
650                 io_queue_linked_timeout(link);
651 }
652
653 static void io_kill_timeout(struct io_kiocb *req)
654 {
655         int ret;
656
657         ret = hrtimer_try_to_cancel(&req->io->timeout.timer);
658         if (ret != -1) {
659                 atomic_inc(&req->ctx->cq_timeouts);
660                 list_del_init(&req->list);
661                 io_cqring_fill_event(req, 0);
662                 io_put_req(req);
663         }
664 }
665
666 static void io_kill_timeouts(struct io_ring_ctx *ctx)
667 {
668         struct io_kiocb *req, *tmp;
669
670         spin_lock_irq(&ctx->completion_lock);
671         list_for_each_entry_safe(req, tmp, &ctx->timeout_list, list)
672                 io_kill_timeout(req);
673         spin_unlock_irq(&ctx->completion_lock);
674 }
675
676 static void io_commit_cqring(struct io_ring_ctx *ctx)
677 {
678         struct io_kiocb *req;
679
680         while ((req = io_get_timeout_req(ctx)) != NULL)
681                 io_kill_timeout(req);
682
683         __io_commit_cqring(ctx);
684
685         while ((req = io_get_deferred_req(ctx)) != NULL) {
686                 req->flags |= REQ_F_IO_DRAINED;
687                 io_queue_async_work(req);
688         }
689 }
690
691 static struct io_uring_cqe *io_get_cqring(struct io_ring_ctx *ctx)
692 {
693         struct io_rings *rings = ctx->rings;
694         unsigned tail;
695
696         tail = ctx->cached_cq_tail;
697         /*
698          * writes to the cq entry need to come after reading head; the
699          * control dependency is enough as we're using WRITE_ONCE to
700          * fill the cq entry
701          */
702         if (tail - READ_ONCE(rings->cq.head) == rings->cq_ring_entries)
703                 return NULL;
704
705         ctx->cached_cq_tail++;
706         return &rings->cqes[tail & ctx->cq_mask];
707 }
708
709 static void io_cqring_ev_posted(struct io_ring_ctx *ctx)
710 {
711         if (waitqueue_active(&ctx->wait))
712                 wake_up(&ctx->wait);
713         if (waitqueue_active(&ctx->sqo_wait))
714                 wake_up(&ctx->sqo_wait);
715         if (ctx->cq_ev_fd)
716                 eventfd_signal(ctx->cq_ev_fd, 1);
717 }
718
719 /* Returns true if there are no backlogged entries after the flush */
720 static bool io_cqring_overflow_flush(struct io_ring_ctx *ctx, bool force)
721 {
722         struct io_rings *rings = ctx->rings;
723         struct io_uring_cqe *cqe;
724         struct io_kiocb *req;
725         unsigned long flags;
726         LIST_HEAD(list);
727
728         if (!force) {
729                 if (list_empty_careful(&ctx->cq_overflow_list))
730                         return true;
731                 if ((ctx->cached_cq_tail - READ_ONCE(rings->cq.head) ==
732                     rings->cq_ring_entries))
733                         return false;
734         }
735
736         spin_lock_irqsave(&ctx->completion_lock, flags);
737
738         /* if force is set, the ring is going away. always drop after that */
739         if (force)
740                 ctx->cq_overflow_flushed = true;
741
742         cqe = NULL;
743         while (!list_empty(&ctx->cq_overflow_list)) {
744                 cqe = io_get_cqring(ctx);
745                 if (!cqe && !force)
746                         break;
747
748                 req = list_first_entry(&ctx->cq_overflow_list, struct io_kiocb,
749                                                 list);
750                 list_move(&req->list, &list);
751                 if (cqe) {
752                         WRITE_ONCE(cqe->user_data, req->user_data);
753                         WRITE_ONCE(cqe->res, req->result);
754                         WRITE_ONCE(cqe->flags, 0);
755                 } else {
756                         WRITE_ONCE(ctx->rings->cq_overflow,
757                                 atomic_inc_return(&ctx->cached_cq_overflow));
758                 }
759         }
760
761         io_commit_cqring(ctx);
762         spin_unlock_irqrestore(&ctx->completion_lock, flags);
763         io_cqring_ev_posted(ctx);
764
765         while (!list_empty(&list)) {
766                 req = list_first_entry(&list, struct io_kiocb, list);
767                 list_del(&req->list);
768                 io_put_req(req);
769         }
770
771         return cqe != NULL;
772 }
773
774 static void io_cqring_fill_event(struct io_kiocb *req, long res)
775 {
776         struct io_ring_ctx *ctx = req->ctx;
777         struct io_uring_cqe *cqe;
778
779         trace_io_uring_complete(ctx, req->user_data, res);
780
781         /*
782          * If we can't get a cq entry, userspace overflowed the
783          * submission (by quite a lot). Increment the overflow count in
784          * the ring.
785          */
786         cqe = io_get_cqring(ctx);
787         if (likely(cqe)) {
788                 WRITE_ONCE(cqe->user_data, req->user_data);
789                 WRITE_ONCE(cqe->res, res);
790                 WRITE_ONCE(cqe->flags, 0);
791         } else if (ctx->cq_overflow_flushed) {
792                 WRITE_ONCE(ctx->rings->cq_overflow,
793                                 atomic_inc_return(&ctx->cached_cq_overflow));
794         } else {
795                 refcount_inc(&req->refs);
796                 req->result = res;
797                 list_add_tail(&req->list, &ctx->cq_overflow_list);
798         }
799 }
800
801 static void io_cqring_add_event(struct io_kiocb *req, long res)
802 {
803         struct io_ring_ctx *ctx = req->ctx;
804         unsigned long flags;
805
806         spin_lock_irqsave(&ctx->completion_lock, flags);
807         io_cqring_fill_event(req, res);
808         io_commit_cqring(ctx);
809         spin_unlock_irqrestore(&ctx->completion_lock, flags);
810
811         io_cqring_ev_posted(ctx);
812 }
813
814 static inline bool io_is_fallback_req(struct io_kiocb *req)
815 {
816         return req == (struct io_kiocb *)
817                         ((unsigned long) req->ctx->fallback_req & ~1UL);
818 }
819
820 static struct io_kiocb *io_get_fallback_req(struct io_ring_ctx *ctx)
821 {
822         struct io_kiocb *req;
823
824         req = ctx->fallback_req;
825         if (!test_and_set_bit_lock(0, (unsigned long *) ctx->fallback_req))
826                 return req;
827
828         return NULL;
829 }
830
831 static struct io_kiocb *io_get_req(struct io_ring_ctx *ctx,
832                                    struct io_submit_state *state)
833 {
834         gfp_t gfp = GFP_KERNEL | __GFP_NOWARN;
835         struct io_kiocb *req;
836
837         if (!percpu_ref_tryget(&ctx->refs))
838                 return NULL;
839
840         if (!state) {
841                 req = kmem_cache_alloc(req_cachep, gfp);
842                 if (unlikely(!req))
843                         goto fallback;
844         } else if (!state->free_reqs) {
845                 size_t sz;
846                 int ret;
847
848                 sz = min_t(size_t, state->ios_left, ARRAY_SIZE(state->reqs));
849                 ret = kmem_cache_alloc_bulk(req_cachep, gfp, sz, state->reqs);
850
851                 /*
852                  * Bulk alloc is all-or-nothing. If we fail to get a batch,
853                  * retry single alloc to be on the safe side.
854                  */
855                 if (unlikely(ret <= 0)) {
856                         state->reqs[0] = kmem_cache_alloc(req_cachep, gfp);
857                         if (!state->reqs[0])
858                                 goto fallback;
859                         ret = 1;
860                 }
861                 state->free_reqs = ret - 1;
862                 state->cur_req = 1;
863                 req = state->reqs[0];
864         } else {
865                 req = state->reqs[state->cur_req];
866                 state->free_reqs--;
867                 state->cur_req++;
868         }
869
870 got_it:
871         req->io = NULL;
872         req->ring_file = NULL;
873         req->file = NULL;
874         req->ctx = ctx;
875         req->flags = 0;
876         /* one is dropped after submission, the other at completion */
877         refcount_set(&req->refs, 2);
878         req->result = 0;
879         INIT_IO_WORK(&req->work, io_wq_submit_work);
880         return req;
881 fallback:
882         req = io_get_fallback_req(ctx);
883         if (req)
884                 goto got_it;
885         percpu_ref_put(&ctx->refs);
886         return NULL;
887 }
888
889 static void io_free_req_many(struct io_ring_ctx *ctx, void **reqs, int *nr)
890 {
891         if (*nr) {
892                 kmem_cache_free_bulk(req_cachep, *nr, reqs);
893                 percpu_ref_put_many(&ctx->refs, *nr);
894                 *nr = 0;
895         }
896 }
897
898 static void __io_free_req(struct io_kiocb *req)
899 {
900         struct io_ring_ctx *ctx = req->ctx;
901
902         if (req->io)
903                 kfree(req->io);
904         if (req->file && !(req->flags & REQ_F_FIXED_FILE))
905                 fput(req->file);
906         if (req->flags & REQ_F_INFLIGHT) {
907                 unsigned long flags;
908
909                 spin_lock_irqsave(&ctx->inflight_lock, flags);
910                 list_del(&req->inflight_entry);
911                 if (waitqueue_active(&ctx->inflight_wait))
912                         wake_up(&ctx->inflight_wait);
913                 spin_unlock_irqrestore(&ctx->inflight_lock, flags);
914         }
915         percpu_ref_put(&ctx->refs);
916         if (likely(!io_is_fallback_req(req)))
917                 kmem_cache_free(req_cachep, req);
918         else
919                 clear_bit_unlock(0, (unsigned long *) ctx->fallback_req);
920 }
921
922 static bool io_link_cancel_timeout(struct io_kiocb *req)
923 {
924         struct io_ring_ctx *ctx = req->ctx;
925         int ret;
926
927         ret = hrtimer_try_to_cancel(&req->io->timeout.timer);
928         if (ret != -1) {
929                 io_cqring_fill_event(req, -ECANCELED);
930                 io_commit_cqring(ctx);
931                 req->flags &= ~REQ_F_LINK;
932                 io_put_req(req);
933                 return true;
934         }
935
936         return false;
937 }
938
939 static void io_req_link_next(struct io_kiocb *req, struct io_kiocb **nxtptr)
940 {
941         struct io_ring_ctx *ctx = req->ctx;
942         bool wake_ev = false;
943
944         /* Already got next link */
945         if (req->flags & REQ_F_LINK_NEXT)
946                 return;
947
948         /*
949          * The list should never be empty when we are called here. But could
950          * potentially happen if the chain is messed up, check to be on the
951          * safe side.
952          */
953         while (!list_empty(&req->link_list)) {
954                 struct io_kiocb *nxt = list_first_entry(&req->link_list,
955                                                 struct io_kiocb, link_list);
956
957                 if (unlikely((req->flags & REQ_F_LINK_TIMEOUT) &&
958                              (nxt->flags & REQ_F_TIMEOUT))) {
959                         list_del_init(&nxt->link_list);
960                         wake_ev |= io_link_cancel_timeout(nxt);
961                         req->flags &= ~REQ_F_LINK_TIMEOUT;
962                         continue;
963                 }
964
965                 list_del_init(&req->link_list);
966                 if (!list_empty(&nxt->link_list))
967                         nxt->flags |= REQ_F_LINK;
968                 *nxtptr = nxt;
969                 break;
970         }
971
972         req->flags |= REQ_F_LINK_NEXT;
973         if (wake_ev)
974                 io_cqring_ev_posted(ctx);
975 }
976
977 /*
978  * Called if REQ_F_LINK is set, and we fail the head request
979  */
980 static void io_fail_links(struct io_kiocb *req)
981 {
982         struct io_ring_ctx *ctx = req->ctx;
983         unsigned long flags;
984
985         spin_lock_irqsave(&ctx->completion_lock, flags);
986
987         while (!list_empty(&req->link_list)) {
988                 struct io_kiocb *link = list_first_entry(&req->link_list,
989                                                 struct io_kiocb, link_list);
990
991                 list_del_init(&link->link_list);
992                 trace_io_uring_fail_link(req, link);
993
994                 if ((req->flags & REQ_F_LINK_TIMEOUT) &&
995                     link->sqe->opcode == IORING_OP_LINK_TIMEOUT) {
996                         io_link_cancel_timeout(link);
997                 } else {
998                         io_cqring_fill_event(link, -ECANCELED);
999                         __io_double_put_req(link);
1000                 }
1001                 req->flags &= ~REQ_F_LINK_TIMEOUT;
1002         }
1003
1004         io_commit_cqring(ctx);
1005         spin_unlock_irqrestore(&ctx->completion_lock, flags);
1006         io_cqring_ev_posted(ctx);
1007 }
1008
1009 static void io_req_find_next(struct io_kiocb *req, struct io_kiocb **nxt)
1010 {
1011         if (likely(!(req->flags & REQ_F_LINK)))
1012                 return;
1013
1014         /*
1015          * If LINK is set, we have dependent requests in this chain. If we
1016          * didn't fail this request, queue the first one up, moving any other
1017          * dependencies to the next request. In case of failure, fail the rest
1018          * of the chain.
1019          */
1020         if (req->flags & REQ_F_FAIL_LINK) {
1021                 io_fail_links(req);
1022         } else if ((req->flags & (REQ_F_LINK_TIMEOUT | REQ_F_COMP_LOCKED)) ==
1023                         REQ_F_LINK_TIMEOUT) {
1024                 struct io_ring_ctx *ctx = req->ctx;
1025                 unsigned long flags;
1026
1027                 /*
1028                  * If this is a timeout link, we could be racing with the
1029                  * timeout timer. Grab the completion lock for this case to
1030                  * protect against that.
1031                  */
1032                 spin_lock_irqsave(&ctx->completion_lock, flags);
1033                 io_req_link_next(req, nxt);
1034                 spin_unlock_irqrestore(&ctx->completion_lock, flags);
1035         } else {
1036                 io_req_link_next(req, nxt);
1037         }
1038 }
1039
1040 static void io_free_req(struct io_kiocb *req)
1041 {
1042         struct io_kiocb *nxt = NULL;
1043
1044         io_req_find_next(req, &nxt);
1045         __io_free_req(req);
1046
1047         if (nxt)
1048                 io_queue_async_work(nxt);
1049 }
1050
1051 /*
1052  * Drop reference to request, return next in chain (if there is one) if this
1053  * was the last reference to this request.
1054  */
1055 __attribute__((nonnull))
1056 static void io_put_req_find_next(struct io_kiocb *req, struct io_kiocb **nxtptr)
1057 {
1058         io_req_find_next(req, nxtptr);
1059
1060         if (refcount_dec_and_test(&req->refs))
1061                 __io_free_req(req);
1062 }
1063
1064 static void io_put_req(struct io_kiocb *req)
1065 {
1066         if (refcount_dec_and_test(&req->refs))
1067                 io_free_req(req);
1068 }
1069
1070 /*
1071  * Must only be used if we don't need to care about links, usually from
1072  * within the completion handling itself.
1073  */
1074 static void __io_double_put_req(struct io_kiocb *req)
1075 {
1076         /* drop both submit and complete references */
1077         if (refcount_sub_and_test(2, &req->refs))
1078                 __io_free_req(req);
1079 }
1080
1081 static void io_double_put_req(struct io_kiocb *req)
1082 {
1083         /* drop both submit and complete references */
1084         if (refcount_sub_and_test(2, &req->refs))
1085                 io_free_req(req);
1086 }
1087
1088 static unsigned io_cqring_events(struct io_ring_ctx *ctx, bool noflush)
1089 {
1090         struct io_rings *rings = ctx->rings;
1091
1092         /*
1093          * noflush == true is from the waitqueue handler, just ensure we wake
1094          * up the task, and the next invocation will flush the entries. We
1095          * cannot safely to it from here.
1096          */
1097         if (noflush && !list_empty(&ctx->cq_overflow_list))
1098                 return -1U;
1099
1100         io_cqring_overflow_flush(ctx, false);
1101
1102         /* See comment at the top of this file */
1103         smp_rmb();
1104         return READ_ONCE(rings->cq.tail) - READ_ONCE(rings->cq.head);
1105 }
1106
1107 static inline unsigned int io_sqring_entries(struct io_ring_ctx *ctx)
1108 {
1109         struct io_rings *rings = ctx->rings;
1110
1111         /* make sure SQ entry isn't read before tail */
1112         return smp_load_acquire(&rings->sq.tail) - ctx->cached_sq_head;
1113 }
1114
1115 /*
1116  * Find and free completed poll iocbs
1117  */
1118 static void io_iopoll_complete(struct io_ring_ctx *ctx, unsigned int *nr_events,
1119                                struct list_head *done)
1120 {
1121         void *reqs[IO_IOPOLL_BATCH];
1122         struct io_kiocb *req;
1123         int to_free;
1124
1125         to_free = 0;
1126         while (!list_empty(done)) {
1127                 req = list_first_entry(done, struct io_kiocb, list);
1128                 list_del(&req->list);
1129
1130                 io_cqring_fill_event(req, req->result);
1131                 (*nr_events)++;
1132
1133                 if (refcount_dec_and_test(&req->refs)) {
1134                         /* If we're not using fixed files, we have to pair the
1135                          * completion part with the file put. Use regular
1136                          * completions for those, only batch free for fixed
1137                          * file and non-linked commands.
1138                          */
1139                         if (((req->flags & (REQ_F_FIXED_FILE|REQ_F_LINK)) ==
1140                             REQ_F_FIXED_FILE) && !io_is_fallback_req(req) &&
1141                             !req->io) {
1142                                 reqs[to_free++] = req;
1143                                 if (to_free == ARRAY_SIZE(reqs))
1144                                         io_free_req_many(ctx, reqs, &to_free);
1145                         } else {
1146                                 io_free_req(req);
1147                         }
1148                 }
1149         }
1150
1151         io_commit_cqring(ctx);
1152         io_free_req_many(ctx, reqs, &to_free);
1153 }
1154
1155 static int io_do_iopoll(struct io_ring_ctx *ctx, unsigned int *nr_events,
1156                         long min)
1157 {
1158         struct io_kiocb *req, *tmp;
1159         LIST_HEAD(done);
1160         bool spin;
1161         int ret;
1162
1163         /*
1164          * Only spin for completions if we don't have multiple devices hanging
1165          * off our complete list, and we're under the requested amount.
1166          */
1167         spin = !ctx->poll_multi_file && *nr_events < min;
1168
1169         ret = 0;
1170         list_for_each_entry_safe(req, tmp, &ctx->poll_list, list) {
1171                 struct kiocb *kiocb = &req->rw;
1172
1173                 /*
1174                  * Move completed entries to our local list. If we find a
1175                  * request that requires polling, break out and complete
1176                  * the done list first, if we have entries there.
1177                  */
1178                 if (req->flags & REQ_F_IOPOLL_COMPLETED) {
1179                         list_move_tail(&req->list, &done);
1180                         continue;
1181                 }
1182                 if (!list_empty(&done))
1183                         break;
1184
1185                 ret = kiocb->ki_filp->f_op->iopoll(kiocb, spin);
1186                 if (ret < 0)
1187                         break;
1188
1189                 if (ret && spin)
1190                         spin = false;
1191                 ret = 0;
1192         }
1193
1194         if (!list_empty(&done))
1195                 io_iopoll_complete(ctx, nr_events, &done);
1196
1197         return ret;
1198 }
1199
1200 /*
1201  * Poll for a minimum of 'min' events. Note that if min == 0 we consider that a
1202  * non-spinning poll check - we'll still enter the driver poll loop, but only
1203  * as a non-spinning completion check.
1204  */
1205 static int io_iopoll_getevents(struct io_ring_ctx *ctx, unsigned int *nr_events,
1206                                 long min)
1207 {
1208         while (!list_empty(&ctx->poll_list) && !need_resched()) {
1209                 int ret;
1210
1211                 ret = io_do_iopoll(ctx, nr_events, min);
1212                 if (ret < 0)
1213                         return ret;
1214                 if (!min || *nr_events >= min)
1215                         return 0;
1216         }
1217
1218         return 1;
1219 }
1220
1221 /*
1222  * We can't just wait for polled events to come to us, we have to actively
1223  * find and complete them.
1224  */
1225 static void io_iopoll_reap_events(struct io_ring_ctx *ctx)
1226 {
1227         if (!(ctx->flags & IORING_SETUP_IOPOLL))
1228                 return;
1229
1230         mutex_lock(&ctx->uring_lock);
1231         while (!list_empty(&ctx->poll_list)) {
1232                 unsigned int nr_events = 0;
1233
1234                 io_iopoll_getevents(ctx, &nr_events, 1);
1235
1236                 /*
1237                  * Ensure we allow local-to-the-cpu processing to take place,
1238                  * in this case we need to ensure that we reap all events.
1239                  */
1240                 cond_resched();
1241         }
1242         mutex_unlock(&ctx->uring_lock);
1243 }
1244
1245 static int __io_iopoll_check(struct io_ring_ctx *ctx, unsigned *nr_events,
1246                             long min)
1247 {
1248         int iters = 0, ret = 0;
1249
1250         do {
1251                 int tmin = 0;
1252
1253                 /*
1254                  * Don't enter poll loop if we already have events pending.
1255                  * If we do, we can potentially be spinning for commands that
1256                  * already triggered a CQE (eg in error).
1257                  */
1258                 if (io_cqring_events(ctx, false))
1259                         break;
1260
1261                 /*
1262                  * If a submit got punted to a workqueue, we can have the
1263                  * application entering polling for a command before it gets
1264                  * issued. That app will hold the uring_lock for the duration
1265                  * of the poll right here, so we need to take a breather every
1266                  * now and then to ensure that the issue has a chance to add
1267                  * the poll to the issued list. Otherwise we can spin here
1268                  * forever, while the workqueue is stuck trying to acquire the
1269                  * very same mutex.
1270                  */
1271                 if (!(++iters & 7)) {
1272                         mutex_unlock(&ctx->uring_lock);
1273                         mutex_lock(&ctx->uring_lock);
1274                 }
1275
1276                 if (*nr_events < min)
1277                         tmin = min - *nr_events;
1278
1279                 ret = io_iopoll_getevents(ctx, nr_events, tmin);
1280                 if (ret <= 0)
1281                         break;
1282                 ret = 0;
1283         } while (min && !*nr_events && !need_resched());
1284
1285         return ret;
1286 }
1287
1288 static int io_iopoll_check(struct io_ring_ctx *ctx, unsigned *nr_events,
1289                            long min)
1290 {
1291         int ret;
1292
1293         /*
1294          * We disallow the app entering submit/complete with polling, but we
1295          * still need to lock the ring to prevent racing with polled issue
1296          * that got punted to a workqueue.
1297          */
1298         mutex_lock(&ctx->uring_lock);
1299         ret = __io_iopoll_check(ctx, nr_events, min);
1300         mutex_unlock(&ctx->uring_lock);
1301         return ret;
1302 }
1303
1304 static void kiocb_end_write(struct io_kiocb *req)
1305 {
1306         /*
1307          * Tell lockdep we inherited freeze protection from submission
1308          * thread.
1309          */
1310         if (req->flags & REQ_F_ISREG) {
1311                 struct inode *inode = file_inode(req->file);
1312
1313                 __sb_writers_acquired(inode->i_sb, SB_FREEZE_WRITE);
1314         }
1315         file_end_write(req->file);
1316 }
1317
1318 static inline void req_set_fail_links(struct io_kiocb *req)
1319 {
1320         if ((req->flags & (REQ_F_LINK | REQ_F_HARDLINK)) == REQ_F_LINK)
1321                 req->flags |= REQ_F_FAIL_LINK;
1322 }
1323
1324 static void io_complete_rw_common(struct kiocb *kiocb, long res)
1325 {
1326         struct io_kiocb *req = container_of(kiocb, struct io_kiocb, rw);
1327
1328         if (kiocb->ki_flags & IOCB_WRITE)
1329                 kiocb_end_write(req);
1330
1331         if (res != req->result)
1332                 req_set_fail_links(req);
1333         io_cqring_add_event(req, res);
1334 }
1335
1336 static void io_complete_rw(struct kiocb *kiocb, long res, long res2)
1337 {
1338         struct io_kiocb *req = container_of(kiocb, struct io_kiocb, rw);
1339
1340         io_complete_rw_common(kiocb, res);
1341         io_put_req(req);
1342 }
1343
1344 static struct io_kiocb *__io_complete_rw(struct kiocb *kiocb, long res)
1345 {
1346         struct io_kiocb *req = container_of(kiocb, struct io_kiocb, rw);
1347         struct io_kiocb *nxt = NULL;
1348
1349         io_complete_rw_common(kiocb, res);
1350         io_put_req_find_next(req, &nxt);
1351
1352         return nxt;
1353 }
1354
1355 static void io_complete_rw_iopoll(struct kiocb *kiocb, long res, long res2)
1356 {
1357         struct io_kiocb *req = container_of(kiocb, struct io_kiocb, rw);
1358
1359         if (kiocb->ki_flags & IOCB_WRITE)
1360                 kiocb_end_write(req);
1361
1362         if (res != req->result)
1363                 req_set_fail_links(req);
1364         req->result = res;
1365         if (res != -EAGAIN)
1366                 req->flags |= REQ_F_IOPOLL_COMPLETED;
1367 }
1368
1369 /*
1370  * After the iocb has been issued, it's safe to be found on the poll list.
1371  * Adding the kiocb to the list AFTER submission ensures that we don't
1372  * find it from a io_iopoll_getevents() thread before the issuer is done
1373  * accessing the kiocb cookie.
1374  */
1375 static void io_iopoll_req_issued(struct io_kiocb *req)
1376 {
1377         struct io_ring_ctx *ctx = req->ctx;
1378
1379         /*
1380          * Track whether we have multiple files in our lists. This will impact
1381          * how we do polling eventually, not spinning if we're on potentially
1382          * different devices.
1383          */
1384         if (list_empty(&ctx->poll_list)) {
1385                 ctx->poll_multi_file = false;
1386         } else if (!ctx->poll_multi_file) {
1387                 struct io_kiocb *list_req;
1388
1389                 list_req = list_first_entry(&ctx->poll_list, struct io_kiocb,
1390                                                 list);
1391                 if (list_req->rw.ki_filp != req->rw.ki_filp)
1392                         ctx->poll_multi_file = true;
1393         }
1394
1395         /*
1396          * For fast devices, IO may have already completed. If it has, add
1397          * it to the front so we find it first.
1398          */
1399         if (req->flags & REQ_F_IOPOLL_COMPLETED)
1400                 list_add(&req->list, &ctx->poll_list);
1401         else
1402                 list_add_tail(&req->list, &ctx->poll_list);
1403 }
1404
1405 static void io_file_put(struct io_submit_state *state)
1406 {
1407         if (state->file) {
1408                 int diff = state->has_refs - state->used_refs;
1409
1410                 if (diff)
1411                         fput_many(state->file, diff);
1412                 state->file = NULL;
1413         }
1414 }
1415
1416 /*
1417  * Get as many references to a file as we have IOs left in this submission,
1418  * assuming most submissions are for one file, or at least that each file
1419  * has more than one submission.
1420  */
1421 static struct file *io_file_get(struct io_submit_state *state, int fd)
1422 {
1423         if (!state)
1424                 return fget(fd);
1425
1426         if (state->file) {
1427                 if (state->fd == fd) {
1428                         state->used_refs++;
1429                         state->ios_left--;
1430                         return state->file;
1431                 }
1432                 io_file_put(state);
1433         }
1434         state->file = fget_many(fd, state->ios_left);
1435         if (!state->file)
1436                 return NULL;
1437
1438         state->fd = fd;
1439         state->has_refs = state->ios_left;
1440         state->used_refs = 1;
1441         state->ios_left--;
1442         return state->file;
1443 }
1444
1445 /*
1446  * If we tracked the file through the SCM inflight mechanism, we could support
1447  * any file. For now, just ensure that anything potentially problematic is done
1448  * inline.
1449  */
1450 static bool io_file_supports_async(struct file *file)
1451 {
1452         umode_t mode = file_inode(file)->i_mode;
1453
1454         if (S_ISBLK(mode) || S_ISCHR(mode) || S_ISSOCK(mode))
1455                 return true;
1456         if (S_ISREG(mode) && file->f_op != &io_uring_fops)
1457                 return true;
1458
1459         return false;
1460 }
1461
1462 static int io_prep_rw(struct io_kiocb *req, bool force_nonblock)
1463 {
1464         const struct io_uring_sqe *sqe = req->sqe;
1465         struct io_ring_ctx *ctx = req->ctx;
1466         struct kiocb *kiocb = &req->rw;
1467         unsigned ioprio;
1468         int ret;
1469
1470         if (!req->file)
1471                 return -EBADF;
1472
1473         if (S_ISREG(file_inode(req->file)->i_mode))
1474                 req->flags |= REQ_F_ISREG;
1475
1476         kiocb->ki_pos = READ_ONCE(sqe->off);
1477         kiocb->ki_flags = iocb_flags(kiocb->ki_filp);
1478         kiocb->ki_hint = ki_hint_validate(file_write_hint(kiocb->ki_filp));
1479
1480         ioprio = READ_ONCE(sqe->ioprio);
1481         if (ioprio) {
1482                 ret = ioprio_check_cap(ioprio);
1483                 if (ret)
1484                         return ret;
1485
1486                 kiocb->ki_ioprio = ioprio;
1487         } else
1488                 kiocb->ki_ioprio = get_current_ioprio();
1489
1490         ret = kiocb_set_rw_flags(kiocb, READ_ONCE(sqe->rw_flags));
1491         if (unlikely(ret))
1492                 return ret;
1493
1494         /* don't allow async punt if RWF_NOWAIT was requested */
1495         if ((kiocb->ki_flags & IOCB_NOWAIT) ||
1496             (req->file->f_flags & O_NONBLOCK))
1497                 req->flags |= REQ_F_NOWAIT;
1498
1499         if (force_nonblock)
1500                 kiocb->ki_flags |= IOCB_NOWAIT;
1501
1502         if (ctx->flags & IORING_SETUP_IOPOLL) {
1503                 if (!(kiocb->ki_flags & IOCB_DIRECT) ||
1504                     !kiocb->ki_filp->f_op->iopoll)
1505                         return -EOPNOTSUPP;
1506
1507                 kiocb->ki_flags |= IOCB_HIPRI;
1508                 kiocb->ki_complete = io_complete_rw_iopoll;
1509                 req->result = 0;
1510         } else {
1511                 if (kiocb->ki_flags & IOCB_HIPRI)
1512                         return -EINVAL;
1513                 kiocb->ki_complete = io_complete_rw;
1514         }
1515         return 0;
1516 }
1517
1518 static inline void io_rw_done(struct kiocb *kiocb, ssize_t ret)
1519 {
1520         switch (ret) {
1521         case -EIOCBQUEUED:
1522                 break;
1523         case -ERESTARTSYS:
1524         case -ERESTARTNOINTR:
1525         case -ERESTARTNOHAND:
1526         case -ERESTART_RESTARTBLOCK:
1527                 /*
1528                  * We can't just restart the syscall, since previously
1529                  * submitted sqes may already be in progress. Just fail this
1530                  * IO with EINTR.
1531                  */
1532                 ret = -EINTR;
1533                 /* fall through */
1534         default:
1535                 kiocb->ki_complete(kiocb, ret, 0);
1536         }
1537 }
1538
1539 static void kiocb_done(struct kiocb *kiocb, ssize_t ret, struct io_kiocb **nxt,
1540                        bool in_async)
1541 {
1542         if (in_async && ret >= 0 && kiocb->ki_complete == io_complete_rw)
1543                 *nxt = __io_complete_rw(kiocb, ret);
1544         else
1545                 io_rw_done(kiocb, ret);
1546 }
1547
1548 static ssize_t io_import_fixed(struct io_ring_ctx *ctx, int rw,
1549                                const struct io_uring_sqe *sqe,
1550                                struct iov_iter *iter)
1551 {
1552         size_t len = READ_ONCE(sqe->len);
1553         struct io_mapped_ubuf *imu;
1554         unsigned index, buf_index;
1555         size_t offset;
1556         u64 buf_addr;
1557
1558         /* attempt to use fixed buffers without having provided iovecs */
1559         if (unlikely(!ctx->user_bufs))
1560                 return -EFAULT;
1561
1562         buf_index = READ_ONCE(sqe->buf_index);
1563         if (unlikely(buf_index >= ctx->nr_user_bufs))
1564                 return -EFAULT;
1565
1566         index = array_index_nospec(buf_index, ctx->nr_user_bufs);
1567         imu = &ctx->user_bufs[index];
1568         buf_addr = READ_ONCE(sqe->addr);
1569
1570         /* overflow */
1571         if (buf_addr + len < buf_addr)
1572                 return -EFAULT;
1573         /* not inside the mapped region */
1574         if (buf_addr < imu->ubuf || buf_addr + len > imu->ubuf + imu->len)
1575                 return -EFAULT;
1576
1577         /*
1578          * May not be a start of buffer, set size appropriately
1579          * and advance us to the beginning.
1580          */
1581         offset = buf_addr - imu->ubuf;
1582         iov_iter_bvec(iter, rw, imu->bvec, imu->nr_bvecs, offset + len);
1583
1584         if (offset) {
1585                 /*
1586                  * Don't use iov_iter_advance() here, as it's really slow for
1587                  * using the latter parts of a big fixed buffer - it iterates
1588                  * over each segment manually. We can cheat a bit here, because
1589                  * we know that:
1590                  *
1591                  * 1) it's a BVEC iter, we set it up
1592                  * 2) all bvecs are PAGE_SIZE in size, except potentially the
1593                  *    first and last bvec
1594                  *
1595                  * So just find our index, and adjust the iterator afterwards.
1596                  * If the offset is within the first bvec (or the whole first
1597                  * bvec, just use iov_iter_advance(). This makes it easier
1598                  * since we can just skip the first segment, which may not
1599                  * be PAGE_SIZE aligned.
1600                  */
1601                 const struct bio_vec *bvec = imu->bvec;
1602
1603                 if (offset <= bvec->bv_len) {
1604                         iov_iter_advance(iter, offset);
1605                 } else {
1606                         unsigned long seg_skip;
1607
1608                         /* skip first vec */
1609                         offset -= bvec->bv_len;
1610                         seg_skip = 1 + (offset >> PAGE_SHIFT);
1611
1612                         iter->bvec = bvec + seg_skip;
1613                         iter->nr_segs -= seg_skip;
1614                         iter->count -= bvec->bv_len + offset;
1615                         iter->iov_offset = offset & ~PAGE_MASK;
1616                 }
1617         }
1618
1619         return len;
1620 }
1621
1622 static ssize_t io_import_iovec(int rw, struct io_kiocb *req,
1623                                struct iovec **iovec, struct iov_iter *iter)
1624 {
1625         const struct io_uring_sqe *sqe = req->sqe;
1626         void __user *buf = u64_to_user_ptr(READ_ONCE(sqe->addr));
1627         size_t sqe_len = READ_ONCE(sqe->len);
1628         u8 opcode;
1629
1630         /*
1631          * We're reading ->opcode for the second time, but the first read
1632          * doesn't care whether it's _FIXED or not, so it doesn't matter
1633          * whether ->opcode changes concurrently. The first read does care
1634          * about whether it is a READ or a WRITE, so we don't trust this read
1635          * for that purpose and instead let the caller pass in the read/write
1636          * flag.
1637          */
1638         opcode = READ_ONCE(sqe->opcode);
1639         if (opcode == IORING_OP_READ_FIXED || opcode == IORING_OP_WRITE_FIXED) {
1640                 *iovec = NULL;
1641                 return io_import_fixed(req->ctx, rw, sqe, iter);
1642         }
1643
1644         if (req->io) {
1645                 struct io_async_rw *iorw = &req->io->rw;
1646
1647                 *iovec = iorw->iov;
1648                 iov_iter_init(iter, rw, *iovec, iorw->nr_segs, iorw->size);
1649                 if (iorw->iov == iorw->fast_iov)
1650                         *iovec = NULL;
1651                 return iorw->size;
1652         }
1653
1654         if (!req->has_user)
1655                 return -EFAULT;
1656
1657 #ifdef CONFIG_COMPAT
1658         if (req->ctx->compat)
1659                 return compat_import_iovec(rw, buf, sqe_len, UIO_FASTIOV,
1660                                                 iovec, iter);
1661 #endif
1662
1663         return import_iovec(rw, buf, sqe_len, UIO_FASTIOV, iovec, iter);
1664 }
1665
1666 /*
1667  * For files that don't have ->read_iter() and ->write_iter(), handle them
1668  * by looping over ->read() or ->write() manually.
1669  */
1670 static ssize_t loop_rw_iter(int rw, struct file *file, struct kiocb *kiocb,
1671                            struct iov_iter *iter)
1672 {
1673         ssize_t ret = 0;
1674
1675         /*
1676          * Don't support polled IO through this interface, and we can't
1677          * support non-blocking either. For the latter, this just causes
1678          * the kiocb to be handled from an async context.
1679          */
1680         if (kiocb->ki_flags & IOCB_HIPRI)
1681                 return -EOPNOTSUPP;
1682         if (kiocb->ki_flags & IOCB_NOWAIT)
1683                 return -EAGAIN;
1684
1685         while (iov_iter_count(iter)) {
1686                 struct iovec iovec;
1687                 ssize_t nr;
1688
1689                 if (!iov_iter_is_bvec(iter)) {
1690                         iovec = iov_iter_iovec(iter);
1691                 } else {
1692                         /* fixed buffers import bvec */
1693                         iovec.iov_base = kmap(iter->bvec->bv_page)
1694                                                 + iter->iov_offset;
1695                         iovec.iov_len = min(iter->count,
1696                                         iter->bvec->bv_len - iter->iov_offset);
1697                 }
1698
1699                 if (rw == READ) {
1700                         nr = file->f_op->read(file, iovec.iov_base,
1701                                               iovec.iov_len, &kiocb->ki_pos);
1702                 } else {
1703                         nr = file->f_op->write(file, iovec.iov_base,
1704                                                iovec.iov_len, &kiocb->ki_pos);
1705                 }
1706
1707                 if (iov_iter_is_bvec(iter))
1708                         kunmap(iter->bvec->bv_page);
1709
1710                 if (nr < 0) {
1711                         if (!ret)
1712                                 ret = nr;
1713                         break;
1714                 }
1715                 ret += nr;
1716                 if (nr != iovec.iov_len)
1717                         break;
1718                 iov_iter_advance(iter, nr);
1719         }
1720
1721         return ret;
1722 }
1723
1724 static void io_req_map_rw(struct io_kiocb *req, ssize_t io_size,
1725                           struct iovec *iovec, struct iovec *fast_iov,
1726                           struct iov_iter *iter)
1727 {
1728         req->io->rw.nr_segs = iter->nr_segs;
1729         req->io->rw.size = io_size;
1730         req->io->rw.iov = iovec;
1731         if (!req->io->rw.iov) {
1732                 req->io->rw.iov = req->io->rw.fast_iov;
1733                 memcpy(req->io->rw.iov, fast_iov,
1734                         sizeof(struct iovec) * iter->nr_segs);
1735         }
1736 }
1737
1738 static int io_alloc_async_ctx(struct io_kiocb *req)
1739 {
1740         req->io = kmalloc(sizeof(*req->io), GFP_KERNEL);
1741         if (req->io) {
1742                 memcpy(&req->io->sqe, req->sqe, sizeof(req->io->sqe));
1743                 req->sqe = &req->io->sqe;
1744                 return 0;
1745         }
1746
1747         return 1;
1748 }
1749
1750 static void io_rw_async(struct io_wq_work **workptr)
1751 {
1752         struct io_kiocb *req = container_of(*workptr, struct io_kiocb, work);
1753         struct iovec *iov = NULL;
1754
1755         if (req->io->rw.iov != req->io->rw.fast_iov)
1756                 iov = req->io->rw.iov;
1757         io_wq_submit_work(workptr);
1758         kfree(iov);
1759 }
1760
1761 static int io_setup_async_rw(struct io_kiocb *req, ssize_t io_size,
1762                              struct iovec *iovec, struct iovec *fast_iov,
1763                              struct iov_iter *iter)
1764 {
1765         if (!req->io && io_alloc_async_ctx(req))
1766                 return -ENOMEM;
1767
1768         io_req_map_rw(req, io_size, iovec, fast_iov, iter);
1769         req->work.func = io_rw_async;
1770         return 0;
1771 }
1772
1773 static int io_read_prep(struct io_kiocb *req, struct iovec **iovec,
1774                         struct iov_iter *iter, bool force_nonblock)
1775 {
1776         ssize_t ret;
1777
1778         ret = io_prep_rw(req, force_nonblock);
1779         if (ret)
1780                 return ret;
1781
1782         if (unlikely(!(req->file->f_mode & FMODE_READ)))
1783                 return -EBADF;
1784
1785         return io_import_iovec(READ, req, iovec, iter);
1786 }
1787
1788 static int io_read(struct io_kiocb *req, struct io_kiocb **nxt,
1789                    bool force_nonblock)
1790 {
1791         struct iovec inline_vecs[UIO_FASTIOV], *iovec = inline_vecs;
1792         struct kiocb *kiocb = &req->rw;
1793         struct iov_iter iter;
1794         struct file *file;
1795         size_t iov_count;
1796         ssize_t io_size, ret;
1797
1798         if (!req->io) {
1799                 ret = io_read_prep(req, &iovec, &iter, force_nonblock);
1800                 if (ret < 0)
1801                         return ret;
1802         } else {
1803                 ret = io_import_iovec(READ, req, &iovec, &iter);
1804                 if (ret < 0)
1805                         return ret;
1806         }
1807
1808         file = req->file;
1809         io_size = ret;
1810         if (req->flags & REQ_F_LINK)
1811                 req->result = io_size;
1812
1813         /*
1814          * If the file doesn't support async, mark it as REQ_F_MUST_PUNT so
1815          * we know to async punt it even if it was opened O_NONBLOCK
1816          */
1817         if (force_nonblock && !io_file_supports_async(file)) {
1818                 req->flags |= REQ_F_MUST_PUNT;
1819                 goto copy_iov;
1820         }
1821
1822         iov_count = iov_iter_count(&iter);
1823         ret = rw_verify_area(READ, file, &kiocb->ki_pos, iov_count);
1824         if (!ret) {
1825                 ssize_t ret2;
1826
1827                 if (file->f_op->read_iter)
1828                         ret2 = call_read_iter(file, kiocb, &iter);
1829                 else
1830                         ret2 = loop_rw_iter(READ, file, kiocb, &iter);
1831
1832                 /*
1833                  * In case of a short read, punt to async. This can happen
1834                  * if we have data partially cached. Alternatively we can
1835                  * return the short read, in which case the application will
1836                  * need to issue another SQE and wait for it. That SQE will
1837                  * need async punt anyway, so it's more efficient to do it
1838                  * here.
1839                  */
1840                 if (force_nonblock && !(req->flags & REQ_F_NOWAIT) &&
1841                     (req->flags & REQ_F_ISREG) &&
1842                     ret2 > 0 && ret2 < io_size)
1843                         ret2 = -EAGAIN;
1844                 /* Catch -EAGAIN return for forced non-blocking submission */
1845                 if (!force_nonblock || ret2 != -EAGAIN) {
1846                         kiocb_done(kiocb, ret2, nxt, req->in_async);
1847                 } else {
1848 copy_iov:
1849                         ret = io_setup_async_rw(req, io_size, iovec,
1850                                                 inline_vecs, &iter);
1851                         if (ret)
1852                                 goto out_free;
1853                         return -EAGAIN;
1854                 }
1855         }
1856 out_free:
1857         if (!io_wq_current_is_worker())
1858                 kfree(iovec);
1859         return ret;
1860 }
1861
1862 static int io_write_prep(struct io_kiocb *req, struct iovec **iovec,
1863                          struct iov_iter *iter, bool force_nonblock)
1864 {
1865         ssize_t ret;
1866
1867         ret = io_prep_rw(req, force_nonblock);
1868         if (ret)
1869                 return ret;
1870
1871         if (unlikely(!(req->file->f_mode & FMODE_WRITE)))
1872                 return -EBADF;
1873
1874         return io_import_iovec(WRITE, req, iovec, iter);
1875 }
1876
1877 static int io_write(struct io_kiocb *req, struct io_kiocb **nxt,
1878                     bool force_nonblock)
1879 {
1880         struct iovec inline_vecs[UIO_FASTIOV], *iovec = inline_vecs;
1881         struct kiocb *kiocb = &req->rw;
1882         struct iov_iter iter;
1883         struct file *file;
1884         size_t iov_count;
1885         ssize_t ret, io_size;
1886
1887         if (!req->io) {
1888                 ret = io_write_prep(req, &iovec, &iter, force_nonblock);
1889                 if (ret < 0)
1890                         return ret;
1891         } else {
1892                 ret = io_import_iovec(WRITE, req, &iovec, &iter);
1893                 if (ret < 0)
1894                         return ret;
1895         }
1896
1897         file = kiocb->ki_filp;
1898         io_size = ret;
1899         if (req->flags & REQ_F_LINK)
1900                 req->result = io_size;
1901
1902         /*
1903          * If the file doesn't support async, mark it as REQ_F_MUST_PUNT so
1904          * we know to async punt it even if it was opened O_NONBLOCK
1905          */
1906         if (force_nonblock && !io_file_supports_async(req->file)) {
1907                 req->flags |= REQ_F_MUST_PUNT;
1908                 goto copy_iov;
1909         }
1910
1911         /* file path doesn't support NOWAIT for non-direct_IO */
1912         if (force_nonblock && !(kiocb->ki_flags & IOCB_DIRECT) &&
1913             (req->flags & REQ_F_ISREG))
1914                 goto copy_iov;
1915
1916         iov_count = iov_iter_count(&iter);
1917         ret = rw_verify_area(WRITE, file, &kiocb->ki_pos, iov_count);
1918         if (!ret) {
1919                 ssize_t ret2;
1920
1921                 /*
1922                  * Open-code file_start_write here to grab freeze protection,
1923                  * which will be released by another thread in
1924                  * io_complete_rw().  Fool lockdep by telling it the lock got
1925                  * released so that it doesn't complain about the held lock when
1926                  * we return to userspace.
1927                  */
1928                 if (req->flags & REQ_F_ISREG) {
1929                         __sb_start_write(file_inode(file)->i_sb,
1930                                                 SB_FREEZE_WRITE, true);
1931                         __sb_writers_release(file_inode(file)->i_sb,
1932                                                 SB_FREEZE_WRITE);
1933                 }
1934                 kiocb->ki_flags |= IOCB_WRITE;
1935
1936                 if (file->f_op->write_iter)
1937                         ret2 = call_write_iter(file, kiocb, &iter);
1938                 else
1939                         ret2 = loop_rw_iter(WRITE, file, kiocb, &iter);
1940                 if (!force_nonblock || ret2 != -EAGAIN) {
1941                         kiocb_done(kiocb, ret2, nxt, req->in_async);
1942                 } else {
1943 copy_iov:
1944                         ret = io_setup_async_rw(req, io_size, iovec,
1945                                                 inline_vecs, &iter);
1946                         if (ret)
1947                                 goto out_free;
1948                         return -EAGAIN;
1949                 }
1950         }
1951 out_free:
1952         if (!io_wq_current_is_worker())
1953                 kfree(iovec);
1954         return ret;
1955 }
1956
1957 /*
1958  * IORING_OP_NOP just posts a completion event, nothing else.
1959  */
1960 static int io_nop(struct io_kiocb *req)
1961 {
1962         struct io_ring_ctx *ctx = req->ctx;
1963
1964         if (unlikely(ctx->flags & IORING_SETUP_IOPOLL))
1965                 return -EINVAL;
1966
1967         io_cqring_add_event(req, 0);
1968         io_put_req(req);
1969         return 0;
1970 }
1971
1972 static int io_prep_fsync(struct io_kiocb *req)
1973 {
1974         const struct io_uring_sqe *sqe = req->sqe;
1975         struct io_ring_ctx *ctx = req->ctx;
1976
1977         if (req->flags & REQ_F_PREPPED)
1978                 return 0;
1979         if (!req->file)
1980                 return -EBADF;
1981
1982         if (unlikely(ctx->flags & IORING_SETUP_IOPOLL))
1983                 return -EINVAL;
1984         if (unlikely(sqe->addr || sqe->ioprio || sqe->buf_index))
1985                 return -EINVAL;
1986
1987         req->sync.flags = READ_ONCE(sqe->fsync_flags);
1988         if (unlikely(req->sync.flags & ~IORING_FSYNC_DATASYNC))
1989                 return -EINVAL;
1990
1991         req->sync.off = READ_ONCE(sqe->off);
1992         req->sync.len = READ_ONCE(sqe->len);
1993         req->flags |= REQ_F_PREPPED;
1994         return 0;
1995 }
1996
1997 static bool io_req_cancelled(struct io_kiocb *req)
1998 {
1999         if (req->work.flags & IO_WQ_WORK_CANCEL) {
2000                 req_set_fail_links(req);
2001                 io_cqring_add_event(req, -ECANCELED);
2002                 io_put_req(req);
2003                 return true;
2004         }
2005
2006         return false;
2007 }
2008
2009 static void io_fsync_finish(struct io_wq_work **workptr)
2010 {
2011         struct io_kiocb *req = container_of(*workptr, struct io_kiocb, work);
2012         loff_t end = req->sync.off + req->sync.len;
2013         struct io_kiocb *nxt = NULL;
2014         int ret;
2015
2016         if (io_req_cancelled(req))
2017                 return;
2018
2019         ret = vfs_fsync_range(req->rw.ki_filp, req->sync.off,
2020                                 end > 0 ? end : LLONG_MAX,
2021                                 req->sync.flags & IORING_FSYNC_DATASYNC);
2022         if (ret < 0)
2023                 req_set_fail_links(req);
2024         io_cqring_add_event(req, ret);
2025         io_put_req_find_next(req, &nxt);
2026         if (nxt)
2027                 *workptr = &nxt->work;
2028 }
2029
2030 static int io_fsync(struct io_kiocb *req, struct io_kiocb **nxt,
2031                     bool force_nonblock)
2032 {
2033         struct io_wq_work *work, *old_work;
2034         int ret;
2035
2036         ret = io_prep_fsync(req);
2037         if (ret)
2038                 return ret;
2039
2040         /* fsync always requires a blocking context */
2041         if (force_nonblock) {
2042                 io_put_req(req);
2043                 req->work.func = io_fsync_finish;
2044                 return -EAGAIN;
2045         }
2046
2047         work = old_work = &req->work;
2048         io_fsync_finish(&work);
2049         if (work && work != old_work)
2050                 *nxt = container_of(work, struct io_kiocb, work);
2051         return 0;
2052 }
2053
2054 static int io_prep_sfr(struct io_kiocb *req)
2055 {
2056         const struct io_uring_sqe *sqe = req->sqe;
2057         struct io_ring_ctx *ctx = req->ctx;
2058
2059         if (req->flags & REQ_F_PREPPED)
2060                 return 0;
2061         if (!req->file)
2062                 return -EBADF;
2063
2064         if (unlikely(ctx->flags & IORING_SETUP_IOPOLL))
2065                 return -EINVAL;
2066         if (unlikely(sqe->addr || sqe->ioprio || sqe->buf_index))
2067                 return -EINVAL;
2068
2069         req->sync.off = READ_ONCE(sqe->off);
2070         req->sync.len = READ_ONCE(sqe->len);
2071         req->sync.flags = READ_ONCE(sqe->sync_range_flags);
2072         req->flags |= REQ_F_PREPPED;
2073         return 0;
2074 }
2075
2076 static void io_sync_file_range_finish(struct io_wq_work **workptr)
2077 {
2078         struct io_kiocb *req = container_of(*workptr, struct io_kiocb, work);
2079         struct io_kiocb *nxt = NULL;
2080         int ret;
2081
2082         if (io_req_cancelled(req))
2083                 return;
2084
2085         ret = sync_file_range(req->rw.ki_filp, req->sync.off, req->sync.len,
2086                                 req->sync.flags);
2087         if (ret < 0)
2088                 req_set_fail_links(req);
2089         io_cqring_add_event(req, ret);
2090         io_put_req_find_next(req, &nxt);
2091         if (nxt)
2092                 *workptr = &nxt->work;
2093 }
2094
2095 static int io_sync_file_range(struct io_kiocb *req, struct io_kiocb **nxt,
2096                               bool force_nonblock)
2097 {
2098         struct io_wq_work *work, *old_work;
2099         int ret;
2100
2101         ret = io_prep_sfr(req);
2102         if (ret)
2103                 return ret;
2104
2105         /* sync_file_range always requires a blocking context */
2106         if (force_nonblock) {
2107                 io_put_req(req);
2108                 req->work.func = io_sync_file_range_finish;
2109                 return -EAGAIN;
2110         }
2111
2112         work = old_work = &req->work;
2113         io_sync_file_range_finish(&work);
2114         if (work && work != old_work)
2115                 *nxt = container_of(work, struct io_kiocb, work);
2116         return 0;
2117 }
2118
2119 #if defined(CONFIG_NET)
2120 static void io_sendrecv_async(struct io_wq_work **workptr)
2121 {
2122         struct io_kiocb *req = container_of(*workptr, struct io_kiocb, work);
2123         struct iovec *iov = NULL;
2124
2125         if (req->io->rw.iov != req->io->rw.fast_iov)
2126                 iov = req->io->msg.iov;
2127         io_wq_submit_work(workptr);
2128         kfree(iov);
2129 }
2130 #endif
2131
2132 static int io_sendmsg_prep(struct io_kiocb *req, struct io_async_ctx *io)
2133 {
2134 #if defined(CONFIG_NET)
2135         const struct io_uring_sqe *sqe = req->sqe;
2136         struct user_msghdr __user *msg;
2137         unsigned flags;
2138
2139         flags = READ_ONCE(sqe->msg_flags);
2140         msg = (struct user_msghdr __user *)(unsigned long) READ_ONCE(sqe->addr);
2141         io->msg.iov = io->msg.fast_iov;
2142         return sendmsg_copy_msghdr(&io->msg.msg, msg, flags, &io->msg.iov);
2143 #else
2144         return 0;
2145 #endif
2146 }
2147
2148 static int io_sendmsg(struct io_kiocb *req, struct io_kiocb **nxt,
2149                       bool force_nonblock)
2150 {
2151 #if defined(CONFIG_NET)
2152         const struct io_uring_sqe *sqe = req->sqe;
2153         struct io_async_msghdr *kmsg = NULL;
2154         struct socket *sock;
2155         int ret;
2156
2157         if (unlikely(req->ctx->flags & IORING_SETUP_IOPOLL))
2158                 return -EINVAL;
2159
2160         sock = sock_from_file(req->file, &ret);
2161         if (sock) {
2162                 struct io_async_ctx io;
2163                 struct sockaddr_storage addr;
2164                 unsigned flags;
2165
2166                 flags = READ_ONCE(sqe->msg_flags);
2167                 if (flags & MSG_DONTWAIT)
2168                         req->flags |= REQ_F_NOWAIT;
2169                 else if (force_nonblock)
2170                         flags |= MSG_DONTWAIT;
2171
2172                 if (req->io) {
2173                         kmsg = &req->io->msg;
2174                         kmsg->msg.msg_name = &addr;
2175                         /* if iov is set, it's allocated already */
2176                         if (!kmsg->iov)
2177                                 kmsg->iov = kmsg->fast_iov;
2178                         kmsg->msg.msg_iter.iov = kmsg->iov;
2179                 } else {
2180                         kmsg = &io.msg;
2181                         kmsg->msg.msg_name = &addr;
2182                         ret = io_sendmsg_prep(req, &io);
2183                         if (ret)
2184                                 goto out;
2185                 }
2186
2187                 ret = __sys_sendmsg_sock(sock, &kmsg->msg, flags);
2188                 if (force_nonblock && ret == -EAGAIN) {
2189                         if (req->io)
2190                                 return -EAGAIN;
2191                         if (io_alloc_async_ctx(req))
2192                                 return -ENOMEM;
2193                         memcpy(&req->io->msg, &io.msg, sizeof(io.msg));
2194                         req->work.func = io_sendrecv_async;
2195                         return -EAGAIN;
2196                 }
2197                 if (ret == -ERESTARTSYS)
2198                         ret = -EINTR;
2199         }
2200
2201 out:
2202         if (!io_wq_current_is_worker() && kmsg && kmsg->iov != kmsg->fast_iov)
2203                 kfree(kmsg->iov);
2204         io_cqring_add_event(req, ret);
2205         if (ret < 0)
2206                 req_set_fail_links(req);
2207         io_put_req_find_next(req, nxt);
2208         return 0;
2209 #else
2210         return -EOPNOTSUPP;
2211 #endif
2212 }
2213
2214 static int io_recvmsg_prep(struct io_kiocb *req, struct io_async_ctx *io)
2215 {
2216 #if defined(CONFIG_NET)
2217         const struct io_uring_sqe *sqe = req->sqe;
2218         struct user_msghdr __user *msg;
2219         unsigned flags;
2220
2221         flags = READ_ONCE(sqe->msg_flags);
2222         msg = (struct user_msghdr __user *)(unsigned long) READ_ONCE(sqe->addr);
2223         io->msg.iov = io->msg.fast_iov;
2224         return recvmsg_copy_msghdr(&io->msg.msg, msg, flags, &io->msg.uaddr,
2225                                         &io->msg.iov);
2226 #else
2227         return 0;
2228 #endif
2229 }
2230
2231 static int io_recvmsg(struct io_kiocb *req, struct io_kiocb **nxt,
2232                       bool force_nonblock)
2233 {
2234 #if defined(CONFIG_NET)
2235         const struct io_uring_sqe *sqe = req->sqe;
2236         struct io_async_msghdr *kmsg = NULL;
2237         struct socket *sock;
2238         int ret;
2239
2240         if (unlikely(req->ctx->flags & IORING_SETUP_IOPOLL))
2241                 return -EINVAL;
2242
2243         sock = sock_from_file(req->file, &ret);
2244         if (sock) {
2245                 struct user_msghdr __user *msg;
2246                 struct io_async_ctx io;
2247                 struct sockaddr_storage addr;
2248                 unsigned flags;
2249
2250                 flags = READ_ONCE(sqe->msg_flags);
2251                 if (flags & MSG_DONTWAIT)
2252                         req->flags |= REQ_F_NOWAIT;
2253                 else if (force_nonblock)
2254                         flags |= MSG_DONTWAIT;
2255
2256                 msg = (struct user_msghdr __user *) (unsigned long)
2257                         READ_ONCE(sqe->addr);
2258                 if (req->io) {
2259                         kmsg = &req->io->msg;
2260                         kmsg->msg.msg_name = &addr;
2261                         /* if iov is set, it's allocated already */
2262                         if (!kmsg->iov)
2263                                 kmsg->iov = kmsg->fast_iov;
2264                         kmsg->msg.msg_iter.iov = kmsg->iov;
2265                 } else {
2266                         kmsg = &io.msg;
2267                         kmsg->msg.msg_name = &addr;
2268                         ret = io_recvmsg_prep(req, &io);
2269                         if (ret)
2270                                 goto out;
2271                 }
2272
2273                 ret = __sys_recvmsg_sock(sock, &kmsg->msg, msg, kmsg->uaddr, flags);
2274                 if (force_nonblock && ret == -EAGAIN) {
2275                         if (req->io)
2276                                 return -EAGAIN;
2277                         if (io_alloc_async_ctx(req))
2278                                 return -ENOMEM;
2279                         memcpy(&req->io->msg, &io.msg, sizeof(io.msg));
2280                         req->work.func = io_sendrecv_async;
2281                         return -EAGAIN;
2282                 }
2283                 if (ret == -ERESTARTSYS)
2284                         ret = -EINTR;
2285         }
2286
2287 out:
2288         if (!io_wq_current_is_worker() && kmsg && kmsg->iov != kmsg->fast_iov)
2289                 kfree(kmsg->iov);
2290         io_cqring_add_event(req, ret);
2291         if (ret < 0)
2292                 req_set_fail_links(req);
2293         io_put_req_find_next(req, nxt);
2294         return 0;
2295 #else
2296         return -EOPNOTSUPP;
2297 #endif
2298 }
2299
2300 static int io_accept_prep(struct io_kiocb *req)
2301 {
2302 #if defined(CONFIG_NET)
2303         const struct io_uring_sqe *sqe = req->sqe;
2304         struct io_accept *accept = &req->accept;
2305
2306         if (req->flags & REQ_F_PREPPED)
2307                 return 0;
2308
2309         if (unlikely(req->ctx->flags & (IORING_SETUP_IOPOLL|IORING_SETUP_SQPOLL)))
2310                 return -EINVAL;
2311         if (sqe->ioprio || sqe->len || sqe->buf_index)
2312                 return -EINVAL;
2313
2314         accept->addr = (struct sockaddr __user *)
2315                                 (unsigned long) READ_ONCE(sqe->addr);
2316         accept->addr_len = (int __user *) (unsigned long) READ_ONCE(sqe->addr2);
2317         accept->flags = READ_ONCE(sqe->accept_flags);
2318         req->flags |= REQ_F_PREPPED;
2319         return 0;
2320 #else
2321         return -EOPNOTSUPP;
2322 #endif
2323 }
2324
2325 #if defined(CONFIG_NET)
2326 static int __io_accept(struct io_kiocb *req, struct io_kiocb **nxt,
2327                        bool force_nonblock)
2328 {
2329         struct io_accept *accept = &req->accept;
2330         unsigned file_flags;
2331         int ret;
2332
2333         file_flags = force_nonblock ? O_NONBLOCK : 0;
2334         ret = __sys_accept4_file(req->file, file_flags, accept->addr,
2335                                         accept->addr_len, accept->flags);
2336         if (ret == -EAGAIN && force_nonblock)
2337                 return -EAGAIN;
2338         if (ret == -ERESTARTSYS)
2339                 ret = -EINTR;
2340         if (ret < 0)
2341                 req_set_fail_links(req);
2342         io_cqring_add_event(req, ret);
2343         io_put_req_find_next(req, nxt);
2344         return 0;
2345 }
2346
2347 static void io_accept_finish(struct io_wq_work **workptr)
2348 {
2349         struct io_kiocb *req = container_of(*workptr, struct io_kiocb, work);
2350         struct io_kiocb *nxt = NULL;
2351
2352         if (io_req_cancelled(req))
2353                 return;
2354         __io_accept(req, &nxt, false);
2355         if (nxt)
2356                 *workptr = &nxt->work;
2357 }
2358 #endif
2359
2360 static int io_accept(struct io_kiocb *req, struct io_kiocb **nxt,
2361                      bool force_nonblock)
2362 {
2363 #if defined(CONFIG_NET)
2364         int ret;
2365
2366         ret = io_accept_prep(req);
2367         if (ret)
2368                 return ret;
2369
2370         ret = __io_accept(req, nxt, force_nonblock);
2371         if (ret == -EAGAIN && force_nonblock) {
2372                 req->work.func = io_accept_finish;
2373                 req->work.flags |= IO_WQ_WORK_NEEDS_FILES;
2374                 io_put_req(req);
2375                 return -EAGAIN;
2376         }
2377         return 0;
2378 #else
2379         return -EOPNOTSUPP;
2380 #endif
2381 }
2382
2383 static int io_connect_prep(struct io_kiocb *req, struct io_async_ctx *io)
2384 {
2385 #if defined(CONFIG_NET)
2386         const struct io_uring_sqe *sqe = req->sqe;
2387         struct sockaddr __user *addr;
2388         int addr_len;
2389
2390         addr = (struct sockaddr __user *) (unsigned long) READ_ONCE(sqe->addr);
2391         addr_len = READ_ONCE(sqe->addr2);
2392         return move_addr_to_kernel(addr, addr_len, &io->connect.address);
2393 #else
2394         return 0;
2395 #endif
2396 }
2397
2398 static int io_connect(struct io_kiocb *req, struct io_kiocb **nxt,
2399                       bool force_nonblock)
2400 {
2401 #if defined(CONFIG_NET)
2402         const struct io_uring_sqe *sqe = req->sqe;
2403         struct io_async_ctx __io, *io;
2404         unsigned file_flags;
2405         int addr_len, ret;
2406
2407         if (unlikely(req->ctx->flags & (IORING_SETUP_IOPOLL|IORING_SETUP_SQPOLL)))
2408                 return -EINVAL;
2409         if (sqe->ioprio || sqe->len || sqe->buf_index || sqe->rw_flags)
2410                 return -EINVAL;
2411
2412         addr_len = READ_ONCE(sqe->addr2);
2413         file_flags = force_nonblock ? O_NONBLOCK : 0;
2414
2415         if (req->io) {
2416                 io = req->io;
2417         } else {
2418                 ret = io_connect_prep(req, &__io);
2419                 if (ret)
2420                         goto out;
2421                 io = &__io;
2422         }
2423
2424         ret = __sys_connect_file(req->file, &io->connect.address, addr_len,
2425                                         file_flags);
2426         if ((ret == -EAGAIN || ret == -EINPROGRESS) && force_nonblock) {
2427                 if (req->io)
2428                         return -EAGAIN;
2429                 if (io_alloc_async_ctx(req)) {
2430                         ret = -ENOMEM;
2431                         goto out;
2432                 }
2433                 memcpy(&req->io->connect, &__io.connect, sizeof(__io.connect));
2434                 return -EAGAIN;
2435         }
2436         if (ret == -ERESTARTSYS)
2437                 ret = -EINTR;
2438 out:
2439         if (ret < 0)
2440                 req_set_fail_links(req);
2441         io_cqring_add_event(req, ret);
2442         io_put_req_find_next(req, nxt);
2443         return 0;
2444 #else
2445         return -EOPNOTSUPP;
2446 #endif
2447 }
2448
2449 static void io_poll_remove_one(struct io_kiocb *req)
2450 {
2451         struct io_poll_iocb *poll = &req->poll;
2452
2453         spin_lock(&poll->head->lock);
2454         WRITE_ONCE(poll->canceled, true);
2455         if (!list_empty(&poll->wait.entry)) {
2456                 list_del_init(&poll->wait.entry);
2457                 io_queue_async_work(req);
2458         }
2459         spin_unlock(&poll->head->lock);
2460         hash_del(&req->hash_node);
2461 }
2462
2463 static void io_poll_remove_all(struct io_ring_ctx *ctx)
2464 {
2465         struct hlist_node *tmp;
2466         struct io_kiocb *req;
2467         int i;
2468
2469         spin_lock_irq(&ctx->completion_lock);
2470         for (i = 0; i < (1U << ctx->cancel_hash_bits); i++) {
2471                 struct hlist_head *list;
2472
2473                 list = &ctx->cancel_hash[i];
2474                 hlist_for_each_entry_safe(req, tmp, list, hash_node)
2475                         io_poll_remove_one(req);
2476         }
2477         spin_unlock_irq(&ctx->completion_lock);
2478 }
2479
2480 static int io_poll_cancel(struct io_ring_ctx *ctx, __u64 sqe_addr)
2481 {
2482         struct hlist_head *list;
2483         struct io_kiocb *req;
2484
2485         list = &ctx->cancel_hash[hash_long(sqe_addr, ctx->cancel_hash_bits)];
2486         hlist_for_each_entry(req, list, hash_node) {
2487                 if (sqe_addr == req->user_data) {
2488                         io_poll_remove_one(req);
2489                         return 0;
2490                 }
2491         }
2492
2493         return -ENOENT;
2494 }
2495
2496 static int io_poll_remove_prep(struct io_kiocb *req)
2497 {
2498         const struct io_uring_sqe *sqe = req->sqe;
2499
2500         if (req->flags & REQ_F_PREPPED)
2501                 return 0;
2502         if (unlikely(req->ctx->flags & IORING_SETUP_IOPOLL))
2503                 return -EINVAL;
2504         if (sqe->ioprio || sqe->off || sqe->len || sqe->buf_index ||
2505             sqe->poll_events)
2506                 return -EINVAL;
2507
2508         req->poll.addr = READ_ONCE(sqe->addr);
2509         req->flags |= REQ_F_PREPPED;
2510         return 0;
2511 }
2512
2513 /*
2514  * Find a running poll command that matches one specified in sqe->addr,
2515  * and remove it if found.
2516  */
2517 static int io_poll_remove(struct io_kiocb *req)
2518 {
2519         struct io_ring_ctx *ctx = req->ctx;
2520         u64 addr;
2521         int ret;
2522
2523         ret = io_poll_remove_prep(req);
2524         if (ret)
2525                 return ret;
2526
2527         addr = req->poll.addr;
2528         spin_lock_irq(&ctx->completion_lock);
2529         ret = io_poll_cancel(ctx, addr);
2530         spin_unlock_irq(&ctx->completion_lock);
2531
2532         io_cqring_add_event(req, ret);
2533         if (ret < 0)
2534                 req_set_fail_links(req);
2535         io_put_req(req);
2536         return 0;
2537 }
2538
2539 static void io_poll_complete(struct io_kiocb *req, __poll_t mask, int error)
2540 {
2541         struct io_ring_ctx *ctx = req->ctx;
2542
2543         req->poll.done = true;
2544         if (error)
2545                 io_cqring_fill_event(req, error);
2546         else
2547                 io_cqring_fill_event(req, mangle_poll(mask));
2548         io_commit_cqring(ctx);
2549 }
2550
2551 static void io_poll_complete_work(struct io_wq_work **workptr)
2552 {
2553         struct io_wq_work *work = *workptr;
2554         struct io_kiocb *req = container_of(work, struct io_kiocb, work);
2555         struct io_poll_iocb *poll = &req->poll;
2556         struct poll_table_struct pt = { ._key = poll->events };
2557         struct io_ring_ctx *ctx = req->ctx;
2558         struct io_kiocb *nxt = NULL;
2559         __poll_t mask = 0;
2560         int ret = 0;
2561
2562         if (work->flags & IO_WQ_WORK_CANCEL) {
2563                 WRITE_ONCE(poll->canceled, true);
2564                 ret = -ECANCELED;
2565         } else if (READ_ONCE(poll->canceled)) {
2566                 ret = -ECANCELED;
2567         }
2568
2569         if (ret != -ECANCELED)
2570                 mask = vfs_poll(poll->file, &pt) & poll->events;
2571
2572         /*
2573          * Note that ->ki_cancel callers also delete iocb from active_reqs after
2574          * calling ->ki_cancel.  We need the ctx_lock roundtrip here to
2575          * synchronize with them.  In the cancellation case the list_del_init
2576          * itself is not actually needed, but harmless so we keep it in to
2577          * avoid further branches in the fast path.
2578          */
2579         spin_lock_irq(&ctx->completion_lock);
2580         if (!mask && ret != -ECANCELED) {
2581                 add_wait_queue(poll->head, &poll->wait);
2582                 spin_unlock_irq(&ctx->completion_lock);
2583                 return;
2584         }
2585         hash_del(&req->hash_node);
2586         io_poll_complete(req, mask, ret);
2587         spin_unlock_irq(&ctx->completion_lock);
2588
2589         io_cqring_ev_posted(ctx);
2590
2591         if (ret < 0)
2592                 req_set_fail_links(req);
2593         io_put_req_find_next(req, &nxt);
2594         if (nxt)
2595                 *workptr = &nxt->work;
2596 }
2597
2598 static int io_poll_wake(struct wait_queue_entry *wait, unsigned mode, int sync,
2599                         void *key)
2600 {
2601         struct io_poll_iocb *poll = wait->private;
2602         struct io_kiocb *req = container_of(poll, struct io_kiocb, poll);
2603         struct io_ring_ctx *ctx = req->ctx;
2604         __poll_t mask = key_to_poll(key);
2605         unsigned long flags;
2606
2607         /* for instances that support it check for an event match first: */
2608         if (mask && !(mask & poll->events))
2609                 return 0;
2610
2611         list_del_init(&poll->wait.entry);
2612
2613         /*
2614          * Run completion inline if we can. We're using trylock here because
2615          * we are violating the completion_lock -> poll wq lock ordering.
2616          * If we have a link timeout we're going to need the completion_lock
2617          * for finalizing the request, mark us as having grabbed that already.
2618          */
2619         if (mask && spin_trylock_irqsave(&ctx->completion_lock, flags)) {
2620                 hash_del(&req->hash_node);
2621                 io_poll_complete(req, mask, 0);
2622                 req->flags |= REQ_F_COMP_LOCKED;
2623                 io_put_req(req);
2624                 spin_unlock_irqrestore(&ctx->completion_lock, flags);
2625
2626                 io_cqring_ev_posted(ctx);
2627         } else {
2628                 io_queue_async_work(req);
2629         }
2630
2631         return 1;
2632 }
2633
2634 struct io_poll_table {
2635         struct poll_table_struct pt;
2636         struct io_kiocb *req;
2637         int error;
2638 };
2639
2640 static void io_poll_queue_proc(struct file *file, struct wait_queue_head *head,
2641                                struct poll_table_struct *p)
2642 {
2643         struct io_poll_table *pt = container_of(p, struct io_poll_table, pt);
2644
2645         if (unlikely(pt->req->poll.head)) {
2646                 pt->error = -EINVAL;
2647                 return;
2648         }
2649
2650         pt->error = 0;
2651         pt->req->poll.head = head;
2652         add_wait_queue(head, &pt->req->poll.wait);
2653 }
2654
2655 static void io_poll_req_insert(struct io_kiocb *req)
2656 {
2657         struct io_ring_ctx *ctx = req->ctx;
2658         struct hlist_head *list;
2659
2660         list = &ctx->cancel_hash[hash_long(req->user_data, ctx->cancel_hash_bits)];
2661         hlist_add_head(&req->hash_node, list);
2662 }
2663
2664 static int io_poll_add_prep(struct io_kiocb *req)
2665 {
2666         const struct io_uring_sqe *sqe = req->sqe;
2667         struct io_poll_iocb *poll = &req->poll;
2668         u16 events;
2669
2670         if (req->flags & REQ_F_PREPPED)
2671                 return 0;
2672         if (unlikely(req->ctx->flags & IORING_SETUP_IOPOLL))
2673                 return -EINVAL;
2674         if (sqe->addr || sqe->ioprio || sqe->off || sqe->len || sqe->buf_index)
2675                 return -EINVAL;
2676         if (!poll->file)
2677                 return -EBADF;
2678
2679         req->flags |= REQ_F_PREPPED;
2680         events = READ_ONCE(sqe->poll_events);
2681         poll->events = demangle_poll(events) | EPOLLERR | EPOLLHUP;
2682         return 0;
2683 }
2684
2685 static int io_poll_add(struct io_kiocb *req, struct io_kiocb **nxt)
2686 {
2687         struct io_poll_iocb *poll = &req->poll;
2688         struct io_ring_ctx *ctx = req->ctx;
2689         struct io_poll_table ipt;
2690         bool cancel = false;
2691         __poll_t mask;
2692         int ret;
2693
2694         ret = io_poll_add_prep(req);
2695         if (ret)
2696                 return ret;
2697
2698         INIT_IO_WORK(&req->work, io_poll_complete_work);
2699         INIT_HLIST_NODE(&req->hash_node);
2700
2701         poll->head = NULL;
2702         poll->done = false;
2703         poll->canceled = false;
2704
2705         ipt.pt._qproc = io_poll_queue_proc;
2706         ipt.pt._key = poll->events;
2707         ipt.req = req;
2708         ipt.error = -EINVAL; /* same as no support for IOCB_CMD_POLL */
2709
2710         /* initialized the list so that we can do list_empty checks */
2711         INIT_LIST_HEAD(&poll->wait.entry);
2712         init_waitqueue_func_entry(&poll->wait, io_poll_wake);
2713         poll->wait.private = poll;
2714
2715         INIT_LIST_HEAD(&req->list);
2716
2717         mask = vfs_poll(poll->file, &ipt.pt) & poll->events;
2718
2719         spin_lock_irq(&ctx->completion_lock);
2720         if (likely(poll->head)) {
2721                 spin_lock(&poll->head->lock);
2722                 if (unlikely(list_empty(&poll->wait.entry))) {
2723                         if (ipt.error)
2724                                 cancel = true;
2725                         ipt.error = 0;
2726                         mask = 0;
2727                 }
2728                 if (mask || ipt.error)
2729                         list_del_init(&poll->wait.entry);
2730                 else if (cancel)
2731                         WRITE_ONCE(poll->canceled, true);
2732                 else if (!poll->done) /* actually waiting for an event */
2733                         io_poll_req_insert(req);
2734                 spin_unlock(&poll->head->lock);
2735         }
2736         if (mask) { /* no async, we'd stolen it */
2737                 ipt.error = 0;
2738                 io_poll_complete(req, mask, 0);
2739         }
2740         spin_unlock_irq(&ctx->completion_lock);
2741
2742         if (mask) {
2743                 io_cqring_ev_posted(ctx);
2744                 io_put_req_find_next(req, nxt);
2745         }
2746         return ipt.error;
2747 }
2748
2749 static enum hrtimer_restart io_timeout_fn(struct hrtimer *timer)
2750 {
2751         struct io_timeout_data *data = container_of(timer,
2752                                                 struct io_timeout_data, timer);
2753         struct io_kiocb *req = data->req;
2754         struct io_ring_ctx *ctx = req->ctx;
2755         unsigned long flags;
2756
2757         atomic_inc(&ctx->cq_timeouts);
2758
2759         spin_lock_irqsave(&ctx->completion_lock, flags);
2760         /*
2761          * We could be racing with timeout deletion. If the list is empty,
2762          * then timeout lookup already found it and will be handling it.
2763          */
2764         if (!list_empty(&req->list)) {
2765                 struct io_kiocb *prev;
2766
2767                 /*
2768                  * Adjust the reqs sequence before the current one because it
2769                  * will consume a slot in the cq_ring and the cq_tail
2770                  * pointer will be increased, otherwise other timeout reqs may
2771                  * return in advance without waiting for enough wait_nr.
2772                  */
2773                 prev = req;
2774                 list_for_each_entry_continue_reverse(prev, &ctx->timeout_list, list)
2775                         prev->sequence++;
2776                 list_del_init(&req->list);
2777         }
2778
2779         io_cqring_fill_event(req, -ETIME);
2780         io_commit_cqring(ctx);
2781         spin_unlock_irqrestore(&ctx->completion_lock, flags);
2782
2783         io_cqring_ev_posted(ctx);
2784         req_set_fail_links(req);
2785         io_put_req(req);
2786         return HRTIMER_NORESTART;
2787 }
2788
2789 static int io_timeout_cancel(struct io_ring_ctx *ctx, __u64 user_data)
2790 {
2791         struct io_kiocb *req;
2792         int ret = -ENOENT;
2793
2794         list_for_each_entry(req, &ctx->timeout_list, list) {
2795                 if (user_data == req->user_data) {
2796                         list_del_init(&req->list);
2797                         ret = 0;
2798                         break;
2799                 }
2800         }
2801
2802         if (ret == -ENOENT)
2803                 return ret;
2804
2805         ret = hrtimer_try_to_cancel(&req->io->timeout.timer);
2806         if (ret == -1)
2807                 return -EALREADY;
2808
2809         req_set_fail_links(req);
2810         io_cqring_fill_event(req, -ECANCELED);
2811         io_put_req(req);
2812         return 0;
2813 }
2814
2815 /*
2816  * Remove or update an existing timeout command
2817  */
2818 static int io_timeout_remove(struct io_kiocb *req)
2819 {
2820         const struct io_uring_sqe *sqe = req->sqe;
2821         struct io_ring_ctx *ctx = req->ctx;
2822         unsigned flags;
2823         int ret;
2824
2825         if (unlikely(ctx->flags & IORING_SETUP_IOPOLL))
2826                 return -EINVAL;
2827         if (sqe->flags || sqe->ioprio || sqe->buf_index || sqe->len)
2828                 return -EINVAL;
2829         flags = READ_ONCE(sqe->timeout_flags);
2830         if (flags)
2831                 return -EINVAL;
2832
2833         spin_lock_irq(&ctx->completion_lock);
2834         ret = io_timeout_cancel(ctx, READ_ONCE(sqe->addr));
2835
2836         io_cqring_fill_event(req, ret);
2837         io_commit_cqring(ctx);
2838         spin_unlock_irq(&ctx->completion_lock);
2839         io_cqring_ev_posted(ctx);
2840         if (ret < 0)
2841                 req_set_fail_links(req);
2842         io_put_req(req);
2843         return 0;
2844 }
2845
2846 static int io_timeout_prep(struct io_kiocb *req, struct io_async_ctx *io,
2847                            bool is_timeout_link)
2848 {
2849         const struct io_uring_sqe *sqe = req->sqe;
2850         struct io_timeout_data *data;
2851         unsigned flags;
2852
2853         if (unlikely(req->ctx->flags & IORING_SETUP_IOPOLL))
2854                 return -EINVAL;
2855         if (sqe->ioprio || sqe->buf_index || sqe->len != 1)
2856                 return -EINVAL;
2857         if (sqe->off && is_timeout_link)
2858                 return -EINVAL;
2859         flags = READ_ONCE(sqe->timeout_flags);
2860         if (flags & ~IORING_TIMEOUT_ABS)
2861                 return -EINVAL;
2862
2863         data = &io->timeout;
2864         data->req = req;
2865         req->flags |= REQ_F_TIMEOUT;
2866
2867         if (get_timespec64(&data->ts, u64_to_user_ptr(sqe->addr)))
2868                 return -EFAULT;
2869
2870         if (flags & IORING_TIMEOUT_ABS)
2871                 data->mode = HRTIMER_MODE_ABS;
2872         else
2873                 data->mode = HRTIMER_MODE_REL;
2874
2875         hrtimer_init(&data->timer, CLOCK_MONOTONIC, data->mode);
2876         return 0;
2877 }
2878
2879 static int io_timeout(struct io_kiocb *req)
2880 {
2881         const struct io_uring_sqe *sqe = req->sqe;
2882         unsigned count;
2883         struct io_ring_ctx *ctx = req->ctx;
2884         struct io_timeout_data *data;
2885         struct list_head *entry;
2886         unsigned span = 0;
2887         int ret;
2888
2889         if (!req->io) {
2890                 if (io_alloc_async_ctx(req))
2891                         return -ENOMEM;
2892                 ret = io_timeout_prep(req, req->io, false);
2893                 if (ret)
2894                         return ret;
2895         }
2896         data = &req->io->timeout;
2897
2898         /*
2899          * sqe->off holds how many events that need to occur for this
2900          * timeout event to be satisfied. If it isn't set, then this is
2901          * a pure timeout request, sequence isn't used.
2902          */
2903         count = READ_ONCE(sqe->off);
2904         if (!count) {
2905                 req->flags |= REQ_F_TIMEOUT_NOSEQ;
2906                 spin_lock_irq(&ctx->completion_lock);
2907                 entry = ctx->timeout_list.prev;
2908                 goto add;
2909         }
2910
2911         req->sequence = ctx->cached_sq_head + count - 1;
2912         data->seq_offset = count;
2913
2914         /*
2915          * Insertion sort, ensuring the first entry in the list is always
2916          * the one we need first.
2917          */
2918         spin_lock_irq(&ctx->completion_lock);
2919         list_for_each_prev(entry, &ctx->timeout_list) {
2920                 struct io_kiocb *nxt = list_entry(entry, struct io_kiocb, list);
2921                 unsigned nxt_sq_head;
2922                 long long tmp, tmp_nxt;
2923                 u32 nxt_offset = nxt->io->timeout.seq_offset;
2924
2925                 if (nxt->flags & REQ_F_TIMEOUT_NOSEQ)
2926                         continue;
2927
2928                 /*
2929                  * Since cached_sq_head + count - 1 can overflow, use type long
2930                  * long to store it.
2931                  */
2932                 tmp = (long long)ctx->cached_sq_head + count - 1;
2933                 nxt_sq_head = nxt->sequence - nxt_offset + 1;
2934                 tmp_nxt = (long long)nxt_sq_head + nxt_offset - 1;
2935
2936                 /*
2937                  * cached_sq_head may overflow, and it will never overflow twice
2938                  * once there is some timeout req still be valid.
2939                  */
2940                 if (ctx->cached_sq_head < nxt_sq_head)
2941                         tmp += UINT_MAX;
2942
2943                 if (tmp > tmp_nxt)
2944                         break;
2945
2946                 /*
2947                  * Sequence of reqs after the insert one and itself should
2948                  * be adjusted because each timeout req consumes a slot.
2949                  */
2950                 span++;
2951                 nxt->sequence++;
2952         }
2953         req->sequence -= span;
2954 add:
2955         list_add(&req->list, entry);
2956         data->timer.function = io_timeout_fn;
2957         hrtimer_start(&data->timer, timespec64_to_ktime(data->ts), data->mode);
2958         spin_unlock_irq(&ctx->completion_lock);
2959         return 0;
2960 }
2961
2962 static bool io_cancel_cb(struct io_wq_work *work, void *data)
2963 {
2964         struct io_kiocb *req = container_of(work, struct io_kiocb, work);
2965
2966         return req->user_data == (unsigned long) data;
2967 }
2968
2969 static int io_async_cancel_one(struct io_ring_ctx *ctx, void *sqe_addr)
2970 {
2971         enum io_wq_cancel cancel_ret;
2972         int ret = 0;
2973
2974         cancel_ret = io_wq_cancel_cb(ctx->io_wq, io_cancel_cb, sqe_addr);
2975         switch (cancel_ret) {
2976         case IO_WQ_CANCEL_OK:
2977                 ret = 0;
2978                 break;
2979         case IO_WQ_CANCEL_RUNNING:
2980                 ret = -EALREADY;
2981                 break;
2982         case IO_WQ_CANCEL_NOTFOUND:
2983                 ret = -ENOENT;
2984                 break;
2985         }
2986
2987         return ret;
2988 }
2989
2990 static void io_async_find_and_cancel(struct io_ring_ctx *ctx,
2991                                      struct io_kiocb *req, __u64 sqe_addr,
2992                                      struct io_kiocb **nxt, int success_ret)
2993 {
2994         unsigned long flags;
2995         int ret;
2996
2997         ret = io_async_cancel_one(ctx, (void *) (unsigned long) sqe_addr);
2998         if (ret != -ENOENT) {
2999                 spin_lock_irqsave(&ctx->completion_lock, flags);
3000                 goto done;
3001         }
3002
3003         spin_lock_irqsave(&ctx->completion_lock, flags);
3004         ret = io_timeout_cancel(ctx, sqe_addr);
3005         if (ret != -ENOENT)
3006                 goto done;
3007         ret = io_poll_cancel(ctx, sqe_addr);
3008 done:
3009         if (!ret)
3010                 ret = success_ret;
3011         io_cqring_fill_event(req, ret);
3012         io_commit_cqring(ctx);
3013         spin_unlock_irqrestore(&ctx->completion_lock, flags);
3014         io_cqring_ev_posted(ctx);
3015
3016         if (ret < 0)
3017                 req_set_fail_links(req);
3018         io_put_req_find_next(req, nxt);
3019 }
3020
3021 static int io_async_cancel(struct io_kiocb *req, struct io_kiocb **nxt)
3022 {
3023         const struct io_uring_sqe *sqe = req->sqe;
3024         struct io_ring_ctx *ctx = req->ctx;
3025
3026         if (unlikely(ctx->flags & IORING_SETUP_IOPOLL))
3027                 return -EINVAL;
3028         if (sqe->flags || sqe->ioprio || sqe->off || sqe->len ||
3029             sqe->cancel_flags)
3030                 return -EINVAL;
3031
3032         io_async_find_and_cancel(ctx, req, READ_ONCE(sqe->addr), nxt, 0);
3033         return 0;
3034 }
3035
3036 static int io_req_defer_prep(struct io_kiocb *req)
3037 {
3038         struct iovec inline_vecs[UIO_FASTIOV], *iovec = inline_vecs;
3039         struct io_async_ctx *io = req->io;
3040         struct iov_iter iter;
3041         ssize_t ret;
3042
3043         switch (io->sqe.opcode) {
3044         case IORING_OP_READV:
3045         case IORING_OP_READ_FIXED:
3046                 /* ensure prep does right import */
3047                 req->io = NULL;
3048                 ret = io_read_prep(req, &iovec, &iter, true);
3049                 req->io = io;
3050                 if (ret < 0)
3051                         break;
3052                 io_req_map_rw(req, ret, iovec, inline_vecs, &iter);
3053                 ret = 0;
3054                 break;
3055         case IORING_OP_WRITEV:
3056         case IORING_OP_WRITE_FIXED:
3057                 /* ensure prep does right import */
3058                 req->io = NULL;
3059                 ret = io_write_prep(req, &iovec, &iter, true);
3060                 req->io = io;
3061                 if (ret < 0)
3062                         break;
3063                 io_req_map_rw(req, ret, iovec, inline_vecs, &iter);
3064                 ret = 0;
3065                 break;
3066         case IORING_OP_POLL_ADD:
3067                 ret = io_poll_add_prep(req);
3068                 break;
3069         case IORING_OP_POLL_REMOVE:
3070                 ret = io_poll_remove_prep(req);
3071                 break;
3072         case IORING_OP_FSYNC:
3073                 ret = io_prep_fsync(req);
3074                 break;
3075         case IORING_OP_SYNC_FILE_RANGE:
3076                 ret = io_prep_sfr(req);
3077                 break;
3078         case IORING_OP_SENDMSG:
3079                 ret = io_sendmsg_prep(req, io);
3080                 break;
3081         case IORING_OP_RECVMSG:
3082                 ret = io_recvmsg_prep(req, io);
3083                 break;
3084         case IORING_OP_CONNECT:
3085                 ret = io_connect_prep(req, io);
3086                 break;
3087         case IORING_OP_TIMEOUT:
3088                 ret = io_timeout_prep(req, io, false);
3089                 break;
3090         case IORING_OP_LINK_TIMEOUT:
3091                 ret = io_timeout_prep(req, io, true);
3092                 break;
3093         case IORING_OP_ACCEPT:
3094                 ret = io_accept_prep(req);
3095                 break;
3096         default:
3097                 ret = 0;
3098                 break;
3099         }
3100
3101         return ret;
3102 }
3103
3104 static int io_req_defer(struct io_kiocb *req)
3105 {
3106         struct io_ring_ctx *ctx = req->ctx;
3107         int ret;
3108
3109         /* Still need defer if there is pending req in defer list. */
3110         if (!req_need_defer(req) && list_empty(&ctx->defer_list))
3111                 return 0;
3112
3113         if (io_alloc_async_ctx(req))
3114                 return -EAGAIN;
3115
3116         ret = io_req_defer_prep(req);
3117         if (ret < 0)
3118                 return ret;
3119
3120         spin_lock_irq(&ctx->completion_lock);
3121         if (!req_need_defer(req) && list_empty(&ctx->defer_list)) {
3122                 spin_unlock_irq(&ctx->completion_lock);
3123                 return 0;
3124         }
3125
3126         trace_io_uring_defer(ctx, req, req->user_data);
3127         list_add_tail(&req->list, &ctx->defer_list);
3128         spin_unlock_irq(&ctx->completion_lock);
3129         return -EIOCBQUEUED;
3130 }
3131
3132 __attribute__((nonnull))
3133 static int io_issue_sqe(struct io_kiocb *req, struct io_kiocb **nxt,
3134                         bool force_nonblock)
3135 {
3136         int ret, opcode;
3137         struct io_ring_ctx *ctx = req->ctx;
3138
3139         opcode = READ_ONCE(req->sqe->opcode);
3140         switch (opcode) {
3141         case IORING_OP_NOP:
3142                 ret = io_nop(req);
3143                 break;
3144         case IORING_OP_READV:
3145                 if (unlikely(req->sqe->buf_index))
3146                         return -EINVAL;
3147                 ret = io_read(req, nxt, force_nonblock);
3148                 break;
3149         case IORING_OP_WRITEV:
3150                 if (unlikely(req->sqe->buf_index))
3151                         return -EINVAL;
3152                 ret = io_write(req, nxt, force_nonblock);
3153                 break;
3154         case IORING_OP_READ_FIXED:
3155                 ret = io_read(req, nxt, force_nonblock);
3156                 break;
3157         case IORING_OP_WRITE_FIXED:
3158                 ret = io_write(req, nxt, force_nonblock);
3159                 break;
3160         case IORING_OP_FSYNC:
3161                 ret = io_fsync(req, nxt, force_nonblock);
3162                 break;
3163         case IORING_OP_POLL_ADD:
3164                 ret = io_poll_add(req, nxt);
3165                 break;
3166         case IORING_OP_POLL_REMOVE:
3167                 ret = io_poll_remove(req);
3168                 break;
3169         case IORING_OP_SYNC_FILE_RANGE:
3170                 ret = io_sync_file_range(req, nxt, force_nonblock);
3171                 break;
3172         case IORING_OP_SENDMSG:
3173                 ret = io_sendmsg(req, nxt, force_nonblock);
3174                 break;
3175         case IORING_OP_RECVMSG:
3176                 ret = io_recvmsg(req, nxt, force_nonblock);
3177                 break;
3178         case IORING_OP_TIMEOUT:
3179                 ret = io_timeout(req);
3180                 break;
3181         case IORING_OP_TIMEOUT_REMOVE:
3182                 ret = io_timeout_remove(req);
3183                 break;
3184         case IORING_OP_ACCEPT:
3185                 ret = io_accept(req, nxt, force_nonblock);
3186                 break;
3187         case IORING_OP_CONNECT:
3188                 ret = io_connect(req, nxt, force_nonblock);
3189                 break;
3190         case IORING_OP_ASYNC_CANCEL:
3191                 ret = io_async_cancel(req, nxt);
3192                 break;
3193         default:
3194                 ret = -EINVAL;
3195                 break;
3196         }
3197
3198         if (ret)
3199                 return ret;
3200
3201         if (ctx->flags & IORING_SETUP_IOPOLL) {
3202                 if (req->result == -EAGAIN)
3203                         return -EAGAIN;
3204
3205                 io_iopoll_req_issued(req);
3206         }
3207
3208         return 0;
3209 }
3210
3211 static void io_link_work_cb(struct io_wq_work **workptr)
3212 {
3213         struct io_wq_work *work = *workptr;
3214         struct io_kiocb *link = work->data;
3215
3216         io_queue_linked_timeout(link);
3217         work->func = io_wq_submit_work;
3218 }
3219
3220 static void io_wq_submit_work(struct io_wq_work **workptr)
3221 {
3222         struct io_wq_work *work = *workptr;
3223         struct io_kiocb *req = container_of(work, struct io_kiocb, work);
3224         struct io_kiocb *nxt = NULL;
3225         int ret = 0;
3226
3227         /* Ensure we clear previously set non-block flag */
3228         req->rw.ki_flags &= ~IOCB_NOWAIT;
3229
3230         if (work->flags & IO_WQ_WORK_CANCEL)
3231                 ret = -ECANCELED;
3232
3233         if (!ret) {
3234                 req->has_user = (work->flags & IO_WQ_WORK_HAS_MM) != 0;
3235                 req->in_async = true;
3236                 do {
3237                         ret = io_issue_sqe(req, &nxt, false);
3238                         /*
3239                          * We can get EAGAIN for polled IO even though we're
3240                          * forcing a sync submission from here, since we can't
3241                          * wait for request slots on the block side.
3242                          */
3243                         if (ret != -EAGAIN)
3244                                 break;
3245                         cond_resched();
3246                 } while (1);
3247         }
3248
3249         /* drop submission reference */
3250         io_put_req(req);
3251
3252         if (ret) {
3253                 req_set_fail_links(req);
3254                 io_cqring_add_event(req, ret);
3255                 io_put_req(req);
3256         }
3257
3258         /* if a dependent link is ready, pass it back */
3259         if (!ret && nxt) {
3260                 struct io_kiocb *link;
3261
3262                 io_prep_async_work(nxt, &link);
3263                 *workptr = &nxt->work;
3264                 if (link) {
3265                         nxt->work.flags |= IO_WQ_WORK_CB;
3266                         nxt->work.func = io_link_work_cb;
3267                         nxt->work.data = link;
3268                 }
3269         }
3270 }
3271
3272 static bool io_req_op_valid(int op)
3273 {
3274         return op >= IORING_OP_NOP && op < IORING_OP_LAST;
3275 }
3276
3277 static int io_op_needs_file(const struct io_uring_sqe *sqe)
3278 {
3279         int op = READ_ONCE(sqe->opcode);
3280
3281         switch (op) {
3282         case IORING_OP_NOP:
3283         case IORING_OP_POLL_REMOVE:
3284         case IORING_OP_TIMEOUT:
3285         case IORING_OP_TIMEOUT_REMOVE:
3286         case IORING_OP_ASYNC_CANCEL:
3287         case IORING_OP_LINK_TIMEOUT:
3288                 return 0;
3289         default:
3290                 if (io_req_op_valid(op))
3291                         return 1;
3292                 return -EINVAL;
3293         }
3294 }
3295
3296 static inline struct file *io_file_from_index(struct io_ring_ctx *ctx,
3297                                               int index)
3298 {
3299         struct fixed_file_table *table;
3300
3301         table = &ctx->file_table[index >> IORING_FILE_TABLE_SHIFT];
3302         return table->files[index & IORING_FILE_TABLE_MASK];
3303 }
3304
3305 static int io_req_set_file(struct io_submit_state *state, struct io_kiocb *req)
3306 {
3307         struct io_ring_ctx *ctx = req->ctx;
3308         unsigned flags;
3309         int fd, ret;
3310
3311         flags = READ_ONCE(req->sqe->flags);
3312         fd = READ_ONCE(req->sqe->fd);
3313
3314         if (flags & IOSQE_IO_DRAIN)
3315                 req->flags |= REQ_F_IO_DRAIN;
3316
3317         ret = io_op_needs_file(req->sqe);
3318         if (ret <= 0)
3319                 return ret;
3320
3321         if (flags & IOSQE_FIXED_FILE) {
3322                 if (unlikely(!ctx->file_table ||
3323                     (unsigned) fd >= ctx->nr_user_files))
3324                         return -EBADF;
3325                 fd = array_index_nospec(fd, ctx->nr_user_files);
3326                 req->file = io_file_from_index(ctx, fd);
3327                 if (!req->file)
3328                         return -EBADF;
3329                 req->flags |= REQ_F_FIXED_FILE;
3330         } else {
3331                 if (req->needs_fixed_file)
3332                         return -EBADF;
3333                 trace_io_uring_file_get(ctx, fd);
3334                 req->file = io_file_get(state, fd);
3335                 if (unlikely(!req->file))
3336                         return -EBADF;
3337         }
3338
3339         return 0;
3340 }
3341
3342 static int io_grab_files(struct io_kiocb *req)
3343 {
3344         int ret = -EBADF;
3345         struct io_ring_ctx *ctx = req->ctx;
3346
3347         rcu_read_lock();
3348         spin_lock_irq(&ctx->inflight_lock);
3349         /*
3350          * We use the f_ops->flush() handler to ensure that we can flush
3351          * out work accessing these files if the fd is closed. Check if
3352          * the fd has changed since we started down this path, and disallow
3353          * this operation if it has.
3354          */
3355         if (fcheck(req->ring_fd) == req->ring_file) {
3356                 list_add(&req->inflight_entry, &ctx->inflight_list);
3357                 req->flags |= REQ_F_INFLIGHT;
3358                 req->work.files = current->files;
3359                 ret = 0;
3360         }
3361         spin_unlock_irq(&ctx->inflight_lock);
3362         rcu_read_unlock();
3363
3364         return ret;
3365 }
3366
3367 static enum hrtimer_restart io_link_timeout_fn(struct hrtimer *timer)
3368 {
3369         struct io_timeout_data *data = container_of(timer,
3370                                                 struct io_timeout_data, timer);
3371         struct io_kiocb *req = data->req;
3372         struct io_ring_ctx *ctx = req->ctx;
3373         struct io_kiocb *prev = NULL;
3374         unsigned long flags;
3375
3376         spin_lock_irqsave(&ctx->completion_lock, flags);
3377
3378         /*
3379          * We don't expect the list to be empty, that will only happen if we
3380          * race with the completion of the linked work.
3381          */
3382         if (!list_empty(&req->link_list)) {
3383                 prev = list_entry(req->link_list.prev, struct io_kiocb,
3384                                   link_list);
3385                 if (refcount_inc_not_zero(&prev->refs)) {
3386                         list_del_init(&req->link_list);
3387                         prev->flags &= ~REQ_F_LINK_TIMEOUT;
3388                 } else
3389                         prev = NULL;
3390         }
3391
3392         spin_unlock_irqrestore(&ctx->completion_lock, flags);
3393
3394         if (prev) {
3395                 req_set_fail_links(prev);
3396                 io_async_find_and_cancel(ctx, req, prev->user_data, NULL,
3397                                                 -ETIME);
3398                 io_put_req(prev);
3399         } else {
3400                 io_cqring_add_event(req, -ETIME);
3401                 io_put_req(req);
3402         }
3403         return HRTIMER_NORESTART;
3404 }
3405
3406 static void io_queue_linked_timeout(struct io_kiocb *req)
3407 {
3408         struct io_ring_ctx *ctx = req->ctx;
3409
3410         /*
3411          * If the list is now empty, then our linked request finished before
3412          * we got a chance to setup the timer
3413          */
3414         spin_lock_irq(&ctx->completion_lock);
3415         if (!list_empty(&req->link_list)) {
3416                 struct io_timeout_data *data = &req->io->timeout;
3417
3418                 data->timer.function = io_link_timeout_fn;
3419                 hrtimer_start(&data->timer, timespec64_to_ktime(data->ts),
3420                                 data->mode);
3421         }
3422         spin_unlock_irq(&ctx->completion_lock);
3423
3424         /* drop submission reference */
3425         io_put_req(req);
3426 }
3427
3428 static struct io_kiocb *io_prep_linked_timeout(struct io_kiocb *req)
3429 {
3430         struct io_kiocb *nxt;
3431
3432         if (!(req->flags & REQ_F_LINK))
3433                 return NULL;
3434
3435         nxt = list_first_entry_or_null(&req->link_list, struct io_kiocb,
3436                                         link_list);
3437         if (!nxt || nxt->sqe->opcode != IORING_OP_LINK_TIMEOUT)
3438                 return NULL;
3439
3440         req->flags |= REQ_F_LINK_TIMEOUT;
3441         return nxt;
3442 }
3443
3444 static void __io_queue_sqe(struct io_kiocb *req)
3445 {
3446         struct io_kiocb *linked_timeout;
3447         struct io_kiocb *nxt = NULL;
3448         int ret;
3449
3450 again:
3451         linked_timeout = io_prep_linked_timeout(req);
3452
3453         ret = io_issue_sqe(req, &nxt, true);
3454
3455         /*
3456          * We async punt it if the file wasn't marked NOWAIT, or if the file
3457          * doesn't support non-blocking read/write attempts
3458          */
3459         if (ret == -EAGAIN && (!(req->flags & REQ_F_NOWAIT) ||
3460             (req->flags & REQ_F_MUST_PUNT))) {
3461                 if (req->work.flags & IO_WQ_WORK_NEEDS_FILES) {
3462                         ret = io_grab_files(req);
3463                         if (ret)
3464                                 goto err;
3465                 }
3466
3467                 /*
3468                  * Queued up for async execution, worker will release
3469                  * submit reference when the iocb is actually submitted.
3470                  */
3471                 io_queue_async_work(req);
3472                 goto done_req;
3473         }
3474
3475 err:
3476         /* drop submission reference */
3477         io_put_req(req);
3478
3479         if (linked_timeout) {
3480                 if (!ret)
3481                         io_queue_linked_timeout(linked_timeout);
3482                 else
3483                         io_put_req(linked_timeout);
3484         }
3485
3486         /* and drop final reference, if we failed */
3487         if (ret) {
3488                 io_cqring_add_event(req, ret);
3489                 req_set_fail_links(req);
3490                 io_put_req(req);
3491         }
3492 done_req:
3493         if (nxt) {
3494                 req = nxt;
3495                 nxt = NULL;
3496                 goto again;
3497         }
3498 }
3499
3500 static void io_queue_sqe(struct io_kiocb *req)
3501 {
3502         int ret;
3503
3504         if (unlikely(req->ctx->drain_next)) {
3505                 req->flags |= REQ_F_IO_DRAIN;
3506                 req->ctx->drain_next = false;
3507         }
3508         req->ctx->drain_next = (req->flags & REQ_F_DRAIN_LINK);
3509
3510         ret = io_req_defer(req);
3511         if (ret) {
3512                 if (ret != -EIOCBQUEUED) {
3513                         io_cqring_add_event(req, ret);
3514                         req_set_fail_links(req);
3515                         io_double_put_req(req);
3516                 }
3517         } else
3518                 __io_queue_sqe(req);
3519 }
3520
3521 static inline void io_queue_link_head(struct io_kiocb *req)
3522 {
3523         if (unlikely(req->flags & REQ_F_FAIL_LINK)) {
3524                 io_cqring_add_event(req, -ECANCELED);
3525                 io_double_put_req(req);
3526         } else
3527                 io_queue_sqe(req);
3528 }
3529
3530 #define SQE_VALID_FLAGS (IOSQE_FIXED_FILE|IOSQE_IO_DRAIN|IOSQE_IO_LINK| \
3531                                 IOSQE_IO_HARDLINK)
3532
3533 static bool io_submit_sqe(struct io_kiocb *req, struct io_submit_state *state,
3534                           struct io_kiocb **link)
3535 {
3536         struct io_ring_ctx *ctx = req->ctx;
3537         int ret;
3538
3539         req->user_data = req->sqe->user_data;
3540
3541         /* enforce forwards compatibility on users */
3542         if (unlikely(req->sqe->flags & ~SQE_VALID_FLAGS)) {
3543                 ret = -EINVAL;
3544                 goto err_req;
3545         }
3546
3547         ret = io_req_set_file(state, req);
3548         if (unlikely(ret)) {
3549 err_req:
3550                 io_cqring_add_event(req, ret);
3551                 io_double_put_req(req);
3552                 return false;
3553         }
3554
3555         /*
3556          * If we already have a head request, queue this one for async
3557          * submittal once the head completes. If we don't have a head but
3558          * IOSQE_IO_LINK is set in the sqe, start a new head. This one will be
3559          * submitted sync once the chain is complete. If none of those
3560          * conditions are true (normal request), then just queue it.
3561          */
3562         if (*link) {
3563                 struct io_kiocb *prev = *link;
3564
3565                 if (req->sqe->flags & IOSQE_IO_DRAIN)
3566                         (*link)->flags |= REQ_F_DRAIN_LINK | REQ_F_IO_DRAIN;
3567
3568                 if (req->sqe->flags & IOSQE_IO_HARDLINK)
3569                         req->flags |= REQ_F_HARDLINK;
3570
3571                 if (io_alloc_async_ctx(req)) {
3572                         ret = -EAGAIN;
3573                         goto err_req;
3574                 }
3575
3576                 ret = io_req_defer_prep(req);
3577                 if (ret) {
3578                         /* fail even hard links since we don't submit */
3579                         prev->flags |= REQ_F_FAIL_LINK;
3580                         goto err_req;
3581                 }
3582                 trace_io_uring_link(ctx, req, prev);
3583                 list_add_tail(&req->link_list, &prev->link_list);
3584         } else if (req->sqe->flags & (IOSQE_IO_LINK|IOSQE_IO_HARDLINK)) {
3585                 req->flags |= REQ_F_LINK;
3586                 if (req->sqe->flags & IOSQE_IO_HARDLINK)
3587                         req->flags |= REQ_F_HARDLINK;
3588
3589                 INIT_LIST_HEAD(&req->link_list);
3590                 *link = req;
3591         } else {
3592                 io_queue_sqe(req);
3593         }
3594
3595         return true;
3596 }
3597
3598 /*
3599  * Batched submission is done, ensure local IO is flushed out.
3600  */
3601 static void io_submit_state_end(struct io_submit_state *state)
3602 {
3603         blk_finish_plug(&state->plug);
3604         io_file_put(state);
3605         if (state->free_reqs)
3606                 kmem_cache_free_bulk(req_cachep, state->free_reqs,
3607                                         &state->reqs[state->cur_req]);
3608 }
3609
3610 /*
3611  * Start submission side cache.
3612  */
3613 static void io_submit_state_start(struct io_submit_state *state,
3614                                   unsigned int max_ios)
3615 {
3616         blk_start_plug(&state->plug);
3617         state->free_reqs = 0;
3618         state->file = NULL;
3619         state->ios_left = max_ios;
3620 }
3621
3622 static void io_commit_sqring(struct io_ring_ctx *ctx)
3623 {
3624         struct io_rings *rings = ctx->rings;
3625
3626         if (ctx->cached_sq_head != READ_ONCE(rings->sq.head)) {
3627                 /*
3628                  * Ensure any loads from the SQEs are done at this point,
3629                  * since once we write the new head, the application could
3630                  * write new data to them.
3631                  */
3632                 smp_store_release(&rings->sq.head, ctx->cached_sq_head);
3633         }
3634 }
3635
3636 /*
3637  * Fetch an sqe, if one is available. Note that req->sqe will point to memory
3638  * that is mapped by userspace. This means that care needs to be taken to
3639  * ensure that reads are stable, as we cannot rely on userspace always
3640  * being a good citizen. If members of the sqe are validated and then later
3641  * used, it's important that those reads are done through READ_ONCE() to
3642  * prevent a re-load down the line.
3643  */
3644 static bool io_get_sqring(struct io_ring_ctx *ctx, struct io_kiocb *req)
3645 {
3646         struct io_rings *rings = ctx->rings;
3647         u32 *sq_array = ctx->sq_array;
3648         unsigned head;
3649
3650         /*
3651          * The cached sq head (or cq tail) serves two purposes:
3652          *
3653          * 1) allows us to batch the cost of updating the user visible
3654          *    head updates.
3655          * 2) allows the kernel side to track the head on its own, even
3656          *    though the application is the one updating it.
3657          */
3658         head = ctx->cached_sq_head;
3659         /* make sure SQ entry isn't read before tail */
3660         if (unlikely(head == smp_load_acquire(&rings->sq.tail)))
3661                 return false;
3662
3663         head = READ_ONCE(sq_array[head & ctx->sq_mask]);
3664         if (likely(head < ctx->sq_entries)) {
3665                 /*
3666                  * All io need record the previous position, if LINK vs DARIN,
3667                  * it can be used to mark the position of the first IO in the
3668                  * link list.
3669                  */
3670                 req->sequence = ctx->cached_sq_head;
3671                 req->sqe = &ctx->sq_sqes[head];
3672                 ctx->cached_sq_head++;
3673                 return true;
3674         }
3675
3676         /* drop invalid entries */
3677         ctx->cached_sq_head++;
3678         ctx->cached_sq_dropped++;
3679         WRITE_ONCE(rings->sq_dropped, ctx->cached_sq_dropped);
3680         return false;
3681 }
3682
3683 static int io_submit_sqes(struct io_ring_ctx *ctx, unsigned int nr,
3684                           struct file *ring_file, int ring_fd,
3685                           struct mm_struct **mm, bool async)
3686 {
3687         struct io_submit_state state, *statep = NULL;
3688         struct io_kiocb *link = NULL;
3689         int i, submitted = 0;
3690         bool mm_fault = false;
3691
3692         /* if we have a backlog and couldn't flush it all, return BUSY */
3693         if (!list_empty(&ctx->cq_overflow_list) &&
3694             !io_cqring_overflow_flush(ctx, false))
3695                 return -EBUSY;
3696
3697         if (nr > IO_PLUG_THRESHOLD) {
3698                 io_submit_state_start(&state, nr);
3699                 statep = &state;
3700         }
3701
3702         for (i = 0; i < nr; i++) {
3703                 struct io_kiocb *req;
3704                 unsigned int sqe_flags;
3705
3706                 req = io_get_req(ctx, statep);
3707                 if (unlikely(!req)) {
3708                         if (!submitted)
3709                                 submitted = -EAGAIN;
3710                         break;
3711                 }
3712                 if (!io_get_sqring(ctx, req)) {
3713                         __io_free_req(req);
3714                         break;
3715                 }
3716
3717                 if (io_sqe_needs_user(req->sqe) && !*mm) {
3718                         mm_fault = mm_fault || !mmget_not_zero(ctx->sqo_mm);
3719                         if (!mm_fault) {
3720                                 use_mm(ctx->sqo_mm);
3721                                 *mm = ctx->sqo_mm;
3722                         }
3723                 }
3724
3725                 submitted++;
3726                 sqe_flags = req->sqe->flags;
3727
3728                 req->ring_file = ring_file;
3729                 req->ring_fd = ring_fd;
3730                 req->has_user = *mm != NULL;
3731                 req->in_async = async;
3732                 req->needs_fixed_file = async;
3733                 trace_io_uring_submit_sqe(ctx, req->sqe->user_data,
3734                                           true, async);
3735                 if (!io_submit_sqe(req, statep, &link))
3736                         break;
3737                 /*
3738                  * If previous wasn't linked and we have a linked command,
3739                  * that's the end of the chain. Submit the previous link.
3740                  */
3741                 if (!(sqe_flags & (IOSQE_IO_LINK|IOSQE_IO_HARDLINK)) && link) {
3742                         io_queue_link_head(link);
3743                         link = NULL;
3744                 }
3745         }
3746
3747         if (link)
3748                 io_queue_link_head(link);
3749         if (statep)
3750                 io_submit_state_end(&state);
3751
3752          /* Commit SQ ring head once we've consumed and submitted all SQEs */
3753         io_commit_sqring(ctx);
3754
3755         return submitted;
3756 }
3757
3758 static int io_sq_thread(void *data)
3759 {
3760         struct io_ring_ctx *ctx = data;
3761         struct mm_struct *cur_mm = NULL;
3762         const struct cred *old_cred;
3763         mm_segment_t old_fs;
3764         DEFINE_WAIT(wait);
3765         unsigned inflight;
3766         unsigned long timeout;
3767         int ret;
3768
3769         complete(&ctx->completions[1]);
3770
3771         old_fs = get_fs();
3772         set_fs(USER_DS);
3773         old_cred = override_creds(ctx->creds);
3774
3775         ret = timeout = inflight = 0;
3776         while (!kthread_should_park()) {
3777                 unsigned int to_submit;
3778
3779                 if (inflight) {
3780                         unsigned nr_events = 0;
3781
3782                         if (ctx->flags & IORING_SETUP_IOPOLL) {
3783                                 /*
3784                                  * inflight is the count of the maximum possible
3785                                  * entries we submitted, but it can be smaller
3786                                  * if we dropped some of them. If we don't have
3787                                  * poll entries available, then we know that we
3788                                  * have nothing left to poll for. Reset the
3789                                  * inflight count to zero in that case.
3790                                  */
3791                                 mutex_lock(&ctx->uring_lock);
3792                                 if (!list_empty(&ctx->poll_list))
3793                                         __io_iopoll_check(ctx, &nr_events, 0);
3794                                 else
3795                                         inflight = 0;
3796                                 mutex_unlock(&ctx->uring_lock);
3797                         } else {
3798                                 /*
3799                                  * Normal IO, just pretend everything completed.
3800                                  * We don't have to poll completions for that.
3801                                  */
3802                                 nr_events = inflight;
3803                         }
3804
3805                         inflight -= nr_events;
3806                         if (!inflight)
3807                                 timeout = jiffies + ctx->sq_thread_idle;
3808                 }
3809
3810                 to_submit = io_sqring_entries(ctx);
3811
3812                 /*
3813                  * If submit got -EBUSY, flag us as needing the application
3814                  * to enter the kernel to reap and flush events.
3815                  */
3816                 if (!to_submit || ret == -EBUSY) {
3817                         /*
3818                          * We're polling. If we're within the defined idle
3819                          * period, then let us spin without work before going
3820                          * to sleep. The exception is if we got EBUSY doing
3821                          * more IO, we should wait for the application to
3822                          * reap events and wake us up.
3823                          */
3824                         if (inflight ||
3825                             (!time_after(jiffies, timeout) && ret != -EBUSY)) {
3826                                 cond_resched();
3827                                 continue;
3828                         }
3829
3830                         /*
3831                          * Drop cur_mm before scheduling, we can't hold it for
3832                          * long periods (or over schedule()). Do this before
3833                          * adding ourselves to the waitqueue, as the unuse/drop
3834                          * may sleep.
3835                          */
3836                         if (cur_mm) {
3837                                 unuse_mm(cur_mm);
3838                                 mmput(cur_mm);
3839                                 cur_mm = NULL;
3840                         }
3841
3842                         prepare_to_wait(&ctx->sqo_wait, &wait,
3843                                                 TASK_INTERRUPTIBLE);
3844
3845                         /* Tell userspace we may need a wakeup call */
3846                         ctx->rings->sq_flags |= IORING_SQ_NEED_WAKEUP;
3847                         /* make sure to read SQ tail after writing flags */
3848                         smp_mb();
3849
3850                         to_submit = io_sqring_entries(ctx);
3851                         if (!to_submit || ret == -EBUSY) {
3852                                 if (kthread_should_park()) {
3853                                         finish_wait(&ctx->sqo_wait, &wait);
3854                                         break;
3855                                 }
3856                                 if (signal_pending(current))
3857                                         flush_signals(current);
3858                                 schedule();
3859                                 finish_wait(&ctx->sqo_wait, &wait);
3860
3861                                 ctx->rings->sq_flags &= ~IORING_SQ_NEED_WAKEUP;
3862                                 continue;
3863                         }
3864                         finish_wait(&ctx->sqo_wait, &wait);
3865
3866                         ctx->rings->sq_flags &= ~IORING_SQ_NEED_WAKEUP;
3867                 }
3868
3869                 to_submit = min(to_submit, ctx->sq_entries);
3870                 mutex_lock(&ctx->uring_lock);
3871                 ret = io_submit_sqes(ctx, to_submit, NULL, -1, &cur_mm, true);
3872                 mutex_unlock(&ctx->uring_lock);
3873                 if (ret > 0)
3874                         inflight += ret;
3875         }
3876
3877         set_fs(old_fs);
3878         if (cur_mm) {
3879                 unuse_mm(cur_mm);
3880                 mmput(cur_mm);
3881         }
3882         revert_creds(old_cred);
3883
3884         kthread_parkme();
3885
3886         return 0;
3887 }
3888
3889 struct io_wait_queue {
3890         struct wait_queue_entry wq;
3891         struct io_ring_ctx *ctx;
3892         unsigned to_wait;
3893         unsigned nr_timeouts;
3894 };
3895
3896 static inline bool io_should_wake(struct io_wait_queue *iowq, bool noflush)
3897 {
3898         struct io_ring_ctx *ctx = iowq->ctx;
3899
3900         /*
3901          * Wake up if we have enough events, or if a timeout occurred since we
3902          * started waiting. For timeouts, we always want to return to userspace,
3903          * regardless of event count.
3904          */
3905         return io_cqring_events(ctx, noflush) >= iowq->to_wait ||
3906                         atomic_read(&ctx->cq_timeouts) != iowq->nr_timeouts;
3907 }
3908
3909 static int io_wake_function(struct wait_queue_entry *curr, unsigned int mode,
3910                             int wake_flags, void *key)
3911 {
3912         struct io_wait_queue *iowq = container_of(curr, struct io_wait_queue,
3913                                                         wq);
3914
3915         /* use noflush == true, as we can't safely rely on locking context */
3916         if (!io_should_wake(iowq, true))
3917                 return -1;
3918
3919         return autoremove_wake_function(curr, mode, wake_flags, key);
3920 }
3921
3922 /*
3923  * Wait until events become available, if we don't already have some. The
3924  * application must reap them itself, as they reside on the shared cq ring.
3925  */
3926 static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events,
3927                           const sigset_t __user *sig, size_t sigsz)
3928 {
3929         struct io_wait_queue iowq = {
3930                 .wq = {
3931                         .private        = current,
3932                         .func           = io_wake_function,
3933                         .entry          = LIST_HEAD_INIT(iowq.wq.entry),
3934                 },
3935                 .ctx            = ctx,
3936                 .to_wait        = min_events,
3937         };
3938         struct io_rings *rings = ctx->rings;
3939         int ret = 0;
3940
3941         if (io_cqring_events(ctx, false) >= min_events)
3942                 return 0;
3943
3944         if (sig) {
3945 #ifdef CONFIG_COMPAT
3946                 if (in_compat_syscall())
3947                         ret = set_compat_user_sigmask((const compat_sigset_t __user *)sig,
3948                                                       sigsz);
3949                 else
3950 #endif
3951                         ret = set_user_sigmask(sig, sigsz);
3952
3953                 if (ret)
3954                         return ret;
3955         }
3956
3957         iowq.nr_timeouts = atomic_read(&ctx->cq_timeouts);
3958         trace_io_uring_cqring_wait(ctx, min_events);
3959         do {
3960                 prepare_to_wait_exclusive(&ctx->wait, &iowq.wq,
3961                                                 TASK_INTERRUPTIBLE);
3962                 if (io_should_wake(&iowq, false))
3963                         break;
3964                 schedule();
3965                 if (signal_pending(current)) {
3966                         ret = -EINTR;
3967                         break;
3968                 }
3969         } while (1);
3970         finish_wait(&ctx->wait, &iowq.wq);
3971
3972         restore_saved_sigmask_unless(ret == -EINTR);
3973
3974         return READ_ONCE(rings->cq.head) == READ_ONCE(rings->cq.tail) ? ret : 0;
3975 }
3976
3977 static void __io_sqe_files_unregister(struct io_ring_ctx *ctx)
3978 {
3979 #if defined(CONFIG_UNIX)
3980         if (ctx->ring_sock) {
3981                 struct sock *sock = ctx->ring_sock->sk;
3982                 struct sk_buff *skb;
3983
3984                 while ((skb = skb_dequeue(&sock->sk_receive_queue)) != NULL)
3985                         kfree_skb(skb);
3986         }
3987 #else
3988         int i;
3989
3990         for (i = 0; i < ctx->nr_user_files; i++) {
3991                 struct file *file;
3992
3993                 file = io_file_from_index(ctx, i);
3994                 if (file)
3995                         fput(file);
3996         }
3997 #endif
3998 }
3999
4000 static int io_sqe_files_unregister(struct io_ring_ctx *ctx)
4001 {
4002         unsigned nr_tables, i;
4003
4004         if (!ctx->file_table)
4005                 return -ENXIO;
4006
4007         __io_sqe_files_unregister(ctx);
4008         nr_tables = DIV_ROUND_UP(ctx->nr_user_files, IORING_MAX_FILES_TABLE);
4009         for (i = 0; i < nr_tables; i++)
4010                 kfree(ctx->file_table[i].files);
4011         kfree(ctx->file_table);
4012         ctx->file_table = NULL;
4013         ctx->nr_user_files = 0;
4014         return 0;
4015 }
4016
4017 static void io_sq_thread_stop(struct io_ring_ctx *ctx)
4018 {
4019         if (ctx->sqo_thread) {
4020                 wait_for_completion(&ctx->completions[1]);
4021                 /*
4022                  * The park is a bit of a work-around, without it we get
4023                  * warning spews on shutdown with SQPOLL set and affinity
4024                  * set to a single CPU.
4025                  */
4026                 kthread_park(ctx->sqo_thread);
4027                 kthread_stop(ctx->sqo_thread);
4028                 ctx->sqo_thread = NULL;
4029         }
4030 }
4031
4032 static void io_finish_async(struct io_ring_ctx *ctx)
4033 {
4034         io_sq_thread_stop(ctx);
4035
4036         if (ctx->io_wq) {
4037                 io_wq_destroy(ctx->io_wq);
4038                 ctx->io_wq = NULL;
4039         }
4040 }
4041
4042 #if defined(CONFIG_UNIX)
4043 static void io_destruct_skb(struct sk_buff *skb)
4044 {
4045         struct io_ring_ctx *ctx = skb->sk->sk_user_data;
4046
4047         if (ctx->io_wq)
4048                 io_wq_flush(ctx->io_wq);
4049
4050         unix_destruct_scm(skb);
4051 }
4052
4053 /*
4054  * Ensure the UNIX gc is aware of our file set, so we are certain that
4055  * the io_uring can be safely unregistered on process exit, even if we have
4056  * loops in the file referencing.
4057  */
4058 static int __io_sqe_files_scm(struct io_ring_ctx *ctx, int nr, int offset)
4059 {
4060         struct sock *sk = ctx->ring_sock->sk;
4061         struct scm_fp_list *fpl;
4062         struct sk_buff *skb;
4063         int i, nr_files;
4064
4065         if (!capable(CAP_SYS_RESOURCE) && !capable(CAP_SYS_ADMIN)) {
4066                 unsigned long inflight = ctx->user->unix_inflight + nr;
4067
4068                 if (inflight > task_rlimit(current, RLIMIT_NOFILE))
4069                         return -EMFILE;
4070         }
4071
4072         fpl = kzalloc(sizeof(*fpl), GFP_KERNEL);
4073         if (!fpl)
4074                 return -ENOMEM;
4075
4076         skb = alloc_skb(0, GFP_KERNEL);
4077         if (!skb) {
4078                 kfree(fpl);
4079                 return -ENOMEM;
4080         }
4081
4082         skb->sk = sk;
4083
4084         nr_files = 0;
4085         fpl->user = get_uid(ctx->user);
4086         for (i = 0; i < nr; i++) {
4087                 struct file *file = io_file_from_index(ctx, i + offset);
4088
4089                 if (!file)
4090                         continue;
4091                 fpl->fp[nr_files] = get_file(file);
4092                 unix_inflight(fpl->user, fpl->fp[nr_files]);
4093                 nr_files++;
4094         }
4095
4096         if (nr_files) {
4097                 fpl->max = SCM_MAX_FD;
4098                 fpl->count = nr_files;
4099                 UNIXCB(skb).fp = fpl;
4100                 skb->destructor = io_destruct_skb;
4101                 refcount_add(skb->truesize, &sk->sk_wmem_alloc);
4102                 skb_queue_head(&sk->sk_receive_queue, skb);
4103
4104                 for (i = 0; i < nr_files; i++)
4105                         fput(fpl->fp[i]);
4106         } else {
4107                 kfree_skb(skb);
4108                 kfree(fpl);
4109         }
4110
4111         return 0;
4112 }
4113
4114 /*
4115  * If UNIX sockets are enabled, fd passing can cause a reference cycle which
4116  * causes regular reference counting to break down. We rely on the UNIX
4117  * garbage collection to take care of this problem for us.
4118  */
4119 static int io_sqe_files_scm(struct io_ring_ctx *ctx)
4120 {
4121         unsigned left, total;
4122         int ret = 0;
4123
4124         total = 0;
4125         left = ctx->nr_user_files;
4126         while (left) {
4127                 unsigned this_files = min_t(unsigned, left, SCM_MAX_FD);
4128
4129                 ret = __io_sqe_files_scm(ctx, this_files, total);
4130                 if (ret)
4131                         break;
4132                 left -= this_files;
4133                 total += this_files;
4134         }
4135
4136         if (!ret)
4137                 return 0;
4138
4139         while (total < ctx->nr_user_files) {
4140                 struct file *file = io_file_from_index(ctx, total);
4141
4142                 if (file)
4143                         fput(file);
4144                 total++;
4145         }
4146
4147         return ret;
4148 }
4149 #else
4150 static int io_sqe_files_scm(struct io_ring_ctx *ctx)
4151 {
4152         return 0;
4153 }
4154 #endif
4155
4156 static int io_sqe_alloc_file_tables(struct io_ring_ctx *ctx, unsigned nr_tables,
4157                                     unsigned nr_files)
4158 {
4159         int i;
4160
4161         for (i = 0; i < nr_tables; i++) {
4162                 struct fixed_file_table *table = &ctx->file_table[i];
4163                 unsigned this_files;
4164
4165                 this_files = min(nr_files, IORING_MAX_FILES_TABLE);
4166                 table->files = kcalloc(this_files, sizeof(struct file *),
4167                                         GFP_KERNEL);
4168                 if (!table->files)
4169                         break;
4170                 nr_files -= this_files;
4171         }
4172
4173         if (i == nr_tables)
4174                 return 0;
4175
4176         for (i = 0; i < nr_tables; i++) {
4177                 struct fixed_file_table *table = &ctx->file_table[i];
4178                 kfree(table->files);
4179         }
4180         return 1;
4181 }
4182
4183 static int io_sqe_files_register(struct io_ring_ctx *ctx, void __user *arg,
4184                                  unsigned nr_args)
4185 {
4186         __s32 __user *fds = (__s32 __user *) arg;
4187         unsigned nr_tables;
4188         int fd, ret = 0;
4189         unsigned i;
4190
4191         if (ctx->file_table)
4192                 return -EBUSY;
4193         if (!nr_args)
4194                 return -EINVAL;
4195         if (nr_args > IORING_MAX_FIXED_FILES)
4196                 return -EMFILE;
4197
4198         nr_tables = DIV_ROUND_UP(nr_args, IORING_MAX_FILES_TABLE);
4199         ctx->file_table = kcalloc(nr_tables, sizeof(struct fixed_file_table),
4200                                         GFP_KERNEL);
4201         if (!ctx->file_table)
4202                 return -ENOMEM;
4203
4204         if (io_sqe_alloc_file_tables(ctx, nr_tables, nr_args)) {
4205                 kfree(ctx->file_table);
4206                 ctx->file_table = NULL;
4207                 return -ENOMEM;
4208         }
4209
4210         for (i = 0; i < nr_args; i++, ctx->nr_user_files++) {
4211                 struct fixed_file_table *table;
4212                 unsigned index;
4213
4214                 ret = -EFAULT;
4215                 if (copy_from_user(&fd, &fds[i], sizeof(fd)))
4216                         break;
4217                 /* allow sparse sets */
4218                 if (fd == -1) {
4219                         ret = 0;
4220                         continue;
4221                 }
4222
4223                 table = &ctx->file_table[i >> IORING_FILE_TABLE_SHIFT];
4224                 index = i & IORING_FILE_TABLE_MASK;
4225                 table->files[index] = fget(fd);
4226
4227                 ret = -EBADF;
4228                 if (!table->files[index])
4229                         break;
4230                 /*
4231                  * Don't allow io_uring instances to be registered. If UNIX
4232                  * isn't enabled, then this causes a reference cycle and this
4233                  * instance can never get freed. If UNIX is enabled we'll
4234                  * handle it just fine, but there's still no point in allowing
4235                  * a ring fd as it doesn't support regular read/write anyway.
4236                  */
4237                 if (table->files[index]->f_op == &io_uring_fops) {
4238                         fput(table->files[index]);
4239                         break;
4240                 }
4241                 ret = 0;
4242         }
4243
4244         if (ret) {
4245                 for (i = 0; i < ctx->nr_user_files; i++) {
4246                         struct file *file;
4247
4248                         file = io_file_from_index(ctx, i);
4249                         if (file)
4250                                 fput(file);
4251                 }
4252                 for (i = 0; i < nr_tables; i++)
4253                         kfree(ctx->file_table[i].files);
4254
4255                 kfree(ctx->file_table);
4256                 ctx->file_table = NULL;
4257                 ctx->nr_user_files = 0;
4258                 return ret;
4259         }
4260
4261         ret = io_sqe_files_scm(ctx);
4262         if (ret)
4263                 io_sqe_files_unregister(ctx);
4264
4265         return ret;
4266 }
4267
4268 static void io_sqe_file_unregister(struct io_ring_ctx *ctx, int index)
4269 {
4270 #if defined(CONFIG_UNIX)
4271         struct file *file = io_file_from_index(ctx, index);
4272         struct sock *sock = ctx->ring_sock->sk;
4273         struct sk_buff_head list, *head = &sock->sk_receive_queue;
4274         struct sk_buff *skb;
4275         int i;
4276
4277         __skb_queue_head_init(&list);
4278
4279         /*
4280          * Find the skb that holds this file in its SCM_RIGHTS. When found,
4281          * remove this entry and rearrange the file array.
4282          */
4283         skb = skb_dequeue(head);
4284         while (skb) {
4285                 struct scm_fp_list *fp;
4286
4287                 fp = UNIXCB(skb).fp;
4288                 for (i = 0; i < fp->count; i++) {
4289                         int left;
4290
4291                         if (fp->fp[i] != file)
4292                                 continue;
4293
4294                         unix_notinflight(fp->user, fp->fp[i]);
4295                         left = fp->count - 1 - i;
4296                         if (left) {
4297                                 memmove(&fp->fp[i], &fp->fp[i + 1],
4298                                                 left * sizeof(struct file *));
4299                         }
4300                         fp->count--;
4301                         if (!fp->count) {
4302                                 kfree_skb(skb);
4303                                 skb = NULL;
4304                         } else {
4305                                 __skb_queue_tail(&list, skb);
4306                         }
4307                         fput(file);
4308                         file = NULL;
4309                         break;
4310                 }
4311
4312                 if (!file)
4313                         break;
4314
4315                 __skb_queue_tail(&list, skb);
4316
4317                 skb = skb_dequeue(head);
4318         }
4319
4320         if (skb_peek(&list)) {
4321                 spin_lock_irq(&head->lock);
4322                 while ((skb = __skb_dequeue(&list)) != NULL)
4323                         __skb_queue_tail(head, skb);
4324                 spin_unlock_irq(&head->lock);
4325         }
4326 #else
4327         fput(io_file_from_index(ctx, index));
4328 #endif
4329 }
4330
4331 static int io_sqe_file_register(struct io_ring_ctx *ctx, struct file *file,
4332                                 int index)
4333 {
4334 #if defined(CONFIG_UNIX)
4335         struct sock *sock = ctx->ring_sock->sk;
4336         struct sk_buff_head *head = &sock->sk_receive_queue;
4337         struct sk_buff *skb;
4338
4339         /*
4340          * See if we can merge this file into an existing skb SCM_RIGHTS
4341          * file set. If there's no room, fall back to allocating a new skb
4342          * and filling it in.
4343          */
4344         spin_lock_irq(&head->lock);
4345         skb = skb_peek(head);
4346         if (skb) {
4347                 struct scm_fp_list *fpl = UNIXCB(skb).fp;
4348
4349                 if (fpl->count < SCM_MAX_FD) {
4350                         __skb_unlink(skb, head);
4351                         spin_unlock_irq(&head->lock);
4352                         fpl->fp[fpl->count] = get_file(file);
4353                         unix_inflight(fpl->user, fpl->fp[fpl->count]);
4354                         fpl->count++;
4355                         spin_lock_irq(&head->lock);
4356                         __skb_queue_head(head, skb);
4357                 } else {
4358                         skb = NULL;
4359                 }
4360         }
4361         spin_unlock_irq(&head->lock);
4362
4363         if (skb) {
4364                 fput(file);
4365                 return 0;
4366         }
4367
4368         return __io_sqe_files_scm(ctx, 1, index);
4369 #else
4370         return 0;
4371 #endif
4372 }
4373
4374 static int io_sqe_files_update(struct io_ring_ctx *ctx, void __user *arg,
4375                                unsigned nr_args)
4376 {
4377         struct io_uring_files_update up;
4378         __s32 __user *fds;
4379         int fd, i, err;
4380         __u32 done;
4381
4382         if (!ctx->file_table)
4383                 return -ENXIO;
4384         if (!nr_args)
4385                 return -EINVAL;
4386         if (copy_from_user(&up, arg, sizeof(up)))
4387                 return -EFAULT;
4388         if (check_add_overflow(up.offset, nr_args, &done))
4389                 return -EOVERFLOW;
4390         if (done > ctx->nr_user_files)
4391                 return -EINVAL;
4392
4393         done = 0;
4394         fds = (__s32 __user *) up.fds;
4395         while (nr_args) {
4396                 struct fixed_file_table *table;
4397                 unsigned index;
4398
4399                 err = 0;
4400                 if (copy_from_user(&fd, &fds[done], sizeof(fd))) {
4401                         err = -EFAULT;
4402                         break;
4403                 }
4404                 i = array_index_nospec(up.offset, ctx->nr_user_files);
4405                 table = &ctx->file_table[i >> IORING_FILE_TABLE_SHIFT];
4406                 index = i & IORING_FILE_TABLE_MASK;
4407                 if (table->files[index]) {
4408                         io_sqe_file_unregister(ctx, i);
4409                         table->files[index] = NULL;
4410                 }
4411                 if (fd != -1) {
4412                         struct file *file;
4413
4414                         file = fget(fd);
4415                         if (!file) {
4416                                 err = -EBADF;
4417                                 break;
4418                         }
4419                         /*
4420                          * Don't allow io_uring instances to be registered. If
4421                          * UNIX isn't enabled, then this causes a reference
4422                          * cycle and this instance can never get freed. If UNIX
4423                          * is enabled we'll handle it just fine, but there's
4424                          * still no point in allowing a ring fd as it doesn't
4425                          * support regular read/write anyway.
4426                          */
4427                         if (file->f_op == &io_uring_fops) {
4428                                 fput(file);
4429                                 err = -EBADF;
4430                                 break;
4431                         }
4432                         table->files[index] = file;
4433                         err = io_sqe_file_register(ctx, file, i);
4434                         if (err)
4435                                 break;
4436                 }
4437                 nr_args--;
4438                 done++;
4439                 up.offset++;
4440         }
4441
4442         return done ? done : err;
4443 }
4444
4445 static void io_put_work(struct io_wq_work *work)
4446 {
4447         struct io_kiocb *req = container_of(work, struct io_kiocb, work);
4448
4449         io_put_req(req);
4450 }
4451
4452 static void io_get_work(struct io_wq_work *work)
4453 {
4454         struct io_kiocb *req = container_of(work, struct io_kiocb, work);
4455
4456         refcount_inc(&req->refs);
4457 }
4458
4459 static int io_sq_offload_start(struct io_ring_ctx *ctx,
4460                                struct io_uring_params *p)
4461 {
4462         struct io_wq_data data;
4463         unsigned concurrency;
4464         int ret;
4465
4466         init_waitqueue_head(&ctx->sqo_wait);
4467         mmgrab(current->mm);
4468         ctx->sqo_mm = current->mm;
4469
4470         if (ctx->flags & IORING_SETUP_SQPOLL) {
4471                 ret = -EPERM;
4472                 if (!capable(CAP_SYS_ADMIN))
4473                         goto err;
4474
4475                 ctx->sq_thread_idle = msecs_to_jiffies(p->sq_thread_idle);
4476                 if (!ctx->sq_thread_idle)
4477                         ctx->sq_thread_idle = HZ;
4478
4479                 if (p->flags & IORING_SETUP_SQ_AFF) {
4480                         int cpu = p->sq_thread_cpu;
4481
4482                         ret = -EINVAL;
4483                         if (cpu >= nr_cpu_ids)
4484                                 goto err;
4485                         if (!cpu_online(cpu))
4486                                 goto err;
4487
4488                         ctx->sqo_thread = kthread_create_on_cpu(io_sq_thread,
4489                                                         ctx, cpu,
4490                                                         "io_uring-sq");
4491                 } else {
4492                         ctx->sqo_thread = kthread_create(io_sq_thread, ctx,
4493                                                         "io_uring-sq");
4494                 }
4495                 if (IS_ERR(ctx->sqo_thread)) {
4496                         ret = PTR_ERR(ctx->sqo_thread);
4497                         ctx->sqo_thread = NULL;
4498                         goto err;
4499                 }
4500                 wake_up_process(ctx->sqo_thread);
4501         } else if (p->flags & IORING_SETUP_SQ_AFF) {
4502                 /* Can't have SQ_AFF without SQPOLL */
4503                 ret = -EINVAL;
4504                 goto err;
4505         }
4506
4507         data.mm = ctx->sqo_mm;
4508         data.user = ctx->user;
4509         data.creds = ctx->creds;
4510         data.get_work = io_get_work;
4511         data.put_work = io_put_work;
4512
4513         /* Do QD, or 4 * CPUS, whatever is smallest */
4514         concurrency = min(ctx->sq_entries, 4 * num_online_cpus());
4515         ctx->io_wq = io_wq_create(concurrency, &data);
4516         if (IS_ERR(ctx->io_wq)) {
4517                 ret = PTR_ERR(ctx->io_wq);
4518                 ctx->io_wq = NULL;
4519                 goto err;
4520         }
4521
4522         return 0;
4523 err:
4524         io_finish_async(ctx);
4525         mmdrop(ctx->sqo_mm);
4526         ctx->sqo_mm = NULL;
4527         return ret;
4528 }
4529
4530 static void io_unaccount_mem(struct user_struct *user, unsigned long nr_pages)
4531 {
4532         atomic_long_sub(nr_pages, &user->locked_vm);
4533 }
4534
4535 static int io_account_mem(struct user_struct *user, unsigned long nr_pages)
4536 {
4537         unsigned long page_limit, cur_pages, new_pages;
4538
4539         /* Don't allow more pages than we can safely lock */
4540         page_limit = rlimit(RLIMIT_MEMLOCK) >> PAGE_SHIFT;
4541
4542         do {
4543                 cur_pages = atomic_long_read(&user->locked_vm);
4544                 new_pages = cur_pages + nr_pages;
4545                 if (new_pages > page_limit)
4546                         return -ENOMEM;
4547         } while (atomic_long_cmpxchg(&user->locked_vm, cur_pages,
4548                                         new_pages) != cur_pages);
4549
4550         return 0;
4551 }
4552
4553 static void io_mem_free(void *ptr)
4554 {
4555         struct page *page;
4556
4557         if (!ptr)
4558                 return;
4559
4560         page = virt_to_head_page(ptr);
4561         if (put_page_testzero(page))
4562                 free_compound_page(page);
4563 }
4564
4565 static void *io_mem_alloc(size_t size)
4566 {
4567         gfp_t gfp_flags = GFP_KERNEL | __GFP_ZERO | __GFP_NOWARN | __GFP_COMP |
4568                                 __GFP_NORETRY;
4569
4570         return (void *) __get_free_pages(gfp_flags, get_order(size));
4571 }
4572
4573 static unsigned long rings_size(unsigned sq_entries, unsigned cq_entries,
4574                                 size_t *sq_offset)
4575 {
4576         struct io_rings *rings;
4577         size_t off, sq_array_size;
4578
4579         off = struct_size(rings, cqes, cq_entries);
4580         if (off == SIZE_MAX)
4581                 return SIZE_MAX;
4582
4583 #ifdef CONFIG_SMP
4584         off = ALIGN(off, SMP_CACHE_BYTES);
4585         if (off == 0)
4586                 return SIZE_MAX;
4587 #endif
4588
4589         sq_array_size = array_size(sizeof(u32), sq_entries);
4590         if (sq_array_size == SIZE_MAX)
4591                 return SIZE_MAX;
4592
4593         if (check_add_overflow(off, sq_array_size, &off))
4594                 return SIZE_MAX;
4595
4596         if (sq_offset)
4597                 *sq_offset = off;
4598
4599         return off;
4600 }
4601
4602 static unsigned long ring_pages(unsigned sq_entries, unsigned cq_entries)
4603 {
4604         size_t pages;
4605
4606         pages = (size_t)1 << get_order(
4607                 rings_size(sq_entries, cq_entries, NULL));
4608         pages += (size_t)1 << get_order(
4609                 array_size(sizeof(struct io_uring_sqe), sq_entries));
4610
4611         return pages;
4612 }
4613
4614 static int io_sqe_buffer_unregister(struct io_ring_ctx *ctx)
4615 {
4616         int i, j;
4617
4618         if (!ctx->user_bufs)
4619                 return -ENXIO;
4620
4621         for (i = 0; i < ctx->nr_user_bufs; i++) {
4622                 struct io_mapped_ubuf *imu = &ctx->user_bufs[i];
4623
4624                 for (j = 0; j < imu->nr_bvecs; j++)
4625                         put_user_page(imu->bvec[j].bv_page);
4626
4627                 if (ctx->account_mem)
4628                         io_unaccount_mem(ctx->user, imu->nr_bvecs);
4629                 kvfree(imu->bvec);
4630                 imu->nr_bvecs = 0;
4631         }
4632
4633         kfree(ctx->user_bufs);
4634         ctx->user_bufs = NULL;
4635         ctx->nr_user_bufs = 0;
4636         return 0;
4637 }
4638
4639 static int io_copy_iov(struct io_ring_ctx *ctx, struct iovec *dst,
4640                        void __user *arg, unsigned index)
4641 {
4642         struct iovec __user *src;
4643
4644 #ifdef CONFIG_COMPAT
4645         if (ctx->compat) {
4646                 struct compat_iovec __user *ciovs;
4647                 struct compat_iovec ciov;
4648
4649                 ciovs = (struct compat_iovec __user *) arg;
4650                 if (copy_from_user(&ciov, &ciovs[index], sizeof(ciov)))
4651                         return -EFAULT;
4652
4653                 dst->iov_base = (void __user *) (unsigned long) ciov.iov_base;
4654                 dst->iov_len = ciov.iov_len;
4655                 return 0;
4656         }
4657 #endif
4658         src = (struct iovec __user *) arg;
4659         if (copy_from_user(dst, &src[index], sizeof(*dst)))
4660                 return -EFAULT;
4661         return 0;
4662 }
4663
4664 static int io_sqe_buffer_register(struct io_ring_ctx *ctx, void __user *arg,
4665                                   unsigned nr_args)
4666 {
4667         struct vm_area_struct **vmas = NULL;
4668         struct page **pages = NULL;
4669         int i, j, got_pages = 0;
4670         int ret = -EINVAL;
4671
4672         if (ctx->user_bufs)
4673                 return -EBUSY;
4674         if (!nr_args || nr_args > UIO_MAXIOV)
4675                 return -EINVAL;
4676
4677         ctx->user_bufs = kcalloc(nr_args, sizeof(struct io_mapped_ubuf),
4678                                         GFP_KERNEL);
4679         if (!ctx->user_bufs)
4680                 return -ENOMEM;
4681
4682         for (i = 0; i < nr_args; i++) {
4683                 struct io_mapped_ubuf *imu = &ctx->user_bufs[i];
4684                 unsigned long off, start, end, ubuf;
4685                 int pret, nr_pages;
4686                 struct iovec iov;
4687                 size_t size;
4688
4689                 ret = io_copy_iov(ctx, &iov, arg, i);
4690                 if (ret)
4691                         goto err;
4692
4693                 /*
4694                  * Don't impose further limits on the size and buffer
4695                  * constraints here, we'll -EINVAL later when IO is
4696                  * submitted if they are wrong.
4697                  */
4698                 ret = -EFAULT;
4699                 if (!iov.iov_base || !iov.iov_len)
4700                         goto err;
4701
4702                 /* arbitrary limit, but we need something */
4703                 if (iov.iov_len > SZ_1G)
4704                         goto err;
4705
4706                 ubuf = (unsigned long) iov.iov_base;
4707                 end = (ubuf + iov.iov_len + PAGE_SIZE - 1) >> PAGE_SHIFT;
4708                 start = ubuf >> PAGE_SHIFT;
4709                 nr_pages = end - start;
4710
4711                 if (ctx->account_mem) {
4712                         ret = io_account_mem(ctx->user, nr_pages);
4713                         if (ret)
4714                                 goto err;
4715                 }
4716
4717                 ret = 0;
4718                 if (!pages || nr_pages > got_pages) {
4719                         kfree(vmas);
4720                         kfree(pages);
4721                         pages = kvmalloc_array(nr_pages, sizeof(struct page *),
4722                                                 GFP_KERNEL);
4723                         vmas = kvmalloc_array(nr_pages,
4724                                         sizeof(struct vm_area_struct *),
4725                                         GFP_KERNEL);
4726                         if (!pages || !vmas) {
4727                                 ret = -ENOMEM;
4728                                 if (ctx->account_mem)
4729                                         io_unaccount_mem(ctx->user, nr_pages);
4730                                 goto err;
4731                         }
4732                         got_pages = nr_pages;
4733                 }
4734
4735                 imu->bvec = kvmalloc_array(nr_pages, sizeof(struct bio_vec),
4736                                                 GFP_KERNEL);
4737                 ret = -ENOMEM;
4738                 if (!imu->bvec) {
4739                         if (ctx->account_mem)
4740                                 io_unaccount_mem(ctx->user, nr_pages);
4741                         goto err;
4742                 }
4743
4744                 ret = 0;
4745                 down_read(&current->mm->mmap_sem);
4746                 pret = get_user_pages(ubuf, nr_pages,
4747                                       FOLL_WRITE | FOLL_LONGTERM,
4748                                       pages, vmas);
4749                 if (pret == nr_pages) {
4750                         /* don't support file backed memory */
4751                         for (j = 0; j < nr_pages; j++) {
4752                                 struct vm_area_struct *vma = vmas[j];
4753
4754                                 if (vma->vm_file &&
4755                                     !is_file_hugepages(vma->vm_file)) {
4756                                         ret = -EOPNOTSUPP;
4757                                         break;
4758                                 }
4759                         }
4760                 } else {
4761                         ret = pret < 0 ? pret : -EFAULT;
4762                 }
4763                 up_read(&current->mm->mmap_sem);
4764                 if (ret) {
4765                         /*
4766                          * if we did partial map, or found file backed vmas,
4767                          * release any pages we did get
4768                          */
4769                         if (pret > 0)
4770                                 put_user_pages(pages, pret);
4771                         if (ctx->account_mem)
4772                                 io_unaccount_mem(ctx->user, nr_pages);
4773                         kvfree(imu->bvec);
4774                         goto err;
4775                 }
4776
4777                 off = ubuf & ~PAGE_MASK;
4778                 size = iov.iov_len;
4779                 for (j = 0; j < nr_pages; j++) {
4780                         size_t vec_len;
4781
4782                         vec_len = min_t(size_t, size, PAGE_SIZE - off);
4783                         imu->bvec[j].bv_page = pages[j];
4784                         imu->bvec[j].bv_len = vec_len;
4785                         imu->bvec[j].bv_offset = off;
4786                         off = 0;
4787                         size -= vec_len;
4788                 }
4789                 /* store original address for later verification */
4790                 imu->ubuf = ubuf;
4791                 imu->len = iov.iov_len;
4792                 imu->nr_bvecs = nr_pages;
4793
4794                 ctx->nr_user_bufs++;
4795         }
4796         kvfree(pages);
4797         kvfree(vmas);
4798         return 0;
4799 err:
4800         kvfree(pages);
4801         kvfree(vmas);
4802         io_sqe_buffer_unregister(ctx);
4803         return ret;
4804 }
4805
4806 static int io_eventfd_register(struct io_ring_ctx *ctx, void __user *arg)
4807 {
4808         __s32 __user *fds = arg;
4809         int fd;
4810
4811         if (ctx->cq_ev_fd)
4812                 return -EBUSY;
4813
4814         if (copy_from_user(&fd, fds, sizeof(*fds)))
4815                 return -EFAULT;
4816
4817         ctx->cq_ev_fd = eventfd_ctx_fdget(fd);
4818         if (IS_ERR(ctx->cq_ev_fd)) {
4819                 int ret = PTR_ERR(ctx->cq_ev_fd);
4820                 ctx->cq_ev_fd = NULL;
4821                 return ret;
4822         }
4823
4824         return 0;
4825 }
4826
4827 static int io_eventfd_unregister(struct io_ring_ctx *ctx)
4828 {
4829         if (ctx->cq_ev_fd) {
4830                 eventfd_ctx_put(ctx->cq_ev_fd);
4831                 ctx->cq_ev_fd = NULL;
4832                 return 0;
4833         }
4834
4835         return -ENXIO;
4836 }
4837
4838 static void io_ring_ctx_free(struct io_ring_ctx *ctx)
4839 {
4840         io_finish_async(ctx);
4841         if (ctx->sqo_mm)
4842                 mmdrop(ctx->sqo_mm);
4843
4844         io_iopoll_reap_events(ctx);
4845         io_sqe_buffer_unregister(ctx);
4846         io_sqe_files_unregister(ctx);
4847         io_eventfd_unregister(ctx);
4848
4849 #if defined(CONFIG_UNIX)
4850         if (ctx->ring_sock) {
4851                 ctx->ring_sock->file = NULL; /* so that iput() is called */
4852                 sock_release(ctx->ring_sock);
4853         }
4854 #endif
4855
4856         io_mem_free(ctx->rings);
4857         io_mem_free(ctx->sq_sqes);
4858
4859         percpu_ref_exit(&ctx->refs);
4860         if (ctx->account_mem)
4861                 io_unaccount_mem(ctx->user,
4862                                 ring_pages(ctx->sq_entries, ctx->cq_entries));
4863         free_uid(ctx->user);
4864         put_cred(ctx->creds);
4865         kfree(ctx->completions);
4866         kfree(ctx->cancel_hash);
4867         kmem_cache_free(req_cachep, ctx->fallback_req);
4868         kfree(ctx);
4869 }
4870
4871 static __poll_t io_uring_poll(struct file *file, poll_table *wait)
4872 {
4873         struct io_ring_ctx *ctx = file->private_data;
4874         __poll_t mask = 0;
4875
4876         poll_wait(file, &ctx->cq_wait, wait);
4877         /*
4878          * synchronizes with barrier from wq_has_sleeper call in
4879          * io_commit_cqring
4880          */
4881         smp_rmb();
4882         if (READ_ONCE(ctx->rings->sq.tail) - ctx->cached_sq_head !=
4883             ctx->rings->sq_ring_entries)
4884                 mask |= EPOLLOUT | EPOLLWRNORM;
4885         if (READ_ONCE(ctx->rings->cq.head) != ctx->cached_cq_tail)
4886                 mask |= EPOLLIN | EPOLLRDNORM;
4887
4888         return mask;
4889 }
4890
4891 static int io_uring_fasync(int fd, struct file *file, int on)
4892 {
4893         struct io_ring_ctx *ctx = file->private_data;
4894
4895         return fasync_helper(fd, file, on, &ctx->cq_fasync);
4896 }
4897
4898 static void io_ring_ctx_wait_and_kill(struct io_ring_ctx *ctx)
4899 {
4900         mutex_lock(&ctx->uring_lock);
4901         percpu_ref_kill(&ctx->refs);
4902         mutex_unlock(&ctx->uring_lock);
4903
4904         io_kill_timeouts(ctx);
4905         io_poll_remove_all(ctx);
4906
4907         if (ctx->io_wq)
4908                 io_wq_cancel_all(ctx->io_wq);
4909
4910         io_iopoll_reap_events(ctx);
4911         /* if we failed setting up the ctx, we might not have any rings */
4912         if (ctx->rings)
4913                 io_cqring_overflow_flush(ctx, true);
4914         wait_for_completion(&ctx->completions[0]);
4915         io_ring_ctx_free(ctx);
4916 }
4917
4918 static int io_uring_release(struct inode *inode, struct file *file)
4919 {
4920         struct io_ring_ctx *ctx = file->private_data;
4921
4922         file->private_data = NULL;
4923         io_ring_ctx_wait_and_kill(ctx);
4924         return 0;
4925 }
4926
4927 static void io_uring_cancel_files(struct io_ring_ctx *ctx,
4928                                   struct files_struct *files)
4929 {
4930         struct io_kiocb *req;
4931         DEFINE_WAIT(wait);
4932
4933         while (!list_empty_careful(&ctx->inflight_list)) {
4934                 struct io_kiocb *cancel_req = NULL;
4935
4936                 spin_lock_irq(&ctx->inflight_lock);
4937                 list_for_each_entry(req, &ctx->inflight_list, inflight_entry) {
4938                         if (req->work.files != files)
4939                                 continue;
4940                         /* req is being completed, ignore */
4941                         if (!refcount_inc_not_zero(&req->refs))
4942                                 continue;
4943                         cancel_req = req;
4944                         break;
4945                 }
4946                 if (cancel_req)
4947                         prepare_to_wait(&ctx->inflight_wait, &wait,
4948                                                 TASK_UNINTERRUPTIBLE);
4949                 spin_unlock_irq(&ctx->inflight_lock);
4950
4951                 /* We need to keep going until we don't find a matching req */
4952                 if (!cancel_req)
4953                         break;
4954
4955                 io_wq_cancel_work(ctx->io_wq, &cancel_req->work);
4956                 io_put_req(cancel_req);
4957                 schedule();
4958         }
4959         finish_wait(&ctx->inflight_wait, &wait);
4960 }
4961
4962 static int io_uring_flush(struct file *file, void *data)
4963 {
4964         struct io_ring_ctx *ctx = file->private_data;
4965
4966         io_uring_cancel_files(ctx, data);
4967         if (fatal_signal_pending(current) || (current->flags & PF_EXITING)) {
4968                 io_cqring_overflow_flush(ctx, true);
4969                 io_wq_cancel_all(ctx->io_wq);
4970         }
4971         return 0;
4972 }
4973
4974 static void *io_uring_validate_mmap_request(struct file *file,
4975                                             loff_t pgoff, size_t sz)
4976 {
4977         struct io_ring_ctx *ctx = file->private_data;
4978         loff_t offset = pgoff << PAGE_SHIFT;
4979         struct page *page;
4980         void *ptr;
4981
4982         switch (offset) {
4983         case IORING_OFF_SQ_RING:
4984         case IORING_OFF_CQ_RING:
4985                 ptr = ctx->rings;
4986                 break;
4987         case IORING_OFF_SQES:
4988                 ptr = ctx->sq_sqes;
4989                 break;
4990         default:
4991                 return ERR_PTR(-EINVAL);
4992         }
4993
4994         page = virt_to_head_page(ptr);
4995         if (sz > page_size(page))
4996                 return ERR_PTR(-EINVAL);
4997
4998         return ptr;
4999 }
5000
5001 #ifdef CONFIG_MMU
5002
5003 static int io_uring_mmap(struct file *file, struct vm_area_struct *vma)
5004 {
5005         size_t sz = vma->vm_end - vma->vm_start;
5006         unsigned long pfn;
5007         void *ptr;
5008
5009         ptr = io_uring_validate_mmap_request(file, vma->vm_pgoff, sz);
5010         if (IS_ERR(ptr))
5011                 return PTR_ERR(ptr);
5012
5013         pfn = virt_to_phys(ptr) >> PAGE_SHIFT;
5014         return remap_pfn_range(vma, vma->vm_start, pfn, sz, vma->vm_page_prot);
5015 }
5016
5017 #else /* !CONFIG_MMU */
5018
5019 static int io_uring_mmap(struct file *file, struct vm_area_struct *vma)
5020 {
5021         return vma->vm_flags & (VM_SHARED | VM_MAYSHARE) ? 0 : -EINVAL;
5022 }
5023
5024 static unsigned int io_uring_nommu_mmap_capabilities(struct file *file)
5025 {
5026         return NOMMU_MAP_DIRECT | NOMMU_MAP_READ | NOMMU_MAP_WRITE;
5027 }
5028
5029 static unsigned long io_uring_nommu_get_unmapped_area(struct file *file,
5030         unsigned long addr, unsigned long len,
5031         unsigned long pgoff, unsigned long flags)
5032 {
5033         void *ptr;
5034
5035         ptr = io_uring_validate_mmap_request(file, pgoff, len);
5036         if (IS_ERR(ptr))
5037                 return PTR_ERR(ptr);
5038
5039         return (unsigned long) ptr;
5040 }
5041
5042 #endif /* !CONFIG_MMU */
5043
5044 SYSCALL_DEFINE6(io_uring_enter, unsigned int, fd, u32, to_submit,
5045                 u32, min_complete, u32, flags, const sigset_t __user *, sig,
5046                 size_t, sigsz)
5047 {
5048         struct io_ring_ctx *ctx;
5049         long ret = -EBADF;
5050         int submitted = 0;
5051         struct fd f;
5052
5053         if (flags & ~(IORING_ENTER_GETEVENTS | IORING_ENTER_SQ_WAKEUP))
5054                 return -EINVAL;
5055
5056         f = fdget(fd);
5057         if (!f.file)
5058                 return -EBADF;
5059
5060         ret = -EOPNOTSUPP;
5061         if (f.file->f_op != &io_uring_fops)
5062                 goto out_fput;
5063
5064         ret = -ENXIO;
5065         ctx = f.file->private_data;
5066         if (!percpu_ref_tryget(&ctx->refs))
5067                 goto out_fput;
5068
5069         /*
5070          * For SQ polling, the thread will do all submissions and completions.
5071          * Just return the requested submit count, and wake the thread if
5072          * we were asked to.
5073          */
5074         ret = 0;
5075         if (ctx->flags & IORING_SETUP_SQPOLL) {
5076                 if (!list_empty_careful(&ctx->cq_overflow_list))
5077                         io_cqring_overflow_flush(ctx, false);
5078                 if (flags & IORING_ENTER_SQ_WAKEUP)
5079                         wake_up(&ctx->sqo_wait);
5080                 submitted = to_submit;
5081         } else if (to_submit) {
5082                 struct mm_struct *cur_mm;
5083
5084                 to_submit = min(to_submit, ctx->sq_entries);
5085                 mutex_lock(&ctx->uring_lock);
5086                 /* already have mm, so io_submit_sqes() won't try to grab it */
5087                 cur_mm = ctx->sqo_mm;
5088                 submitted = io_submit_sqes(ctx, to_submit, f.file, fd,
5089                                            &cur_mm, false);
5090                 mutex_unlock(&ctx->uring_lock);
5091         }
5092         if (flags & IORING_ENTER_GETEVENTS) {
5093                 unsigned nr_events = 0;
5094
5095                 min_complete = min(min_complete, ctx->cq_entries);
5096
5097                 if (ctx->flags & IORING_SETUP_IOPOLL) {
5098                         ret = io_iopoll_check(ctx, &nr_events, min_complete);
5099                 } else {
5100                         ret = io_cqring_wait(ctx, min_complete, sig, sigsz);
5101                 }
5102         }
5103
5104         percpu_ref_put(&ctx->refs);
5105 out_fput:
5106         fdput(f);
5107         return submitted ? submitted : ret;
5108 }
5109
5110 static const struct file_operations io_uring_fops = {
5111         .release        = io_uring_release,
5112         .flush          = io_uring_flush,
5113         .mmap           = io_uring_mmap,
5114 #ifndef CONFIG_MMU
5115         .get_unmapped_area = io_uring_nommu_get_unmapped_area,
5116         .mmap_capabilities = io_uring_nommu_mmap_capabilities,
5117 #endif
5118         .poll           = io_uring_poll,
5119         .fasync         = io_uring_fasync,
5120 };
5121
5122 static int io_allocate_scq_urings(struct io_ring_ctx *ctx,
5123                                   struct io_uring_params *p)
5124 {
5125         struct io_rings *rings;
5126         size_t size, sq_array_offset;
5127
5128         size = rings_size(p->sq_entries, p->cq_entries, &sq_array_offset);
5129         if (size == SIZE_MAX)
5130                 return -EOVERFLOW;
5131
5132         rings = io_mem_alloc(size);
5133         if (!rings)
5134                 return -ENOMEM;
5135
5136         ctx->rings = rings;
5137         ctx->sq_array = (u32 *)((char *)rings + sq_array_offset);
5138         rings->sq_ring_mask = p->sq_entries - 1;
5139         rings->cq_ring_mask = p->cq_entries - 1;
5140         rings->sq_ring_entries = p->sq_entries;
5141         rings->cq_ring_entries = p->cq_entries;
5142         ctx->sq_mask = rings->sq_ring_mask;
5143         ctx->cq_mask = rings->cq_ring_mask;
5144         ctx->sq_entries = rings->sq_ring_entries;
5145         ctx->cq_entries = rings->cq_ring_entries;
5146
5147         size = array_size(sizeof(struct io_uring_sqe), p->sq_entries);
5148         if (size == SIZE_MAX) {
5149                 io_mem_free(ctx->rings);
5150                 ctx->rings = NULL;
5151                 return -EOVERFLOW;
5152         }
5153
5154         ctx->sq_sqes = io_mem_alloc(size);
5155         if (!ctx->sq_sqes) {
5156                 io_mem_free(ctx->rings);
5157                 ctx->rings = NULL;
5158                 return -ENOMEM;
5159         }
5160
5161         return 0;
5162 }
5163
5164 /*
5165  * Allocate an anonymous fd, this is what constitutes the application
5166  * visible backing of an io_uring instance. The application mmaps this
5167  * fd to gain access to the SQ/CQ ring details. If UNIX sockets are enabled,
5168  * we have to tie this fd to a socket for file garbage collection purposes.
5169  */
5170 static int io_uring_get_fd(struct io_ring_ctx *ctx)
5171 {
5172         struct file *file;
5173         int ret;
5174
5175 #if defined(CONFIG_UNIX)
5176         ret = sock_create_kern(&init_net, PF_UNIX, SOCK_RAW, IPPROTO_IP,
5177                                 &ctx->ring_sock);
5178         if (ret)
5179                 return ret;
5180 #endif
5181
5182         ret = get_unused_fd_flags(O_RDWR | O_CLOEXEC);
5183         if (ret < 0)
5184                 goto err;
5185
5186         file = anon_inode_getfile("[io_uring]", &io_uring_fops, ctx,
5187                                         O_RDWR | O_CLOEXEC);
5188         if (IS_ERR(file)) {
5189                 put_unused_fd(ret);
5190                 ret = PTR_ERR(file);
5191                 goto err;
5192         }
5193
5194 #if defined(CONFIG_UNIX)
5195         ctx->ring_sock->file = file;
5196         ctx->ring_sock->sk->sk_user_data = ctx;
5197 #endif
5198         fd_install(ret, file);
5199         return ret;
5200 err:
5201 #if defined(CONFIG_UNIX)
5202         sock_release(ctx->ring_sock);
5203         ctx->ring_sock = NULL;
5204 #endif
5205         return ret;
5206 }
5207
5208 static int io_uring_create(unsigned entries, struct io_uring_params *p)
5209 {
5210         struct user_struct *user = NULL;
5211         struct io_ring_ctx *ctx;
5212         bool account_mem;
5213         int ret;
5214
5215         if (!entries || entries > IORING_MAX_ENTRIES)
5216                 return -EINVAL;
5217
5218         /*
5219          * Use twice as many entries for the CQ ring. It's possible for the
5220          * application to drive a higher depth than the size of the SQ ring,
5221          * since the sqes are only used at submission time. This allows for
5222          * some flexibility in overcommitting a bit. If the application has
5223          * set IORING_SETUP_CQSIZE, it will have passed in the desired number
5224          * of CQ ring entries manually.
5225          */
5226         p->sq_entries = roundup_pow_of_two(entries);
5227         if (p->flags & IORING_SETUP_CQSIZE) {
5228                 /*
5229                  * If IORING_SETUP_CQSIZE is set, we do the same roundup
5230                  * to a power-of-two, if it isn't already. We do NOT impose
5231                  * any cq vs sq ring sizing.
5232                  */
5233                 if (p->cq_entries < p->sq_entries || p->cq_entries > IORING_MAX_CQ_ENTRIES)
5234                         return -EINVAL;
5235                 p->cq_entries = roundup_pow_of_two(p->cq_entries);
5236         } else {
5237                 p->cq_entries = 2 * p->sq_entries;
5238         }
5239
5240         user = get_uid(current_user());
5241         account_mem = !capable(CAP_IPC_LOCK);
5242
5243         if (account_mem) {
5244                 ret = io_account_mem(user,
5245                                 ring_pages(p->sq_entries, p->cq_entries));
5246                 if (ret) {
5247                         free_uid(user);
5248                         return ret;
5249                 }
5250         }
5251
5252         ctx = io_ring_ctx_alloc(p);
5253         if (!ctx) {
5254                 if (account_mem)
5255                         io_unaccount_mem(user, ring_pages(p->sq_entries,
5256                                                                 p->cq_entries));
5257                 free_uid(user);
5258                 return -ENOMEM;
5259         }
5260         ctx->compat = in_compat_syscall();
5261         ctx->account_mem = account_mem;
5262         ctx->user = user;
5263         ctx->creds = get_current_cred();
5264
5265         ret = io_allocate_scq_urings(ctx, p);
5266         if (ret)
5267                 goto err;
5268
5269         ret = io_sq_offload_start(ctx, p);
5270         if (ret)
5271                 goto err;
5272
5273         memset(&p->sq_off, 0, sizeof(p->sq_off));
5274         p->sq_off.head = offsetof(struct io_rings, sq.head);
5275         p->sq_off.tail = offsetof(struct io_rings, sq.tail);
5276         p->sq_off.ring_mask = offsetof(struct io_rings, sq_ring_mask);
5277         p->sq_off.ring_entries = offsetof(struct io_rings, sq_ring_entries);
5278         p->sq_off.flags = offsetof(struct io_rings, sq_flags);
5279         p->sq_off.dropped = offsetof(struct io_rings, sq_dropped);
5280         p->sq_off.array = (char *)ctx->sq_array - (char *)ctx->rings;
5281
5282         memset(&p->cq_off, 0, sizeof(p->cq_off));
5283         p->cq_off.head = offsetof(struct io_rings, cq.head);
5284         p->cq_off.tail = offsetof(struct io_rings, cq.tail);
5285         p->cq_off.ring_mask = offsetof(struct io_rings, cq_ring_mask);
5286         p->cq_off.ring_entries = offsetof(struct io_rings, cq_ring_entries);
5287         p->cq_off.overflow = offsetof(struct io_rings, cq_overflow);
5288         p->cq_off.cqes = offsetof(struct io_rings, cqes);
5289
5290         /*
5291          * Install ring fd as the very last thing, so we don't risk someone
5292          * having closed it before we finish setup
5293          */
5294         ret = io_uring_get_fd(ctx);
5295         if (ret < 0)
5296                 goto err;
5297
5298         p->features = IORING_FEAT_SINGLE_MMAP | IORING_FEAT_NODROP |
5299                         IORING_FEAT_SUBMIT_STABLE;
5300         trace_io_uring_create(ret, ctx, p->sq_entries, p->cq_entries, p->flags);
5301         return ret;
5302 err:
5303         io_ring_ctx_wait_and_kill(ctx);
5304         return ret;
5305 }
5306
5307 /*
5308  * Sets up an aio uring context, and returns the fd. Applications asks for a
5309  * ring size, we return the actual sq/cq ring sizes (among other things) in the
5310  * params structure passed in.
5311  */
5312 static long io_uring_setup(u32 entries, struct io_uring_params __user *params)
5313 {
5314         struct io_uring_params p;
5315         long ret;
5316         int i;
5317
5318         if (copy_from_user(&p, params, sizeof(p)))
5319                 return -EFAULT;
5320         for (i = 0; i < ARRAY_SIZE(p.resv); i++) {
5321                 if (p.resv[i])
5322                         return -EINVAL;
5323         }
5324
5325         if (p.flags & ~(IORING_SETUP_IOPOLL | IORING_SETUP_SQPOLL |
5326                         IORING_SETUP_SQ_AFF | IORING_SETUP_CQSIZE))
5327                 return -EINVAL;
5328
5329         ret = io_uring_create(entries, &p);
5330         if (ret < 0)
5331                 return ret;
5332
5333         if (copy_to_user(params, &p, sizeof(p)))
5334                 return -EFAULT;
5335
5336         return ret;
5337 }
5338
5339 SYSCALL_DEFINE2(io_uring_setup, u32, entries,
5340                 struct io_uring_params __user *, params)
5341 {
5342         return io_uring_setup(entries, params);
5343 }
5344
5345 static int __io_uring_register(struct io_ring_ctx *ctx, unsigned opcode,
5346                                void __user *arg, unsigned nr_args)
5347         __releases(ctx->uring_lock)
5348         __acquires(ctx->uring_lock)
5349 {
5350         int ret;
5351
5352         /*
5353          * We're inside the ring mutex, if the ref is already dying, then
5354          * someone else killed the ctx or is already going through
5355          * io_uring_register().
5356          */
5357         if (percpu_ref_is_dying(&ctx->refs))
5358                 return -ENXIO;
5359
5360         percpu_ref_kill(&ctx->refs);
5361
5362         /*
5363          * Drop uring mutex before waiting for references to exit. If another
5364          * thread is currently inside io_uring_enter() it might need to grab
5365          * the uring_lock to make progress. If we hold it here across the drain
5366          * wait, then we can deadlock. It's safe to drop the mutex here, since
5367          * no new references will come in after we've killed the percpu ref.
5368          */
5369         mutex_unlock(&ctx->uring_lock);
5370         wait_for_completion(&ctx->completions[0]);
5371         mutex_lock(&ctx->uring_lock);
5372
5373         switch (opcode) {
5374         case IORING_REGISTER_BUFFERS:
5375                 ret = io_sqe_buffer_register(ctx, arg, nr_args);
5376                 break;
5377         case IORING_UNREGISTER_BUFFERS:
5378                 ret = -EINVAL;
5379                 if (arg || nr_args)
5380                         break;
5381                 ret = io_sqe_buffer_unregister(ctx);
5382                 break;
5383         case IORING_REGISTER_FILES:
5384                 ret = io_sqe_files_register(ctx, arg, nr_args);
5385                 break;
5386         case IORING_UNREGISTER_FILES:
5387                 ret = -EINVAL;
5388                 if (arg || nr_args)
5389                         break;
5390                 ret = io_sqe_files_unregister(ctx);
5391                 break;
5392         case IORING_REGISTER_FILES_UPDATE:
5393                 ret = io_sqe_files_update(ctx, arg, nr_args);
5394                 break;
5395         case IORING_REGISTER_EVENTFD:
5396                 ret = -EINVAL;
5397                 if (nr_args != 1)
5398                         break;
5399                 ret = io_eventfd_register(ctx, arg);
5400                 break;
5401         case IORING_UNREGISTER_EVENTFD:
5402                 ret = -EINVAL;
5403                 if (arg || nr_args)
5404                         break;
5405                 ret = io_eventfd_unregister(ctx);
5406                 break;
5407         default:
5408                 ret = -EINVAL;
5409                 break;
5410         }
5411
5412         /* bring the ctx back to life */
5413         reinit_completion(&ctx->completions[0]);
5414         percpu_ref_reinit(&ctx->refs);
5415         return ret;
5416 }
5417
5418 SYSCALL_DEFINE4(io_uring_register, unsigned int, fd, unsigned int, opcode,
5419                 void __user *, arg, unsigned int, nr_args)
5420 {
5421         struct io_ring_ctx *ctx;
5422         long ret = -EBADF;
5423         struct fd f;
5424
5425         f = fdget(fd);
5426         if (!f.file)
5427                 return -EBADF;
5428
5429         ret = -EOPNOTSUPP;
5430         if (f.file->f_op != &io_uring_fops)
5431                 goto out_fput;
5432
5433         ctx = f.file->private_data;
5434
5435         mutex_lock(&ctx->uring_lock);
5436         ret = __io_uring_register(ctx, opcode, arg, nr_args);
5437         mutex_unlock(&ctx->uring_lock);
5438         trace_io_uring_register(ctx, opcode, ctx->nr_user_files, ctx->nr_user_bufs,
5439                                                         ctx->cq_ev_fd != NULL, ret);
5440 out_fput:
5441         fdput(f);
5442         return ret;
5443 }
5444
5445 static int __init io_uring_init(void)
5446 {
5447         req_cachep = KMEM_CACHE(io_kiocb, SLAB_HWCACHE_ALIGN | SLAB_PANIC);
5448         return 0;
5449 };
5450 __initcall(io_uring_init);