[clangd] The new threading implementation
authorIlya Biryukov <ibiryukov@google.com>
Tue, 6 Feb 2018 15:53:42 +0000 (15:53 +0000)
committerIlya Biryukov <ibiryukov@google.com>
Tue, 6 Feb 2018 15:53:42 +0000 (15:53 +0000)
Summary:
In the new threading model clangd creates one thread per file to manage
the AST and one thread to process each of the incoming requests.
The number of actively running threads is bounded by the semaphore to
avoid overloading the system.

Reviewers: sammccall

Reviewed By: sammccall

Subscribers: klimek, mgorny, jkorous-apple, ioeric, hintonda, cfe-commits

Differential Revision: https://reviews.llvm.org/D42573

llvm-svn: 324356

clang-tools-extra/clangd/CMakeLists.txt
clang-tools-extra/clangd/ClangdServer.h
clang-tools-extra/clangd/ClangdUnit.h
clang-tools-extra/clangd/ClangdUnitStore.cpp [deleted file]
clang-tools-extra/clangd/ClangdUnitStore.h [deleted file]
clang-tools-extra/clangd/TUScheduler.cpp
clang-tools-extra/clangd/TUScheduler.h
clang-tools-extra/clangd/Threading.cpp
clang-tools-extra/clangd/Threading.h
clang-tools-extra/unittests/clangd/CMakeLists.txt
clang-tools-extra/unittests/clangd/ThreadingTests.cpp [new file with mode: 0644]

index 9c42439..6753027 100644 (file)
@@ -6,7 +6,6 @@ add_clang_library(clangDaemon
   ClangdLSPServer.cpp
   ClangdServer.cpp
   ClangdUnit.cpp
-  ClangdUnitStore.cpp
   CodeComplete.cpp
   CodeCompletionStrings.cpp
   CompileArgsCache.cpp
index 79dcf27..fffd46f 100644 (file)
@@ -11,7 +11,6 @@
 #define LLVM_CLANG_TOOLS_EXTRA_CLANGD_CLANGDSERVER_H
 
 #include "ClangdUnit.h"
-#include "ClangdUnitStore.h"
 #include "CodeComplete.h"
 #include "CompileArgsCache.h"
 #include "DraftStore.h"
index bf1aced..12228bc 100644 (file)
@@ -151,6 +151,8 @@ using ASTParsedCallback = std::function<void(PathRef Path, ParsedAST *)>;
 
 /// Manages resources, required by clangd. Allows to rebuild file with new
 /// contents, and provides AST and Preamble for it.
+/// NOTE: Threading-related bits of CppFile are now deprecated and will be
+/// removed soon.
 class CppFile : public std::enable_shared_from_this<CppFile> {
 public:
   // We only allow to create CppFile as shared_ptr, because a future returned by
@@ -178,6 +180,7 @@ public:
   /// that will wait for any ongoing rebuilds to finish and actually set the AST
   /// and Preamble to nulls. It can be run on a different thread. This function
   /// is useful to cancel ongoing rebuilds, if any, before removing CppFile.
+  /// DEPRECATED. This function will be removed soon, please do not use it.
   UniqueFunction<void()> deferCancelRebuild();
 
   /// Rebuild AST and Preamble synchronously on the calling thread.
@@ -200,6 +203,7 @@ public:
   /// The future to finish rebuild returns a list of diagnostics built during
   /// reparse, or None, if another deferRebuild was called before this
   /// rebuild was finished.
+  /// DEPRECATED. This function will be removed soon, please do not use it.
   UniqueFunction<llvm::Optional<std::vector<DiagWithFixIts>>()>
   deferRebuild(ParseInputs &&Inputs);
 
diff --git a/clang-tools-extra/clangd/ClangdUnitStore.cpp b/clang-tools-extra/clangd/ClangdUnitStore.cpp
deleted file mode 100644 (file)
index bc2479d..0000000
+++ /dev/null
@@ -1,37 +0,0 @@
-//===--- ClangdUnitStore.cpp - A ClangdUnits container -----------*-C++-*-===//
-//
-//                     The LLVM Compiler Infrastructure
-//
-// This file is distributed under the University of Illinois Open Source
-// License. See LICENSE.TXT for details.
-//
-//===----------------------------------------------------------------------===//
-
-#include "ClangdUnitStore.h"
-#include "llvm/Support/Path.h"
-#include <algorithm>
-
-using namespace clang::clangd;
-using namespace clang;
-
-std::shared_ptr<CppFile> CppFileCollection::removeIfPresent(PathRef File) {
-  std::lock_guard<std::mutex> Lock(Mutex);
-
-  auto It = OpenedFiles.find(File);
-  if (It == OpenedFiles.end())
-    return nullptr;
-
-  std::shared_ptr<CppFile> Result = It->second;
-  OpenedFiles.erase(It);
-  return Result;
-}
-std::vector<std::pair<Path, std::size_t>>
-CppFileCollection::getUsedBytesPerFile() const {
-  std::lock_guard<std::mutex> Lock(Mutex);
-  std::vector<std::pair<Path, std::size_t>> Result;
-  Result.reserve(OpenedFiles.size());
-  for (auto &&PathAndFile : OpenedFiles)
-    Result.push_back(
-        {PathAndFile.first().str(), PathAndFile.second->getUsedBytes()});
-  return Result;
-}
diff --git a/clang-tools-extra/clangd/ClangdUnitStore.h b/clang-tools-extra/clangd/ClangdUnitStore.h
deleted file mode 100644 (file)
index 6ec0302..0000000
+++ /dev/null
@@ -1,73 +0,0 @@
-//===--- ClangdUnitStore.h - A container of CppFiles -------------*-C++-*-===//
-//
-//                     The LLVM Compiler Infrastructure
-//
-// This file is distributed under the University of Illinois Open Source
-// License. See LICENSE.TXT for details.
-//
-//===---------------------------------------------------------------------===//
-
-#ifndef LLVM_CLANG_TOOLS_EXTRA_CLANGD_CLANGDUNITSTORE_H
-#define LLVM_CLANG_TOOLS_EXTRA_CLANGD_CLANGDUNITSTORE_H
-
-#include "ClangdUnit.h"
-#include "GlobalCompilationDatabase.h"
-#include "Logger.h"
-#include "Path.h"
-#include "clang/Tooling/CompilationDatabase.h"
-#include <mutex>
-
-namespace clang {
-namespace clangd {
-
-class Logger;
-
-/// Thread-safe mapping from FileNames to CppFile.
-class CppFileCollection {
-public:
-  /// \p ASTCallback is called when a file is parsed synchronously. This should
-  /// not be expensive since it blocks diagnostics.
-  explicit CppFileCollection(bool StorePreamblesInMemory,
-                             std::shared_ptr<PCHContainerOperations> PCHs,
-                             ASTParsedCallback ASTCallback)
-      : ASTCallback(std::move(ASTCallback)), PCHs(std::move(PCHs)),
-        StorePreamblesInMemory(StorePreamblesInMemory) {}
-
-  std::shared_ptr<CppFile> getOrCreateFile(PathRef File) {
-    std::lock_guard<std::mutex> Lock(Mutex);
-    auto It = OpenedFiles.find(File);
-    if (It == OpenedFiles.end()) {
-      It = OpenedFiles
-               .try_emplace(File, CppFile::Create(File, StorePreamblesInMemory,
-                                                  PCHs, ASTCallback))
-               .first;
-    }
-    return It->second;
-  }
-
-  std::shared_ptr<CppFile> getFile(PathRef File) const {
-    std::lock_guard<std::mutex> Lock(Mutex);
-    auto It = OpenedFiles.find(File);
-    if (It == OpenedFiles.end())
-      return nullptr;
-    return It->second;
-  }
-
-  /// Removes a CppFile, stored for \p File, if it's inside collection and
-  /// returns it.
-  std::shared_ptr<CppFile> removeIfPresent(PathRef File);
-
-  /// Gets used memory for each of the stored files.
-  std::vector<std::pair<Path, std::size_t>> getUsedBytesPerFile() const;
-
-private:
-  mutable std::mutex Mutex;
-  llvm::StringMap<std::shared_ptr<CppFile>> OpenedFiles;
-  ASTParsedCallback ASTCallback;
-  std::shared_ptr<PCHContainerOperations> PCHs;
-  bool StorePreamblesInMemory;
-};
-} // namespace clangd
-} // namespace clang
-
-#endif
index 4c18dcd..e239365 100644 (file)
@@ -1,9 +1,303 @@
+//===--- TUScheduler.cpp -----------------------------------------*-C++-*-===//
+//
+//                     The LLVM Compiler Infrastructure
+//
+// This file is distributed under the University of Illinois Open Source
+// License. See LICENSE.TXT for details.
+//
+//===----------------------------------------------------------------------===//
+// For each file, managed by TUScheduler, we create a single ASTWorker that
+// manages an AST for that file. All operations that modify or read the AST are
+// run on a separate dedicated thread asynchronously in FIFO order.
+//
+// We start processing each update immediately after we receive it. If two or
+// more updates come subsequently without reads in-between, we attempt to drop
+// an older one to not waste time building the ASTs we don't need.
+//
+// The processing thread of the ASTWorker is also responsible for building the
+// preamble. However, unlike AST, the same preamble can be read concurrently, so
+// we run each of async preamble reads on its own thread.
+//
+// To limit the concurrent load that clangd produces we mantain a semaphore that
+// keeps more than a fixed number of threads from running concurrently.
+//
+// Rationale for cancelling updates.
+// LSP clients can send updates to clangd on each keystroke. Some files take
+// significant time to parse (e.g. a few seconds) and clangd can get starved by
+// the updates to those files. Therefore we try to process only the last update,
+// if possible.
+// Our current strategy to do that is the following:
+// - For each update we immediately schedule rebuild of the AST.
+// - Rebuild of the AST checks if it was cancelled before doing any actual work.
+//   If it was, it does not do an actual rebuild, only reports llvm::None to the
+//   callback
+// - When adding an update, we cancel the last update in the queue if it didn't
+//   have any reads.
+// There is probably a optimal ways to do that. One approach we might take is
+// the following:
+// - For each update we remember the pending inputs, but delay rebuild of the
+//   AST for some timeout.
+// - If subsequent updates come before rebuild was started, we replace the
+//   pending inputs and reset the timer.
+// - If any reads of the AST are scheduled, we start building the AST
+//   immediately.
+
 #include "TUScheduler.h"
 #include "clang/Frontend/PCHContainerOperations.h"
 #include "llvm/Support/Errc.h"
+#include <memory>
+#include <queue>
 
 namespace clang {
 namespace clangd {
+namespace {
+class ASTWorkerHandle;
+
+/// Owns one instance of the AST, schedules updates and reads of it.
+/// Also responsible for building and providing access to the preamble.
+/// Each ASTWorker processes the async requests sent to it on a separate
+/// dedicated thread.
+/// The ASTWorker that manages the AST is shared by both the processing thread
+/// and the TUScheduler. The TUScheduler should discard an ASTWorker when
+/// remove() is called, but its thread may be busy and we don't want to block.
+/// So the workers are accessed via an ASTWorkerHandle. Destroying the handle
+/// signals the worker to exit its run loop and gives up shared ownership of the
+/// worker.
+class ASTWorker {
+  friend class ASTWorkerHandle;
+  ASTWorker(Semaphore &Barrier, std::shared_ptr<CppFile> AST, bool RunSync);
+
+public:
+  /// Create a new ASTWorker and return a handle to it.
+  /// The processing thread is spawned using \p Tasks. However, when \p Tasks
+  /// is null, all requests will be processed on the calling thread
+  /// synchronously instead. \p Barrier is acquired when processing each
+  /// request, it is be used to limit the number of actively running threads.
+  static ASTWorkerHandle Create(AsyncTaskRunner *Tasks, Semaphore &Barrier,
+                                std::shared_ptr<CppFile> AST);
+  ~ASTWorker();
+
+  void update(ParseInputs Inputs,
+              UniqueFunction<void(llvm::Optional<std::vector<DiagWithFixIts>>)>
+                  OnUpdated);
+  void runWithAST(UniqueFunction<void(llvm::Expected<InputsAndAST>)> Action);
+
+  std::shared_ptr<const PreambleData> getPossiblyStalePreamble() const;
+  std::size_t getUsedBytes() const;
+
+private:
+  // Must be called exactly once on processing thread. Will return after
+  // stop() is called on a separate thread and all pending requests are
+  // processed.
+  void run();
+  /// Signal that run() should finish processing pending requests and exit.
+  void stop();
+  /// Adds a new task to the end of the request queue.
+  void startTask(UniqueFunction<void()> Task, bool isUpdate,
+                 llvm::Optional<CancellationFlag> CF);
+
+  using RequestWithCtx = std::pair<UniqueFunction<void()>, Context>;
+
+  const bool RunSync;
+  Semaphore &Barrier;
+  // AST and FileInputs are only accessed on the processing thread from run().
+  const std::shared_ptr<CppFile> AST;
+  // Inputs, corresponding to the current state of AST.
+  ParseInputs FileInputs;
+  // Guards members used by both TUScheduler and the worker thread.
+  mutable std::mutex Mutex;
+  // Set to true to signal run() to finish processing.
+  bool Done;                           /* GUARDED_BY(Mutex) */
+  std::queue<RequestWithCtx> Requests; /* GUARDED_BY(Mutex) */
+  // Only set when last request is an update. This allows us to cancel an update
+  // that was never read, if a subsequent update comes in.
+  llvm::Optional<CancellationFlag> LastUpdateCF; /* GUARDED_BY(Mutex) */
+  std::condition_variable RequestsCV;
+};
+
+/// A smart-pointer-like class that points to an active ASTWorker.
+/// In destructor, signals to the underlying ASTWorker that no new requests will
+/// be sent and the processing loop may exit (after running all pending
+/// requests).
+class ASTWorkerHandle {
+  friend class ASTWorker;
+  ASTWorkerHandle(std::shared_ptr<ASTWorker> Worker)
+      : Worker(std::move(Worker)) {
+    assert(this->Worker);
+  }
+
+public:
+  ASTWorkerHandle(const ASTWorkerHandle &) = delete;
+  ASTWorkerHandle &operator=(const ASTWorkerHandle &) = delete;
+  ASTWorkerHandle(ASTWorkerHandle &&) = default;
+  ASTWorkerHandle &operator=(ASTWorkerHandle &&) = default;
+
+  ~ASTWorkerHandle() {
+    if (Worker)
+      Worker->stop();
+  }
+
+  ASTWorker &operator*() {
+    assert(Worker && "Handle was moved from");
+    return *Worker;
+  }
+
+  ASTWorker *operator->() {
+    assert(Worker && "Handle was moved from");
+    return Worker.get();
+  }
+
+  /// Returns an owning reference to the underlying ASTWorker that can outlive
+  /// the ASTWorkerHandle. However, no new requests to an active ASTWorker can
+  /// be schedule via the returned reference, i.e. only reads of the preamble
+  /// are possible.
+  std::shared_ptr<const ASTWorker> lock() { return Worker; }
+
+private:
+  std::shared_ptr<ASTWorker> Worker;
+};
+
+ASTWorkerHandle ASTWorker::Create(AsyncTaskRunner *Tasks, Semaphore &Barrier,
+                                  std::shared_ptr<CppFile> AST) {
+  std::shared_ptr<ASTWorker> Worker(
+      new ASTWorker(Barrier, std::move(AST), /*RunSync=*/!Tasks));
+  if (Tasks)
+    Tasks->runAsync([Worker]() { Worker->run(); });
+
+  return ASTWorkerHandle(std::move(Worker));
+}
+
+ASTWorker::ASTWorker(Semaphore &Barrier, std::shared_ptr<CppFile> AST,
+                     bool RunSync)
+    : RunSync(RunSync), Barrier(Barrier), AST(std::move(AST)), Done(false) {
+  if (RunSync)
+    return;
+}
+
+ASTWorker::~ASTWorker() {
+#ifndef NDEBUG
+  std::lock_guard<std::mutex> Lock(Mutex);
+  assert(Done && "handle was not destroyed");
+  assert(Requests.empty() && "unprocessed requests when destroying ASTWorker");
+#endif
+}
+
+void ASTWorker::update(
+    ParseInputs Inputs,
+    UniqueFunction<void(llvm::Optional<std::vector<DiagWithFixIts>>)>
+        OnUpdated) {
+  auto Task = [=](CancellationFlag CF, decltype(OnUpdated) OnUpdated) mutable {
+    if (CF.isCancelled()) {
+      OnUpdated(llvm::None);
+      return;
+    }
+    FileInputs = Inputs;
+    auto Diags = AST->rebuild(std::move(Inputs));
+    // We want to report the diagnostics even if this update was cancelled.
+    // It seems more useful than making the clients wait indefinitely if they
+    // spam us with updates.
+    OnUpdated(std::move(Diags));
+  };
+
+  CancellationFlag UpdateCF;
+  startTask(BindWithForward(Task, UpdateCF, std::move(OnUpdated)),
+            /*isUpdate=*/true, UpdateCF);
+}
+
+void ASTWorker::runWithAST(
+    UniqueFunction<void(llvm::Expected<InputsAndAST>)> Action) {
+  auto Task = [=](decltype(Action) Action) {
+    auto ASTWrapper = this->AST->getAST().get();
+    // FIXME: no need to lock here, cleanup the CppFile interface to get rid of
+    // them.
+    ASTWrapper->runUnderLock([&](ParsedAST *AST) {
+      if (!AST) {
+        Action(llvm::make_error<llvm::StringError>(
+            "invalid AST", llvm::errc::invalid_argument));
+        return;
+      }
+      Action(InputsAndAST{FileInputs, *AST});
+    });
+  };
+
+  startTask(BindWithForward(Task, std::move(Action)), /*isUpdate=*/false,
+            llvm::None);
+}
+
+std::shared_ptr<const PreambleData>
+ASTWorker::getPossiblyStalePreamble() const {
+  return AST->getPossiblyStalePreamble();
+}
+
+std::size_t ASTWorker::getUsedBytes() const {
+  // FIXME(ibiryukov): we'll need to take locks here after we remove
+  // thread-safety from CppFile. For now, CppFile is thread-safe and we can
+  // safely call methods on it without acquiring a lock.
+  return AST->getUsedBytes();
+}
+
+void ASTWorker::stop() {
+  {
+    std::lock_guard<std::mutex> Lock(Mutex);
+    assert(!Done && "stop() called twice");
+    Done = true;
+  }
+  RequestsCV.notify_one();
+}
+
+void ASTWorker::startTask(UniqueFunction<void()> Task, bool isUpdate,
+                          llvm::Optional<CancellationFlag> CF) {
+  assert(isUpdate == CF.hasValue() &&
+         "Only updates are expected to pass CancellationFlag");
+
+  if (RunSync) {
+    assert(!Done && "running a task after stop()");
+    Task();
+    return;
+  }
+
+  {
+    std::lock_guard<std::mutex> Lock(Mutex);
+    assert(!Done && "running a task after stop()");
+    if (isUpdate) {
+      if (!Requests.empty() && LastUpdateCF) {
+        // There were no reads for the last unprocessed update, let's cancel it
+        // to not waste time on it.
+        LastUpdateCF->cancel();
+      }
+      LastUpdateCF = std::move(*CF);
+    } else {
+      LastUpdateCF = llvm::None;
+    }
+    Requests.emplace(std::move(Task), Context::current().clone());
+  } // unlock Mutex.
+  RequestsCV.notify_one();
+}
+
+void ASTWorker::run() {
+  while (true) {
+    RequestWithCtx Req;
+    {
+      std::unique_lock<std::mutex> Lock(Mutex);
+      RequestsCV.wait(Lock, [&]() { return Done || !Requests.empty(); });
+      if (Requests.empty()) {
+        assert(Done);
+        return;
+      }
+      // Even when Done is true, we finish processing all pending requests
+      // before exiting the processing loop.
+
+      Req = std::move(Requests.front());
+      Requests.pop();
+    } // unlock Mutex
+
+    std::lock_guard<Semaphore> BarrierLock(Barrier);
+    WithContext Guard(std::move(Req.second));
+    Req.first();
+  }
+}
+} // namespace
+
 unsigned getDefaultAsyncThreadsCount() {
   unsigned HardwareConcurrency = std::thread::hardware_concurrency();
   // C++ standard says that hardware_concurrency()
@@ -14,110 +308,114 @@ unsigned getDefaultAsyncThreadsCount() {
   return HardwareConcurrency;
 }
 
+struct TUScheduler::FileData {
+  /// Latest inputs, passed to TUScheduler::update().
+  ParseInputs Inputs;
+  ASTWorkerHandle Worker;
+};
+
 TUScheduler::TUScheduler(unsigned AsyncThreadsCount,
                          bool StorePreamblesInMemory,
                          ASTParsedCallback ASTCallback)
+    : StorePreamblesInMemory(StorePreamblesInMemory),
+      PCHOps(std::make_shared<PCHContainerOperations>()),
+      ASTCallback(std::move(ASTCallback)), Barrier(AsyncThreadsCount) {
+  if (0 < AsyncThreadsCount)
+    Tasks.emplace();
+}
+
+TUScheduler::~TUScheduler() {
+  // Notify all workers that they need to stop.
+  Files.clear();
 
-    : Files(StorePreamblesInMemory, std::make_shared<PCHContainerOperations>(),
-            std::move(ASTCallback)),
-      Threads(AsyncThreadsCount) {}
+  // Wait for all in-flight tasks to finish.
+  if (Tasks)
+    Tasks->waitForAll();
+}
 
 void TUScheduler::update(
     PathRef File, ParseInputs Inputs,
     UniqueFunction<void(llvm::Optional<std::vector<DiagWithFixIts>>)>
         OnUpdated) {
-  CachedInputs[File] = Inputs;
-
-  auto Resources = Files.getOrCreateFile(File);
-  auto DeferredRebuild = Resources->deferRebuild(std::move(Inputs));
-
-  Threads.addToFront(
-      [](decltype(OnUpdated) OnUpdated,
-         decltype(DeferredRebuild) DeferredRebuild) {
-        auto Diags = DeferredRebuild();
-        OnUpdated(Diags);
-      },
-      std::move(OnUpdated), std::move(DeferredRebuild));
+  std::unique_ptr<FileData> &FD = Files[File];
+  if (!FD) {
+    // Create a new worker to process the AST-related tasks.
+    ASTWorkerHandle Worker = ASTWorker::Create(
+        Tasks ? Tasks.getPointer() : nullptr, Barrier,
+        CppFile::Create(File, StorePreamblesInMemory, PCHOps, ASTCallback));
+    FD = std::unique_ptr<FileData>(new FileData{Inputs, std::move(Worker)});
+  } else {
+    FD->Inputs = Inputs;
+  }
+  FD->Worker->update(std::move(Inputs), std::move(OnUpdated));
 }
 
 void TUScheduler::remove(PathRef File,
                          UniqueFunction<void(llvm::Error)> Action) {
-  CachedInputs.erase(File);
-
-  auto Resources = Files.removeIfPresent(File);
-  if (!Resources) {
+  auto It = Files.find(File);
+  if (It == Files.end()) {
     Action(llvm::make_error<llvm::StringError>(
         "trying to remove non-added document", llvm::errc::invalid_argument));
     return;
   }
-
-  auto DeferredCancel = Resources->deferCancelRebuild();
-  Threads.addToFront(
-      [](decltype(Action) Action, decltype(DeferredCancel) DeferredCancel) {
-        DeferredCancel();
-        Action(llvm::Error::success());
-      },
-      std::move(Action), std::move(DeferredCancel));
+  Files.erase(It);
 }
 
 void TUScheduler::runWithAST(
     PathRef File, UniqueFunction<void(llvm::Expected<InputsAndAST>)> Action) {
-  auto Resources = Files.getFile(File);
-  if (!Resources) {
+  auto It = Files.find(File);
+  if (It == Files.end()) {
     Action(llvm::make_error<llvm::StringError>(
         "trying to get AST for non-added document",
         llvm::errc::invalid_argument));
     return;
   }
 
-  const ParseInputs &Inputs = getInputs(File);
-  // We currently block the calling thread until AST is available and run the
-  // action on the calling thread to avoid inconsistent states coming from
-  // subsequent updates.
-  // FIXME(ibiryukov): this should be moved to the worker threads.
-  Resources->getAST().get()->runUnderLock([&](ParsedAST *AST) {
-    if (AST)
-      Action(InputsAndAST{Inputs, *AST});
-    else
-      Action(llvm::make_error<llvm::StringError>(
-          "Could not build AST for the latest file update",
-          llvm::errc::invalid_argument));
-  });
+  It->second->Worker->runWithAST(std::move(Action));
 }
 
 void TUScheduler::runWithPreamble(
     PathRef File,
     UniqueFunction<void(llvm::Expected<InputsAndPreamble>)> Action) {
-  std::shared_ptr<CppFile> Resources = Files.getFile(File);
-  if (!Resources) {
+  auto It = Files.find(File);
+  if (It == Files.end()) {
     Action(llvm::make_error<llvm::StringError>(
         "trying to get preamble for non-added document",
         llvm::errc::invalid_argument));
     return;
   }
 
-  const ParseInputs &Inputs = getInputs(File);
-  std::shared_ptr<const PreambleData> Preamble =
-      Resources->getPossiblyStalePreamble();
-  Threads.addToFront(
-      [Resources, Preamble, Inputs](decltype(Action) Action) mutable {
-        if (!Preamble)
-          Preamble = Resources->getPossiblyStalePreamble();
+  if (!Tasks) {
+    std::shared_ptr<const PreambleData> Preamble =
+        It->second->Worker->getPossiblyStalePreamble();
+    Action(InputsAndPreamble{It->second->Inputs, Preamble.get()});
+    return;
+  }
 
-        Action(InputsAndPreamble{Inputs, Preamble.get()});
-      },
-      std::move(Action));
-}
+  ParseInputs InputsCopy = It->second->Inputs;
+  std::shared_ptr<const ASTWorker> Worker = It->second->Worker.lock();
+  auto Task = [InputsCopy, Worker, this](Context Ctx,
+                                         decltype(Action) Action) mutable {
+    std::lock_guard<Semaphore> BarrierLock(Barrier);
+    WithContext Guard(std::move(Ctx));
+    std::shared_ptr<const PreambleData> Preamble =
+        Worker->getPossiblyStalePreamble();
+    Action(InputsAndPreamble{InputsCopy, Preamble.get()});
+  };
 
-const ParseInputs &TUScheduler::getInputs(PathRef File) {
-  auto It = CachedInputs.find(File);
-  assert(It != CachedInputs.end());
-  return It->second;
+  Tasks->runAsync(
+      BindWithForward(Task, Context::current().clone(), std::move(Action)));
 }
 
 std::vector<std::pair<Path, std::size_t>>
 TUScheduler::getUsedBytesPerFile() const {
-  return Files.getUsedBytesPerFile();
+  std::vector<std::pair<Path, std::size_t>> Result;
+  Result.reserve(Files.size());
+  for (auto &&PathAndFile : Files)
+    Result.push_back(
+        {PathAndFile.first(), PathAndFile.second->Worker->getUsedBytes()});
+  return Result;
 }
+
 } // namespace clangd
 } // namespace clang
index c7df8c4..41562eb 100644 (file)
@@ -11,9 +11,9 @@
 #define LLVM_CLANG_TOOLS_EXTRA_CLANGD_TUSCHEDULER_H
 
 #include "ClangdUnit.h"
-#include "ClangdUnitStore.h"
 #include "Function.h"
 #include "Threading.h"
+#include "llvm/ADT/StringMap.h"
 
 namespace clang {
 namespace clangd {
@@ -42,6 +42,7 @@ class TUScheduler {
 public:
   TUScheduler(unsigned AsyncThreadsCount, bool StorePreamblesInMemory,
               ASTParsedCallback ASTCallback);
+  ~TUScheduler();
 
   /// Returns estimated memory usage for each of the currently open files.
   /// The order of results is unspecified.
@@ -81,11 +82,17 @@ public:
       UniqueFunction<void(llvm::Expected<InputsAndPreamble>)> Action);
 
 private:
-  const ParseInputs &getInputs(PathRef File);
+  /// This class stores per-file data in the Files map.
+  struct FileData;
 
-  llvm::StringMap<ParseInputs> CachedInputs;
-  CppFileCollection Files;
-  ThreadPool Threads;
+  const bool StorePreamblesInMemory;
+  const std::shared_ptr<PCHContainerOperations> PCHOps;
+  const ASTParsedCallback ASTCallback;
+  Semaphore Barrier;
+  llvm::StringMap<std::unique_ptr<FileData>> Files;
+  // None when running tasks synchronously and non-None when running tasks
+  // asynchronously.
+  llvm::Optional<AsyncTaskRunner> Tasks;
 };
 } // namespace clangd
 } // namespace clang
index 3c0c74b..94bb76c 100644 (file)
@@ -1,63 +1,62 @@
 #include "Threading.h"
+#include "llvm/ADT/ScopeExit.h"
 #include "llvm/Support/FormatVariadic.h"
 #include "llvm/Support/Threading.h"
+#include <thread>
 
 namespace clang {
 namespace clangd {
-ThreadPool::ThreadPool(unsigned AsyncThreadsCount)
-    : RunSynchronously(AsyncThreadsCount == 0) {
-  if (RunSynchronously) {
-    // Don't start the worker thread if we're running synchronously
-    return;
-  }
 
-  Workers.reserve(AsyncThreadsCount);
-  for (unsigned I = 0; I < AsyncThreadsCount; ++I) {
-    Workers.push_back(std::thread([this, I]() {
-      llvm::set_thread_name(llvm::formatv("scheduler/{0}", I));
-      while (true) {
-        UniqueFunction<void()> Request;
-        Context Ctx;
-
-        // Pick request from the queue
-        {
-          std::unique_lock<std::mutex> Lock(Mutex);
-          // Wait for more requests.
-          RequestCV.wait(Lock,
-                         [this] { return !RequestQueue.empty() || Done; });
-          if (RequestQueue.empty()) {
-            assert(Done);
-            return;
-          }
-
-          // We process requests starting from the front of the queue. Users of
-          // ThreadPool have a way to prioritise their requests by putting
-          // them to the either side of the queue (using either addToEnd or
-          // addToFront).
-          std::tie(Request, Ctx) = std::move(RequestQueue.front());
-          RequestQueue.pop_front();
-        } // unlock Mutex
-
-        WithContext WithCtx(std::move(Ctx));
-        Request();
-      }
-    }));
-  }
+CancellationFlag::CancellationFlag()
+    : WasCancelled(std::make_shared<std::atomic<bool>>(false)) {}
+
+Semaphore::Semaphore(std::size_t MaxLocks) : FreeSlots(MaxLocks) {}
+
+void Semaphore::lock() {
+  std::unique_lock<std::mutex> Lock(Mutex);
+  SlotsChanged.wait(Lock, [&]() { return FreeSlots > 0; });
+  --FreeSlots;
 }
 
-ThreadPool::~ThreadPool() {
-  if (RunSynchronously)
-    return; // no worker thread is running in that case
+void Semaphore::unlock() {
+  std::unique_lock<std::mutex> Lock(Mutex);
+  ++FreeSlots;
+  Lock.unlock();
+
+  SlotsChanged.notify_one();
+}
 
+AsyncTaskRunner::~AsyncTaskRunner() { waitForAll(); }
+
+void AsyncTaskRunner::waitForAll() {
+  std::unique_lock<std::mutex> Lock(Mutex);
+  TasksReachedZero.wait(Lock, [&]() { return InFlightTasks == 0; });
+}
+
+void AsyncTaskRunner::runAsync(UniqueFunction<void()> Action) {
   {
-    std::lock_guard<std::mutex> Lock(Mutex);
-    // Wake up the worker thread
-    Done = true;
-  } // unlock Mutex
-  RequestCV.notify_all();
-
-  for (auto &Worker : Workers)
-    Worker.join();
+    std::unique_lock<std::mutex> Lock(Mutex);
+    ++InFlightTasks;
+  }
+
+  auto CleanupTask = llvm::make_scope_exit([this]() {
+    std::unique_lock<std::mutex> Lock(Mutex);
+    int NewTasksCnt = --InFlightTasks;
+    Lock.unlock();
+
+    if (NewTasksCnt == 0)
+      TasksReachedZero.notify_one();
+  });
+
+  std::thread(
+      [](decltype(Action) Action, decltype(CleanupTask)) {
+        Action();
+        // Make sure function stored by Action is destroyed before CleanupTask
+        // is run.
+        Action = nullptr;
+      },
+      std::move(Action), std::move(CleanupTask))
+      .detach();
 }
 } // namespace clangd
 } // namespace clang
index 123d179..a24eed7 100644 (file)
 
 #include "Context.h"
 #include "Function.h"
+#include <atomic>
+#include <cassert>
 #include <condition_variable>
-#include <deque>
+#include <memory>
 #include <mutex>
-#include <thread>
 #include <vector>
 
 namespace clang {
 namespace clangd {
-/// A simple fixed-size thread pool implementation.
-class ThreadPool {
+
+/// A shared boolean flag indicating if the computation was cancelled.
+/// Once cancelled, cannot be returned to the previous state.
+class CancellationFlag {
 public:
-  /// If \p AsyncThreadsCount is 0, requests added using addToFront and addToEnd
-  /// will be processed synchronously on the calling thread.
-  // Otherwise, \p AsyncThreadsCount threads will be created to schedule the
-  // requests.
-  ThreadPool(unsigned AsyncThreadsCount);
-  /// Destructor blocks until all requests are processed and worker threads are
-  /// terminated.
-  ~ThreadPool();
+  CancellationFlag();
 
-  /// Add a new request to run function \p F with args \p As to the start of the
-  /// queue. The request will be run on a separate thread.
-  template <class Func, class... Args>
-  void addToFront(Func &&F, Args &&... As) {
-    if (RunSynchronously) {
-      std::forward<Func>(F)(std::forward<Args>(As)...);
-      return;
-    }
+  void cancel() {
+    assert(WasCancelled && "the object was moved");
+    WasCancelled->store(true);
+  }
 
-    {
-      std::lock_guard<std::mutex> Lock(Mutex);
-      RequestQueue.emplace_front(
-          BindWithForward(std::forward<Func>(F), std::forward<Args>(As)...),
-          Context::current().clone());
-    }
-    RequestCV.notify_one();
+  bool isCancelled() const {
+    assert(WasCancelled && "the object was moved");
+    return WasCancelled->load();
   }
 
-  /// Add a new request to run function \p F with args \p As to the end of the
-  /// queue. The request will be run on a separate thread.
-  template <class Func, class... Args> void addToEnd(Func &&F, Args &&... As) {
-    if (RunSynchronously) {
-      std::forward<Func>(F)(std::forward<Args>(As)...);
-      return;
-    }
+private:
+  std::shared_ptr<std::atomic<bool>> WasCancelled;
+};
 
-    {
-      std::lock_guard<std::mutex> Lock(Mutex);
-      RequestQueue.emplace_back(
-          BindWithForward(std::forward<Func>(F), std::forward<Args>(As)...),
-          Context::current().clone());
-    }
-    RequestCV.notify_one();
-  }
+/// Limits the number of threads that can acquire the lock at the same time.
+class Semaphore {
+public:
+  Semaphore(std::size_t MaxLocks);
+
+  void lock();
+  void unlock();
+
+private:
+  std::mutex Mutex;
+  std::condition_variable SlotsChanged;
+  std::size_t FreeSlots;
+};
+
+/// Runs tasks on separate (detached) threads and wait for all tasks to finish.
+/// Objects that need to spawn threads can own an AsyncTaskRunner to ensure they
+/// all complete on destruction.
+class AsyncTaskRunner {
+public:
+  /// Destructor waits for all pending tasks to finish.
+  ~AsyncTaskRunner();
+
+  void waitForAll();
+  void runAsync(UniqueFunction<void()> Action);
 
 private:
-  bool RunSynchronously;
-  mutable std::mutex Mutex;
-  /// We run some tasks on separate threads(parsing, CppFile cleanup).
-  /// These threads looks into RequestQueue to find requests to handle and
-  /// terminate when Done is set to true.
-  std::vector<std::thread> Workers;
-  /// Setting Done to true will make the worker threads terminate.
-  bool Done = false;
-  /// A queue of requests.
-  std::deque<std::pair<UniqueFunction<void()>, Context>> RequestQueue;
-  /// Condition variable to wake up worker threads.
-  std::condition_variable RequestCV;
+  std::mutex Mutex;
+  std::condition_variable TasksReachedZero;
+  std::size_t InFlightTasks = 0;
 };
 } // namespace clangd
 } // namespace clang
index 8f6125e..c0cba6c 100644 (file)
@@ -21,6 +21,7 @@ add_extra_unittest(ClangdTests
   JSONExprTests.cpp
   URITests.cpp
   TestFS.cpp
+  ThreadingTests.cpp
   TraceTests.cpp
   TUSchedulerTests.cpp
   SourceCodeTests.cpp
diff --git a/clang-tools-extra/unittests/clangd/ThreadingTests.cpp b/clang-tools-extra/unittests/clangd/ThreadingTests.cpp
new file mode 100644 (file)
index 0000000..ffa1288
--- /dev/null
@@ -0,0 +1,61 @@
+//===-- ThreadingTests.cpp --------------------------------------*- C++ -*-===//
+//
+//                     The LLVM Compiler Infrastructure
+//
+// This file is distributed under the University of Illinois Open Source
+// License. See LICENSE.TXT for details.
+//
+//===----------------------------------------------------------------------===//
+
+#include "Threading.h"
+#include "gtest/gtest.h"
+#include <mutex>
+
+namespace clang {
+namespace clangd {
+class ThreadingTest : public ::testing::Test {};
+
+TEST_F(ThreadingTest, TaskRunner) {
+  const int TasksCnt = 100;
+  const int IncrementsPerTask = 1000;
+
+  std::mutex Mutex;
+  int Counter(0); /* GUARDED_BY(Mutex) */
+  {
+    AsyncTaskRunner Tasks;
+    auto scheduleIncrements = [&]() {
+      for (int TaskI = 0; TaskI < TasksCnt; ++TaskI) {
+        Tasks.runAsync([&Counter, &Mutex]() {
+          for (int Increment = 0; Increment < IncrementsPerTask; ++Increment) {
+            std::lock_guard<std::mutex> Lock(Mutex);
+            ++Counter;
+          }
+        });
+      }
+    };
+
+    {
+      // Make sure runAsync is not running tasks synchronously on the same
+      // thread by locking the Mutex used for increments.
+      std::lock_guard<std::mutex> Lock(Mutex);
+      scheduleIncrements();
+    }
+
+    Tasks.waitForAll();
+    {
+      std::lock_guard<std::mutex> Lock(Mutex);
+      ASSERT_EQ(Counter, TasksCnt * IncrementsPerTask);
+    }
+
+    {
+      std::lock_guard<std::mutex> Lock(Mutex);
+      Counter = 0;
+      scheduleIncrements();
+    }
+  }
+  // Check that destructor has waited for tasks to finish.
+  std::lock_guard<std::mutex> Lock(Mutex);
+  ASSERT_EQ(Counter, TasksCnt * IncrementsPerTask);
+}
+} // namespace clangd
+} // namespace clang