[ThreadPool] Rollback recent changes until I figure out the breakage.
authorDavide Italiano <davide@freebsd.org>
Mon, 28 Nov 2016 09:17:12 +0000 (09:17 +0000)
committerDavide Italiano <davide@freebsd.org>
Mon, 28 Nov 2016 09:17:12 +0000 (09:17 +0000)
llvm-svn: 288018

llvm/include/llvm/Support/ThreadPool.h
llvm/lib/Support/ThreadPool.cpp
llvm/unittests/Support/ThreadPool.cpp

index da1834d..665cec2 100644 (file)
@@ -72,27 +72,32 @@ public:
   /// Blocking destructor: the pool will wait for all the threads to complete.
   ~ThreadPool();
 
-  /// Asynchronous submission of a task to the pool.
+  /// Asynchronous submission of a task to the pool. The returned future can be
+  /// used to wait for the task to finish and is *non-blocking* on destruction.
   template <typename Function, typename... Args>
-  inline void async(Function &&F, Args &&... ArgList) {
+  inline std::shared_future<VoidTy> async(Function &&F, Args &&... ArgList) {
     auto Task =
         std::bind(std::forward<Function>(F), std::forward<Args>(ArgList)...);
 #ifndef _MSC_VER
-    asyncImpl(std::move(Task));
+    return asyncImpl(std::move(Task));
 #else
     // This lambda has to be marked mutable because MSVC 2013's std::bind call
     // operator isn't const qualified.
-    asyncImpl([Task](VoidTy) mutable { Task(); });
+    return asyncImpl([Task](VoidTy) mutable -> VoidTy {
+      Task();
+      return VoidTy();
+    });
 #endif
   }
 
-  /// Asynchronous submission of a task to the pool.
+  /// Asynchronous submission of a task to the pool. The returned future can be
+  /// used to wait for the task to finish and is *non-blocking* on destruction.
   template <typename Function>
-  inline void async(Function &&F) {
+  inline std::shared_future<VoidTy> async(Function &&F) {
 #ifndef _MSC_VER
-    asyncImpl(std::forward<Function>(F));
+    return asyncImpl(std::forward<Function>(F));
 #else
-    asyncImpl([F] (VoidTy) { F(); });
+    return asyncImpl([F] (VoidTy) -> VoidTy { F(); return VoidTy(); });
 #endif
   }
 
@@ -101,8 +106,9 @@ public:
   void wait();
 
 private:
-  /// Asynchronous submission of a task to the pool.
-  void asyncImpl(TaskTy F);
+  /// Asynchronous submission of a task to the pool. The returned future can be
+  /// used to wait for the task to finish and is *non-blocking* on destruction.
+  std::shared_future<VoidTy> asyncImpl(TaskTy F);
 
   /// Threads in flight
   std::vector<llvm::thread> Threads;
index 022f32a..db03a4d 100644 (file)
@@ -82,7 +82,7 @@ void ThreadPool::wait() {
                            [&] { return !ActiveThreads && Tasks.empty(); });
 }
 
-void ThreadPool::asyncImpl(TaskTy Task) {
+std::shared_future<ThreadPool::VoidTy> ThreadPool::asyncImpl(TaskTy Task) {
   /// Wrap the Task in a packaged_task to return a future object.
   PackagedTaskTy PackagedTask(std::move(Task));
   auto Future = PackagedTask.get_future();
@@ -96,6 +96,7 @@ void ThreadPool::asyncImpl(TaskTy Task) {
     Tasks.push(std::move(PackagedTask));
   }
   QueueCondition.notify_one();
+  return Future.share();
 }
 
 // The destructor joins all threads, waiting for completion.
@@ -135,7 +136,7 @@ void ThreadPool::wait() {
   }
 }
 
-void ThreadPool::asyncImpl(TaskTy Task) {
+std::shared_future<ThreadPool::VoidTy> ThreadPool::asyncImpl(TaskTy Task) {
 #ifndef _MSC_VER
   // Get a Future with launch::deferred execution using std::async
   auto Future = std::async(std::launch::deferred, std::move(Task)).share();
@@ -147,6 +148,7 @@ void ThreadPool::asyncImpl(TaskTy Task) {
   PackagedTaskTy PackagedTask([Future](bool) -> bool { Future.get(); return false; });
 #endif
   Tasks.push(std::move(PackagedTask));
+  return Future;
 }
 
 ThreadPool::~ThreadPool() {
index bb972f7..8e03aac 100644 (file)
@@ -131,6 +131,22 @@ TEST_F(ThreadPoolTest, Async) {
   ASSERT_EQ(2, i.load());
 }
 
+TEST_F(ThreadPoolTest, GetFuture) {
+  CHECK_UNSUPPORTED();
+  ThreadPool Pool{2};
+  std::atomic_int i{0};
+  Pool.async([this, &i] {
+    waitForMainThread();
+    ++i;
+  });
+  // Force the future using get()
+  Pool.async([&i] { ++i; }).get();
+  ASSERT_NE(2, i.load());
+  setMainThreadReady();
+  Pool.wait();
+  ASSERT_EQ(2, i.load());
+}
+
 TEST_F(ThreadPoolTest, PoolDestruction) {
   CHECK_UNSUPPORTED();
   // Test that we are waiting on destruction