io_uring: One wqe per wq
[platform/kernel/linux-starfive.git] / io_uring / io-wq.c
1 // SPDX-License-Identifier: GPL-2.0
2 /*
3  * Basic worker thread pool for io_uring
4  *
5  * Copyright (C) 2019 Jens Axboe
6  *
7  */
8 #include <linux/kernel.h>
9 #include <linux/init.h>
10 #include <linux/errno.h>
11 #include <linux/sched/signal.h>
12 #include <linux/percpu.h>
13 #include <linux/slab.h>
14 #include <linux/rculist_nulls.h>
15 #include <linux/cpu.h>
16 #include <linux/task_work.h>
17 #include <linux/audit.h>
18 #include <linux/mmu_context.h>
19 #include <uapi/linux/io_uring.h>
20
21 #include "io-wq.h"
22 #include "slist.h"
23 #include "io_uring.h"
24
25 #define WORKER_IDLE_TIMEOUT     (5 * HZ)
26
27 enum {
28         IO_WORKER_F_UP          = 1,    /* up and active */
29         IO_WORKER_F_RUNNING     = 2,    /* account as running */
30         IO_WORKER_F_FREE        = 4,    /* worker on free list */
31         IO_WORKER_F_BOUND       = 8,    /* is doing bounded work */
32 };
33
34 enum {
35         IO_WQ_BIT_EXIT          = 0,    /* wq exiting */
36 };
37
38 enum {
39         IO_ACCT_STALLED_BIT     = 0,    /* stalled on hash */
40 };
41
42 /*
43  * One for each thread in a wqe pool
44  */
45 struct io_worker {
46         refcount_t ref;
47         unsigned flags;
48         struct hlist_nulls_node nulls_node;
49         struct list_head all_list;
50         struct task_struct *task;
51         struct io_wqe *wqe;
52
53         struct io_wq_work *cur_work;
54         struct io_wq_work *next_work;
55         raw_spinlock_t lock;
56
57         struct completion ref_done;
58
59         unsigned long create_state;
60         struct callback_head create_work;
61         int create_index;
62
63         union {
64                 struct rcu_head rcu;
65                 struct work_struct work;
66         };
67 };
68
69 #if BITS_PER_LONG == 64
70 #define IO_WQ_HASH_ORDER        6
71 #else
72 #define IO_WQ_HASH_ORDER        5
73 #endif
74
75 #define IO_WQ_NR_HASH_BUCKETS   (1u << IO_WQ_HASH_ORDER)
76
77 struct io_wqe_acct {
78         unsigned nr_workers;
79         unsigned max_workers;
80         int index;
81         atomic_t nr_running;
82         raw_spinlock_t lock;
83         struct io_wq_work_list work_list;
84         unsigned long flags;
85 };
86
87 enum {
88         IO_WQ_ACCT_BOUND,
89         IO_WQ_ACCT_UNBOUND,
90         IO_WQ_ACCT_NR,
91 };
92
93 /*
94  * Per-node worker thread pool
95  */
96 struct io_wqe {
97         raw_spinlock_t lock;
98         struct io_wqe_acct acct[IO_WQ_ACCT_NR];
99
100         struct hlist_nulls_head free_list;
101         struct list_head all_list;
102
103         struct wait_queue_entry wait;
104
105         struct io_wq *wq;
106         struct io_wq_work *hash_tail[IO_WQ_NR_HASH_BUCKETS];
107
108         cpumask_var_t cpu_mask;
109 };
110
111 /*
112  * Per io_wq state
113   */
114 struct io_wq {
115         unsigned long state;
116
117         free_work_fn *free_work;
118         io_wq_work_fn *do_work;
119
120         struct io_wq_hash *hash;
121
122         atomic_t worker_refs;
123         struct completion worker_done;
124
125         struct hlist_node cpuhp_node;
126
127         struct task_struct *task;
128
129         struct io_wqe wqe;
130 };
131
132 static enum cpuhp_state io_wq_online;
133
134 struct io_cb_cancel_data {
135         work_cancel_fn *fn;
136         void *data;
137         int nr_running;
138         int nr_pending;
139         bool cancel_all;
140 };
141
142 static bool create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index);
143 static void io_wqe_dec_running(struct io_worker *worker);
144 static bool io_acct_cancel_pending_work(struct io_wqe *wqe,
145                                         struct io_wqe_acct *acct,
146                                         struct io_cb_cancel_data *match);
147 static void create_worker_cb(struct callback_head *cb);
148 static void io_wq_cancel_tw_create(struct io_wq *wq);
149
150 static bool io_worker_get(struct io_worker *worker)
151 {
152         return refcount_inc_not_zero(&worker->ref);
153 }
154
155 static void io_worker_release(struct io_worker *worker)
156 {
157         if (refcount_dec_and_test(&worker->ref))
158                 complete(&worker->ref_done);
159 }
160
161 static inline struct io_wqe_acct *io_get_acct(struct io_wqe *wqe, bool bound)
162 {
163         return &wqe->acct[bound ? IO_WQ_ACCT_BOUND : IO_WQ_ACCT_UNBOUND];
164 }
165
166 static inline struct io_wqe_acct *io_work_get_acct(struct io_wqe *wqe,
167                                                    struct io_wq_work *work)
168 {
169         return io_get_acct(wqe, !(work->flags & IO_WQ_WORK_UNBOUND));
170 }
171
172 static inline struct io_wqe_acct *io_wqe_get_acct(struct io_worker *worker)
173 {
174         return io_get_acct(worker->wqe, worker->flags & IO_WORKER_F_BOUND);
175 }
176
177 static void io_worker_ref_put(struct io_wq *wq)
178 {
179         if (atomic_dec_and_test(&wq->worker_refs))
180                 complete(&wq->worker_done);
181 }
182
183 static void io_worker_cancel_cb(struct io_worker *worker)
184 {
185         struct io_wqe_acct *acct = io_wqe_get_acct(worker);
186         struct io_wqe *wqe = worker->wqe;
187         struct io_wq *wq = wqe->wq;
188
189         atomic_dec(&acct->nr_running);
190         raw_spin_lock(&worker->wqe->lock);
191         acct->nr_workers--;
192         raw_spin_unlock(&worker->wqe->lock);
193         io_worker_ref_put(wq);
194         clear_bit_unlock(0, &worker->create_state);
195         io_worker_release(worker);
196 }
197
198 static bool io_task_worker_match(struct callback_head *cb, void *data)
199 {
200         struct io_worker *worker;
201
202         if (cb->func != create_worker_cb)
203                 return false;
204         worker = container_of(cb, struct io_worker, create_work);
205         return worker == data;
206 }
207
208 static void io_worker_exit(struct io_worker *worker)
209 {
210         struct io_wqe *wqe = worker->wqe;
211         struct io_wq *wq = wqe->wq;
212
213         while (1) {
214                 struct callback_head *cb = task_work_cancel_match(wq->task,
215                                                 io_task_worker_match, worker);
216
217                 if (!cb)
218                         break;
219                 io_worker_cancel_cb(worker);
220         }
221
222         io_worker_release(worker);
223         wait_for_completion(&worker->ref_done);
224
225         raw_spin_lock(&wqe->lock);
226         if (worker->flags & IO_WORKER_F_FREE)
227                 hlist_nulls_del_rcu(&worker->nulls_node);
228         list_del_rcu(&worker->all_list);
229         raw_spin_unlock(&wqe->lock);
230         io_wqe_dec_running(worker);
231         worker->flags = 0;
232         preempt_disable();
233         current->flags &= ~PF_IO_WORKER;
234         preempt_enable();
235
236         kfree_rcu(worker, rcu);
237         io_worker_ref_put(wqe->wq);
238         do_exit(0);
239 }
240
241 static inline bool io_acct_run_queue(struct io_wqe_acct *acct)
242 {
243         bool ret = false;
244
245         raw_spin_lock(&acct->lock);
246         if (!wq_list_empty(&acct->work_list) &&
247             !test_bit(IO_ACCT_STALLED_BIT, &acct->flags))
248                 ret = true;
249         raw_spin_unlock(&acct->lock);
250
251         return ret;
252 }
253
254 /*
255  * Check head of free list for an available worker. If one isn't available,
256  * caller must create one.
257  */
258 static bool io_wqe_activate_free_worker(struct io_wqe *wqe,
259                                         struct io_wqe_acct *acct)
260         __must_hold(RCU)
261 {
262         struct hlist_nulls_node *n;
263         struct io_worker *worker;
264
265         /*
266          * Iterate free_list and see if we can find an idle worker to
267          * activate. If a given worker is on the free_list but in the process
268          * of exiting, keep trying.
269          */
270         hlist_nulls_for_each_entry_rcu(worker, n, &wqe->free_list, nulls_node) {
271                 if (!io_worker_get(worker))
272                         continue;
273                 if (io_wqe_get_acct(worker) != acct) {
274                         io_worker_release(worker);
275                         continue;
276                 }
277                 if (wake_up_process(worker->task)) {
278                         io_worker_release(worker);
279                         return true;
280                 }
281                 io_worker_release(worker);
282         }
283
284         return false;
285 }
286
287 /*
288  * We need a worker. If we find a free one, we're good. If not, and we're
289  * below the max number of workers, create one.
290  */
291 static bool io_wqe_create_worker(struct io_wqe *wqe, struct io_wqe_acct *acct)
292 {
293         /*
294          * Most likely an attempt to queue unbounded work on an io_wq that
295          * wasn't setup with any unbounded workers.
296          */
297         if (unlikely(!acct->max_workers))
298                 pr_warn_once("io-wq is not configured for unbound workers");
299
300         raw_spin_lock(&wqe->lock);
301         if (acct->nr_workers >= acct->max_workers) {
302                 raw_spin_unlock(&wqe->lock);
303                 return true;
304         }
305         acct->nr_workers++;
306         raw_spin_unlock(&wqe->lock);
307         atomic_inc(&acct->nr_running);
308         atomic_inc(&wqe->wq->worker_refs);
309         return create_io_worker(wqe->wq, wqe, acct->index);
310 }
311
312 static void io_wqe_inc_running(struct io_worker *worker)
313 {
314         struct io_wqe_acct *acct = io_wqe_get_acct(worker);
315
316         atomic_inc(&acct->nr_running);
317 }
318
319 static void create_worker_cb(struct callback_head *cb)
320 {
321         struct io_worker *worker;
322         struct io_wq *wq;
323         struct io_wqe *wqe;
324         struct io_wqe_acct *acct;
325         bool do_create = false;
326
327         worker = container_of(cb, struct io_worker, create_work);
328         wqe = worker->wqe;
329         wq = wqe->wq;
330         acct = &wqe->acct[worker->create_index];
331         raw_spin_lock(&wqe->lock);
332         if (acct->nr_workers < acct->max_workers) {
333                 acct->nr_workers++;
334                 do_create = true;
335         }
336         raw_spin_unlock(&wqe->lock);
337         if (do_create) {
338                 create_io_worker(wq, wqe, worker->create_index);
339         } else {
340                 atomic_dec(&acct->nr_running);
341                 io_worker_ref_put(wq);
342         }
343         clear_bit_unlock(0, &worker->create_state);
344         io_worker_release(worker);
345 }
346
347 static bool io_queue_worker_create(struct io_worker *worker,
348                                    struct io_wqe_acct *acct,
349                                    task_work_func_t func)
350 {
351         struct io_wqe *wqe = worker->wqe;
352         struct io_wq *wq = wqe->wq;
353
354         /* raced with exit, just ignore create call */
355         if (test_bit(IO_WQ_BIT_EXIT, &wq->state))
356                 goto fail;
357         if (!io_worker_get(worker))
358                 goto fail;
359         /*
360          * create_state manages ownership of create_work/index. We should
361          * only need one entry per worker, as the worker going to sleep
362          * will trigger the condition, and waking will clear it once it
363          * runs the task_work.
364          */
365         if (test_bit(0, &worker->create_state) ||
366             test_and_set_bit_lock(0, &worker->create_state))
367                 goto fail_release;
368
369         atomic_inc(&wq->worker_refs);
370         init_task_work(&worker->create_work, func);
371         worker->create_index = acct->index;
372         if (!task_work_add(wq->task, &worker->create_work, TWA_SIGNAL)) {
373                 /*
374                  * EXIT may have been set after checking it above, check after
375                  * adding the task_work and remove any creation item if it is
376                  * now set. wq exit does that too, but we can have added this
377                  * work item after we canceled in io_wq_exit_workers().
378                  */
379                 if (test_bit(IO_WQ_BIT_EXIT, &wq->state))
380                         io_wq_cancel_tw_create(wq);
381                 io_worker_ref_put(wq);
382                 return true;
383         }
384         io_worker_ref_put(wq);
385         clear_bit_unlock(0, &worker->create_state);
386 fail_release:
387         io_worker_release(worker);
388 fail:
389         atomic_dec(&acct->nr_running);
390         io_worker_ref_put(wq);
391         return false;
392 }
393
394 static void io_wqe_dec_running(struct io_worker *worker)
395 {
396         struct io_wqe_acct *acct = io_wqe_get_acct(worker);
397         struct io_wqe *wqe = worker->wqe;
398
399         if (!(worker->flags & IO_WORKER_F_UP))
400                 return;
401
402         if (!atomic_dec_and_test(&acct->nr_running))
403                 return;
404         if (!io_acct_run_queue(acct))
405                 return;
406
407         atomic_inc(&acct->nr_running);
408         atomic_inc(&wqe->wq->worker_refs);
409         io_queue_worker_create(worker, acct, create_worker_cb);
410 }
411
412 /*
413  * Worker will start processing some work. Move it to the busy list, if
414  * it's currently on the freelist
415  */
416 static void __io_worker_busy(struct io_wqe *wqe, struct io_worker *worker)
417 {
418         if (worker->flags & IO_WORKER_F_FREE) {
419                 worker->flags &= ~IO_WORKER_F_FREE;
420                 raw_spin_lock(&wqe->lock);
421                 hlist_nulls_del_init_rcu(&worker->nulls_node);
422                 raw_spin_unlock(&wqe->lock);
423         }
424 }
425
426 /*
427  * No work, worker going to sleep. Move to freelist, and unuse mm if we
428  * have one attached. Dropping the mm may potentially sleep, so we drop
429  * the lock in that case and return success. Since the caller has to
430  * retry the loop in that case (we changed task state), we don't regrab
431  * the lock if we return success.
432  */
433 static void __io_worker_idle(struct io_wqe *wqe, struct io_worker *worker)
434         __must_hold(wqe->lock)
435 {
436         if (!(worker->flags & IO_WORKER_F_FREE)) {
437                 worker->flags |= IO_WORKER_F_FREE;
438                 hlist_nulls_add_head_rcu(&worker->nulls_node, &wqe->free_list);
439         }
440 }
441
442 static inline unsigned int io_get_work_hash(struct io_wq_work *work)
443 {
444         return work->flags >> IO_WQ_HASH_SHIFT;
445 }
446
447 static bool io_wait_on_hash(struct io_wqe *wqe, unsigned int hash)
448 {
449         struct io_wq *wq = wqe->wq;
450         bool ret = false;
451
452         spin_lock_irq(&wq->hash->wait.lock);
453         if (list_empty(&wqe->wait.entry)) {
454                 __add_wait_queue(&wq->hash->wait, &wqe->wait);
455                 if (!test_bit(hash, &wq->hash->map)) {
456                         __set_current_state(TASK_RUNNING);
457                         list_del_init(&wqe->wait.entry);
458                         ret = true;
459                 }
460         }
461         spin_unlock_irq(&wq->hash->wait.lock);
462         return ret;
463 }
464
465 static struct io_wq_work *io_get_next_work(struct io_wqe_acct *acct,
466                                            struct io_worker *worker)
467         __must_hold(acct->lock)
468 {
469         struct io_wq_work_node *node, *prev;
470         struct io_wq_work *work, *tail;
471         unsigned int stall_hash = -1U;
472         struct io_wqe *wqe = worker->wqe;
473
474         wq_list_for_each(node, prev, &acct->work_list) {
475                 unsigned int hash;
476
477                 work = container_of(node, struct io_wq_work, list);
478
479                 /* not hashed, can run anytime */
480                 if (!io_wq_is_hashed(work)) {
481                         wq_list_del(&acct->work_list, node, prev);
482                         return work;
483                 }
484
485                 hash = io_get_work_hash(work);
486                 /* all items with this hash lie in [work, tail] */
487                 tail = wqe->hash_tail[hash];
488
489                 /* hashed, can run if not already running */
490                 if (!test_and_set_bit(hash, &wqe->wq->hash->map)) {
491                         wqe->hash_tail[hash] = NULL;
492                         wq_list_cut(&acct->work_list, &tail->list, prev);
493                         return work;
494                 }
495                 if (stall_hash == -1U)
496                         stall_hash = hash;
497                 /* fast forward to a next hash, for-each will fix up @prev */
498                 node = &tail->list;
499         }
500
501         if (stall_hash != -1U) {
502                 bool unstalled;
503
504                 /*
505                  * Set this before dropping the lock to avoid racing with new
506                  * work being added and clearing the stalled bit.
507                  */
508                 set_bit(IO_ACCT_STALLED_BIT, &acct->flags);
509                 raw_spin_unlock(&acct->lock);
510                 unstalled = io_wait_on_hash(wqe, stall_hash);
511                 raw_spin_lock(&acct->lock);
512                 if (unstalled) {
513                         clear_bit(IO_ACCT_STALLED_BIT, &acct->flags);
514                         if (wq_has_sleeper(&wqe->wq->hash->wait))
515                                 wake_up(&wqe->wq->hash->wait);
516                 }
517         }
518
519         return NULL;
520 }
521
522 static void io_assign_current_work(struct io_worker *worker,
523                                    struct io_wq_work *work)
524 {
525         if (work) {
526                 io_run_task_work();
527                 cond_resched();
528         }
529
530         raw_spin_lock(&worker->lock);
531         worker->cur_work = work;
532         worker->next_work = NULL;
533         raw_spin_unlock(&worker->lock);
534 }
535
536 static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work);
537
538 static void io_worker_handle_work(struct io_worker *worker)
539 {
540         struct io_wqe_acct *acct = io_wqe_get_acct(worker);
541         struct io_wqe *wqe = worker->wqe;
542         struct io_wq *wq = wqe->wq;
543         bool do_kill = test_bit(IO_WQ_BIT_EXIT, &wq->state);
544
545         do {
546                 struct io_wq_work *work;
547
548                 /*
549                  * If we got some work, mark us as busy. If we didn't, but
550                  * the list isn't empty, it means we stalled on hashed work.
551                  * Mark us stalled so we don't keep looking for work when we
552                  * can't make progress, any work completion or insertion will
553                  * clear the stalled flag.
554                  */
555                 raw_spin_lock(&acct->lock);
556                 work = io_get_next_work(acct, worker);
557                 raw_spin_unlock(&acct->lock);
558                 if (work) {
559                         __io_worker_busy(wqe, worker);
560
561                         /*
562                          * Make sure cancelation can find this, even before
563                          * it becomes the active work. That avoids a window
564                          * where the work has been removed from our general
565                          * work list, but isn't yet discoverable as the
566                          * current work item for this worker.
567                          */
568                         raw_spin_lock(&worker->lock);
569                         worker->next_work = work;
570                         raw_spin_unlock(&worker->lock);
571                 } else {
572                         break;
573                 }
574                 io_assign_current_work(worker, work);
575                 __set_current_state(TASK_RUNNING);
576
577                 /* handle a whole dependent link */
578                 do {
579                         struct io_wq_work *next_hashed, *linked;
580                         unsigned int hash = io_get_work_hash(work);
581
582                         next_hashed = wq_next_work(work);
583
584                         if (unlikely(do_kill) && (work->flags & IO_WQ_WORK_UNBOUND))
585                                 work->flags |= IO_WQ_WORK_CANCEL;
586                         wq->do_work(work);
587                         io_assign_current_work(worker, NULL);
588
589                         linked = wq->free_work(work);
590                         work = next_hashed;
591                         if (!work && linked && !io_wq_is_hashed(linked)) {
592                                 work = linked;
593                                 linked = NULL;
594                         }
595                         io_assign_current_work(worker, work);
596                         if (linked)
597                                 io_wqe_enqueue(wqe, linked);
598
599                         if (hash != -1U && !next_hashed) {
600                                 /* serialize hash clear with wake_up() */
601                                 spin_lock_irq(&wq->hash->wait.lock);
602                                 clear_bit(hash, &wq->hash->map);
603                                 clear_bit(IO_ACCT_STALLED_BIT, &acct->flags);
604                                 spin_unlock_irq(&wq->hash->wait.lock);
605                                 if (wq_has_sleeper(&wq->hash->wait))
606                                         wake_up(&wq->hash->wait);
607                         }
608                 } while (work);
609         } while (1);
610 }
611
612 static int io_wqe_worker(void *data)
613 {
614         struct io_worker *worker = data;
615         struct io_wqe_acct *acct = io_wqe_get_acct(worker);
616         struct io_wqe *wqe = worker->wqe;
617         struct io_wq *wq = wqe->wq;
618         bool exit_mask = false, last_timeout = false;
619         char buf[TASK_COMM_LEN];
620
621         worker->flags |= (IO_WORKER_F_UP | IO_WORKER_F_RUNNING);
622
623         snprintf(buf, sizeof(buf), "iou-wrk-%d", wq->task->pid);
624         set_task_comm(current, buf);
625
626         while (!test_bit(IO_WQ_BIT_EXIT, &wq->state)) {
627                 long ret;
628
629                 set_current_state(TASK_INTERRUPTIBLE);
630                 while (io_acct_run_queue(acct))
631                         io_worker_handle_work(worker);
632
633                 raw_spin_lock(&wqe->lock);
634                 /*
635                  * Last sleep timed out. Exit if we're not the last worker,
636                  * or if someone modified our affinity.
637                  */
638                 if (last_timeout && (exit_mask || acct->nr_workers > 1)) {
639                         acct->nr_workers--;
640                         raw_spin_unlock(&wqe->lock);
641                         __set_current_state(TASK_RUNNING);
642                         break;
643                 }
644                 last_timeout = false;
645                 __io_worker_idle(wqe, worker);
646                 raw_spin_unlock(&wqe->lock);
647                 if (io_run_task_work())
648                         continue;
649                 ret = schedule_timeout(WORKER_IDLE_TIMEOUT);
650                 if (signal_pending(current)) {
651                         struct ksignal ksig;
652
653                         if (!get_signal(&ksig))
654                                 continue;
655                         break;
656                 }
657                 if (!ret) {
658                         last_timeout = true;
659                         exit_mask = !cpumask_test_cpu(raw_smp_processor_id(),
660                                                         wqe->cpu_mask);
661                 }
662         }
663
664         if (test_bit(IO_WQ_BIT_EXIT, &wq->state))
665                 io_worker_handle_work(worker);
666
667         io_worker_exit(worker);
668         return 0;
669 }
670
671 /*
672  * Called when a worker is scheduled in. Mark us as currently running.
673  */
674 void io_wq_worker_running(struct task_struct *tsk)
675 {
676         struct io_worker *worker = tsk->worker_private;
677
678         if (!worker)
679                 return;
680         if (!(worker->flags & IO_WORKER_F_UP))
681                 return;
682         if (worker->flags & IO_WORKER_F_RUNNING)
683                 return;
684         worker->flags |= IO_WORKER_F_RUNNING;
685         io_wqe_inc_running(worker);
686 }
687
688 /*
689  * Called when worker is going to sleep. If there are no workers currently
690  * running and we have work pending, wake up a free one or create a new one.
691  */
692 void io_wq_worker_sleeping(struct task_struct *tsk)
693 {
694         struct io_worker *worker = tsk->worker_private;
695
696         if (!worker)
697                 return;
698         if (!(worker->flags & IO_WORKER_F_UP))
699                 return;
700         if (!(worker->flags & IO_WORKER_F_RUNNING))
701                 return;
702
703         worker->flags &= ~IO_WORKER_F_RUNNING;
704         io_wqe_dec_running(worker);
705 }
706
707 static void io_init_new_worker(struct io_wqe *wqe, struct io_worker *worker,
708                                struct task_struct *tsk)
709 {
710         tsk->worker_private = worker;
711         worker->task = tsk;
712         set_cpus_allowed_ptr(tsk, wqe->cpu_mask);
713
714         raw_spin_lock(&wqe->lock);
715         hlist_nulls_add_head_rcu(&worker->nulls_node, &wqe->free_list);
716         list_add_tail_rcu(&worker->all_list, &wqe->all_list);
717         worker->flags |= IO_WORKER_F_FREE;
718         raw_spin_unlock(&wqe->lock);
719         wake_up_new_task(tsk);
720 }
721
722 static bool io_wq_work_match_all(struct io_wq_work *work, void *data)
723 {
724         return true;
725 }
726
727 static inline bool io_should_retry_thread(long err)
728 {
729         /*
730          * Prevent perpetual task_work retry, if the task (or its group) is
731          * exiting.
732          */
733         if (fatal_signal_pending(current))
734                 return false;
735
736         switch (err) {
737         case -EAGAIN:
738         case -ERESTARTSYS:
739         case -ERESTARTNOINTR:
740         case -ERESTARTNOHAND:
741                 return true;
742         default:
743                 return false;
744         }
745 }
746
747 static void create_worker_cont(struct callback_head *cb)
748 {
749         struct io_worker *worker;
750         struct task_struct *tsk;
751         struct io_wqe *wqe;
752
753         worker = container_of(cb, struct io_worker, create_work);
754         clear_bit_unlock(0, &worker->create_state);
755         wqe = worker->wqe;
756         tsk = create_io_thread(io_wqe_worker, worker, NUMA_NO_NODE);
757         if (!IS_ERR(tsk)) {
758                 io_init_new_worker(wqe, worker, tsk);
759                 io_worker_release(worker);
760                 return;
761         } else if (!io_should_retry_thread(PTR_ERR(tsk))) {
762                 struct io_wqe_acct *acct = io_wqe_get_acct(worker);
763
764                 atomic_dec(&acct->nr_running);
765                 raw_spin_lock(&wqe->lock);
766                 acct->nr_workers--;
767                 if (!acct->nr_workers) {
768                         struct io_cb_cancel_data match = {
769                                 .fn             = io_wq_work_match_all,
770                                 .cancel_all     = true,
771                         };
772
773                         raw_spin_unlock(&wqe->lock);
774                         while (io_acct_cancel_pending_work(wqe, acct, &match))
775                                 ;
776                 } else {
777                         raw_spin_unlock(&wqe->lock);
778                 }
779                 io_worker_ref_put(wqe->wq);
780                 kfree(worker);
781                 return;
782         }
783
784         /* re-create attempts grab a new worker ref, drop the existing one */
785         io_worker_release(worker);
786         schedule_work(&worker->work);
787 }
788
789 static void io_workqueue_create(struct work_struct *work)
790 {
791         struct io_worker *worker = container_of(work, struct io_worker, work);
792         struct io_wqe_acct *acct = io_wqe_get_acct(worker);
793
794         if (!io_queue_worker_create(worker, acct, create_worker_cont))
795                 kfree(worker);
796 }
797
798 static bool create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index)
799 {
800         struct io_wqe_acct *acct = &wqe->acct[index];
801         struct io_worker *worker;
802         struct task_struct *tsk;
803
804         __set_current_state(TASK_RUNNING);
805
806         worker = kzalloc(sizeof(*worker), GFP_KERNEL);
807         if (!worker) {
808 fail:
809                 atomic_dec(&acct->nr_running);
810                 raw_spin_lock(&wqe->lock);
811                 acct->nr_workers--;
812                 raw_spin_unlock(&wqe->lock);
813                 io_worker_ref_put(wq);
814                 return false;
815         }
816
817         refcount_set(&worker->ref, 1);
818         worker->wqe = wqe;
819         raw_spin_lock_init(&worker->lock);
820         init_completion(&worker->ref_done);
821
822         if (index == IO_WQ_ACCT_BOUND)
823                 worker->flags |= IO_WORKER_F_BOUND;
824
825         tsk = create_io_thread(io_wqe_worker, worker, NUMA_NO_NODE);
826         if (!IS_ERR(tsk)) {
827                 io_init_new_worker(wqe, worker, tsk);
828         } else if (!io_should_retry_thread(PTR_ERR(tsk))) {
829                 kfree(worker);
830                 goto fail;
831         } else {
832                 INIT_WORK(&worker->work, io_workqueue_create);
833                 schedule_work(&worker->work);
834         }
835
836         return true;
837 }
838
839 /*
840  * Iterate the passed in list and call the specific function for each
841  * worker that isn't exiting
842  */
843 static bool io_wq_for_each_worker(struct io_wqe *wqe,
844                                   bool (*func)(struct io_worker *, void *),
845                                   void *data)
846 {
847         struct io_worker *worker;
848         bool ret = false;
849
850         list_for_each_entry_rcu(worker, &wqe->all_list, all_list) {
851                 if (io_worker_get(worker)) {
852                         /* no task if node is/was offline */
853                         if (worker->task)
854                                 ret = func(worker, data);
855                         io_worker_release(worker);
856                         if (ret)
857                                 break;
858                 }
859         }
860
861         return ret;
862 }
863
864 static bool io_wq_worker_wake(struct io_worker *worker, void *data)
865 {
866         __set_notify_signal(worker->task);
867         wake_up_process(worker->task);
868         return false;
869 }
870
871 static void io_run_cancel(struct io_wq_work *work, struct io_wqe *wqe)
872 {
873         struct io_wq *wq = wqe->wq;
874
875         do {
876                 work->flags |= IO_WQ_WORK_CANCEL;
877                 wq->do_work(work);
878                 work = wq->free_work(work);
879         } while (work);
880 }
881
882 static void io_wqe_insert_work(struct io_wqe *wqe, struct io_wq_work *work)
883 {
884         struct io_wqe_acct *acct = io_work_get_acct(wqe, work);
885         unsigned int hash;
886         struct io_wq_work *tail;
887
888         if (!io_wq_is_hashed(work)) {
889 append:
890                 wq_list_add_tail(&work->list, &acct->work_list);
891                 return;
892         }
893
894         hash = io_get_work_hash(work);
895         tail = wqe->hash_tail[hash];
896         wqe->hash_tail[hash] = work;
897         if (!tail)
898                 goto append;
899
900         wq_list_add_after(&work->list, &tail->list, &acct->work_list);
901 }
902
903 static bool io_wq_work_match_item(struct io_wq_work *work, void *data)
904 {
905         return work == data;
906 }
907
908 static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work)
909 {
910         struct io_wqe_acct *acct = io_work_get_acct(wqe, work);
911         struct io_cb_cancel_data match;
912         unsigned work_flags = work->flags;
913         bool do_create;
914
915         /*
916          * If io-wq is exiting for this task, or if the request has explicitly
917          * been marked as one that should not get executed, cancel it here.
918          */
919         if (test_bit(IO_WQ_BIT_EXIT, &wqe->wq->state) ||
920             (work->flags & IO_WQ_WORK_CANCEL)) {
921                 io_run_cancel(work, wqe);
922                 return;
923         }
924
925         raw_spin_lock(&acct->lock);
926         io_wqe_insert_work(wqe, work);
927         clear_bit(IO_ACCT_STALLED_BIT, &acct->flags);
928         raw_spin_unlock(&acct->lock);
929
930         raw_spin_lock(&wqe->lock);
931         rcu_read_lock();
932         do_create = !io_wqe_activate_free_worker(wqe, acct);
933         rcu_read_unlock();
934
935         raw_spin_unlock(&wqe->lock);
936
937         if (do_create && ((work_flags & IO_WQ_WORK_CONCURRENT) ||
938             !atomic_read(&acct->nr_running))) {
939                 bool did_create;
940
941                 did_create = io_wqe_create_worker(wqe, acct);
942                 if (likely(did_create))
943                         return;
944
945                 raw_spin_lock(&wqe->lock);
946                 if (acct->nr_workers) {
947                         raw_spin_unlock(&wqe->lock);
948                         return;
949                 }
950                 raw_spin_unlock(&wqe->lock);
951
952                 /* fatal condition, failed to create the first worker */
953                 match.fn                = io_wq_work_match_item,
954                 match.data              = work,
955                 match.cancel_all        = false,
956
957                 io_acct_cancel_pending_work(wqe, acct, &match);
958         }
959 }
960
961 void io_wq_enqueue(struct io_wq *wq, struct io_wq_work *work)
962 {
963         struct io_wqe *wqe = &wq->wqe;
964
965         io_wqe_enqueue(wqe, work);
966 }
967
968 /*
969  * Work items that hash to the same value will not be done in parallel.
970  * Used to limit concurrent writes, generally hashed by inode.
971  */
972 void io_wq_hash_work(struct io_wq_work *work, void *val)
973 {
974         unsigned int bit;
975
976         bit = hash_ptr(val, IO_WQ_HASH_ORDER);
977         work->flags |= (IO_WQ_WORK_HASHED | (bit << IO_WQ_HASH_SHIFT));
978 }
979
980 static bool __io_wq_worker_cancel(struct io_worker *worker,
981                                   struct io_cb_cancel_data *match,
982                                   struct io_wq_work *work)
983 {
984         if (work && match->fn(work, match->data)) {
985                 work->flags |= IO_WQ_WORK_CANCEL;
986                 __set_notify_signal(worker->task);
987                 return true;
988         }
989
990         return false;
991 }
992
993 static bool io_wq_worker_cancel(struct io_worker *worker, void *data)
994 {
995         struct io_cb_cancel_data *match = data;
996
997         /*
998          * Hold the lock to avoid ->cur_work going out of scope, caller
999          * may dereference the passed in work.
1000          */
1001         raw_spin_lock(&worker->lock);
1002         if (__io_wq_worker_cancel(worker, match, worker->cur_work) ||
1003             __io_wq_worker_cancel(worker, match, worker->next_work))
1004                 match->nr_running++;
1005         raw_spin_unlock(&worker->lock);
1006
1007         return match->nr_running && !match->cancel_all;
1008 }
1009
1010 static inline void io_wqe_remove_pending(struct io_wqe *wqe,
1011                                          struct io_wq_work *work,
1012                                          struct io_wq_work_node *prev)
1013 {
1014         struct io_wqe_acct *acct = io_work_get_acct(wqe, work);
1015         unsigned int hash = io_get_work_hash(work);
1016         struct io_wq_work *prev_work = NULL;
1017
1018         if (io_wq_is_hashed(work) && work == wqe->hash_tail[hash]) {
1019                 if (prev)
1020                         prev_work = container_of(prev, struct io_wq_work, list);
1021                 if (prev_work && io_get_work_hash(prev_work) == hash)
1022                         wqe->hash_tail[hash] = prev_work;
1023                 else
1024                         wqe->hash_tail[hash] = NULL;
1025         }
1026         wq_list_del(&acct->work_list, &work->list, prev);
1027 }
1028
1029 static bool io_acct_cancel_pending_work(struct io_wqe *wqe,
1030                                         struct io_wqe_acct *acct,
1031                                         struct io_cb_cancel_data *match)
1032 {
1033         struct io_wq_work_node *node, *prev;
1034         struct io_wq_work *work;
1035
1036         raw_spin_lock(&acct->lock);
1037         wq_list_for_each(node, prev, &acct->work_list) {
1038                 work = container_of(node, struct io_wq_work, list);
1039                 if (!match->fn(work, match->data))
1040                         continue;
1041                 io_wqe_remove_pending(wqe, work, prev);
1042                 raw_spin_unlock(&acct->lock);
1043                 io_run_cancel(work, wqe);
1044                 match->nr_pending++;
1045                 /* not safe to continue after unlock */
1046                 return true;
1047         }
1048         raw_spin_unlock(&acct->lock);
1049
1050         return false;
1051 }
1052
1053 static void io_wqe_cancel_pending_work(struct io_wqe *wqe,
1054                                        struct io_cb_cancel_data *match)
1055 {
1056         int i;
1057 retry:
1058         for (i = 0; i < IO_WQ_ACCT_NR; i++) {
1059                 struct io_wqe_acct *acct = io_get_acct(wqe, i == 0);
1060
1061                 if (io_acct_cancel_pending_work(wqe, acct, match)) {
1062                         if (match->cancel_all)
1063                                 goto retry;
1064                         break;
1065                 }
1066         }
1067 }
1068
1069 static void io_wqe_cancel_running_work(struct io_wqe *wqe,
1070                                        struct io_cb_cancel_data *match)
1071 {
1072         rcu_read_lock();
1073         io_wq_for_each_worker(wqe, io_wq_worker_cancel, match);
1074         rcu_read_unlock();
1075 }
1076
1077 enum io_wq_cancel io_wq_cancel_cb(struct io_wq *wq, work_cancel_fn *cancel,
1078                                   void *data, bool cancel_all)
1079 {
1080         struct io_cb_cancel_data match = {
1081                 .fn             = cancel,
1082                 .data           = data,
1083                 .cancel_all     = cancel_all,
1084         };
1085         struct io_wqe *wqe = &wq->wqe;
1086
1087         /*
1088          * First check pending list, if we're lucky we can just remove it
1089          * from there. CANCEL_OK means that the work is returned as-new,
1090          * no completion will be posted for it.
1091          *
1092          * Then check if a free (going busy) or busy worker has the work
1093          * currently running. If we find it there, we'll return CANCEL_RUNNING
1094          * as an indication that we attempt to signal cancellation. The
1095          * completion will run normally in this case.
1096          *
1097          * Do both of these while holding the wqe->lock, to ensure that
1098          * we'll find a work item regardless of state.
1099          */
1100         io_wqe_cancel_pending_work(wqe, &match);
1101         if (match.nr_pending && !match.cancel_all)
1102                 return IO_WQ_CANCEL_OK;
1103
1104         raw_spin_lock(&wqe->lock);
1105         io_wqe_cancel_running_work(wqe, &match);
1106         raw_spin_unlock(&wqe->lock);
1107         if (match.nr_running && !match.cancel_all)
1108                 return IO_WQ_CANCEL_RUNNING;
1109
1110         if (match.nr_running)
1111                 return IO_WQ_CANCEL_RUNNING;
1112         if (match.nr_pending)
1113                 return IO_WQ_CANCEL_OK;
1114         return IO_WQ_CANCEL_NOTFOUND;
1115 }
1116
1117 static int io_wqe_hash_wake(struct wait_queue_entry *wait, unsigned mode,
1118                             int sync, void *key)
1119 {
1120         struct io_wqe *wqe = container_of(wait, struct io_wqe, wait);
1121         int i;
1122
1123         list_del_init(&wait->entry);
1124
1125         rcu_read_lock();
1126         for (i = 0; i < IO_WQ_ACCT_NR; i++) {
1127                 struct io_wqe_acct *acct = &wqe->acct[i];
1128
1129                 if (test_and_clear_bit(IO_ACCT_STALLED_BIT, &acct->flags))
1130                         io_wqe_activate_free_worker(wqe, acct);
1131         }
1132         rcu_read_unlock();
1133         return 1;
1134 }
1135
1136 struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data)
1137 {
1138         int ret, i;
1139         struct io_wq *wq;
1140         struct io_wqe *wqe;
1141
1142         if (WARN_ON_ONCE(!data->free_work || !data->do_work))
1143                 return ERR_PTR(-EINVAL);
1144         if (WARN_ON_ONCE(!bounded))
1145                 return ERR_PTR(-EINVAL);
1146
1147         wq = kzalloc(sizeof(struct io_wq), GFP_KERNEL);
1148         if (!wq)
1149                 return ERR_PTR(-ENOMEM);
1150         ret = cpuhp_state_add_instance_nocalls(io_wq_online, &wq->cpuhp_node);
1151         if (ret)
1152                 goto err_wq;
1153
1154         refcount_inc(&data->hash->refs);
1155         wq->hash = data->hash;
1156         wq->free_work = data->free_work;
1157         wq->do_work = data->do_work;
1158         wqe = &wq->wqe;
1159
1160         ret = -ENOMEM;
1161
1162         if (!alloc_cpumask_var(&wqe->cpu_mask, GFP_KERNEL))
1163                 goto err;
1164         cpumask_copy(wqe->cpu_mask, cpu_possible_mask);
1165         wqe->acct[IO_WQ_ACCT_BOUND].max_workers = bounded;
1166         wqe->acct[IO_WQ_ACCT_UNBOUND].max_workers =
1167                                 task_rlimit(current, RLIMIT_NPROC);
1168         INIT_LIST_HEAD(&wqe->wait.entry);
1169         wqe->wait.func = io_wqe_hash_wake;
1170         for (i = 0; i < IO_WQ_ACCT_NR; i++) {
1171                 struct io_wqe_acct *acct = &wqe->acct[i];
1172
1173                 acct->index = i;
1174                 atomic_set(&acct->nr_running, 0);
1175                 INIT_WQ_LIST(&acct->work_list);
1176                 raw_spin_lock_init(&acct->lock);
1177         }
1178         wqe->wq = wq;
1179         raw_spin_lock_init(&wqe->lock);
1180         INIT_HLIST_NULLS_HEAD(&wqe->free_list, 0);
1181         INIT_LIST_HEAD(&wqe->all_list);
1182
1183         wq->task = get_task_struct(data->task);
1184         atomic_set(&wq->worker_refs, 1);
1185         init_completion(&wq->worker_done);
1186         return wq;
1187 err:
1188         io_wq_put_hash(data->hash);
1189         cpuhp_state_remove_instance_nocalls(io_wq_online, &wq->cpuhp_node);
1190
1191         free_cpumask_var(wq->wqe.cpu_mask);
1192 err_wq:
1193         kfree(wq);
1194         return ERR_PTR(ret);
1195 }
1196
1197 static bool io_task_work_match(struct callback_head *cb, void *data)
1198 {
1199         struct io_worker *worker;
1200
1201         if (cb->func != create_worker_cb && cb->func != create_worker_cont)
1202                 return false;
1203         worker = container_of(cb, struct io_worker, create_work);
1204         return worker->wqe->wq == data;
1205 }
1206
1207 void io_wq_exit_start(struct io_wq *wq)
1208 {
1209         set_bit(IO_WQ_BIT_EXIT, &wq->state);
1210 }
1211
1212 static void io_wq_cancel_tw_create(struct io_wq *wq)
1213 {
1214         struct callback_head *cb;
1215
1216         while ((cb = task_work_cancel_match(wq->task, io_task_work_match, wq)) != NULL) {
1217                 struct io_worker *worker;
1218
1219                 worker = container_of(cb, struct io_worker, create_work);
1220                 io_worker_cancel_cb(worker);
1221                 /*
1222                  * Only the worker continuation helper has worker allocated and
1223                  * hence needs freeing.
1224                  */
1225                 if (cb->func == create_worker_cont)
1226                         kfree(worker);
1227         }
1228 }
1229
1230 static void io_wq_exit_workers(struct io_wq *wq)
1231 {
1232         if (!wq->task)
1233                 return;
1234
1235         io_wq_cancel_tw_create(wq);
1236
1237         rcu_read_lock();
1238         io_wq_for_each_worker(&wq->wqe, io_wq_worker_wake, NULL);
1239         rcu_read_unlock();
1240         io_worker_ref_put(wq);
1241         wait_for_completion(&wq->worker_done);
1242
1243         spin_lock_irq(&wq->hash->wait.lock);
1244         list_del_init(&wq->wqe.wait.entry);
1245         spin_unlock_irq(&wq->hash->wait.lock);
1246
1247         put_task_struct(wq->task);
1248         wq->task = NULL;
1249 }
1250
1251 static void io_wq_destroy(struct io_wq *wq)
1252 {
1253         struct io_cb_cancel_data match = {
1254                 .fn             = io_wq_work_match_all,
1255                 .cancel_all     = true,
1256         };
1257         struct io_wqe *wqe = &wq->wqe;
1258
1259         cpuhp_state_remove_instance_nocalls(io_wq_online, &wq->cpuhp_node);
1260         io_wqe_cancel_pending_work(wqe, &match);
1261         free_cpumask_var(wqe->cpu_mask);
1262         io_wq_put_hash(wq->hash);
1263         kfree(wq);
1264 }
1265
1266 void io_wq_put_and_exit(struct io_wq *wq)
1267 {
1268         WARN_ON_ONCE(!test_bit(IO_WQ_BIT_EXIT, &wq->state));
1269
1270         io_wq_exit_workers(wq);
1271         io_wq_destroy(wq);
1272 }
1273
1274 struct online_data {
1275         unsigned int cpu;
1276         bool online;
1277 };
1278
1279 static bool io_wq_worker_affinity(struct io_worker *worker, void *data)
1280 {
1281         struct online_data *od = data;
1282
1283         if (od->online)
1284                 cpumask_set_cpu(od->cpu, worker->wqe->cpu_mask);
1285         else
1286                 cpumask_clear_cpu(od->cpu, worker->wqe->cpu_mask);
1287         return false;
1288 }
1289
1290 static int __io_wq_cpu_online(struct io_wq *wq, unsigned int cpu, bool online)
1291 {
1292         struct online_data od = {
1293                 .cpu = cpu,
1294                 .online = online
1295         };
1296
1297         rcu_read_lock();
1298         io_wq_for_each_worker(&wq->wqe, io_wq_worker_affinity, &od);
1299         rcu_read_unlock();
1300         return 0;
1301 }
1302
1303 static int io_wq_cpu_online(unsigned int cpu, struct hlist_node *node)
1304 {
1305         struct io_wq *wq = hlist_entry_safe(node, struct io_wq, cpuhp_node);
1306
1307         return __io_wq_cpu_online(wq, cpu, true);
1308 }
1309
1310 static int io_wq_cpu_offline(unsigned int cpu, struct hlist_node *node)
1311 {
1312         struct io_wq *wq = hlist_entry_safe(node, struct io_wq, cpuhp_node);
1313
1314         return __io_wq_cpu_online(wq, cpu, false);
1315 }
1316
1317 int io_wq_cpu_affinity(struct io_wq *wq, cpumask_var_t mask)
1318 {
1319         struct io_wqe *wqe = &wq->wqe;
1320
1321         rcu_read_lock();
1322         if (mask)
1323                 cpumask_copy(wqe->cpu_mask, mask);
1324         else
1325                 cpumask_copy(wqe->cpu_mask, cpu_possible_mask);
1326         rcu_read_unlock();
1327
1328         return 0;
1329 }
1330
1331 /*
1332  * Set max number of unbounded workers, returns old value. If new_count is 0,
1333  * then just return the old value.
1334  */
1335 int io_wq_max_workers(struct io_wq *wq, int *new_count)
1336 {
1337         struct io_wqe *wqe = &wq->wqe;
1338         struct io_wqe_acct *acct;
1339         int prev[IO_WQ_ACCT_NR];
1340         int i;
1341
1342         BUILD_BUG_ON((int) IO_WQ_ACCT_BOUND   != (int) IO_WQ_BOUND);
1343         BUILD_BUG_ON((int) IO_WQ_ACCT_UNBOUND != (int) IO_WQ_UNBOUND);
1344         BUILD_BUG_ON((int) IO_WQ_ACCT_NR      != 2);
1345
1346         for (i = 0; i < IO_WQ_ACCT_NR; i++) {
1347                 if (new_count[i] > task_rlimit(current, RLIMIT_NPROC))
1348                         new_count[i] = task_rlimit(current, RLIMIT_NPROC);
1349         }
1350
1351         for (i = 0; i < IO_WQ_ACCT_NR; i++)
1352                 prev[i] = 0;
1353
1354         rcu_read_lock();
1355
1356         raw_spin_lock(&wqe->lock);
1357         for (i = 0; i < IO_WQ_ACCT_NR; i++) {
1358                 acct = &wqe->acct[i];
1359                 prev[i] = max_t(int, acct->max_workers, prev[i]);
1360                 if (new_count[i])
1361                         acct->max_workers = new_count[i];
1362         }
1363         raw_spin_unlock(&wqe->lock);
1364         rcu_read_unlock();
1365
1366         for (i = 0; i < IO_WQ_ACCT_NR; i++)
1367                 new_count[i] = prev[i];
1368
1369         return 0;
1370 }
1371
1372 static __init int io_wq_init(void)
1373 {
1374         int ret;
1375
1376         ret = cpuhp_setup_state_multi(CPUHP_AP_ONLINE_DYN, "io-wq/online",
1377                                         io_wq_cpu_online, io_wq_cpu_offline);
1378         if (ret < 0)
1379                 return ret;
1380         io_wq_online = ret;
1381         return 0;
1382 }
1383 subsys_initcall(io_wq_init);