2 * Copyright (c) 2011, Joakim Johansson <jocke@tbricks.com>
3 * Copyright (c) 2010, Mark Heily <mark@heily.com>
4 * Copyright (c) 2009, Stacey Son <sson@freebsd.org>
5 * Copyright (c) 2000-2008, Apple Inc.
8 * Redistribution and use in source and binary forms, with or without
9 * modification, are permitted provided that the following conditions
11 * 1. Redistributions of source code must retain the above copyright
12 * notice unmodified, this list of conditions, and the following
14 * 2. Redistributions in binary form must reproduce the above copyright
15 * notice, this list of conditions and the following disclaimer in the
16 * documentation and/or other materials provided with the distribution.
18 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
19 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
20 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
21 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
22 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
23 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
24 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
25 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
26 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
27 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
33 #include "pthread_workqueue.h"
34 #include "thread_info.h"
35 #include "thread_rt.h"
39 /* Environment setting */
40 unsigned int PWQ_RT_THREADS = 0;
41 time_t PWQ_SPIN_USEC = 10000; // The number of microseconds we should spin loop if desired
42 unsigned int PWQ_SPIN_THREADS = 0; // The number of threads that should be kept spinning
43 unsigned volatile int current_threads_spinning = 0; // The number of threads currently spinning
45 /* Tunable constants */
47 #define WORKER_IDLE_SECONDS_THRESHOLD 5
49 /* Function prototypes */
50 static unsigned int get_load_average(void);
51 static void * worker_main(void *arg);
52 static void * overcommit_worker_main(void *arg);
53 static unsigned int get_process_limit(void);
54 static void manager_start(void);
56 static unsigned int cpu_count;
57 static unsigned int worker_min;
58 static unsigned int worker_idle_threshold; // we don't go down below this if we had to increase # workers
61 static struct _pthread_workqueue *ocwq[PTHREAD_WORKQUEUE_MAX];
63 static pthread_mutex_t ocwq_mtx;
64 static pthread_cond_t ocwq_has_work;
65 static unsigned int ocwq_idle_threads;
68 static struct _pthread_workqueue *wqlist[PTHREAD_WORKQUEUE_MAX];
69 static volatile unsigned int wqlist_mask; // mask of currently pending workqueues, atomics used for manipulation
70 static pthread_mutex_t wqlist_mtx;
72 static pthread_cond_t wqlist_has_work;
73 static int wqlist_has_manager;
74 static pthread_attr_t detached_attr;
77 volatile unsigned int load,
80 unsigned int sb_wake_pending;
81 pthread_mutex_t sb_wake_mtx;
82 pthread_cond_t sb_wake_cond;
86 worker_idle_threshold_per_cpu(void)
107 return cpu_count / 4;
116 if (manager_init() < 0)
123 wqlist_has_manager = 0;
124 pthread_cond_init(&wqlist_has_work, NULL);
126 pthread_mutex_init(&wqlist_mtx, NULL);
129 pthread_cond_init(&ocwq_has_work, NULL);
130 pthread_mutex_init(&ocwq_mtx, NULL);
132 ocwq_idle_threads = 0;
136 cpu_count = (PWQ_ACTIVE_CPU > 0) ? (PWQ_ACTIVE_CPU) : (unsigned int) sysconf(_SC_NPROCESSORS_ONLN);
138 pthread_attr_init(&detached_attr);
139 pthread_attr_setdetachstate(&detached_attr, PTHREAD_CREATE_DETACHED);
141 /* Initialize the scoreboard */
142 pthread_cond_init(&scoreboard.sb_wake_cond, NULL);
143 pthread_mutex_init(&scoreboard.sb_wake_mtx, NULL);
145 /* Determine the initial thread pool constraints */
146 worker_min = 2; // we can start with a small amount, worker_idle_threshold will be used as new dynamic low watermark
147 worker_idle_threshold = worker_idle_threshold_per_cpu();
149 if (pthread_atfork(NULL, NULL, manager_reinit) < 0) {
150 dbg_perror("pthread_atfork()");
158 manager_workqueue_create(struct _pthread_workqueue *workq)
160 pthread_mutex_lock(&wqlist_mtx);
161 if (!workq->overcommit && !wqlist_has_manager)
164 if (workq->overcommit) {
165 if (ocwq[workq->queueprio] == NULL) {
166 ocwq[workq->queueprio] = workq;
167 workq->wqlist_index = workq->queueprio;
169 puts("queue already exists\n");
173 if (wqlist[workq->queueprio] == NULL) {
174 wqlist[workq->queueprio] = workq; //FIXME: sort by priority
175 workq->wqlist_index = workq->queueprio;
177 puts("queue already exists\n");
181 pthread_mutex_unlock(&wqlist_mtx);
185 wqlist_scan(int *queue_priority)
187 pthread_workqueue_t workq;
191 idx = ffs(wqlist_mask);
195 workq = wqlist[idx - 1];
197 pthread_spin_lock(&workq->mtx);
199 witem = STAILQ_FIRST(&workq->item_listhead);
201 STAILQ_REMOVE_HEAD(&workq->item_listhead, item_entry);
202 if (STAILQ_EMPTY(&workq->item_listhead))
204 unsigned int wqlist_index_bit = (0x1 << workq->wqlist_index);
205 unsigned int new_mask;
206 // Remove this now empty wq from the mask, the only contention here is with threads performing the same
207 // operation on another workqueue, so we will not be long
208 // the 'bit' for this queue is protected by the spin lock, so we will only clear a bit which we have
209 // ownership for (see additem() below for the corresponding part on the producer side)
212 new_mask = atomic_and(&wqlist_mask, ~(wqlist_index_bit));
213 } while (new_mask & wqlist_index_bit);
215 if (queue_priority != NULL)
216 *queue_priority = workq->queueprio;
218 pthread_spin_unlock(&workq->mtx);
221 // this could happen if multiple threads raced and found the same bit with ffs() and
222 // emptied the queue completely, so we should just bail out
223 pthread_spin_unlock(&workq->mtx);
228 static void _wakeup_manager(void)
230 dbg_puts("asking manager to wake up");
232 pthread_mutex_lock(&scoreboard.sb_wake_mtx);
233 scoreboard.sb_wake_pending = 1;
234 pthread_cond_signal(&scoreboard.sb_wake_cond);
235 pthread_mutex_unlock(&scoreboard.sb_wake_mtx);
240 overcommit_worker_main(void *arg)
243 pthread_workqueue_t workq;
244 void (*func)(void *);
251 pthread_mutex_lock(&ocwq_mtx);
254 /* Find the highest priority workqueue that is non-empty */
255 idx = ffs(ocwq_mask);
257 workq = ocwq[idx - 1];
258 witem = STAILQ_FIRST(&workq->item_listhead);
260 /* Remove the first work item */
261 STAILQ_REMOVE_HEAD(&workq->item_listhead, item_entry);
262 if (STAILQ_EMPTY(&workq->item_listhead))
263 ocwq_mask &= ~(0x1 << workq->wqlist_index);
264 /* Execute the work item */
265 pthread_mutex_unlock(&ocwq_mtx);
267 func_arg = witem->func_arg;
270 pthread_mutex_lock(&ocwq_mtx);
275 /* Wait for more work to be available. */
276 clock_gettime(CLOCK_REALTIME, &ts);
279 dbg_printf("waiting for work (idle=%d)", ocwq_idle_threads);
280 rv = pthread_cond_timedwait(&ocwq_has_work, &ocwq_mtx, &ts);
282 /* Normally, the signaler will decrement the idle counter,
283 but this path is not taken in response to a signaler.
286 pthread_mutex_unlock(&ocwq_mtx);
288 if (rv == ETIMEDOUT) {
289 dbg_puts("timeout, no work available");
292 dbg_perror("pthread_cond_timedwait");
293 //TODO: some kind of crash mechanism
299 dbg_printf("worker exiting (idle=%d)", ocwq_idle_threads);
304 worker_main(void *arg)
307 void (*func)(void *);
309 int queue_priority = 0;
310 struct timespec ts_start, ts_now;
313 dbg_puts("worker thread started");
316 ptwq_set_current_thread_priority(WORKQ_HIGH_PRIOQUEUE); // start at highest priority possible
320 witem = wqlist_scan(&queue_priority);
322 // Only take overhead of sleeping and/or spinning if we
323 // could not get a witem cheaply using the spinlock above
324 if (slowpath(!witem))
326 // Optional busy loop for getting the next item for a while if so configured
327 // We'll only spin limited thread at a time (this is really mostly useful when running
328 // in low latency configurations using dedicated processor sets)
329 if ((PWQ_SPIN_THREADS > 0) && (current_threads_spinning <= PWQ_SPIN_THREADS))
331 atomic_inc(¤t_threads_spinning);
333 // If we are racing with another thread, let's skip
334 // spinning and instead go through the slowpath below
336 if (current_threads_spinning <= PWQ_SPIN_THREADS)
338 clock_gettime(CLOCK_REALTIME, &ts_start);
339 ts_now.tv_sec = ts_start.tv_sec;
340 ts_now.tv_nsec = ts_start.tv_nsec;
342 // Spin until we get an item or PWQ_SPIN_USEC microseconds passes
343 while (!witem && (((ts_now.tv_sec - ts_start.tv_sec) * 1000000) + (((ts_now.tv_nsec - ts_start.tv_nsec) / 1000)) <= PWQ_SPIN_USEC))
345 witem = wqlist_scan(&queue_priority);
348 // Perhaps a hardware pause
349 // instruction could be used here to keep the pace down, probably not needed though
350 clock_gettime(CLOCK_REALTIME, &ts_now);
355 atomic_dec(¤t_threads_spinning);
358 // No witem from the busy loop, let's wait for wakeup
361 pthread_mutex_lock(&wqlist_mtx);
364 TODO: Consider using pthread_cond_timedwait() so that
365 workers can self-terminate if they are idle too long.
366 This would also be a failsafe in case there are bugs
367 with the scoreboard that cause us to "leak" workers.
369 while ((witem = wqlist_scan(&queue_priority)) == NULL)
370 pthread_cond_wait(&wqlist_has_work, &wqlist_mtx);
372 pthread_mutex_unlock(&wqlist_mtx);
376 atomic_dec(&scoreboard.idle);
378 if (slowpath(witem->func == NULL)) {
379 dbg_puts("worker exiting..");
380 atomic_dec(&scoreboard.count);
385 dbg_printf("count=%u idle=%u wake_pending=%u",
386 scoreboard.count, scoreboard.idle, scoreboard.sb_wake_pending);
388 /* Force the manager thread to wakeup if all workers are busy */
389 if (slowpath(scoreboard.idle == 0 && !scoreboard.sb_wake_pending))
392 // If using RT threads, decrease thread prio if we aren't a high prio queue
393 if (PWQ_RT_THREADS && (queue_priority != WORKQ_HIGH_PRIOQUEUE))
394 ptwq_set_current_thread_priority(queue_priority);
396 /* Invoke the callback function, free witem first for possible reuse */
398 func_arg = witem->func_arg;
403 atomic_inc(&scoreboard.idle); // initial inc was one in worker_start, this is to avoid a race
405 // Only take the overhead and change RT priority back if it was not a high priority queue being serviced
406 if (PWQ_RT_THREADS && (queue_priority != WORKQ_HIGH_PRIOQUEUE))
407 ptwq_set_current_thread_priority(WORKQ_HIGH_PRIOQUEUE);
419 dbg_puts("Spawning another worker");
421 atomic_inc(&scoreboard.idle);
422 atomic_inc(&scoreboard.count);
424 if (pthread_create(&tid, &detached_attr, worker_main, NULL) != 0) {
425 dbg_perror("pthread_create(3)");
426 atomic_dec(&scoreboard.idle);
427 atomic_dec(&scoreboard.count);
438 pthread_workqueue_t workq;
440 unsigned int wqlist_index_bit, new_mask;
442 witem = witem_alloc(NULL, NULL);
444 pthread_mutex_lock(&wqlist_mtx);
445 for (i = 0; i < PTHREAD_WORKQUEUE_MAX; i++) {
450 wqlist_index_bit = (0x1 << workq->wqlist_index);
452 pthread_spin_lock(&workq->mtx);
456 new_mask = atomic_or(&wqlist_mask, wqlist_index_bit);
457 } while (!(new_mask & wqlist_index_bit));
459 STAILQ_INSERT_TAIL(&workq->item_listhead, witem, item_entry);
461 pthread_spin_unlock(&workq->mtx);
463 pthread_cond_signal(&wqlist_has_work);
464 pthread_mutex_unlock(&wqlist_mtx);
469 /* FIXME: this means there are no workqueues.. should never happen */
470 dbg_puts("Attempting to add a workitem without a workqueue");
477 manager_main(void *unused __attribute__ ((unused)))
479 unsigned int load_max = cpu_count;
480 unsigned int worker_max, current_thread_count = 0;
481 unsigned int worker_idle_seconds_accumulated = 0;
482 unsigned int max_threads_to_stop = 0;
484 int cond_wait_rv = 0;
489 worker_max = get_process_limit();
490 scoreboard.load = get_load_average();
492 /* Block all signals */
493 sigfillset(&sigmask);
494 pthread_sigmask(SIG_BLOCK, &sigmask, NULL);
496 /* Create the minimum number of workers */
497 scoreboard.count = 0;
498 for (i = 0; i < worker_min; i++)
503 pthread_mutex_lock(&scoreboard.sb_wake_mtx);
505 dbg_puts("manager is sleeping");
507 (void) gettimeofday(&tp, NULL); // TODO - error checking
509 /* Convert from timeval to timespec */
510 ts.tv_sec = tp.tv_sec;
511 ts.tv_nsec = tp.tv_usec * 1000;
512 ts.tv_sec += 1; // wake once per second and check if we have too many idle threads...
514 // We should only sleep on the condition if there are no pending signal, spurious wakeup is also ok
515 if (scoreboard.sb_wake_pending == 0)
516 cond_wait_rv = pthread_cond_timedwait(&scoreboard.sb_wake_cond, &scoreboard.sb_wake_mtx, &ts);
518 scoreboard.sb_wake_pending = 0; // we must set this before spawning any new threads below, or we race...
520 dbg_puts("manager is awake");
522 dbg_printf("load=%u idle=%u workers=%u max_workers=%u worker_min = %u",
523 scoreboard.load, scoreboard.idle, scoreboard.count, worker_max, worker_min);
525 // If no workers available, check if we should create a new one
526 if (scoreboard.idle == 0 && (scoreboard.count > 0)) // last part required for an extremely unlikely race at startup
528 scoreboard.load = get_load_average();
530 if ((scoreboard.load < load_max) && (scoreboard.count < worker_max))
532 if (scoreboard.count < worker_idle_threshold) // allow cheap rampup up to worker_idle_threshold without going to /proc
536 else // check through /proc, will be a bit more expensive in terms of latency
537 if (threads_runnable(¤t_thread_count) == 0)
539 // only start thread if we have less runnable threads than cpus
540 if (current_thread_count >= cpu_count)
542 dbg_printf("Not spawning worker thread, thread_runnable = %d >= cpu_count = %d",
543 current_thread_count, cpu_count);
550 else // always start thread if we can't get runnable count
555 else // high load, allow rampup up to worker_idle_threshold regardless of this
557 if (scoreboard.count < worker_idle_threshold)
565 if (cond_wait_rv == ETIMEDOUT) // Only check for ramp down on the 'timer tick'
567 if ((scoreboard.idle - worker_idle_threshold) > 0) // only accumulate if there are 'too many' idle threads
569 worker_idle_seconds_accumulated += scoreboard.idle; // keep track of many idle 'thread seconds' we have
571 dbg_printf("worker_idle_seconds_accumulated = %d, scoreboard.idle = %d, scoreboard.count = %d\n",
572 worker_idle_seconds_accumulated, scoreboard.idle, scoreboard.count);
575 // Only consider ramp down if we have accumulated enough thread 'idle seconds'
576 // this logic will ensure that a large number of idle threads will ramp down faster
577 max_threads_to_stop = worker_idle_seconds_accumulated / WORKER_IDLE_SECONDS_THRESHOLD;
579 if (max_threads_to_stop > 0)
581 worker_idle_seconds_accumulated = 0;
583 if (max_threads_to_stop > (scoreboard.idle - worker_idle_threshold))
584 max_threads_to_stop = (scoreboard.idle - worker_idle_threshold);
586 // Only stop threads if we actually have 'too many' idle ones in the pool
587 if (scoreboard.idle > worker_idle_threshold)
589 for (i = 0; i < max_threads_to_stop; i++)
591 dbg_puts("Removing one thread from the thread pool");
599 pthread_mutex_unlock(&scoreboard.sb_wake_mtx);
612 dbg_puts("starting the manager thread");
615 rv = pthread_create(&tid, &detached_attr, manager_main, NULL);
618 } else if (rv != 0) {
619 /* FIXME: not nice */
620 dbg_printf("thread creation failed, rv=%d", rv);
625 wqlist_has_manager = 1;
629 manager_workqueue_additem(struct _pthread_workqueue *workq, struct work *witem)
631 unsigned int wqlist_index_bit = (0x1 << workq->wqlist_index);
633 if (workq->overcommit) {
636 pthread_mutex_lock(&ocwq_mtx);
637 pthread_spin_lock(&workq->mtx);
638 STAILQ_INSERT_TAIL(&workq->item_listhead, witem, item_entry);
639 pthread_spin_unlock(&workq->mtx);
640 ocwq_mask |= wqlist_index_bit;
641 if (ocwq_idle_threads > 0) {
642 dbg_puts("signaling an idle worker");
643 pthread_cond_signal(&ocwq_has_work);
646 (void)pthread_create(&tid, &detached_attr, overcommit_worker_main, NULL);
648 pthread_mutex_unlock(&ocwq_mtx);
650 pthread_spin_lock(&workq->mtx);
652 // Only set the mask for the first item added to the workqueue.
653 if (STAILQ_EMPTY(&workq->item_listhead))
655 unsigned int new_mask;
657 // The only possible contention here are with threads performing the same
658 // operation on another workqueue, so we will not be blocked long...
659 // Threads operating on the same workqueue will be serialized by the spinlock so it is very unlikely.
662 new_mask = atomic_or(&wqlist_mask, wqlist_index_bit);
663 } while (!(new_mask & wqlist_index_bit));
666 STAILQ_INSERT_TAIL(&workq->item_listhead, witem, item_entry);
668 pthread_spin_unlock(&workq->mtx);
670 // Only signal thread wakeup if there are idle threads available
671 // and no other thread have managed to race us and empty the wqlist on our behalf already
672 if ((scoreboard.idle > 0)) // && ((wqlist_mask & wqlist_index_bit) != 0)) // disabling this fringe optimization for now
674 pthread_mutex_lock(&wqlist_mtx);
675 pthread_cond_signal(&wqlist_has_work);
676 pthread_mutex_unlock(&wqlist_mtx);
683 get_process_limit(void)
688 if (getrlimit(RLIMIT_NPROC, &rlim) < 0) {
689 dbg_perror("getrlimit(2)");
692 return (rlim.rlim_max);
695 /* Solaris doesn't define this limit anywhere I can see.. */
701 get_load_average(void)
705 /* TODO: proper error handling */
706 if (getloadavg(&loadavg, 1) != 1) {
707 dbg_perror("getloadavg(3)");
710 if (loadavg > INT_MAX || loadavg < 0)
713 return ((int) loadavg);
717 manager_peek(const char *key)
721 if (strcmp(key, "combined_idle") == 0) {
722 rv = scoreboard.idle;
723 if (scoreboard.idle > worker_min)
725 rv += ocwq_idle_threads;
726 } else if (strcmp(key, "idle") == 0) {
727 rv = scoreboard.idle;
728 if (scoreboard.idle > worker_min)
730 } else if (strcmp(key, "ocomm_idle") == 0) {
731 rv = ocwq_idle_threads;
733 dbg_printf("invalid key: %s", key);