static void
util_queue_kill_threads(struct util_queue *queue, unsigned keep_num_threads,
- bool finish_locked);
+ bool locked);
/****************************************************************************
* Wait for all queues to assert idle when exit() is called.
}
void
-util_queue_adjust_num_threads(struct util_queue *queue, unsigned num_threads)
+util_queue_adjust_num_threads(struct util_queue *queue, unsigned num_threads,
+ bool locked)
{
num_threads = MIN2(num_threads, queue->max_threads);
num_threads = MAX2(num_threads, 1);
- simple_mtx_lock(&queue->finish_lock);
+ if (!locked)
+ mtx_lock(&queue->lock);
+
unsigned old_num_threads = queue->num_threads;
if (num_threads == old_num_threads) {
- simple_mtx_unlock(&queue->finish_lock);
+ if (!locked)
+ mtx_unlock(&queue->lock);
return;
}
if (num_threads < old_num_threads) {
util_queue_kill_threads(queue, num_threads, true);
- simple_mtx_unlock(&queue->finish_lock);
+ if (!locked)
+ mtx_unlock(&queue->lock);
return;
}
break;
}
}
- simple_mtx_unlock(&queue->finish_lock);
+
+ if (!locked)
+ mtx_unlock(&queue->lock);
}
bool
queue->global_data = global_data;
(void) mtx_init(&queue->lock, mtx_plain);
- (void) simple_mtx_init(&queue->finish_lock, mtx_plain);
queue->num_queued = 0;
cnd_init(&queue->has_queued_cond);
static void
util_queue_kill_threads(struct util_queue *queue, unsigned keep_num_threads,
- bool finish_locked)
+ bool locked)
{
- unsigned i;
-
/* Signal all threads to terminate. */
- if (!finish_locked)
- simple_mtx_lock(&queue->finish_lock);
+ if (!locked)
+ mtx_lock(&queue->lock);
if (keep_num_threads >= queue->num_threads) {
- simple_mtx_unlock(&queue->finish_lock);
+ if (!locked)
+ mtx_unlock(&queue->lock);
return;
}
- mtx_lock(&queue->lock);
unsigned old_num_threads = queue->num_threads;
/* Setting num_threads is what causes the threads to terminate.
* Then cnd_broadcast wakes them up and they will exit their function.
*/
queue->num_threads = keep_num_threads;
cnd_broadcast(&queue->has_queued_cond);
- mtx_unlock(&queue->lock);
- for (i = keep_num_threads; i < old_num_threads; i++)
- thrd_join(queue->threads[i], NULL);
-
- if (!finish_locked)
- simple_mtx_unlock(&queue->finish_lock);
+ /* Wait for threads to terminate. */
+ if (keep_num_threads < old_num_threads) {
+ /* We need to unlock the mutex to allow threads to terminate. */
+ mtx_unlock(&queue->lock);
+ for (unsigned i = keep_num_threads; i < old_num_threads; i++)
+ thrd_join(queue->threads[i], NULL);
+ if (locked)
+ mtx_lock(&queue->lock);
+ } else {
+ if (!locked)
+ mtx_unlock(&queue->lock);
+ }
}
static void
cnd_destroy(&queue->has_space_cond);
cnd_destroy(&queue->has_queued_cond);
- simple_mtx_destroy(&queue->finish_lock);
mtx_destroy(&queue->lock);
free(queue->jobs);
free(queue->threads);
}
-void
-util_queue_add_job(struct util_queue *queue,
- void *job,
- struct util_queue_fence *fence,
- util_queue_execute_func execute,
- util_queue_execute_func cleanup,
- const size_t job_size)
+static void
+util_queue_add_job_locked(struct util_queue *queue,
+ void *job,
+ struct util_queue_fence *fence,
+ util_queue_execute_func execute,
+ util_queue_execute_func cleanup,
+ const size_t job_size,
+ bool locked)
{
struct util_queue_job *ptr;
- mtx_lock(&queue->lock);
+ if (!locked)
+ mtx_lock(&queue->lock);
if (queue->num_threads == 0) {
- mtx_unlock(&queue->lock);
+ if (!locked)
+ mtx_unlock(&queue->lock);
/* well no good option here, but any leaks will be
* short-lived as things are shutting down..
*/
queue->flags & UTIL_QUEUE_INIT_SCALE_THREADS &&
execute != util_queue_finish_execute &&
queue->num_threads < queue->max_threads) {
- util_queue_adjust_num_threads(queue, queue->num_threads + 1);
+ util_queue_adjust_num_threads(queue, queue->num_threads + 1, true);
}
if (queue->num_queued == queue->max_jobs) {
queue->num_queued++;
cnd_signal(&queue->has_queued_cond);
- mtx_unlock(&queue->lock);
+ if (!locked)
+ mtx_unlock(&queue->lock);
+}
+
+void
+util_queue_add_job(struct util_queue *queue,
+ void *job,
+ struct util_queue_fence *fence,
+ util_queue_execute_func execute,
+ util_queue_execute_func cleanup,
+ const size_t job_size)
+{
+ util_queue_add_job_locked(queue, job, fence, execute, cleanup, job_size,
+ false);
}
/**
* a deadlock would happen, because 1 barrier requires that all threads
* wait for it exclusively.
*/
- simple_mtx_lock(&queue->finish_lock);
+ mtx_lock(&queue->lock);
/* The number of threads can be changed to 0, e.g. by the atexit handler. */
if (!queue->num_threads) {
- simple_mtx_unlock(&queue->finish_lock);
+ mtx_unlock(&queue->lock);
return;
}
+ /* We need to disable adding new threads in util_queue_add_job because
+ * the finish operation requires a fixed number of threads.
+ *
+ * Also note that util_queue_add_job can unlock the mutex if there is not
+ * enough space in the queue and wait for space.
+ */
+ unsigned saved_flags = queue->flags;
+ queue->flags &= ~UTIL_QUEUE_INIT_SCALE_THREADS;
+
fences = malloc(queue->num_threads * sizeof(*fences));
util_barrier_init(&barrier, queue->num_threads);
for (unsigned i = 0; i < queue->num_threads; ++i) {
util_queue_fence_init(&fences[i]);
- util_queue_add_job(queue, &barrier, &fences[i],
- util_queue_finish_execute, NULL, 0);
+ util_queue_add_job_locked(queue, &barrier, &fences[i],
+ util_queue_finish_execute, NULL, 0, true);
}
+ queue->flags = saved_flags;
+ mtx_unlock(&queue->lock);
for (unsigned i = 0; i < queue->num_threads; ++i) {
util_queue_fence_wait(&fences[i]);
util_queue_fence_destroy(&fences[i]);
}
- simple_mtx_unlock(&queue->finish_lock);
free(fences);
}