#include "base/threading/thread_restrictions.h"
namespace cc {
-namespace internal {
namespace {
// Helper class for iterating over all dependents of a task.
} // namespace
-Task::Task() : did_run_(false) {}
+Task::Task() : will_run_(false), did_run_(false) {
+}
-Task::~Task() {}
+Task::~Task() {
+ DCHECK(!will_run_);
+}
void Task::WillRun() {
+ DCHECK(!will_run_);
DCHECK(!did_run_);
+ will_run_ = true;
}
-void Task::DidRun() { did_run_ = true; }
+void Task::DidRun() {
+ DCHECK(will_run_);
+ will_run_ = false;
+ did_run_ = true;
+}
bool Task::HasFinishedRunning() const { return did_run_; }
edges.clear();
}
-TaskGraphRunner::TaskNamespace::TaskNamespace() : num_running_tasks(0u) {}
+TaskGraphRunner::TaskNamespace::TaskNamespace() {}
TaskGraphRunner::TaskNamespace::~TaskNamespace() {}
-TaskGraphRunner::TaskGraphRunner(size_t num_threads,
- const std::string& thread_name_prefix)
+TaskGraphRunner::TaskGraphRunner()
: lock_(),
has_ready_to_run_tasks_cv_(&lock_),
has_namespaces_with_finished_running_tasks_cv_(&lock_),
next_namespace_id_(1),
- next_thread_index_(0u),
- // |num_threads| can be 0 for test.
- running_tasks_(std::max(num_threads, static_cast<size_t>(1)), NULL),
- shutdown_(false) {
- base::AutoLock lock(lock_);
-
- while (workers_.size() < num_threads) {
- scoped_ptr<base::DelegateSimpleThread> worker =
- make_scoped_ptr(new base::DelegateSimpleThread(
- this,
- thread_name_prefix +
- base::StringPrintf("Worker%u",
- static_cast<unsigned>(workers_.size() + 1))
- .c_str()));
- worker->Start();
-#if defined(OS_ANDROID) || defined(OS_LINUX)
- worker->SetThreadPriority(base::kThreadPriority_Background);
-#endif
- workers_.push_back(worker.Pass());
- }
-}
+ shutdown_(false) {}
TaskGraphRunner::~TaskGraphRunner() {
{
DCHECK_EQ(0u, ready_to_run_namespaces_.size());
DCHECK_EQ(0u, namespaces_.size());
-
- DCHECK(!shutdown_);
- shutdown_ = true;
-
- // Wake up a worker so it knows it should exit. This will cause all workers
- // to exit as each will wake up another worker before exiting.
- has_ready_to_run_tasks_cv_.Signal();
- }
-
- while (workers_.size()) {
- scoped_ptr<base::DelegateSimpleThread> worker = workers_.take_front();
- // Join() is considered IO and will block this thread.
- base::ThreadRestrictions::ScopedAllowIO allow_io;
- worker->Join();
}
}
return token;
}
-void TaskGraphRunner::WaitForTasksToFinishRunning(NamespaceToken token) {
- TRACE_EVENT0("cc", "TaskGraphRunner::WaitForTasksToFinishRunning");
-
- DCHECK(token.IsValid());
-
- {
- base::AutoLock lock(lock_);
-
- TaskNamespaceMap::const_iterator it = namespaces_.find(token.id_);
- if (it == namespaces_.end())
- return;
-
- const TaskNamespace& task_namespace = it->second;
-
- while (!HasFinishedRunningTasksInNamespace(&task_namespace))
- has_namespaces_with_finished_running_tasks_cv_.Wait();
-
- // There may be other namespaces that have finished running
- // tasks, so wake up another origin thread.
- has_namespaces_with_finished_running_tasks_cv_.Signal();
- }
-}
-
-void TaskGraphRunner::SetTaskGraph(NamespaceToken token, TaskGraph* graph) {
+void TaskGraphRunner::ScheduleTasks(NamespaceToken token, TaskGraph* graph) {
TRACE_EVENT2("cc",
- "TaskGraphRunner::SetTaskGraph",
+ "TaskGraphRunner::ScheduleTasks",
"num_nodes",
graph->nodes.size(),
"num_edges",
TaskGraph::Node& node = *it;
// Remove any old nodes that are associated with this task. The result is
- // that the old graph is left all nodes not present in this graph, which
- // we use below to determine what tasks need to be canceled.
+ // that the old graph is left with all nodes not present in this graph,
+ // which we use below to determine what tasks need to be canceled.
TaskGraph::Node::Vector::iterator old_it =
std::find_if(task_namespace.graph.nodes.begin(),
task_namespace.graph.nodes.end(),
continue;
// Skip if already running.
- if (std::find(running_tasks_.begin(), running_tasks_.end(), node.task) !=
- running_tasks_.end())
+ if (std::find(task_namespace.running_tasks.begin(),
+ task_namespace.running_tasks.end(),
+ node.task) != task_namespace.running_tasks.end())
continue;
task_namespace.ready_to_run_tasks.push_back(
PrioritizedTask(node.task, node.priority));
}
- // Rearrange the elements in |ready_to_run_tasks| in such a way that
- // they form a heap.
+ // Rearrange the elements in |ready_to_run_tasks| in such a way that they
+ // form a heap.
std::make_heap(task_namespace.ready_to_run_tasks.begin(),
task_namespace.ready_to_run_tasks.end(),
CompareTaskPriority);
continue;
// Skip if already running.
- if (std::find(running_tasks_.begin(), running_tasks_.end(), node.task) !=
- running_tasks_.end())
+ if (std::find(task_namespace.running_tasks.begin(),
+ task_namespace.running_tasks.end(),
+ node.task) != task_namespace.running_tasks.end())
continue;
DCHECK(std::find(task_namespace.completed_tasks.begin(),
ready_to_run_namespaces_.push_back(&it->second);
}
- // Rearrange the task namespaces in |ready_to_run_namespaces_|
- // in such a way that they form a heap.
+ // Rearrange the task namespaces in |ready_to_run_namespaces_| in such a way
+ // that they form a heap.
std::make_heap(ready_to_run_namespaces_.begin(),
ready_to_run_namespaces_.end(),
CompareTaskNamespacePriority);
}
}
+void TaskGraphRunner::WaitForTasksToFinishRunning(NamespaceToken token) {
+ TRACE_EVENT0("cc", "TaskGraphRunner::WaitForTasksToFinishRunning");
+
+ DCHECK(token.IsValid());
+
+ {
+ base::AutoLock lock(lock_);
+
+ TaskNamespaceMap::const_iterator it = namespaces_.find(token.id_);
+ if (it == namespaces_.end())
+ return;
+
+ const TaskNamespace& task_namespace = it->second;
+
+ while (!HasFinishedRunningTasksInNamespace(&task_namespace))
+ has_namespaces_with_finished_running_tasks_cv_.Wait();
+
+ // There may be other namespaces that have finished running tasks, so wake
+ // up another origin thread.
+ has_namespaces_with_finished_running_tasks_cv_.Signal();
+ }
+}
+
void TaskGraphRunner::CollectCompletedTasks(NamespaceToken token,
Task::Vector* completed_tasks) {
TRACE_EVENT0("cc", "TaskGraphRunner::CollectCompletedTasks");
// Remove namespace if finished running tasks.
DCHECK_EQ(0u, task_namespace.completed_tasks.size());
DCHECK_EQ(0u, task_namespace.ready_to_run_tasks.size());
- DCHECK_EQ(0u, task_namespace.num_running_tasks);
+ DCHECK_EQ(0u, task_namespace.running_tasks.size());
namespaces_.erase(it);
}
}
-bool TaskGraphRunner::RunTaskForTesting() {
+void TaskGraphRunner::Shutdown() {
base::AutoLock lock(lock_);
- if (ready_to_run_namespaces_.empty())
- return false;
+ DCHECK_EQ(0u, ready_to_run_namespaces_.size());
+ DCHECK_EQ(0u, namespaces_.size());
- RunTaskWithLockAcquired(0);
- return true;
+ DCHECK(!shutdown_);
+ shutdown_ = true;
+
+ // Wake up a worker so it knows it should exit. This will cause all workers
+ // to exit as each will wake up another worker before exiting.
+ has_ready_to_run_tasks_cv_.Signal();
}
void TaskGraphRunner::Run() {
base::AutoLock lock(lock_);
- // Get a unique thread index.
- int thread_index = next_thread_index_++;
-
while (true) {
if (ready_to_run_namespaces_.empty()) {
// Exit when shutdown is set and no more tasks are pending.
continue;
}
- RunTaskWithLockAcquired(thread_index);
+ RunTaskWithLockAcquired();
}
// We noticed we should exit. Wake up the next worker so it knows it should
has_ready_to_run_tasks_cv_.Signal();
}
-void TaskGraphRunner::RunTaskWithLockAcquired(int thread_index) {
- TRACE_EVENT1("cc", "TaskGraphRunner::RunTask", "thread_index", thread_index);
+void TaskGraphRunner::RunUntilIdle() {
+ base::AutoLock lock(lock_);
+
+ while (!ready_to_run_namespaces_.empty())
+ RunTaskWithLockAcquired();
+}
+
+void TaskGraphRunner::RunTaskWithLockAcquired() {
+ TRACE_EVENT0("toplevel", "TaskGraphRunner::RunTask");
lock_.AssertAcquired();
DCHECK(!ready_to_run_namespaces_.empty());
scoped_refptr<Task> task(task_namespace->ready_to_run_tasks.back().task);
task_namespace->ready_to_run_tasks.pop_back();
- // Add task namespace back to |ready_to_run_namespaces_| if not
- // empty after taking top priority task.
+ // Add task namespace back to |ready_to_run_namespaces_| if not empty after
+ // taking top priority task.
if (!task_namespace->ready_to_run_tasks.empty()) {
ready_to_run_namespaces_.push_back(task_namespace);
std::push_heap(ready_to_run_namespaces_.begin(),
CompareTaskNamespacePriority);
}
- // Add task to |running_tasks_|.
- DCHECK_LT(static_cast<size_t>(thread_index), running_tasks_.size());
- DCHECK(!running_tasks_[thread_index]);
- running_tasks_[thread_index] = task.get();
-
- // Increment running task count for task namespace.
- task_namespace->num_running_tasks++;
+ // Add task to |running_tasks|.
+ task_namespace->running_tasks.push_back(task.get());
// There may be more work available, so wake up another worker thread.
has_ready_to_run_tasks_cv_.Signal();
{
base::AutoUnlock unlock(lock_);
- task->RunOnWorkerThread(thread_index);
+ task->RunOnWorkerThread();
}
// This will mark task as finished running.
task->DidRun();
- // Decrement running task count for task namespace.
- DCHECK_LT(0u, task_namespace->num_running_tasks);
- task_namespace->num_running_tasks--;
-
- // Remove task from |running_tasks_|.
- running_tasks_[thread_index] = NULL;
+ // Remove task from |running_tasks|.
+ TaskVector::iterator it = std::find(task_namespace->running_tasks.begin(),
+ task_namespace->running_tasks.end(),
+ task.get());
+ DCHECK(it != task_namespace->running_tasks.end());
+ std::swap(*it, task_namespace->running_tasks.back());
+ task_namespace->running_tasks.pop_back();
// Now iterate over all dependents to decrement dependencies and check if they
// are ready to run.
has_namespaces_with_finished_running_tasks_cv_.Signal();
}
-} // namespace internal
} // namespace cc