io-wq: fix races around manager/worker creation and task exit
authorJens Axboe <axboe@kernel.dk>
Tue, 23 Feb 2021 22:34:06 +0000 (15:34 -0700)
committerJens Axboe <axboe@kernel.dk>
Wed, 24 Feb 2021 03:33:38 +0000 (20:33 -0700)
These races have always been there, they are just more apparent now that
we do early cancel of io-wq when the task exits.

Ensure that the io-wq manager sets task state correctly to not miss
wakeups for task creation. This is important if we get a wakeup after
having marked ourselves as TASK_INTERRUPTIBLE. If we do end up creating
workers, then we flip the state back to running, making the subsequent
schedule() a no-op. Also increment the wq ref count before forking the
thread, to avoid a use-after-free.

Signed-off-by: Jens Axboe <axboe@kernel.dk>
fs/io-wq.c

index b5ae808..0ce5057 100644 (file)
@@ -605,6 +605,8 @@ static bool create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index)
        struct io_worker *worker;
        pid_t pid;
 
+       __set_current_state(TASK_RUNNING);
+
        worker = kzalloc_node(sizeof(*worker), GFP_KERNEL, wqe->node);
        if (!worker)
                return false;
@@ -614,15 +616,18 @@ static bool create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index)
        worker->wqe = wqe;
        spin_lock_init(&worker->lock);
 
+       refcount_inc(&wq->refs);
+
        if (index == IO_WQ_ACCT_BOUND)
                pid = io_wq_fork_thread(task_thread_bound, worker);
        else
                pid = io_wq_fork_thread(task_thread_unbound, worker);
        if (pid < 0) {
+               if (refcount_dec_and_test(&wq->refs))
+                       complete(&wq->done);
                kfree(worker);
                return false;
        }
-       refcount_inc(&wq->refs);
        return true;
 }
 
@@ -668,6 +673,30 @@ static bool io_wq_worker_wake(struct io_worker *worker, void *data)
        return false;
 }
 
+static void io_wq_check_workers(struct io_wq *wq)
+{
+       int node;
+
+       for_each_node(node) {
+               struct io_wqe *wqe = wq->wqes[node];
+               bool fork_worker[2] = { false, false };
+
+               if (!node_online(node))
+                       continue;
+
+               raw_spin_lock_irq(&wqe->lock);
+               if (io_wqe_need_worker(wqe, IO_WQ_ACCT_BOUND))
+                       fork_worker[IO_WQ_ACCT_BOUND] = true;
+               if (io_wqe_need_worker(wqe, IO_WQ_ACCT_UNBOUND))
+                       fork_worker[IO_WQ_ACCT_UNBOUND] = true;
+               raw_spin_unlock_irq(&wqe->lock);
+               if (fork_worker[IO_WQ_ACCT_BOUND])
+                       create_io_worker(wq, wqe, IO_WQ_ACCT_BOUND);
+               if (fork_worker[IO_WQ_ACCT_UNBOUND])
+                       create_io_worker(wq, wqe, IO_WQ_ACCT_UNBOUND);
+       }
+}
+
 /*
  * Manager thread. Tasked with creating new workers, if we need them.
  */
@@ -684,30 +713,15 @@ static int io_wq_manager(void *data)
 
        complete(&wq->done);
 
-       while (!test_bit(IO_WQ_BIT_EXIT, &wq->state)) {
-               for_each_node(node) {
-                       struct io_wqe *wqe = wq->wqes[node];
-                       bool fork_worker[2] = { false, false };
-
-                       if (!node_online(node))
-                               continue;
-
-                       raw_spin_lock_irq(&wqe->lock);
-                       if (io_wqe_need_worker(wqe, IO_WQ_ACCT_BOUND))
-                               fork_worker[IO_WQ_ACCT_BOUND] = true;
-                       if (io_wqe_need_worker(wqe, IO_WQ_ACCT_UNBOUND))
-                               fork_worker[IO_WQ_ACCT_UNBOUND] = true;
-                       raw_spin_unlock_irq(&wqe->lock);
-                       if (fork_worker[IO_WQ_ACCT_BOUND])
-                               create_io_worker(wq, wqe, IO_WQ_ACCT_BOUND);
-                       if (fork_worker[IO_WQ_ACCT_UNBOUND])
-                               create_io_worker(wq, wqe, IO_WQ_ACCT_UNBOUND);
-               }
+       do {
                set_current_state(TASK_INTERRUPTIBLE);
+               io_wq_check_workers(wq);
                schedule_timeout(HZ);
                if (fatal_signal_pending(current))
                        set_bit(IO_WQ_BIT_EXIT, &wq->state);
-       }
+       } while (!test_bit(IO_WQ_BIT_EXIT, &wq->state));
+
+       io_wq_check_workers(wq);
 
        if (refcount_dec_and_test(&wq->refs)) {
                complete(&wq->done);
@@ -970,7 +984,6 @@ struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data)
        current->flags &= ~PF_IO_WORKER;
        if (ret >= 0) {
                wait_for_completion(&wq->done);
-               reinit_completion(&wq->done);
                return wq;
        }