};
/*
+ * All cpumasks are assumed to be always set on UP and thus can't be
+ * used to determine whether there's something to be done.
+ */
+#ifdef CONFIG_SMP
+typedef cpumask_var_t mayday_mask_t;
+#define mayday_test_and_set_cpu(cpu, mask) \
+ cpumask_test_and_set_cpu((cpu), (mask))
+#define mayday_clear_cpu(cpu, mask) cpumask_clear_cpu((cpu), (mask))
+#define for_each_mayday_cpu(cpu, mask) for_each_cpu((cpu), (mask))
+#define alloc_mayday_mask(maskp, gfp) alloc_cpumask_var((maskp), (gfp))
+#define free_mayday_mask(mask) free_cpumask_var((mask))
+#else
+typedef unsigned long mayday_mask_t;
+#define mayday_test_and_set_cpu(cpu, mask) test_and_set_bit(0, &(mask))
+#define mayday_clear_cpu(cpu, mask) clear_bit(0, &(mask))
+#define for_each_mayday_cpu(cpu, mask) if ((cpu) = 0, (mask))
+#define alloc_mayday_mask(maskp, gfp) true
+#define free_mayday_mask(mask) do { } while (0)
+#endif
+
+/*
* The externally visible workqueue abstraction is an array of
* per-CPU workqueues:
*/
struct list_head flusher_queue; /* F: flush waiters */
struct list_head flusher_overflow; /* F: flush overflow list */
- unsigned long single_cpu; /* cpu for single cpu wq */
-
- cpumask_var_t mayday_mask; /* cpus requesting rescue */
+ mayday_mask_t mayday_mask; /* cpus requesting rescue */
struct worker *rescuer; /* I: rescue worker */
int saved_max_active; /* W: saved cwq max_active */
return __next_gcwq_cpu(cpu, mask, !(wq->flags & WQ_UNBOUND) ? 1 : 2);
}
+/*
+ * CPU iterators
+ *
+ * An extra gcwq is defined for an invalid cpu number
+ * (WORK_CPU_UNBOUND) to host workqueues which are not bound to any
+ * specific CPU. The following iterators are similar to
+ * for_each_*_cpu() iterators but also considers the unbound gcwq.
+ *
+ * for_each_gcwq_cpu() : possible CPUs + WORK_CPU_UNBOUND
+ * for_each_online_gcwq_cpu() : online CPUs + WORK_CPU_UNBOUND
+ * for_each_cwq_cpu() : possible CPUs for bound workqueues,
+ * WORK_CPU_UNBOUND for unbound workqueues
+ */
#define for_each_gcwq_cpu(cpu) \
for ((cpu) = __next_gcwq_cpu(-1, cpu_possible_mask, 3); \
(cpu) < WORK_CPU_NONE; \
(cpu) < WORK_CPU_NONE; \
(cpu) = __next_wq_cpu((cpu), cpu_possible_mask, (wq)))
+#ifdef CONFIG_LOCKDEP
+/**
+ * in_workqueue_context() - in context of specified workqueue?
+ * @wq: the workqueue of interest
+ *
+ * Checks lockdep state to see if the current task is executing from
+ * within a workqueue item. This function exists only if lockdep is
+ * enabled.
+ */
+int in_workqueue_context(struct workqueue_struct *wq)
+{
+ return lock_is_held(&wq->lockdep_map);
+}
+#endif
+
#ifdef CONFIG_DEBUG_OBJECTS_WORK
static struct debug_obj_descr work_debug_descr;
}
/*
- * Work data points to the cwq while a work is on queue. Once
- * execution starts, it points to the cpu the work was last on. This
- * can be distinguished by comparing the data value against
- * PAGE_OFFSET.
+ * A work's data points to the cwq with WORK_STRUCT_CWQ set while the
+ * work is on queue. Once execution starts, WORK_STRUCT_CWQ is
+ * cleared and the work data contains the cpu number it was last on.
*
* set_work_{cwq|cpu}() and clear_work_data() can be used to set the
* cwq, cpu or clear work->data. These functions should only be
unsigned long extra_flags)
{
set_work_data(work, (unsigned long)cwq,
- WORK_STRUCT_PENDING | extra_flags);
+ WORK_STRUCT_PENDING | WORK_STRUCT_CWQ | extra_flags);
}
static void set_work_cpu(struct work_struct *work, unsigned int cpu)
set_work_data(work, WORK_STRUCT_NO_CPU, 0);
}
-static inline unsigned long get_work_data(struct work_struct *work)
-{
- return atomic_long_read(&work->data) & WORK_STRUCT_WQ_DATA_MASK;
-}
-
static struct cpu_workqueue_struct *get_work_cwq(struct work_struct *work)
{
- unsigned long data = get_work_data(work);
+ unsigned long data = atomic_long_read(&work->data);
- return data >= PAGE_OFFSET ? (void *)data : NULL;
+ if (data & WORK_STRUCT_CWQ)
+ return (void *)(data & WORK_STRUCT_WQ_DATA_MASK);
+ else
+ return NULL;
}
static struct global_cwq *get_work_gcwq(struct work_struct *work)
{
- unsigned long data = get_work_data(work);
+ unsigned long data = atomic_long_read(&work->data);
unsigned int cpu;
- if (data >= PAGE_OFFSET)
- return ((struct cpu_workqueue_struct *)data)->gcwq;
+ if (data & WORK_STRUCT_CWQ)
+ return ((struct cpu_workqueue_struct *)
+ (data & WORK_STRUCT_WQ_DATA_MASK))->gcwq;
cpu = data >> WORK_STRUCT_FLAG_BITS;
if (cpu == WORK_CPU_NONE)
wake_up_worker(gcwq);
}
-/**
- * cwq_unbind_single_cpu - unbind cwq from single cpu workqueue processing
- * @cwq: cwq to unbind
- *
- * Try to unbind @cwq from single cpu workqueue processing. If
- * @cwq->wq is frozen, unbind is delayed till the workqueue is thawed.
- *
- * CONTEXT:
- * spin_lock_irq(gcwq->lock).
- */
-static void cwq_unbind_single_cpu(struct cpu_workqueue_struct *cwq)
-{
- struct workqueue_struct *wq = cwq->wq;
- struct global_cwq *gcwq = cwq->gcwq;
-
- BUG_ON(wq->single_cpu != gcwq->cpu);
- /*
- * Unbind from workqueue if @cwq is not frozen. If frozen,
- * thaw_workqueues() will either restart processing on this
- * cpu or unbind if empty. This keeps works queued while
- * frozen fully ordered and flushable.
- */
- if (likely(!(gcwq->flags & GCWQ_FREEZING))) {
- smp_wmb(); /* paired with cmpxchg() in __queue_work() */
- wq->single_cpu = WORK_CPU_NONE;
- }
-}
-
static void __queue_work(unsigned int cpu, struct workqueue_struct *wq,
struct work_struct *work)
{
struct cpu_workqueue_struct *cwq;
struct list_head *worklist;
unsigned long flags;
- bool arbitrate;
debug_work_activate(work);
- if (unlikely(cpu == WORK_CPU_UNBOUND))
- cpu = raw_smp_processor_id();
-
- /*
- * Determine gcwq to use. SINGLE_CPU is inherently
- * NON_REENTRANT, so test it first.
- */
- if (!(wq->flags & (WQ_SINGLE_CPU | WQ_UNBOUND))) {
+ /* determine gcwq to use */
+ if (!(wq->flags & WQ_UNBOUND)) {
struct global_cwq *last_gcwq;
+ if (unlikely(cpu == WORK_CPU_UNBOUND))
+ cpu = raw_smp_processor_id();
+
/*
* It's multi cpu. If @wq is non-reentrant and @work
* was previously on a different cpu, it might still
}
} else
spin_lock_irqsave(&gcwq->lock, flags);
- } else if (!(wq->flags & WQ_UNBOUND)) {
- unsigned int req_cpu = cpu;
-
- /*
- * It's a bit more complex for single cpu workqueues.
- * We first need to determine which cpu is going to be
- * used. If no cpu is currently serving this
- * workqueue, arbitrate using atomic accesses to
- * wq->single_cpu; otherwise, use the current one.
- */
- retry:
- cpu = wq->single_cpu;
- arbitrate = cpu == WORK_CPU_NONE;
- if (arbitrate)
- cpu = req_cpu;
-
- gcwq = get_gcwq(cpu);
- spin_lock_irqsave(&gcwq->lock, flags);
-
- /*
- * The following cmpxchg() is a full barrier paired
- * with smp_wmb() in cwq_unbind_single_cpu() and
- * guarantees that all changes to wq->st_* fields are
- * visible on the new cpu after this point.
- */
- if (arbitrate)
- cmpxchg(&wq->single_cpu, WORK_CPU_NONE, cpu);
-
- if (unlikely(wq->single_cpu != cpu)) {
- spin_unlock_irqrestore(&gcwq->lock, flags);
- goto retry;
- }
} else {
gcwq = get_gcwq(WORK_CPU_UNBOUND);
spin_lock_irqsave(&gcwq->lock, flags);
struct work_struct *work = &dwork->work;
if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work))) {
- struct global_cwq *gcwq = get_work_gcwq(work);
- unsigned int lcpu = gcwq ? gcwq->cpu : raw_smp_processor_id();
+ unsigned int lcpu;
BUG_ON(timer_pending(timer));
BUG_ON(!list_empty(&work->entry));
timer_stats_timer_set_start_info(&dwork->timer);
+
/*
* This stores cwq for the moment, for the timer_fn.
* Note that the work's gcwq is preserved to allow
* reentrance detection for delayed works.
*/
+ if (!(wq->flags & WQ_UNBOUND)) {
+ struct global_cwq *gcwq = get_work_gcwq(work);
+
+ if (gcwq && gcwq->cpu != WORK_CPU_UNBOUND)
+ lcpu = gcwq->cpu;
+ else
+ lcpu = raw_smp_processor_id();
+ } else
+ lcpu = WORK_CPU_UNBOUND;
+
set_work_cwq(work, get_cwq(lcpu, wq), 0);
+
timer->expires = jiffies + delay;
timer->data = (unsigned long)dwork;
timer->function = delayed_work_timer_fn;
/* WORK_CPU_UNBOUND can't be set in cpumask, use cpu 0 instead */
if (cpu == WORK_CPU_UNBOUND)
cpu = 0;
- if (!cpumask_test_and_set_cpu(cpu, wq->mayday_mask))
+ if (!mayday_test_and_set_cpu(cpu, wq->mayday_mask))
wake_up_process(wq->rescuer->task);
return true;
}
if (!need_to_create_worker(gcwq))
return false;
restart:
+ spin_unlock_irq(&gcwq->lock);
+
/* if we don't make progress in MAYDAY_INITIAL_TIMEOUT, call for help */
mod_timer(&gcwq->mayday_timer, jiffies + MAYDAY_INITIAL_TIMEOUT);
while (true) {
struct worker *worker;
- spin_unlock_irq(&gcwq->lock);
-
worker = create_worker(gcwq, true);
if (worker) {
del_timer_sync(&gcwq->mayday_timer);
if (!need_to_create_worker(gcwq))
break;
- spin_unlock_irq(&gcwq->lock);
__set_current_state(TASK_INTERRUPTIBLE);
schedule_timeout(CREATE_COOLDOWN);
- spin_lock_irq(&gcwq->lock);
+
if (!need_to_create_worker(gcwq))
break;
}
- spin_unlock_irq(&gcwq->lock);
del_timer_sync(&gcwq->mayday_timer);
spin_lock_irq(&gcwq->lock);
if (need_to_create_worker(gcwq))
/* one down, submit a delayed one */
if (cwq->nr_active < cwq->max_active)
cwq_activate_first_delayed(cwq);
- } else if (!cwq->nr_active && cwq->wq->flags & WQ_SINGLE_CPU) {
- /* this was the last work, unbind from single cpu */
- cwq_unbind_single_cpu(cwq);
}
/* is flush in progress and are we at the flushing tip? */
* See whether any cpu is asking for help. Unbounded
* workqueues use cpu 0 in mayday_mask for CPU_UNBOUND.
*/
- for_each_cpu(cpu, wq->mayday_mask) {
+ for_each_mayday_cpu(cpu, wq->mayday_mask) {
unsigned int tcpu = is_unbound ? WORK_CPU_UNBOUND : cpu;
struct cpu_workqueue_struct *cwq = get_cwq(tcpu, wq);
struct global_cwq *gcwq = cwq->gcwq;
struct work_struct *work, *n;
__set_current_state(TASK_RUNNING);
- cpumask_clear_cpu(cpu, wq->mayday_mask);
+ mayday_clear_cpu(cpu, wq->mayday_mask);
/* migrate to the target cpu if possible */
rescuer->gcwq = gcwq;
int schedule_on_each_cpu(work_func_t func)
{
int cpu;
- struct work_struct *works;
+ struct work_struct __percpu *works;
works = alloc_percpu(struct work_struct);
if (!works)
const size_t size = sizeof(struct cpu_workqueue_struct);
const size_t align = max_t(size_t, 1 << WORK_STRUCT_FLAG_BITS,
__alignof__(unsigned long long));
+#ifdef CONFIG_SMP
+ bool percpu = !(wq->flags & WQ_UNBOUND);
+#else
+ bool percpu = false;
+#endif
- if (CONFIG_SMP && !(wq->flags & WQ_UNBOUND)) {
- /* on SMP, percpu allocator can align itself */
+ if (percpu)
wq->cpu_wq.pcpu = __alloc_percpu(size, align);
- } else {
+ else {
void *ptr;
/*
static void free_cwqs(struct workqueue_struct *wq)
{
- if (CONFIG_SMP && !(wq->flags & WQ_UNBOUND))
+#ifdef CONFIG_SMP
+ bool percpu = !(wq->flags & WQ_UNBOUND);
+#else
+ bool percpu = false;
+#endif
+
+ if (percpu)
free_percpu(wq->cpu_wq.pcpu);
else if (wq->cpu_wq.single) {
/* the pointer to free is stored right after the cwq */
atomic_set(&wq->nr_cwqs_to_flush, 0);
INIT_LIST_HEAD(&wq->flusher_queue);
INIT_LIST_HEAD(&wq->flusher_overflow);
- wq->single_cpu = WORK_CPU_NONE;
wq->name = name;
lockdep_init_map(&wq->lockdep_map, lock_name, key, 0);
if (flags & WQ_RESCUER) {
struct worker *rescuer;
- if (!alloc_cpumask_var(&wq->mayday_mask, GFP_KERNEL))
+ if (!alloc_mayday_mask(&wq->mayday_mask, GFP_KERNEL))
goto err;
wq->rescuer = rescuer = alloc_worker();
err:
if (wq) {
free_cwqs(wq);
- free_cpumask_var(wq->mayday_mask);
+ free_mayday_mask(wq->mayday_mask);
kfree(wq->rescuer);
kfree(wq);
}
if (wq->flags & WQ_RESCUER) {
kthread_stop(wq->rescuer->task);
- free_cpumask_var(wq->mayday_mask);
+ free_mayday_mask(wq->mayday_mask);
}
free_cwqs(wq);
while (!list_empty(&cwq->delayed_works) &&
cwq->nr_active < cwq->max_active)
cwq_activate_first_delayed(cwq);
-
- /* perform delayed unbind from single cpu if empty */
- if (wq->single_cpu == gcwq->cpu &&
- !cwq->nr_active && list_empty(&cwq->delayed_works))
- cwq_unbind_single_cpu(cwq);
}
wake_up_worker(gcwq);
}
#endif /* CONFIG_FREEZER */
-void __init init_workqueues(void)
+static int __init init_workqueues(void)
{
unsigned int cpu;
int i;
- /*
- * The pointer part of work->data is either pointing to the
- * cwq or contains the cpu number the work ran last on. Make
- * sure cpu number won't overflow into kernel pointer area so
- * that they can be distinguished.
- */
- BUILD_BUG_ON(WORK_CPU_LAST << WORK_STRUCT_FLAG_BITS >= PAGE_OFFSET);
-
- hotcpu_notifier(workqueue_cpu_callback, CPU_PRI_WORKQUEUE);
+ cpu_notifier(workqueue_cpu_callback, CPU_PRI_WORKQUEUE);
/* initialize gcwqs */
for_each_gcwq_cpu(cpu) {
system_unbound_wq = alloc_workqueue("events_unbound", WQ_UNBOUND,
WQ_UNBOUND_MAX_ACTIVE);
BUG_ON(!system_wq || !system_long_wq || !system_nrt_wq);
+ return 0;
}
+early_initcall(init_workqueues);