};
#define IO_POLL_CANCEL_FLAG BIT(31)
-#define IO_POLL_REF_MASK GENMASK(30, 0)
+#define IO_POLL_RETRY_FLAG BIT(30)
+#define IO_POLL_REF_MASK GENMASK(29, 0)
+
+/*
+ * We usually have 1-2 refs taken, 128 is more than enough and we want to
+ * maximise the margin between this amount and the moment when it overflows.
+ */
+#define IO_POLL_REF_BIAS 128
#define IO_WQE_F_DOUBLE 1
return priv & IO_WQE_F_DOUBLE;
}
+static bool io_poll_get_ownership_slowpath(struct io_kiocb *req)
+{
+ int v;
+
+ /*
+ * poll_refs are already elevated and we don't have much hope for
+ * grabbing the ownership. Instead of incrementing set a retry flag
+ * to notify the loop that there might have been some change.
+ */
+ v = atomic_fetch_or(IO_POLL_RETRY_FLAG, &req->poll_refs);
+ if (v & IO_POLL_REF_MASK)
+ return false;
+ return !(atomic_fetch_inc(&req->poll_refs) & IO_POLL_REF_MASK);
+}
+
/*
* If refs part of ->poll_refs (see IO_POLL_REF_MASK) is 0, it's free. We can
* bump it and acquire ownership. It's disallowed to modify requests while not
*/
static inline bool io_poll_get_ownership(struct io_kiocb *req)
{
+ if (unlikely(atomic_read(&req->poll_refs) >= IO_POLL_REF_BIAS))
+ return io_poll_get_ownership_slowpath(req);
return !(atomic_fetch_inc(&req->poll_refs) & IO_POLL_REF_MASK);
}
IOU_POLL_DONE = 0,
IOU_POLL_NO_ACTION = 1,
IOU_POLL_REMOVE_POLL_USE_RES = 2,
+ IOU_POLL_REISSUE = 3,
};
/*
* All poll tw should go through this. Checks for poll events, manages
* references, does rewait, etc.
*
- * Returns a negative error on failure. IOU_POLL_NO_ACTION when no action require,
- * which is either spurious wakeup or multishot CQE is served.
- * IOU_POLL_DONE when it's done with the request, then the mask is stored in req->cqe.res.
- * IOU_POLL_REMOVE_POLL_USE_RES indicates to remove multishot poll and that the result
- * is stored in req->cqe.
+ * Returns a negative error on failure. IOU_POLL_NO_ACTION when no action
+ * require, which is either spurious wakeup or multishot CQE is served.
+ * IOU_POLL_DONE when it's done with the request, then the mask is stored in
+ * req->cqe.res. IOU_POLL_REMOVE_POLL_USE_RES indicates to remove multishot
+ * poll and that the result is stored in req->cqe.
*/
static int io_poll_check_events(struct io_kiocb *req, bool *locked)
{
struct io_ring_ctx *ctx = req->ctx;
- int v, ret;
+ int v;
/* req->task == current here, checking PF_EXITING is safe */
if (unlikely(req->task->flags & PF_EXITING))
*/
if ((v & IO_POLL_REF_MASK) != 1)
req->cqe.res = 0;
+ if (v & IO_POLL_RETRY_FLAG) {
+ req->cqe.res = 0;
+ /*
+ * We won't find new events that came in between
+ * vfs_poll and the ref put unless we clear the flag
+ * in advance.
+ */
+ atomic_andnot(IO_POLL_RETRY_FLAG, &req->poll_refs);
+ v &= ~IO_POLL_RETRY_FLAG;
+ }
/* the mask was stashed in __io_poll_execute */
if (!req->cqe.res) {
struct poll_table_struct pt = { ._key = req->apoll_events };
req->cqe.res = vfs_poll(req->file, &pt) & req->apoll_events;
+ /*
+ * We got woken with a mask, but someone else got to
+ * it first. The above vfs_poll() doesn't add us back
+ * to the waitqueue, so if we get nothing back, we
+ * should be safe and attempt a reissue.
+ */
+ if (unlikely(!req->cqe.res)) {
+ /* Multishot armed need not reissue */
+ if (!(req->apoll_events & EPOLLONESHOT))
+ continue;
+ return IOU_POLL_REISSUE;
+ }
}
-
- if ((unlikely(!req->cqe.res)))
- continue;
if (req->apoll_events & EPOLLONESHOT)
return IOU_POLL_DONE;
if (io_is_uring_fops(req->file))
return IOU_POLL_REMOVE_POLL_USE_RES;
}
} else {
- ret = io_poll_issue(req, locked);
+ int ret = io_poll_issue(req, locked);
if (ret == IOU_STOP_MULTISHOT)
return IOU_POLL_REMOVE_POLL_USE_RES;
if (ret < 0)
* Release all references, retry if someone tried to restart
* task_work while we were executing it.
*/
- } while (atomic_sub_return(v & IO_POLL_REF_MASK, &req->poll_refs));
+ } while (atomic_sub_return(v & IO_POLL_REF_MASK, &req->poll_refs) &
+ IO_POLL_REF_MASK);
return IOU_POLL_NO_ACTION;
}
if (ret == IOU_POLL_DONE) {
struct io_poll *poll = io_kiocb_to_cmd(req, struct io_poll);
req->cqe.res = mangle_poll(req->cqe.res & poll->events);
+ } else if (ret == IOU_POLL_REISSUE) {
+ io_poll_remove_entries(req);
+ io_poll_tw_hash_eject(req, locked);
+ io_req_task_submit(req, locked);
+ return;
} else if (ret != IOU_POLL_REMOVE_POLL_USE_RES) {
req->cqe.res = ret;
req_set_fail(req);
if (ret == IOU_POLL_REMOVE_POLL_USE_RES)
io_req_complete_post(req);
- else if (ret == IOU_POLL_DONE)
+ else if (ret == IOU_POLL_DONE || ret == IOU_POLL_REISSUE)
io_req_task_submit(req, locked);
else
io_req_complete_failed(req, ret);
return 0;
if (io_poll_get_ownership(req)) {
+ /*
+ * If we trigger a multishot poll off our own wakeup path,
+ * disable multishot as there is a circular dependency between
+ * CQ posting and triggering the event.
+ */
+ if (mask & EPOLL_URING_WAKE)
+ poll->events |= EPOLLONESHOT;
+
/* optional, saves extra locking for removal in tw handler */
if (mask && poll->events & EPOLLONESHOT) {
list_del_init(&poll->wait.entry);
return pt->owning || io_poll_get_ownership(req);
}
+static void io_poll_add_hash(struct io_kiocb *req)
+{
+ if (req->flags & REQ_F_HASH_LOCKED)
+ io_poll_req_insert_locked(req);
+ else
+ io_poll_req_insert(req);
+}
+
/*
* Returns 0 when it's handed over for polling. The caller owns the requests if
* it returns non-zero, but otherwise should not touch it. Negative values
unsigned issue_flags)
{
struct io_ring_ctx *ctx = req->ctx;
- int v;
INIT_HLIST_NODE(&req->hash_node);
req->work.cancel_seq = atomic_read(&ctx->cancel_seq);
if (mask &&
((poll->events & (EPOLLET|EPOLLONESHOT)) == (EPOLLET|EPOLLONESHOT))) {
- if (!io_poll_can_finish_inline(req, ipt))
+ if (!io_poll_can_finish_inline(req, ipt)) {
+ io_poll_add_hash(req);
return 0;
+ }
io_poll_remove_entries(req);
ipt->result_mask = mask;
/* no one else has access to the req, forget about the ref */
return 1;
}
- if (req->flags & REQ_F_HASH_LOCKED)
- io_poll_req_insert_locked(req);
- else
- io_poll_req_insert(req);
+ io_poll_add_hash(req);
if (mask && (poll->events & EPOLLET) &&
io_poll_can_finish_inline(req, ipt)) {
if (ipt->owning) {
/*
- * Release ownership. If someone tried to queue a tw while it was
- * locked, kick it off for them.
+ * Try to release ownership. If we see a change of state, e.g.
+ * poll was waken up, queue up a tw, it'll deal with it.
*/
- v = atomic_dec_return(&req->poll_refs);
- if (unlikely(v & IO_POLL_REF_MASK))
+ if (atomic_cmpxchg(&req->poll_refs, 1, 0) != 1)
__io_poll_execute(req, 0);
}
return 0;
__io_queue_proc(&apoll->poll, pt, head, &apoll->double_poll);
}
+/*
+ * We can't reliably detect loops in repeated poll triggers and issue
+ * subsequently failing. But rather than fail these immediately, allow a
+ * certain amount of retries before we give up. Given that this condition
+ * should _rarely_ trigger even once, we should be fine with a larger value.
+ */
+#define APOLL_MAX_RETRY 128
+
static struct async_poll *io_req_alloc_apoll(struct io_kiocb *req,
unsigned issue_flags)
{
if (req->flags & REQ_F_POLLED) {
apoll = req->apoll;
kfree(apoll->double_poll);
- } else if (!(issue_flags & IO_URING_F_UNLOCKED) &&
- (entry = io_alloc_cache_get(&ctx->apoll_cache)) != NULL) {
+ } else if (!(issue_flags & IO_URING_F_UNLOCKED)) {
+ entry = io_alloc_cache_get(&ctx->apoll_cache);
+ if (entry == NULL)
+ goto alloc_apoll;
apoll = container_of(entry, struct async_poll, cache);
+ apoll->poll.retries = APOLL_MAX_RETRY;
} else {
+alloc_apoll:
apoll = kmalloc(sizeof(*apoll), GFP_ATOMIC);
if (unlikely(!apoll))
return NULL;
+ apoll->poll.retries = APOLL_MAX_RETRY;
}
apoll->double_poll = NULL;
req->apoll = apoll;
+ if (unlikely(!--apoll->poll.retries))
+ return NULL;
return apoll;
}
return IO_APOLL_ABORTED;
if (!file_can_poll(req->file))
return IO_APOLL_ABORTED;
- if ((req->flags & (REQ_F_POLLED|REQ_F_PARTIAL_IO)) == REQ_F_POLLED)
- return IO_APOLL_ABORTED;
if (!(req->flags & REQ_F_APOLL_MULTISHOT))
mask |= EPOLLONESHOT;
apoll = io_req_alloc_apoll(req, issue_flags);
if (!apoll)
return IO_APOLL_ABORTED;
+ req->flags &= ~(REQ_F_SINGLE_POLL | REQ_F_DOUBLE_POLL);
req->flags |= REQ_F_POLLED;
ipt.pt._qproc = io_async_queue_proc;
struct io_hash_bucket *bucket;
struct io_kiocb *preq;
int ret2, ret = 0;
- bool locked;
+ bool locked = true;
+ io_ring_submit_lock(ctx, issue_flags);
preq = io_poll_find(ctx, true, &cd, &ctx->cancel_table, &bucket);
ret2 = io_poll_disarm(preq);
if (bucket)
goto out;
}
- io_ring_submit_lock(ctx, issue_flags);
preq = io_poll_find(ctx, true, &cd, &ctx->cancel_table_locked, &bucket);
ret2 = io_poll_disarm(preq);
if (bucket)
spin_unlock(&bucket->lock);
- io_ring_submit_unlock(ctx, issue_flags);
if (ret2) {
ret = ret2;
goto out;
if (poll_update->update_user_data)
preq->cqe.user_data = poll_update->new_user_data;
- ret2 = io_poll_add(preq, issue_flags);
+ ret2 = io_poll_add(preq, issue_flags & ~IO_URING_F_UNLOCKED);
/* successfully updated, don't complete poll request */
if (!ret2 || ret2 == -EIOCBQUEUED)
goto out;
req_set_fail(preq);
io_req_set_res(preq, -ECANCELED, 0);
- locked = !(issue_flags & IO_URING_F_UNLOCKED);
io_req_task_complete(preq, &locked);
out:
+ io_ring_submit_unlock(ctx, issue_flags);
if (ret < 0) {
req_set_fail(req);
return ret;