Add TaskMap for iterating a function over a set of integers
authorPavel Labath <labath@google.com>
Fri, 5 May 2017 11:16:59 +0000 (11:16 +0000)
committerPavel Labath <labath@google.com>
Fri, 5 May 2017 11:16:59 +0000 (11:16 +0000)
Summary:
Many parallel tasks just want to iterate over all the possible numbers from 0 to N-1.  Rather than enqueue N work items, instead just "map" the function across the requested integer space.

Reviewers: clayborg, labath, tberghammer, zturner

Reviewed By: clayborg, zturner

Subscribers: zturner, lldb-commits

Differential Revision: https://reviews.llvm.org/D32757
Patch by Scott Smith <scott.smith@purestorage.com>.

llvm-svn: 302223

lldb/include/lldb/Utility/TaskPool.h
lldb/source/Plugins/SymbolFile/DWARF/SymbolFileDWARF.cpp
lldb/source/Utility/TaskPool.cpp
lldb/unittests/Utility/TaskPoolTest.cpp

index fb936bb..87b8824 100644 (file)
@@ -53,50 +53,6 @@ private:
   static void AddTaskImpl(std::function<void()> &&task_fn);
 };
 
-// Wrapper class around the global TaskPool implementation to make it possible
-// to create a set of
-// tasks and then wait for the tasks to be completed by the
-// WaitForNextCompletedTask call. This
-// class should be used when WaitForNextCompletedTask is needed because this
-// class add no other
-// extra functionality to the TaskPool class and it have a very minor
-// performance overhead.
-template <typename T> // The return type of the tasks what will be added to this
-                      // task runner
-                      class TaskRunner {
-public:
-  // Add a task to the task runner what will also add the task to the global
-  // TaskPool. The
-  // function doesn't return the std::future for the task because it will be
-  // supplied by the
-  // WaitForNextCompletedTask after the task is completed.
-  template <typename F, typename... Args> void AddTask(F &&f, Args &&... args);
-
-  // Wait for the next task in this task runner to finish and then return the
-  // std::future what
-  // belongs to the finished task. If there is no task in this task runner
-  // (neither pending nor
-  // comleted) then this function will return an invalid future. Usually this
-  // function should be
-  // called in a loop processing the results of the tasks until it returns an
-  // invalid std::future
-  // what means that all task in this task runner is completed.
-  std::future<T> WaitForNextCompletedTask();
-
-  // Convenience method to wait for all task in this TaskRunner to finish. Do
-  // NOT use this class
-  // just because of this method. Use TaskPool instead and wait for each
-  // std::future returned by
-  // AddTask in a loop.
-  void WaitForAllTasks();
-
-private:
-  std::list<std::future<T>> m_ready;
-  std::list<std::future<T>> m_pending;
-  std::mutex m_mutex;
-  std::condition_variable m_cv;
-};
-
 template <typename F, typename... Args>
 std::future<typename std::result_of<F(Args...)>::type>
 TaskPool::AddTask(F &&f, Args &&... args) {
@@ -126,64 +82,10 @@ template <> struct TaskPool::RunTaskImpl<> {
   static void Run() {}
 };
 
-template <typename T>
-template <typename F, typename... Args>
-void TaskRunner<T>::AddTask(F &&f, Args &&... args) {
-  std::unique_lock<std::mutex> lock(m_mutex);
-  auto it = m_pending.emplace(m_pending.end());
-  *it = std::move(TaskPool::AddTask(
-      [this, it](F f, Args... args) {
-        T &&r = f(std::forward<Args>(args)...);
-
-        std::unique_lock<std::mutex> lock(this->m_mutex);
-        this->m_ready.splice(this->m_ready.end(), this->m_pending, it);
-        lock.unlock();
-
-        this->m_cv.notify_one();
-        return r;
-      },
-      std::forward<F>(f), std::forward<Args>(args)...));
-}
-
-template <>
-template <typename F, typename... Args>
-void TaskRunner<void>::AddTask(F &&f, Args &&... args) {
-  std::unique_lock<std::mutex> lock(m_mutex);
-  auto it = m_pending.emplace(m_pending.end());
-  *it = std::move(TaskPool::AddTask(
-      [this, it](F f, Args... args) {
-        f(std::forward<Args>(args)...);
-
-        std::unique_lock<std::mutex> lock(this->m_mutex);
-        this->m_ready.emplace_back(std::move(*it));
-        this->m_pending.erase(it);
-        lock.unlock();
-
-        this->m_cv.notify_one();
-      },
-      std::forward<F>(f), std::forward<Args>(args)...));
-}
-
-template <typename T> std::future<T> TaskRunner<T>::WaitForNextCompletedTask() {
-  std::unique_lock<std::mutex> lock(m_mutex);
-  if (m_ready.empty() && m_pending.empty())
-    return std::future<T>(); // No more tasks
-
-  if (m_ready.empty())
-    m_cv.wait(lock, [this]() { return !this->m_ready.empty(); });
-
-  std::future<T> res = std::move(m_ready.front());
-  m_ready.pop_front();
-
-  lock.unlock();
-  res.wait();
-
-  return std::move(res);
-}
-
-template <typename T> void TaskRunner<T>::WaitForAllTasks() {
-  while (WaitForNextCompletedTask().valid())
-    ;
-}
+// Run 'func' on every value from begin .. end-1.  Each worker will grab
+// 'batch_size' numbers at a time to work on, so for very fast functions, batch
+// should be large enough to avoid too much cache line contention.
+void TaskMapOverInt(size_t begin, size_t end,
+                    std::function<void(size_t)> const &func);
 
 #endif // #ifndef utility_TaskPool_h_
index 8c2fc3d..ad6af8d 100644 (file)
@@ -1946,7 +1946,9 @@ void SymbolFileDWARF::Index() {
     std::vector<NameToDIE> type_index(num_compile_units);
     std::vector<NameToDIE> namespace_index(num_compile_units);
 
-    std::vector<bool> clear_cu_dies(num_compile_units, false);
+    // std::vector<bool> might be implemented using bit test-and-set, so use
+    // uint8_t instead.
+    std::vector<uint8_t> clear_cu_dies(num_compile_units, false);
     auto parser_fn = [debug_info, &function_basename_index,
                       &function_fullname_index, &function_method_index,
                       &function_selector_index, &objc_class_selectors_index,
@@ -1963,22 +1965,18 @@ void SymbolFileDWARF::Index() {
       return cu_idx;
     };
 
-    auto extract_fn = [debug_info](uint32_t cu_idx) {
+    auto extract_fn = [debug_info, &clear_cu_dies](uint32_t cu_idx) {
       DWARFCompileUnit *dwarf_cu = debug_info->GetCompileUnitAtIndex(cu_idx);
       if (dwarf_cu) {
         // dwarf_cu->ExtractDIEsIfNeeded(false) will return zero if the
         // DIEs for a compile unit have already been parsed.
-        return std::make_pair(cu_idx, dwarf_cu->ExtractDIEsIfNeeded(false) > 1);
+        if (dwarf_cu->ExtractDIEsIfNeeded(false) > 1)
+          clear_cu_dies[cu_idx] = true;
       }
-      return std::make_pair(cu_idx, false);
     };
 
     // Create a task runner that extracts dies for each DWARF compile unit in a
     // separate thread
-    TaskRunner<std::pair<uint32_t, bool>> task_runner_extract;
-    for (uint32_t cu_idx = 0; cu_idx < num_compile_units; ++cu_idx)
-      task_runner_extract.AddTask(extract_fn, cu_idx);
-
     //----------------------------------------------------------------------
     // First figure out which compile units didn't have their DIEs already
     // parsed and remember this.  If no DIEs were parsed prior to this index
@@ -1988,48 +1986,37 @@ void SymbolFileDWARF::Index() {
     // a DIE in one compile unit refers to another and the indexes accesses
     // those DIEs.
     //----------------------------------------------------------------------
-    while (true) {
-      auto f = task_runner_extract.WaitForNextCompletedTask();
-      if (!f.valid())
-        break;
-      unsigned cu_idx;
-      bool clear;
-      std::tie(cu_idx, clear) = f.get();
-      clear_cu_dies[cu_idx] = clear;
-    }
+    TaskMapOverInt(0, num_compile_units, extract_fn);
 
     // Now create a task runner that can index each DWARF compile unit in a
     // separate
     // thread so we can index quickly.
 
-    TaskRunner<uint32_t> task_runner;
-    for (uint32_t cu_idx = 0; cu_idx < num_compile_units; ++cu_idx)
-      task_runner.AddTask(parser_fn, cu_idx);
+    TaskMapOverInt(0, num_compile_units, parser_fn);
 
-    while (true) {
-      std::future<uint32_t> f = task_runner.WaitForNextCompletedTask();
-      if (!f.valid())
-        break;
-      uint32_t cu_idx = f.get();
-
-      m_function_basename_index.Append(function_basename_index[cu_idx]);
-      m_function_fullname_index.Append(function_fullname_index[cu_idx]);
-      m_function_method_index.Append(function_method_index[cu_idx]);
-      m_function_selector_index.Append(function_selector_index[cu_idx]);
-      m_objc_class_selectors_index.Append(objc_class_selectors_index[cu_idx]);
-      m_global_index.Append(global_index[cu_idx]);
-      m_type_index.Append(type_index[cu_idx]);
-      m_namespace_index.Append(namespace_index[cu_idx]);
-    }
+    auto finalize_fn = [](NameToDIE &index, std::vector<NameToDIE> &srcs) {
+      for (auto &src : srcs)
+        index.Append(src);
+      index.Finalize();
+    };
 
-    TaskPool::RunTasks([&]() { m_function_basename_index.Finalize(); },
-                       [&]() { m_function_fullname_index.Finalize(); },
-                       [&]() { m_function_method_index.Finalize(); },
-                       [&]() { m_function_selector_index.Finalize(); },
-                       [&]() { m_objc_class_selectors_index.Finalize(); },
-                       [&]() { m_global_index.Finalize(); },
-                       [&]() { m_type_index.Finalize(); },
-                       [&]() { m_namespace_index.Finalize(); });
+    TaskPool::RunTasks(
+        [&]() {
+          finalize_fn(m_function_basename_index, function_basename_index);
+        },
+        [&]() {
+          finalize_fn(m_function_fullname_index, function_fullname_index);
+        },
+        [&]() { finalize_fn(m_function_method_index, function_method_index); },
+        [&]() {
+          finalize_fn(m_function_selector_index, function_selector_index);
+        },
+        [&]() {
+          finalize_fn(m_objc_class_selectors_index, objc_class_selectors_index);
+        },
+        [&]() { finalize_fn(m_global_index, global_index); },
+        [&]() { finalize_fn(m_type_index, type_index); },
+        [&]() { finalize_fn(m_namespace_index, namespace_index); });
 
     //----------------------------------------------------------------------
     // Keep memory down by clearing DIEs for any compile units if indexing
index 244e64f..d8306dc 100644 (file)
@@ -73,3 +73,26 @@ void TaskPoolImpl::Worker(TaskPoolImpl *pool) {
     f();
   }
 }
+
+void TaskMapOverInt(size_t begin, size_t end,
+                    std::function<void(size_t)> const &func) {
+  std::atomic<size_t> idx{begin};
+  size_t num_workers =
+      std::min<size_t>(end, std::thread::hardware_concurrency());
+
+  auto wrapper = [&idx, end, &func]() {
+    while (true) {
+      size_t i = idx.fetch_add(1);
+      if (i >= end)
+        break;
+      func(i);
+    }
+  };
+
+  std::vector<std::future<void>> futures;
+  futures.reserve(num_workers);
+  for (size_t i = 0; i < num_workers; i++)
+    futures.push_back(TaskPool::AddTask(wrapper));
+  for (size_t i = 0; i < num_workers; i++)
+    futures[i].wait();
+}
index 172e32a..e340a81 100644 (file)
@@ -30,25 +30,14 @@ TEST(TaskPoolTest, RunTasks) {
   ASSERT_EQ(17, r[3]);
 }
 
-TEST(TaskPoolTest, TaskRunner) {
-  auto fn = [](int x) { return std::make_pair(x, x * x); };
-
-  TaskRunner<std::pair<int, int>> tr;
-  tr.AddTask(fn, 1);
-  tr.AddTask(fn, 2);
-  tr.AddTask(fn, 3);
-  tr.AddTask(fn, 4);
-
-  int count = 0;
-  while (true) {
-    auto f = tr.WaitForNextCompletedTask();
-    if (!f.valid())
-      break;
-
-    ++count;
-    std::pair<int, int> v = f.get();
-    ASSERT_EQ(v.first * v.first, v.second);
-  }
-
-  ASSERT_EQ(4, count);
+TEST(TaskPoolTest, TaskMap) {
+  int data[4];
+  auto fn = [&data](int x) { data[x] = x * x; };
+
+  TaskMapOverInt(0, 4, fn);
+
+  ASSERT_EQ(data[0], 0);
+  ASSERT_EQ(data[1], 1);
+  ASSERT_EQ(data[2], 4);
+  ASSERT_EQ(data[3], 9);
 }