#ifndef LLVM_SUPPORT_THREAD_POOL_H
#define LLVM_SUPPORT_THREAD_POOL_H
-#include "llvm/ADT/STLExtras.h"
#include "llvm/Config/llvm-config.h"
#include "llvm/Support/thread.h"
#include <future>
#include <atomic>
-#include <cassert>
#include <condition_variable>
#include <functional>
#include <memory>
/// The pool keeps a vector of threads alive, waiting on a condition variable
/// for some work to become available.
class ThreadPool {
- struct TaskBase {
- virtual ~TaskBase() {}
- virtual void execute() = 0;
- };
-
- template <typename ReturnType> struct TypedTask : public TaskBase {
- explicit TypedTask(std::packaged_task<ReturnType()> Task)
- : Task(std::move(Task)) {}
-
- void execute() override { Task(); }
-
- std::packaged_task<ReturnType()> Task;
- };
-
public:
+ using TaskTy = std::function<void()>;
+ using PackagedTaskTy = std::packaged_task<void()>;
+
/// Construct a pool with the number of threads found by
/// hardware_concurrency().
ThreadPool();
/// Asynchronous submission of a task to the pool. The returned future can be
/// used to wait for the task to finish and is *non-blocking* on destruction.
template <typename Function, typename... Args>
- inline std::shared_future<typename std::result_of<Function(Args...)>::type>
- async(Function &&F, Args &&... ArgList) {
+ inline std::shared_future<void> async(Function &&F, Args &&... ArgList) {
auto Task =
std::bind(std::forward<Function>(F), std::forward<Args>(ArgList)...);
return asyncImpl(std::move(Task));
/// Asynchronous submission of a task to the pool. The returned future can be
/// used to wait for the task to finish and is *non-blocking* on destruction.
template <typename Function>
- inline std::shared_future<typename std::result_of<Function()>::type>
- async(Function &&F) {
+ inline std::shared_future<void> async(Function &&F) {
return asyncImpl(std::forward<Function>(F));
}
private:
/// Asynchronous submission of a task to the pool. The returned future can be
/// used to wait for the task to finish and is *non-blocking* on destruction.
- template <typename TaskTy>
- std::shared_future<typename std::result_of<TaskTy()>::type>
- asyncImpl(TaskTy &&Task) {
- typedef decltype(Task()) ResultTy;
-
- /// Wrap the Task in a packaged_task to return a future object.
- std::packaged_task<ResultTy()> PackagedTask(std::move(Task));
- auto Future = PackagedTask.get_future();
- std::unique_ptr<TaskBase> TB =
- llvm::make_unique<TypedTask<ResultTy>>(std::move(PackagedTask));
-
- {
- // Lock the queue and push the new task
- std::unique_lock<std::mutex> LockGuard(QueueLock);
-
- // Don't allow enqueueing after disabling the pool
- assert(EnableFlag && "Queuing a thread during ThreadPool destruction");
-
- Tasks.push(std::move(TB));
- }
- QueueCondition.notify_one();
- return Future.share();
- }
+ std::shared_future<void> asyncImpl(TaskTy F);
/// Threads in flight
std::vector<llvm::thread> Threads;
/// Tasks waiting for execution in the pool.
- std::queue<std::unique_ptr<TaskBase>> Tasks;
+ std::queue<PackagedTaskTy> Tasks;
/// Locking and signaling for accessing the Tasks queue.
std::mutex QueueLock;
for (unsigned ThreadID = 0; ThreadID < ThreadCount; ++ThreadID) {
Threads.emplace_back([&] {
while (true) {
- std::unique_ptr<TaskBase> Task;
+ PackagedTaskTy Task;
{
std::unique_lock<std::mutex> LockGuard(QueueLock);
// Wait for tasks to be pushed in the queue
Tasks.pop();
}
// Run the task we just grabbed
- Task->execute();
+ Task();
{
// Adjust `ActiveThreads`, in case someone waits on ThreadPool::wait()
[&] { return !ActiveThreads && Tasks.empty(); });
}
+std::shared_future<void> ThreadPool::asyncImpl(TaskTy Task) {
+ /// Wrap the Task in a packaged_task to return a future object.
+ PackagedTaskTy PackagedTask(std::move(Task));
+ auto Future = PackagedTask.get_future();
+ {
+ // Lock the queue and push the new task
+ std::unique_lock<std::mutex> LockGuard(QueueLock);
+
+ // Don't allow enqueueing after disabling the pool
+ assert(EnableFlag && "Queuing a thread during ThreadPool destruction");
+
+ Tasks.push(std::move(PackagedTask));
+ }
+ QueueCondition.notify_one();
+ return Future.share();
+}
+
// The destructor joins all threads, waiting for completion.
ThreadPool::~ThreadPool() {
{
ASSERT_EQ(2, i.load());
}
-TEST_F(ThreadPoolTest, TaskWithResult) {
- CHECK_UNSUPPORTED();
- // By making only 1 thread in the pool the two tasks are serialized with
- // respect to each other, which means that the second one must return 2.
- ThreadPool Pool{1};
- std::atomic_int i{0};
- Pool.async([this, &i] {
- waitForMainThread();
- ++i;
- });
- // Force the future using get()
- std::shared_future<int> Future = Pool.async([&i] { return ++i; });
- ASSERT_EQ(0, i.load());
- setMainThreadReady();
- int Result = Future.get();
- ASSERT_EQ(2, i.load());
- ASSERT_EQ(2, Result);
-}
-
TEST_F(ThreadPoolTest, PoolDestruction) {
CHECK_UNSUPPORTED();
// Test that we are waiting on destruction