#define IO_PLUG_THRESHOLD 2
#define IO_IOPOLL_BATCH 8
+struct io_submit_state {
+ struct blk_plug plug;
+
+ /*
+ * File reference cache
+ */
+ struct file *file;
+ unsigned int fd;
+ unsigned int has_refs;
+ unsigned int used_refs;
+ unsigned int ios_left;
+};
+
static struct kmem_cache *req_cachep;
static const struct file_operations io_uring_fops;
struct list_head *done)
{
void *reqs[IO_IOPOLL_BATCH];
+ int file_count, to_free;
+ struct file *file = NULL;
struct io_kiocb *req;
- int to_free = 0;
+ file_count = to_free = 0;
while (!list_empty(done)) {
req = list_first_entry(done, struct io_kiocb, list);
list_del(&req->list);
reqs[to_free++] = req;
(*nr_events)++;
- fput(req->rw.ki_filp);
+ /*
+ * Batched puts of the same file, to avoid dirtying the
+ * file usage count multiple times, if avoidable.
+ */
+ if (!file) {
+ file = req->rw.ki_filp;
+ file_count = 1;
+ } else if (file == req->rw.ki_filp) {
+ file_count++;
+ } else {
+ fput_many(file, file_count);
+ file = req->rw.ki_filp;
+ file_count = 1;
+ }
+
if (to_free == ARRAY_SIZE(reqs))
io_free_req_many(ctx, reqs, &to_free);
}
io_commit_cqring(ctx);
+ if (file)
+ fput_many(file, file_count);
io_free_req_many(ctx, reqs, &to_free);
}
list_add_tail(&req->list, &ctx->poll_list);
}
+static void io_file_put(struct io_submit_state *state, struct file *file)
+{
+ if (!state) {
+ fput(file);
+ } else if (state->file) {
+ int diff = state->has_refs - state->used_refs;
+
+ if (diff)
+ fput_many(state->file, diff);
+ state->file = NULL;
+ }
+}
+
+/*
+ * Get as many references to a file as we have IOs left in this submission,
+ * assuming most submissions are for one file, or at least that each file
+ * has more than one submission.
+ */
+static struct file *io_file_get(struct io_submit_state *state, int fd)
+{
+ if (!state)
+ return fget(fd);
+
+ if (state->file) {
+ if (state->fd == fd) {
+ state->used_refs++;
+ state->ios_left--;
+ return state->file;
+ }
+ io_file_put(state, NULL);
+ }
+ state->file = fget_many(fd, state->ios_left);
+ if (!state->file)
+ return NULL;
+
+ state->fd = fd;
+ state->has_refs = state->ios_left;
+ state->used_refs = 1;
+ state->ios_left--;
+ return state->file;
+}
+
/*
* If we tracked the file through the SCM inflight mechanism, we could support
* any file. For now, just ensure that anything potentially problematic is done
}
static int io_prep_rw(struct io_kiocb *req, const struct io_uring_sqe *sqe,
- bool force_nonblock)
+ bool force_nonblock, struct io_submit_state *state)
{
struct io_ring_ctx *ctx = req->ctx;
struct kiocb *kiocb = &req->rw;
return 0;
fd = READ_ONCE(sqe->fd);
- kiocb->ki_filp = fget(fd);
+ kiocb->ki_filp = io_file_get(state, fd);
if (unlikely(!kiocb->ki_filp))
return -EBADF;
if (force_nonblock && !io_file_supports_async(kiocb->ki_filp))
}
return 0;
out_fput:
- fput(kiocb->ki_filp);
+ /* in case of error, we didn't use this file reference. drop it. */
+ if (state)
+ state->used_refs--;
+ io_file_put(state, kiocb->ki_filp);
return ret;
}
}
static ssize_t io_read(struct io_kiocb *req, const struct sqe_submit *s,
- bool force_nonblock)
+ bool force_nonblock, struct io_submit_state *state)
{
struct iovec inline_vecs[UIO_FASTIOV], *iovec = inline_vecs;
struct kiocb *kiocb = &req->rw;
struct file *file;
ssize_t ret;
- ret = io_prep_rw(req, s->sqe, force_nonblock);
+ ret = io_prep_rw(req, s->sqe, force_nonblock, state);
if (ret)
return ret;
file = kiocb->ki_filp;
}
static ssize_t io_write(struct io_kiocb *req, const struct sqe_submit *s,
- bool force_nonblock)
+ bool force_nonblock, struct io_submit_state *state)
{
struct iovec inline_vecs[UIO_FASTIOV], *iovec = inline_vecs;
struct kiocb *kiocb = &req->rw;
struct file *file;
ssize_t ret;
- ret = io_prep_rw(req, s->sqe, force_nonblock);
+ ret = io_prep_rw(req, s->sqe, force_nonblock, state);
if (ret)
return ret;
/* Hold on to the file for -EAGAIN */
}
static int __io_submit_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req,
- const struct sqe_submit *s, bool force_nonblock)
+ const struct sqe_submit *s, bool force_nonblock,
+ struct io_submit_state *state)
{
ssize_t ret;
int opcode;
ret = io_nop(req, req->user_data);
break;
case IORING_OP_READV:
- ret = io_read(req, s, force_nonblock);
+ ret = io_read(req, s, force_nonblock, state);
break;
case IORING_OP_WRITEV:
- ret = io_write(req, s, force_nonblock);
+ ret = io_write(req, s, force_nonblock, state);
break;
case IORING_OP_FSYNC:
ret = io_fsync(req, s->sqe, force_nonblock);
s->needs_lock = true;
do {
- ret = __io_submit_sqe(ctx, req, s, false);
+ ret = __io_submit_sqe(ctx, req, s, false, NULL);
/*
* We can get EAGAIN for polled IO even though we're forcing
* a sync submission from here, since we can't wait for
kfree(sqe);
}
-static int io_submit_sqe(struct io_ring_ctx *ctx, struct sqe_submit *s)
+static int io_submit_sqe(struct io_ring_ctx *ctx, struct sqe_submit *s,
+ struct io_submit_state *state)
{
struct io_kiocb *req;
ssize_t ret;
req->rw.ki_filp = NULL;
- ret = __io_submit_sqe(ctx, req, s, true);
+ ret = __io_submit_sqe(ctx, req, s, true, state);
if (ret == -EAGAIN) {
struct io_uring_sqe *sqe_copy;
return ret;
}
+/*
+ * Batched submission is done, ensure local IO is flushed out.
+ */
+static void io_submit_state_end(struct io_submit_state *state)
+{
+ blk_finish_plug(&state->plug);
+ io_file_put(state, NULL);
+}
+
+/*
+ * Start submission side cache.
+ */
+static void io_submit_state_start(struct io_submit_state *state,
+ struct io_ring_ctx *ctx, unsigned max_ios)
+{
+ blk_start_plug(&state->plug);
+ state->file = NULL;
+ state->ios_left = max_ios;
+}
+
static void io_commit_sqring(struct io_ring_ctx *ctx)
{
struct io_sq_ring *ring = ctx->sq_ring;
static int io_ring_submit(struct io_ring_ctx *ctx, unsigned int to_submit)
{
+ struct io_submit_state state, *statep = NULL;
int i, ret = 0, submit = 0;
- struct blk_plug plug;
- if (to_submit > IO_PLUG_THRESHOLD)
- blk_start_plug(&plug);
+ if (to_submit > IO_PLUG_THRESHOLD) {
+ io_submit_state_start(&state, ctx, to_submit);
+ statep = &state;
+ }
for (i = 0; i < to_submit; i++) {
struct sqe_submit s;
s.has_user = true;
s.needs_lock = false;
- ret = io_submit_sqe(ctx, &s);
+ ret = io_submit_sqe(ctx, &s, statep);
if (ret) {
io_drop_sqring(ctx);
break;
}
io_commit_sqring(ctx);
- if (to_submit > IO_PLUG_THRESHOLD)
- blk_finish_plug(&plug);
+ if (statep)
+ io_submit_state_end(statep);
return submit ? submit : ret;
}