struct io_worker *worker;
pid_t pid;
+ __set_current_state(TASK_RUNNING);
+
worker = kzalloc_node(sizeof(*worker), GFP_KERNEL, wqe->node);
if (!worker)
return false;
worker->wqe = wqe;
spin_lock_init(&worker->lock);
+ refcount_inc(&wq->refs);
+
if (index == IO_WQ_ACCT_BOUND)
pid = io_wq_fork_thread(task_thread_bound, worker);
else
pid = io_wq_fork_thread(task_thread_unbound, worker);
if (pid < 0) {
+ if (refcount_dec_and_test(&wq->refs))
+ complete(&wq->done);
kfree(worker);
return false;
}
- refcount_inc(&wq->refs);
return true;
}
return false;
}
+static void io_wq_check_workers(struct io_wq *wq)
+{
+ int node;
+
+ for_each_node(node) {
+ struct io_wqe *wqe = wq->wqes[node];
+ bool fork_worker[2] = { false, false };
+
+ if (!node_online(node))
+ continue;
+
+ raw_spin_lock_irq(&wqe->lock);
+ if (io_wqe_need_worker(wqe, IO_WQ_ACCT_BOUND))
+ fork_worker[IO_WQ_ACCT_BOUND] = true;
+ if (io_wqe_need_worker(wqe, IO_WQ_ACCT_UNBOUND))
+ fork_worker[IO_WQ_ACCT_UNBOUND] = true;
+ raw_spin_unlock_irq(&wqe->lock);
+ if (fork_worker[IO_WQ_ACCT_BOUND])
+ create_io_worker(wq, wqe, IO_WQ_ACCT_BOUND);
+ if (fork_worker[IO_WQ_ACCT_UNBOUND])
+ create_io_worker(wq, wqe, IO_WQ_ACCT_UNBOUND);
+ }
+}
+
/*
* Manager thread. Tasked with creating new workers, if we need them.
*/
complete(&wq->done);
- while (!test_bit(IO_WQ_BIT_EXIT, &wq->state)) {
- for_each_node(node) {
- struct io_wqe *wqe = wq->wqes[node];
- bool fork_worker[2] = { false, false };
-
- if (!node_online(node))
- continue;
-
- raw_spin_lock_irq(&wqe->lock);
- if (io_wqe_need_worker(wqe, IO_WQ_ACCT_BOUND))
- fork_worker[IO_WQ_ACCT_BOUND] = true;
- if (io_wqe_need_worker(wqe, IO_WQ_ACCT_UNBOUND))
- fork_worker[IO_WQ_ACCT_UNBOUND] = true;
- raw_spin_unlock_irq(&wqe->lock);
- if (fork_worker[IO_WQ_ACCT_BOUND])
- create_io_worker(wq, wqe, IO_WQ_ACCT_BOUND);
- if (fork_worker[IO_WQ_ACCT_UNBOUND])
- create_io_worker(wq, wqe, IO_WQ_ACCT_UNBOUND);
- }
+ do {
set_current_state(TASK_INTERRUPTIBLE);
+ io_wq_check_workers(wq);
schedule_timeout(HZ);
if (fatal_signal_pending(current))
set_bit(IO_WQ_BIT_EXIT, &wq->state);
- }
+ } while (!test_bit(IO_WQ_BIT_EXIT, &wq->state));
+
+ io_wq_check_workers(wq);
if (refcount_dec_and_test(&wq->refs)) {
complete(&wq->done);
current->flags &= ~PF_IO_WORKER;
if (ret >= 0) {
wait_for_completion(&wq->done);
- reinit_completion(&wq->done);
return wq;
}