static void AddTaskImpl(std::function<void()> &&task_fn);
};
-// Wrapper class around the global TaskPool implementation to make it possible
-// to create a set of
-// tasks and then wait for the tasks to be completed by the
-// WaitForNextCompletedTask call. This
-// class should be used when WaitForNextCompletedTask is needed because this
-// class add no other
-// extra functionality to the TaskPool class and it have a very minor
-// performance overhead.
-template <typename T> // The return type of the tasks what will be added to this
- // task runner
- class TaskRunner {
-public:
- // Add a task to the task runner what will also add the task to the global
- // TaskPool. The
- // function doesn't return the std::future for the task because it will be
- // supplied by the
- // WaitForNextCompletedTask after the task is completed.
- template <typename F, typename... Args> void AddTask(F &&f, Args &&... args);
-
- // Wait for the next task in this task runner to finish and then return the
- // std::future what
- // belongs to the finished task. If there is no task in this task runner
- // (neither pending nor
- // comleted) then this function will return an invalid future. Usually this
- // function should be
- // called in a loop processing the results of the tasks until it returns an
- // invalid std::future
- // what means that all task in this task runner is completed.
- std::future<T> WaitForNextCompletedTask();
-
- // Convenience method to wait for all task in this TaskRunner to finish. Do
- // NOT use this class
- // just because of this method. Use TaskPool instead and wait for each
- // std::future returned by
- // AddTask in a loop.
- void WaitForAllTasks();
-
-private:
- std::list<std::future<T>> m_ready;
- std::list<std::future<T>> m_pending;
- std::mutex m_mutex;
- std::condition_variable m_cv;
-};
-
template <typename F, typename... Args>
std::future<typename std::result_of<F(Args...)>::type>
TaskPool::AddTask(F &&f, Args &&... args) {
static void Run() {}
};
-template <typename T>
-template <typename F, typename... Args>
-void TaskRunner<T>::AddTask(F &&f, Args &&... args) {
- std::unique_lock<std::mutex> lock(m_mutex);
- auto it = m_pending.emplace(m_pending.end());
- *it = std::move(TaskPool::AddTask(
- [this, it](F f, Args... args) {
- T &&r = f(std::forward<Args>(args)...);
-
- std::unique_lock<std::mutex> lock(this->m_mutex);
- this->m_ready.splice(this->m_ready.end(), this->m_pending, it);
- lock.unlock();
-
- this->m_cv.notify_one();
- return r;
- },
- std::forward<F>(f), std::forward<Args>(args)...));
-}
-
-template <>
-template <typename F, typename... Args>
-void TaskRunner<void>::AddTask(F &&f, Args &&... args) {
- std::unique_lock<std::mutex> lock(m_mutex);
- auto it = m_pending.emplace(m_pending.end());
- *it = std::move(TaskPool::AddTask(
- [this, it](F f, Args... args) {
- f(std::forward<Args>(args)...);
-
- std::unique_lock<std::mutex> lock(this->m_mutex);
- this->m_ready.emplace_back(std::move(*it));
- this->m_pending.erase(it);
- lock.unlock();
-
- this->m_cv.notify_one();
- },
- std::forward<F>(f), std::forward<Args>(args)...));
-}
-
-template <typename T> std::future<T> TaskRunner<T>::WaitForNextCompletedTask() {
- std::unique_lock<std::mutex> lock(m_mutex);
- if (m_ready.empty() && m_pending.empty())
- return std::future<T>(); // No more tasks
-
- if (m_ready.empty())
- m_cv.wait(lock, [this]() { return !this->m_ready.empty(); });
-
- std::future<T> res = std::move(m_ready.front());
- m_ready.pop_front();
-
- lock.unlock();
- res.wait();
-
- return std::move(res);
-}
-
-template <typename T> void TaskRunner<T>::WaitForAllTasks() {
- while (WaitForNextCompletedTask().valid())
- ;
-}
+// Run 'func' on every value from begin .. end-1. Each worker will grab
+// 'batch_size' numbers at a time to work on, so for very fast functions, batch
+// should be large enough to avoid too much cache line contention.
+void TaskMapOverInt(size_t begin, size_t end,
+ std::function<void(size_t)> const &func);
#endif // #ifndef utility_TaskPool_h_
std::vector<NameToDIE> type_index(num_compile_units);
std::vector<NameToDIE> namespace_index(num_compile_units);
- std::vector<bool> clear_cu_dies(num_compile_units, false);
+ // std::vector<bool> might be implemented using bit test-and-set, so use
+ // uint8_t instead.
+ std::vector<uint8_t> clear_cu_dies(num_compile_units, false);
auto parser_fn = [debug_info, &function_basename_index,
&function_fullname_index, &function_method_index,
&function_selector_index, &objc_class_selectors_index,
return cu_idx;
};
- auto extract_fn = [debug_info](uint32_t cu_idx) {
+ auto extract_fn = [debug_info, &clear_cu_dies](uint32_t cu_idx) {
DWARFCompileUnit *dwarf_cu = debug_info->GetCompileUnitAtIndex(cu_idx);
if (dwarf_cu) {
// dwarf_cu->ExtractDIEsIfNeeded(false) will return zero if the
// DIEs for a compile unit have already been parsed.
- return std::make_pair(cu_idx, dwarf_cu->ExtractDIEsIfNeeded(false) > 1);
+ if (dwarf_cu->ExtractDIEsIfNeeded(false) > 1)
+ clear_cu_dies[cu_idx] = true;
}
- return std::make_pair(cu_idx, false);
};
// Create a task runner that extracts dies for each DWARF compile unit in a
// separate thread
- TaskRunner<std::pair<uint32_t, bool>> task_runner_extract;
- for (uint32_t cu_idx = 0; cu_idx < num_compile_units; ++cu_idx)
- task_runner_extract.AddTask(extract_fn, cu_idx);
-
//----------------------------------------------------------------------
// First figure out which compile units didn't have their DIEs already
// parsed and remember this. If no DIEs were parsed prior to this index
// a DIE in one compile unit refers to another and the indexes accesses
// those DIEs.
//----------------------------------------------------------------------
- while (true) {
- auto f = task_runner_extract.WaitForNextCompletedTask();
- if (!f.valid())
- break;
- unsigned cu_idx;
- bool clear;
- std::tie(cu_idx, clear) = f.get();
- clear_cu_dies[cu_idx] = clear;
- }
+ TaskMapOverInt(0, num_compile_units, extract_fn);
// Now create a task runner that can index each DWARF compile unit in a
// separate
// thread so we can index quickly.
- TaskRunner<uint32_t> task_runner;
- for (uint32_t cu_idx = 0; cu_idx < num_compile_units; ++cu_idx)
- task_runner.AddTask(parser_fn, cu_idx);
+ TaskMapOverInt(0, num_compile_units, parser_fn);
- while (true) {
- std::future<uint32_t> f = task_runner.WaitForNextCompletedTask();
- if (!f.valid())
- break;
- uint32_t cu_idx = f.get();
-
- m_function_basename_index.Append(function_basename_index[cu_idx]);
- m_function_fullname_index.Append(function_fullname_index[cu_idx]);
- m_function_method_index.Append(function_method_index[cu_idx]);
- m_function_selector_index.Append(function_selector_index[cu_idx]);
- m_objc_class_selectors_index.Append(objc_class_selectors_index[cu_idx]);
- m_global_index.Append(global_index[cu_idx]);
- m_type_index.Append(type_index[cu_idx]);
- m_namespace_index.Append(namespace_index[cu_idx]);
- }
+ auto finalize_fn = [](NameToDIE &index, std::vector<NameToDIE> &srcs) {
+ for (auto &src : srcs)
+ index.Append(src);
+ index.Finalize();
+ };
- TaskPool::RunTasks([&]() { m_function_basename_index.Finalize(); },
- [&]() { m_function_fullname_index.Finalize(); },
- [&]() { m_function_method_index.Finalize(); },
- [&]() { m_function_selector_index.Finalize(); },
- [&]() { m_objc_class_selectors_index.Finalize(); },
- [&]() { m_global_index.Finalize(); },
- [&]() { m_type_index.Finalize(); },
- [&]() { m_namespace_index.Finalize(); });
+ TaskPool::RunTasks(
+ [&]() {
+ finalize_fn(m_function_basename_index, function_basename_index);
+ },
+ [&]() {
+ finalize_fn(m_function_fullname_index, function_fullname_index);
+ },
+ [&]() { finalize_fn(m_function_method_index, function_method_index); },
+ [&]() {
+ finalize_fn(m_function_selector_index, function_selector_index);
+ },
+ [&]() {
+ finalize_fn(m_objc_class_selectors_index, objc_class_selectors_index);
+ },
+ [&]() { finalize_fn(m_global_index, global_index); },
+ [&]() { finalize_fn(m_type_index, type_index); },
+ [&]() { finalize_fn(m_namespace_index, namespace_index); });
//----------------------------------------------------------------------
// Keep memory down by clearing DIEs for any compile units if indexing
f();
}
}
+
+void TaskMapOverInt(size_t begin, size_t end,
+ std::function<void(size_t)> const &func) {
+ std::atomic<size_t> idx{begin};
+ size_t num_workers =
+ std::min<size_t>(end, std::thread::hardware_concurrency());
+
+ auto wrapper = [&idx, end, &func]() {
+ while (true) {
+ size_t i = idx.fetch_add(1);
+ if (i >= end)
+ break;
+ func(i);
+ }
+ };
+
+ std::vector<std::future<void>> futures;
+ futures.reserve(num_workers);
+ for (size_t i = 0; i < num_workers; i++)
+ futures.push_back(TaskPool::AddTask(wrapper));
+ for (size_t i = 0; i < num_workers; i++)
+ futures[i].wait();
+}
ASSERT_EQ(17, r[3]);
}
-TEST(TaskPoolTest, TaskRunner) {
- auto fn = [](int x) { return std::make_pair(x, x * x); };
-
- TaskRunner<std::pair<int, int>> tr;
- tr.AddTask(fn, 1);
- tr.AddTask(fn, 2);
- tr.AddTask(fn, 3);
- tr.AddTask(fn, 4);
-
- int count = 0;
- while (true) {
- auto f = tr.WaitForNextCompletedTask();
- if (!f.valid())
- break;
-
- ++count;
- std::pair<int, int> v = f.get();
- ASSERT_EQ(v.first * v.first, v.second);
- }
-
- ASSERT_EQ(4, count);
+TEST(TaskPoolTest, TaskMap) {
+ int data[4];
+ auto fn = [&data](int x) { data[x] = x * x; };
+
+ TaskMapOverInt(0, 4, fn);
+
+ ASSERT_EQ(data[0], 0);
+ ASSERT_EQ(data[1], 1);
+ ASSERT_EQ(data[2], 4);
+ ASSERT_EQ(data[3], 9);
}