io_uring: speedup provided buffer handling
authorJens Axboe <axboe@kernel.dk>
Wed, 9 Mar 2022 00:46:52 +0000 (17:46 -0700)
committerJens Axboe <axboe@kernel.dk>
Thu, 10 Mar 2022 13:33:14 +0000 (06:33 -0700)
In testing high frequency workloads with provided buffers, we spend a
lot of time in allocating and freeing the buffer units themselves.
Rather than repeatedly free and alloc them, add a recycling cache
instead. There are two caches:

- ctx->io_buffers_cache. This is the one we grab from in the submission
  path, and it's protected by ctx->uring_lock. For inline completions,
  we can recycle straight back to this cache and not need any extra
  locking.

- ctx->io_buffers_comp. If we're not under uring_lock, then we use this
  list to recycle buffers. It's protected by the completion_lock.

On adding a new buffer, check io_buffers_cache. If it's empty, check if
we can splice entries from the io_buffers_comp_cache.

This reduces about 5-10% of overhead from provided buffers, bringing it
pretty close to the non-provided path.

Signed-off-by: Jens Axboe <axboe@kernel.dk>
fs/io_uring.c

index daa3609b742f28e326a534ebd2493ba081e60132..0f5e999e569f8eab106e00825be6ace4113b6052 100644 (file)
@@ -384,6 +384,7 @@ struct io_ring_ctx {
                struct list_head        ltimeout_list;
                struct list_head        cq_overflow_list;
                struct xarray           io_buffers;
+               struct list_head        io_buffers_cache;
                struct xarray           personalities;
                u32                     pers_next;
                unsigned                sq_thread_idle;
@@ -426,6 +427,8 @@ struct io_ring_ctx {
                struct hlist_head       *cancel_hash;
                unsigned                cancel_hash_bits;
                bool                    poll_multi_queue;
+
+               struct list_head        io_buffers_comp;
        } ____cacheline_aligned_in_smp;
 
        struct io_restriction           restrictions;
@@ -441,6 +444,8 @@ struct io_ring_ctx {
                struct llist_head               rsrc_put_llist;
                struct list_head                rsrc_ref_list;
                spinlock_t                      rsrc_ref_lock;
+
+               struct list_head        io_buffers_pages;
        };
 
        /* Keep this last, we don't need it for the fast path */
@@ -1278,24 +1283,56 @@ static inline void io_req_set_rsrc_node(struct io_kiocb *req,
        }
 }
 
-static unsigned int __io_put_kbuf(struct io_kiocb *req)
+static unsigned int __io_put_kbuf(struct io_kiocb *req, struct list_head *list)
 {
        struct io_buffer *kbuf = req->kbuf;
        unsigned int cflags;
 
-       cflags = kbuf->bid << IORING_CQE_BUFFER_SHIFT;
-       cflags |= IORING_CQE_F_BUFFER;
+       cflags = IORING_CQE_F_BUFFER | (kbuf->bid << IORING_CQE_BUFFER_SHIFT);
        req->flags &= ~REQ_F_BUFFER_SELECTED;
-       kfree(kbuf);
+       list_add(&kbuf->list, list);
        req->kbuf = NULL;
        return cflags;
 }
 
-static inline unsigned int io_put_kbuf(struct io_kiocb *req)
+static inline unsigned int io_put_kbuf_comp(struct io_kiocb *req)
 {
        if (likely(!(req->flags & REQ_F_BUFFER_SELECTED)))
                return 0;
-       return __io_put_kbuf(req);
+       return __io_put_kbuf(req, &req->ctx->io_buffers_comp);
+}
+
+static inline unsigned int io_put_kbuf(struct io_kiocb *req,
+                                      unsigned issue_flags)
+{
+       unsigned int cflags;
+
+       if (likely(!(req->flags & REQ_F_BUFFER_SELECTED)))
+               return 0;
+
+       /*
+        * We can add this buffer back to two lists:
+        *
+        * 1) The io_buffers_cache list. This one is protected by the
+        *    ctx->uring_lock. If we already hold this lock, add back to this
+        *    list as we can grab it from issue as well.
+        * 2) The io_buffers_comp list. This one is protected by the
+        *    ctx->completion_lock.
+        *
+        * We migrate buffers from the comp_list to the issue cache list
+        * when we need one.
+        */
+       if (issue_flags & IO_URING_F_UNLOCKED) {
+               struct io_ring_ctx *ctx = req->ctx;
+
+               spin_lock(&ctx->completion_lock);
+               cflags = __io_put_kbuf(req, &ctx->io_buffers_comp);
+               spin_unlock(&ctx->completion_lock);
+       } else {
+               cflags = __io_put_kbuf(req, &req->ctx->io_buffers_cache);
+       }
+
+       return cflags;
 }
 
 static bool io_match_task(struct io_kiocb *head, struct task_struct *task,
@@ -1443,6 +1480,7 @@ static __cold struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p)
        init_waitqueue_head(&ctx->sqo_sq_wait);
        INIT_LIST_HEAD(&ctx->sqd_list);
        INIT_LIST_HEAD(&ctx->cq_overflow_list);
+       INIT_LIST_HEAD(&ctx->io_buffers_cache);
        init_completion(&ctx->ref_comp);
        xa_init_flags(&ctx->io_buffers, XA_FLAGS_ALLOC1);
        xa_init_flags(&ctx->personalities, XA_FLAGS_ALLOC1);
@@ -1451,6 +1489,8 @@ static __cold struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p)
        spin_lock_init(&ctx->completion_lock);
        spin_lock_init(&ctx->timeout_lock);
        INIT_WQ_LIST(&ctx->iopoll_list);
+       INIT_LIST_HEAD(&ctx->io_buffers_pages);
+       INIT_LIST_HEAD(&ctx->io_buffers_comp);
        INIT_LIST_HEAD(&ctx->defer_list);
        INIT_LIST_HEAD(&ctx->timeout_list);
        INIT_LIST_HEAD(&ctx->ltimeout_list);
@@ -2329,7 +2369,8 @@ static void handle_prev_tw_list(struct io_wq_work_node *node,
                if (likely(*uring_locked))
                        req->io_task_work.func(req, uring_locked);
                else
-                       __io_req_complete_post(req, req->result, io_put_kbuf(req));
+                       __io_req_complete_post(req, req->result,
+                                               io_put_kbuf_comp(req));
                node = next;
        } while (node);
 
@@ -2679,7 +2720,7 @@ static int io_do_iopoll(struct io_ring_ctx *ctx, bool force_nonspin)
                if (unlikely(req->flags & REQ_F_CQE_SKIP))
                        continue;
 
-               __io_fill_cqe(req, req->result, io_put_kbuf(req));
+               __io_fill_cqe(req, req->result, io_put_kbuf(req, 0));
                nr_events++;
        }
 
@@ -2855,14 +2896,14 @@ static bool __io_complete_rw_common(struct io_kiocb *req, long res)
 
 static inline void io_req_task_complete(struct io_kiocb *req, bool *locked)
 {
-       unsigned int cflags = io_put_kbuf(req);
        int res = req->result;
 
        if (*locked) {
-               io_req_complete_state(req, res, cflags);
+               io_req_complete_state(req, res, io_put_kbuf(req, 0));
                io_req_add_compl_list(req);
        } else {
-               io_req_complete_post(req, res, cflags);
+               io_req_complete_post(req, res,
+                                       io_put_kbuf(req, IO_URING_F_UNLOCKED));
        }
 }
 
@@ -2871,7 +2912,8 @@ static void __io_complete_rw(struct io_kiocb *req, long res,
 {
        if (__io_complete_rw_common(req, res))
                return;
-       __io_req_complete(req, issue_flags, req->result, io_put_kbuf(req));
+       __io_req_complete(req, issue_flags, req->result,
+                               io_put_kbuf(req, issue_flags));
 }
 
 static void io_complete_rw(struct kiocb *kiocb, long res)
@@ -4518,13 +4560,11 @@ static int __io_remove_buffers(struct io_ring_ctx *ctx, struct io_buffer *buf,
 
                nxt = list_first_entry(&buf->list, struct io_buffer, list);
                list_del(&nxt->list);
-               kfree(nxt);
                if (++i == nbufs)
                        return i;
                cond_resched();
        }
        i++;
-       kfree(buf);
        xa_erase(&ctx->io_buffers, bgid);
 
        return i;
@@ -4590,17 +4630,63 @@ static int io_provide_buffers_prep(struct io_kiocb *req,
        return 0;
 }
 
-static int io_add_buffers(struct io_provide_buf *pbuf, struct io_buffer **head)
+static int io_refill_buffer_cache(struct io_ring_ctx *ctx)
+{
+       struct io_buffer *buf;
+       struct page *page;
+       int bufs_in_page;
+
+       /*
+        * Completions that don't happen inline (eg not under uring_lock) will
+        * add to ->io_buffers_comp. If we don't have any free buffers, check
+        * the completion list and splice those entries first.
+        */
+       if (!list_empty_careful(&ctx->io_buffers_comp)) {
+               spin_lock(&ctx->completion_lock);
+               if (!list_empty(&ctx->io_buffers_comp)) {
+                       list_splice_init(&ctx->io_buffers_comp,
+                                               &ctx->io_buffers_cache);
+                       spin_unlock(&ctx->completion_lock);
+                       return 0;
+               }
+               spin_unlock(&ctx->completion_lock);
+       }
+
+       /*
+        * No free buffers and no completion entries either. Allocate a new
+        * page worth of buffer entries and add those to our freelist.
+        */
+       page = alloc_page(GFP_KERNEL_ACCOUNT);
+       if (!page)
+               return -ENOMEM;
+
+       list_add(&page->lru, &ctx->io_buffers_pages);
+
+       buf = page_address(page);
+       bufs_in_page = PAGE_SIZE / sizeof(*buf);
+       while (bufs_in_page) {
+               list_add_tail(&buf->list, &ctx->io_buffers_cache);
+               buf++;
+               bufs_in_page--;
+       }
+
+       return 0;
+}
+
+static int io_add_buffers(struct io_ring_ctx *ctx, struct io_provide_buf *pbuf,
+                         struct io_buffer **head)
 {
        struct io_buffer *buf;
        u64 addr = pbuf->addr;
        int i, bid = pbuf->bid;
 
        for (i = 0; i < pbuf->nbufs; i++) {
-               buf = kmalloc(sizeof(*buf), GFP_KERNEL_ACCOUNT);
-               if (!buf)
+               if (list_empty(&ctx->io_buffers_cache) &&
+                   io_refill_buffer_cache(ctx))
                        break;
-
+               buf = list_first_entry(&ctx->io_buffers_cache, struct io_buffer,
+                                       list);
+               list_del(&buf->list);
                buf->addr = addr;
                buf->len = min_t(__u32, pbuf->len, MAX_RW_COUNT);
                buf->bid = bid;
@@ -4632,7 +4718,7 @@ static int io_provide_buffers(struct io_kiocb *req, unsigned int issue_flags)
 
        list = head = xa_load(&ctx->io_buffers, p->bgid);
 
-       ret = io_add_buffers(p, &head);
+       ret = io_add_buffers(ctx, p, &head);
        if (ret >= 0 && !list) {
                ret = xa_insert(&ctx->io_buffers, p->bgid, head, GFP_KERNEL);
                if (ret < 0)
@@ -5229,7 +5315,7 @@ static int io_recvmsg(struct io_kiocb *req, unsigned int issue_flags)
        if (kmsg->free_iov)
                kfree(kmsg->free_iov);
        req->flags &= ~REQ_F_NEED_CLEANUP;
-       __io_req_complete(req, issue_flags, ret, io_put_kbuf(req));
+       __io_req_complete(req, issue_flags, ret, io_put_kbuf(req, issue_flags));
        return 0;
 }
 
@@ -5284,7 +5370,8 @@ static int io_recv(struct io_kiocb *req, unsigned int issue_flags)
 out_free:
                req_set_fail(req);
        }
-       __io_req_complete(req, issue_flags, ret, io_put_kbuf(req));
+
+       __io_req_complete(req, issue_flags, ret, io_put_kbuf(req, issue_flags));
        return 0;
 }
 
@@ -6704,7 +6791,7 @@ fail:
 static void io_clean_op(struct io_kiocb *req)
 {
        if (req->flags & REQ_F_BUFFER_SELECTED)
-               io_put_kbuf(req);
+               io_put_kbuf_comp(req);
 
        if (req->flags & REQ_F_NEED_CLEANUP) {
                switch (req->opcode) {
@@ -9475,6 +9562,14 @@ static void io_destroy_buffers(struct io_ring_ctx *ctx)
 
        xa_for_each(&ctx->io_buffers, index, buf)
                __io_remove_buffers(ctx, buf, index, -1U);
+
+       while (!list_empty(&ctx->io_buffers_pages)) {
+               struct page *page;
+
+               page = list_first_entry(&ctx->io_buffers_pages, struct page, lru);
+               list_del_init(&page->lru);
+               __free_page(page);
+       }
 }
 
 static void io_req_caches_free(struct io_ring_ctx *ctx)