1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "cc/resources/task_graph_runner.h"
9 #include "base/debug/trace_event.h"
10 #include "base/strings/stringprintf.h"
11 #include "base/threading/thread_restrictions.h"
17 // Helper class for iterating over all dependents of a task.
18 class DependentIterator {
20 DependentIterator(TaskGraph* graph, const Task* task)
21 : graph_(graph), task_(task), current_index_(-1), current_node_(NULL) {
25 TaskGraph::Node& operator->() const {
26 DCHECK_LT(current_index_, graph_->edges.size());
27 DCHECK_EQ(graph_->edges[current_index_].task, task_);
28 DCHECK(current_node_);
29 return *current_node_;
32 TaskGraph::Node& operator*() const {
33 DCHECK_LT(current_index_, graph_->edges.size());
34 DCHECK_EQ(graph_->edges[current_index_].task, task_);
35 DCHECK(current_node_);
36 return *current_node_;
39 // Note: Performance can be improved by keeping edges sorted.
40 DependentIterator& operator++() {
41 // Find next dependency edge for |task_|.
44 if (current_index_ == graph_->edges.size())
46 } while (graph_->edges[current_index_].task != task_);
48 // Now find the node for the dependent of this edge.
49 TaskGraph::Node::Vector::iterator it =
50 std::find_if(graph_->nodes.begin(),
52 TaskGraph::Node::TaskComparator(
53 graph_->edges[current_index_].dependent));
54 DCHECK(it != graph_->nodes.end());
55 current_node_ = &(*it);
60 operator bool() const { return current_index_ < graph_->edges.size(); }
65 size_t current_index_;
66 TaskGraph::Node* current_node_;
69 class DependencyMismatchComparator {
71 explicit DependencyMismatchComparator(const TaskGraph* graph)
74 bool operator()(const TaskGraph::Node& node) const {
75 return static_cast<size_t>(std::count_if(graph_->edges.begin(),
77 DependentComparator(node.task))) !=
82 class DependentComparator {
84 explicit DependentComparator(const Task* dependent)
85 : dependent_(dependent) {}
87 bool operator()(const TaskGraph::Edge& edge) const {
88 return edge.dependent == dependent_;
92 const Task* dependent_;
95 const TaskGraph* graph_;
100 Task::Task() : did_run_(false) {}
104 void Task::WillRun() {
108 void Task::DidRun() { did_run_ = true; }
110 bool Task::HasFinishedRunning() const { return did_run_; }
112 TaskGraph::TaskGraph() {}
114 TaskGraph::~TaskGraph() {}
116 void TaskGraph::Swap(TaskGraph* other) {
117 nodes.swap(other->nodes);
118 edges.swap(other->edges);
121 void TaskGraph::Reset() {
126 TaskGraphRunner::TaskNamespace::TaskNamespace() : num_running_tasks(0u) {}
128 TaskGraphRunner::TaskNamespace::~TaskNamespace() {}
130 TaskGraphRunner::TaskGraphRunner(size_t num_threads,
131 const std::string& thread_name_prefix)
133 has_ready_to_run_tasks_cv_(&lock_),
134 has_namespaces_with_finished_running_tasks_cv_(&lock_),
135 next_namespace_id_(1),
136 next_thread_index_(0u),
137 // |num_threads| can be 0 for test.
138 running_tasks_(std::max(num_threads, static_cast<size_t>(1)), NULL),
140 base::AutoLock lock(lock_);
142 while (workers_.size() < num_threads) {
143 scoped_ptr<base::DelegateSimpleThread> worker =
144 make_scoped_ptr(new base::DelegateSimpleThread(
147 base::StringPrintf("Worker%u",
148 static_cast<unsigned>(workers_.size() + 1))
151 #if defined(OS_ANDROID) || defined(OS_LINUX)
152 worker->SetThreadPriority(base::kThreadPriority_Background);
154 workers_.push_back(worker.Pass());
158 TaskGraphRunner::~TaskGraphRunner() {
160 base::AutoLock lock(lock_);
162 DCHECK_EQ(0u, ready_to_run_namespaces_.size());
163 DCHECK_EQ(0u, namespaces_.size());
168 // Wake up a worker so it knows it should exit. This will cause all workers
169 // to exit as each will wake up another worker before exiting.
170 has_ready_to_run_tasks_cv_.Signal();
173 while (workers_.size()) {
174 scoped_ptr<base::DelegateSimpleThread> worker = workers_.take_front();
175 // Join() is considered IO and will block this thread.
176 base::ThreadRestrictions::ScopedAllowIO allow_io;
181 NamespaceToken TaskGraphRunner::GetNamespaceToken() {
182 base::AutoLock lock(lock_);
184 NamespaceToken token(next_namespace_id_++);
185 DCHECK(namespaces_.find(token.id_) == namespaces_.end());
189 void TaskGraphRunner::WaitForTasksToFinishRunning(NamespaceToken token) {
190 TRACE_EVENT0("cc", "TaskGraphRunner::WaitForTasksToFinishRunning");
192 DCHECK(token.IsValid());
195 base::AutoLock lock(lock_);
197 TaskNamespaceMap::const_iterator it = namespaces_.find(token.id_);
198 if (it == namespaces_.end())
201 const TaskNamespace& task_namespace = it->second;
203 while (!HasFinishedRunningTasksInNamespace(&task_namespace))
204 has_namespaces_with_finished_running_tasks_cv_.Wait();
206 // There may be other namespaces that have finished running
207 // tasks, so wake up another origin thread.
208 has_namespaces_with_finished_running_tasks_cv_.Signal();
212 void TaskGraphRunner::SetTaskGraph(NamespaceToken token, TaskGraph* graph) {
214 "TaskGraphRunner::SetTaskGraph",
218 graph->edges.size());
220 DCHECK(token.IsValid());
221 DCHECK(std::find_if(graph->nodes.begin(),
223 DependencyMismatchComparator(graph)) ==
227 base::AutoLock lock(lock_);
231 TaskNamespace& task_namespace = namespaces_[token.id_];
233 // First adjust number of dependencies to reflect completed tasks.
234 for (Task::Vector::iterator it = task_namespace.completed_tasks.begin();
235 it != task_namespace.completed_tasks.end();
237 for (DependentIterator node_it(graph, it->get()); node_it; ++node_it) {
238 TaskGraph::Node& node = *node_it;
239 DCHECK_LT(0u, node.dependencies);
244 // Build new "ready to run" queue and remove nodes from old graph.
245 task_namespace.ready_to_run_tasks.clear();
246 for (TaskGraph::Node::Vector::iterator it = graph->nodes.begin();
247 it != graph->nodes.end();
249 TaskGraph::Node& node = *it;
251 // Remove any old nodes that are associated with this task. The result is
252 // that the old graph is left all nodes not present in this graph, which
253 // we use below to determine what tasks need to be canceled.
254 TaskGraph::Node::Vector::iterator old_it =
255 std::find_if(task_namespace.graph.nodes.begin(),
256 task_namespace.graph.nodes.end(),
257 TaskGraph::Node::TaskComparator(node.task));
258 if (old_it != task_namespace.graph.nodes.end()) {
259 std::swap(*old_it, task_namespace.graph.nodes.back());
260 task_namespace.graph.nodes.pop_back();
263 // Task is not ready to run if dependencies are not yet satisfied.
264 if (node.dependencies)
267 // Skip if already finished running task.
268 if (node.task->HasFinishedRunning())
271 // Skip if already running.
272 if (std::find(running_tasks_.begin(), running_tasks_.end(), node.task) !=
273 running_tasks_.end())
276 task_namespace.ready_to_run_tasks.push_back(
277 PrioritizedTask(node.task, node.priority));
280 // Rearrange the elements in |ready_to_run_tasks| in such a way that
282 std::make_heap(task_namespace.ready_to_run_tasks.begin(),
283 task_namespace.ready_to_run_tasks.end(),
284 CompareTaskPriority);
287 task_namespace.graph.Swap(graph);
289 // Determine what tasks in old graph need to be canceled.
290 for (TaskGraph::Node::Vector::iterator it = graph->nodes.begin();
291 it != graph->nodes.end();
293 TaskGraph::Node& node = *it;
295 // Skip if already finished running task.
296 if (node.task->HasFinishedRunning())
299 // Skip if already running.
300 if (std::find(running_tasks_.begin(), running_tasks_.end(), node.task) !=
301 running_tasks_.end())
304 DCHECK(std::find(task_namespace.completed_tasks.begin(),
305 task_namespace.completed_tasks.end(),
306 node.task) == task_namespace.completed_tasks.end());
307 task_namespace.completed_tasks.push_back(node.task);
310 // Build new "ready to run" task namespaces queue.
311 ready_to_run_namespaces_.clear();
312 for (TaskNamespaceMap::iterator it = namespaces_.begin();
313 it != namespaces_.end();
315 if (!it->second.ready_to_run_tasks.empty())
316 ready_to_run_namespaces_.push_back(&it->second);
319 // Rearrange the task namespaces in |ready_to_run_namespaces_|
320 // in such a way that they form a heap.
321 std::make_heap(ready_to_run_namespaces_.begin(),
322 ready_to_run_namespaces_.end(),
323 CompareTaskNamespacePriority);
325 // If there is more work available, wake up worker thread.
326 if (!ready_to_run_namespaces_.empty())
327 has_ready_to_run_tasks_cv_.Signal();
331 void TaskGraphRunner::CollectCompletedTasks(NamespaceToken token,
332 Task::Vector* completed_tasks) {
333 TRACE_EVENT0("cc", "TaskGraphRunner::CollectCompletedTasks");
335 DCHECK(token.IsValid());
338 base::AutoLock lock(lock_);
340 TaskNamespaceMap::iterator it = namespaces_.find(token.id_);
341 if (it == namespaces_.end())
344 TaskNamespace& task_namespace = it->second;
346 DCHECK_EQ(0u, completed_tasks->size());
347 completed_tasks->swap(task_namespace.completed_tasks);
348 if (!HasFinishedRunningTasksInNamespace(&task_namespace))
351 // Remove namespace if finished running tasks.
352 DCHECK_EQ(0u, task_namespace.completed_tasks.size());
353 DCHECK_EQ(0u, task_namespace.ready_to_run_tasks.size());
354 DCHECK_EQ(0u, task_namespace.num_running_tasks);
355 namespaces_.erase(it);
359 bool TaskGraphRunner::RunTaskForTesting() {
360 base::AutoLock lock(lock_);
362 if (ready_to_run_namespaces_.empty())
365 RunTaskWithLockAcquired(0);
369 void TaskGraphRunner::Run() {
370 base::AutoLock lock(lock_);
372 // Get a unique thread index.
373 int thread_index = next_thread_index_++;
376 if (ready_to_run_namespaces_.empty()) {
377 // Exit when shutdown is set and no more tasks are pending.
381 // Wait for more tasks.
382 has_ready_to_run_tasks_cv_.Wait();
386 RunTaskWithLockAcquired(thread_index);
389 // We noticed we should exit. Wake up the next worker so it knows it should
390 // exit as well (because the Shutdown() code only signals once).
391 has_ready_to_run_tasks_cv_.Signal();
394 void TaskGraphRunner::RunTaskWithLockAcquired(int thread_index) {
395 TRACE_EVENT1("cc", "TaskGraphRunner::RunTask", "thread_index", thread_index);
397 lock_.AssertAcquired();
398 DCHECK(!ready_to_run_namespaces_.empty());
400 // Take top priority TaskNamespace from |ready_to_run_namespaces_|.
401 std::pop_heap(ready_to_run_namespaces_.begin(),
402 ready_to_run_namespaces_.end(),
403 CompareTaskNamespacePriority);
404 TaskNamespace* task_namespace = ready_to_run_namespaces_.back();
405 ready_to_run_namespaces_.pop_back();
406 DCHECK(!task_namespace->ready_to_run_tasks.empty());
408 // Take top priority task from |ready_to_run_tasks|.
409 std::pop_heap(task_namespace->ready_to_run_tasks.begin(),
410 task_namespace->ready_to_run_tasks.end(),
411 CompareTaskPriority);
412 scoped_refptr<Task> task(task_namespace->ready_to_run_tasks.back().task);
413 task_namespace->ready_to_run_tasks.pop_back();
415 // Add task namespace back to |ready_to_run_namespaces_| if not
416 // empty after taking top priority task.
417 if (!task_namespace->ready_to_run_tasks.empty()) {
418 ready_to_run_namespaces_.push_back(task_namespace);
419 std::push_heap(ready_to_run_namespaces_.begin(),
420 ready_to_run_namespaces_.end(),
421 CompareTaskNamespacePriority);
424 // Add task to |running_tasks_|.
425 DCHECK_LT(static_cast<size_t>(thread_index), running_tasks_.size());
426 DCHECK(!running_tasks_[thread_index]);
427 running_tasks_[thread_index] = task.get();
429 // Increment running task count for task namespace.
430 task_namespace->num_running_tasks++;
432 // There may be more work available, so wake up another worker thread.
433 has_ready_to_run_tasks_cv_.Signal();
435 // Call WillRun() before releasing |lock_| and running task.
439 base::AutoUnlock unlock(lock_);
441 task->RunOnWorkerThread(thread_index);
444 // This will mark task as finished running.
447 // Decrement running task count for task namespace.
448 DCHECK_LT(0u, task_namespace->num_running_tasks);
449 task_namespace->num_running_tasks--;
451 // Remove task from |running_tasks_|.
452 running_tasks_[thread_index] = NULL;
454 // Now iterate over all dependents to decrement dependencies and check if they
456 bool ready_to_run_namespaces_has_heap_properties = true;
457 for (DependentIterator it(&task_namespace->graph, task.get()); it; ++it) {
458 TaskGraph::Node& dependent_node = *it;
460 DCHECK_LT(0u, dependent_node.dependencies);
461 dependent_node.dependencies--;
462 // Task is ready if it has no dependencies. Add it to |ready_to_run_tasks_|.
463 if (!dependent_node.dependencies) {
464 bool was_empty = task_namespace->ready_to_run_tasks.empty();
465 task_namespace->ready_to_run_tasks.push_back(
466 PrioritizedTask(dependent_node.task, dependent_node.priority));
467 std::push_heap(task_namespace->ready_to_run_tasks.begin(),
468 task_namespace->ready_to_run_tasks.end(),
469 CompareTaskPriority);
470 // Task namespace is ready if it has at least one ready to run task. Add
471 // it to |ready_to_run_namespaces_| if it just become ready.
473 DCHECK(std::find(ready_to_run_namespaces_.begin(),
474 ready_to_run_namespaces_.end(),
475 task_namespace) == ready_to_run_namespaces_.end());
476 ready_to_run_namespaces_.push_back(task_namespace);
478 ready_to_run_namespaces_has_heap_properties = false;
482 // Rearrange the task namespaces in |ready_to_run_namespaces_| in such a way
483 // that they yet again form a heap.
484 if (!ready_to_run_namespaces_has_heap_properties) {
485 std::make_heap(ready_to_run_namespaces_.begin(),
486 ready_to_run_namespaces_.end(),
487 CompareTaskNamespacePriority);
490 // Finally add task to |completed_tasks_|.
491 task_namespace->completed_tasks.push_back(task);
493 // If namespace has finished running all tasks, wake up origin thread.
494 if (HasFinishedRunningTasksInNamespace(task_namespace))
495 has_namespaces_with_finished_running_tasks_cv_.Signal();
498 } // namespace internal