struct page *internal_pages[AIO_RING_PAGES];
};
-static inline unsigned aio_ring_avail(struct aio_ring_info *info,
- struct aio_ring *ring)
-{
- return (ring->head + info->nr - 1 - ring->tail) % info->nr;
-}
-
struct kioctx {
atomic_t users;
atomic_t dead;
atomic_t reqs_active;
struct list_head active_reqs; /* used for cancellation */
- /* sys_io_setup currently limits this to an unsigned int */
+ /*
+ * This is what userspace passed to io_setup(), it's not used for
+ * anything but counting against the global max_reqs quota.
+ *
+ * The real limit is ring->nr - 1, which will be larger (see
+ * aio_setup_ring())
+ */
unsigned max_reqs;
struct aio_ring_info ring_info;
*/
static void free_ioctx(struct kioctx *ctx)
{
+ struct aio_ring_info *info = &ctx->ring_info;
+ struct aio_ring *ring;
struct io_event res;
struct kiocb *req;
+ unsigned head, avail;
spin_lock_irq(&ctx->ctx_lock);
spin_unlock_irq(&ctx->ctx_lock);
- wait_event(ctx->wait, !atomic_read(&ctx->reqs_active));
+ ring = kmap_atomic(info->ring_pages[0]);
+ head = ring->head;
+ kunmap_atomic(ring);
+
+ while (atomic_read(&ctx->reqs_active) > 0) {
+ wait_event(ctx->wait, head != info->tail);
+
+ avail = (head <= info->tail ? info->tail : info->nr) - head;
+
+ atomic_sub(avail, &ctx->reqs_active);
+ head += avail;
+ head %= info->nr;
+ }
+
+ WARN_ON(atomic_read(&ctx->reqs_active) < 0);
aio_free_ring(ctx);
unsigned short allocated, to_alloc;
long avail;
struct kiocb *req, *n;
- struct aio_ring *ring;
to_alloc = min(batch->count, KIOCB_BATCH_SIZE);
for (allocated = 0; allocated < to_alloc; allocated++) {
goto out;
spin_lock_irq(&ctx->ctx_lock);
- ring = kmap_atomic(ctx->ring_info.ring_pages[0]);
- avail = aio_ring_avail(&ctx->ring_info, ring) -
- atomic_read(&ctx->reqs_active);
+ avail = ctx->ring_info.nr - atomic_read(&ctx->reqs_active) - 1;
BUG_ON(avail < 0);
if (avail < allocated) {
/* Trim back the number of requests. */
batch->count -= allocated;
atomic_add(allocated, &ctx->reqs_active);
- kunmap_atomic(ring);
spin_unlock_irq(&ctx->ctx_lock);
out:
* when the event got cancelled.
*/
if (unlikely(xchg(&iocb->ki_cancel,
- KIOCB_CANCELLED) == KIOCB_CANCELLED))
+ KIOCB_CANCELLED) == KIOCB_CANCELLED)) {
+ atomic_dec(&ctx->reqs_active);
+ /* Still need the wake_up in case free_ioctx is waiting */
goto put_rq;
+ }
/*
* Add a completion event to the ring buffer. Must be done holding
put_rq:
/* everything turned out well, dispose of the aiocb. */
aio_put_req(iocb);
- atomic_dec(&ctx->reqs_active);
/*
* We have to order our ring_info tail store above and test
flush_dcache_page(info->ring_pages[0]);
pr_debug("%li h%u t%u\n", ret, head, info->tail);
+
+ atomic_sub(ret, &ctx->reqs_active);
out:
mutex_unlock(&info->ring_lock);