+++ /dev/null
-//===-- llvm/Support/TaskQueue.h - A TaskQueue implementation ---*- C++ -*-===//
-//
-// Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
-// See https://llvm.org/LICENSE.txt for license information.
-// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
-//
-//===----------------------------------------------------------------------===//
-//
-// This file defines a crude C++11 based task queue.
-//
-//===----------------------------------------------------------------------===//
-
-#ifndef LLVM_SUPPORT_TASKQUEUE_H
-#define LLVM_SUPPORT_TASKQUEUE_H
-
-#include "llvm/Config/llvm-config.h"
-#include "llvm/Support/ThreadPool.h"
-#include "llvm/Support/thread.h"
-
-#include <atomic>
-#include <cassert>
-#include <condition_variable>
-#include <deque>
-#include <functional>
-#include <future>
-#include <memory>
-#include <mutex>
-#include <utility>
-
-namespace llvm {
-/// TaskQueue executes serialized work on a user-defined Thread Pool. It
-/// guarantees that if task B is enqueued after task A, task B begins after
-/// task A completes and there is no overlap between the two.
-class TaskQueue {
- // Because we don't have init capture to use move-only local variables that
- // are captured into a lambda, we create the promise inside an explicit
- // callable struct. We want to do as much of the wrapping in the
- // type-specialized domain (before type erasure) and then erase this into a
- // std::function.
- template <typename Callable> struct Task {
- using ResultTy = std::invoke_result_t<Callable>;
- explicit Task(Callable C, TaskQueue &Parent)
- : C(std::move(C)), P(std::make_shared<std::promise<ResultTy>>()),
- Parent(&Parent) {}
-
- template<typename T>
- void invokeCallbackAndSetPromise(T*) {
- P->set_value(C());
- }
-
- void invokeCallbackAndSetPromise(void*) {
- C();
- P->set_value();
- }
-
- void operator()() noexcept {
- ResultTy *Dummy = nullptr;
- invokeCallbackAndSetPromise(Dummy);
- Parent->completeTask();
- }
-
- Callable C;
- std::shared_ptr<std::promise<ResultTy>> P;
- TaskQueue *Parent;
- };
-
-public:
- /// Construct a task queue with no work.
- TaskQueue(ThreadPool &Scheduler) : Scheduler(Scheduler) { (void)Scheduler; }
-
- /// Blocking destructor: the queue will wait for all work to complete.
- ~TaskQueue() {
- Scheduler.wait();
- assert(Tasks.empty());
- }
-
- /// Asynchronous submission of a task to the queue. The returned future can be
- /// used to wait for the task (and all previous tasks that have not yet
- /// completed) to finish.
- template <typename Callable>
- std::future<std::invoke_result_t<Callable>> async(Callable &&C) {
-#if !LLVM_ENABLE_THREADS
- static_assert(false,
- "TaskQueue requires building with LLVM_ENABLE_THREADS!");
-#endif
- Task<Callable> T{std::move(C), *this};
- using ResultTy = std::invoke_result_t<Callable>;
- std::future<ResultTy> F = T.P->get_future();
- {
- std::lock_guard<std::mutex> Lock(QueueLock);
- // If there's already a task in flight, just queue this one up. If
- // there is not a task in flight, bypass the queue and schedule this
- // task immediately.
- if (IsTaskInFlight)
- Tasks.push_back(std::move(T));
- else {
- Scheduler.async(std::move(T));
- IsTaskInFlight = true;
- }
- }
- return F;
- }
-
-private:
- void completeTask() {
- // We just completed a task. If there are no more tasks in the queue,
- // update IsTaskInFlight to false and stop doing work. Otherwise
- // schedule the next task (while not holding the lock).
- std::function<void()> Continuation;
- {
- std::lock_guard<std::mutex> Lock(QueueLock);
- if (Tasks.empty()) {
- IsTaskInFlight = false;
- return;
- }
-
- Continuation = std::move(Tasks.front());
- Tasks.pop_front();
- }
- Scheduler.async(std::move(Continuation));
- }
-
- /// The thread pool on which to run the work.
- ThreadPool &Scheduler;
-
- /// State which indicates whether the queue currently is currently processing
- /// any work.
- bool IsTaskInFlight = false;
-
- /// Mutex for synchronizing access to the Tasks array.
- std::mutex QueueLock;
-
- /// Tasks waiting for execution in the queue.
- std::deque<std::function<void()>> Tasks;
-};
-} // namespace llvm
-
-#endif // LLVM_SUPPORT_TASKQUEUE_H
+++ /dev/null
-//========- unittests/Support/TaskQueue.cpp - TaskQueue.h tests ------========//
-//
-// Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
-// See https://llvm.org/LICENSE.txt for license information.
-// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
-//
-//===----------------------------------------------------------------------===//
-
-#include "llvm/Config/llvm-config.h"
-
-#if LLVM_ENABLE_THREADS
-
-#include "llvm/Support/TaskQueue.h"
-
-#include "gtest/gtest.h"
-
-using namespace llvm;
-
-class TaskQueueTest : public testing::Test {
-protected:
- TaskQueueTest() {}
-};
-
-TEST_F(TaskQueueTest, OrderedFutures) {
- ThreadPool TP(hardware_concurrency(1));
- TaskQueue TQ(TP);
- std::atomic<int> X{ 0 };
- std::atomic<int> Y{ 0 };
- std::atomic<int> Z{ 0 };
-
- std::mutex M1, M2, M3;
- std::unique_lock<std::mutex> L1(M1);
- std::unique_lock<std::mutex> L2(M2);
- std::unique_lock<std::mutex> L3(M3);
-
- std::future<void> F1 = TQ.async([&] {
- std::unique_lock<std::mutex> Lock(M1);
- ++X;
- });
- std::future<void> F2 = TQ.async([&] {
- std::unique_lock<std::mutex> Lock(M2);
- ++Y;
- });
- std::future<void> F3 = TQ.async([&] {
- std::unique_lock<std::mutex> Lock(M3);
- ++Z;
- });
-
- L1.unlock();
- F1.wait();
- ASSERT_EQ(1, X);
- ASSERT_EQ(0, Y);
- ASSERT_EQ(0, Z);
-
- L2.unlock();
- F2.wait();
- ASSERT_EQ(1, X);
- ASSERT_EQ(1, Y);
- ASSERT_EQ(0, Z);
-
- L3.unlock();
- F3.wait();
- ASSERT_EQ(1, X);
- ASSERT_EQ(1, Y);
- ASSERT_EQ(1, Z);
-}
-
-TEST_F(TaskQueueTest, UnOrderedFutures) {
- ThreadPool TP(hardware_concurrency(1));
- TaskQueue TQ(TP);
- std::atomic<int> X{ 0 };
- std::atomic<int> Y{ 0 };
- std::atomic<int> Z{ 0 };
- std::mutex M;
-
- std::unique_lock<std::mutex> Lock(M);
-
- std::future<void> F1 = TQ.async([&] { ++X; });
- std::future<void> F2 = TQ.async([&] { ++Y; });
- std::future<void> F3 = TQ.async([&M, &Z] {
- std::unique_lock<std::mutex> Lock(M);
- ++Z;
- });
-
- F2.wait();
- ASSERT_EQ(1, X);
- ASSERT_EQ(1, Y);
- ASSERT_EQ(0, Z);
-
- Lock.unlock();
-
- F3.wait();
- ASSERT_EQ(1, X);
- ASSERT_EQ(1, Y);
- ASSERT_EQ(1, Z);
-}
-
-TEST_F(TaskQueueTest, FutureWithReturnValue) {
- ThreadPool TP(hardware_concurrency(1));
- TaskQueue TQ(TP);
- std::future<std::string> F1 = TQ.async([&] { return std::string("Hello"); });
- std::future<int> F2 = TQ.async([&] { return 42; });
-
- ASSERT_EQ(42, F2.get());
- ASSERT_EQ("Hello", F1.get());
-}
-#endif