unsigned getThreadCount() const { return ThreadCount; }
private:
+ bool workCompletedUnlocked() { return !ActiveThreads && Tasks.empty(); }
+
/// Asynchronous submission of a task to the pool. The returned future can be
/// used to wait for the task to finish and is *non-blocking* on destruction.
std::shared_future<void> asyncImpl(TaskTy F);
std::mutex QueueLock;
std::condition_variable QueueCondition;
- /// Locking and signaling for job completion
- std::mutex CompletionLock;
+ /// Signaling for job completion
std::condition_variable CompletionCondition;
/// Keep track of the number of thread actually busy
- std::atomic<unsigned> ActiveThreads;
+ unsigned ActiveThreads = 0;
#if LLVM_ENABLE_THREADS // avoids warning for unused variable
/// Signal for the destruction of the pool, asking thread to exit.
- bool EnableFlag;
+ bool EnableFlag = true;
#endif
unsigned ThreadCount;
#if LLVM_ENABLE_THREADS
ThreadPool::ThreadPool(ThreadPoolStrategy S)
- : ActiveThreads(0), EnableFlag(true),
- ThreadCount(S.compute_thread_count()) {
+ : ThreadCount(S.compute_thread_count()) {
// Create ThreadCount threads that will loop forever, wait on QueueCondition
// for tasks to be queued or the Pool to be destroyed.
Threads.reserve(ThreadCount);
// We first need to signal that we are active before popping the queue
// in order for wait() to properly detect that even if the queue is
// empty, there is still a task in flight.
- {
- std::unique_lock<std::mutex> LockGuard(CompletionLock);
- ++ActiveThreads;
- }
+ ++ActiveThreads;
Task = std::move(Tasks.front());
Tasks.pop();
}
// Run the task we just grabbed
Task();
+ bool Notify;
{
// Adjust `ActiveThreads`, in case someone waits on ThreadPool::wait()
- std::unique_lock<std::mutex> LockGuard(CompletionLock);
+ std::lock_guard<std::mutex> LockGuard(QueueLock);
--ActiveThreads;
+ Notify = workCompletedUnlocked();
}
-
- // Notify task completion, in case someone waits on ThreadPool::wait()
- CompletionCondition.notify_all();
+ // Notify task completion if this is the last active thread, in case
+ // someone waits on ThreadPool::wait().
+ if (Notify)
+ CompletionCondition.notify_all();
}
});
}
void ThreadPool::wait() {
// Wait for all threads to complete and the queue to be empty
- std::unique_lock<std::mutex> LockGuard(CompletionLock);
- // The order of the checks for ActiveThreads and Tasks.empty() matters because
- // any active threads might be modifying the Tasks queue, and this would be a
- // race.
- CompletionCondition.wait(LockGuard,
- [&] { return !ActiveThreads && Tasks.empty(); });
+ std::unique_lock<std::mutex> LockGuard(QueueLock);
+ CompletionCondition.wait(LockGuard, [&] { return workCompletedUnlocked(); });
}
std::shared_future<void> ThreadPool::asyncImpl(TaskTy Task) {
// No threads are launched, issue a warning if ThreadCount is not 0
ThreadPool::ThreadPool(ThreadPoolStrategy S)
- : ActiveThreads(0), ThreadCount(S.compute_thread_count()) {
+ : ThreadCount(S.compute_thread_count()) {
if (ThreadCount != 1) {
errs() << "Warning: request a ThreadPool with " << ThreadCount
<< " threads, but LLVM_ENABLE_THREADS has been turned off\n";