- add sources.
[platform/framework/web/crosswalk.git] / src / cc / resources / worker_pool.cc
1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "cc/resources/worker_pool.h"
6
7 #include <algorithm>
8 #include <queue>
9
10 #include "base/bind.h"
11 #include "base/containers/hash_tables.h"
12 #include "base/debug/trace_event.h"
13 #include "base/strings/stringprintf.h"
14 #include "base/synchronization/condition_variable.h"
15 #include "base/threading/simple_thread.h"
16 #include "base/threading/thread_restrictions.h"
17 #include "cc/base/scoped_ptr_deque.h"
18
19 namespace cc {
20
21 namespace internal {
22
23 WorkerPoolTask::WorkerPoolTask()
24     : did_schedule_(false),
25       did_run_(false),
26       did_complete_(false) {
27 }
28
29 WorkerPoolTask::~WorkerPoolTask() {
30   DCHECK_EQ(did_schedule_, did_complete_);
31   DCHECK(!did_run_ || did_schedule_);
32   DCHECK(!did_run_ || did_complete_);
33 }
34
35 void WorkerPoolTask::DidSchedule() {
36   DCHECK(!did_complete_);
37   did_schedule_ = true;
38 }
39
40 void WorkerPoolTask::WillRun() {
41   DCHECK(did_schedule_);
42   DCHECK(!did_complete_);
43   DCHECK(!did_run_);
44 }
45
46 void WorkerPoolTask::DidRun() {
47   did_run_ = true;
48 }
49
50 void WorkerPoolTask::WillComplete() {
51   DCHECK(!did_complete_);
52 }
53
54 void WorkerPoolTask::DidComplete() {
55   DCHECK(did_schedule_);
56   DCHECK(!did_complete_);
57   did_complete_ = true;
58 }
59
60 bool WorkerPoolTask::HasFinishedRunning() const {
61   return did_run_;
62 }
63
64 bool WorkerPoolTask::HasCompleted() const {
65   return did_complete_;
66 }
67
68 GraphNode::GraphNode(internal::WorkerPoolTask* task, unsigned priority)
69     : task_(task),
70       priority_(priority),
71       num_dependencies_(0) {
72 }
73
74 GraphNode::~GraphNode() {
75 }
76
77 }  // namespace internal
78
79 // Internal to the worker pool. Any data or logic that needs to be
80 // shared between threads lives in this class. All members are guarded
81 // by |lock_|.
82 class WorkerPool::Inner : public base::DelegateSimpleThread::Delegate {
83  public:
84   Inner(size_t num_threads, const std::string& thread_name_prefix);
85   virtual ~Inner();
86
87   void Shutdown();
88
89   // Schedule running of tasks in |graph|. Tasks previously scheduled but
90   // no longer needed will be canceled unless already running. Canceled
91   // tasks are moved to |completed_tasks_| without being run. The result
92   // is that once scheduled, a task is guaranteed to end up in the
93   // |completed_tasks_| queue even if they later get canceled by another
94   // call to SetTaskGraph().
95   void SetTaskGraph(TaskGraph* graph);
96
97   // Collect all completed tasks in |completed_tasks|.
98   void CollectCompletedTasks(TaskVector* completed_tasks);
99
100  private:
101   class PriorityComparator {
102    public:
103     bool operator()(const internal::GraphNode* a,
104                     const internal::GraphNode* b) {
105       // In this system, numerically lower priority is run first.
106       if (a->priority() != b->priority())
107         return a->priority() > b->priority();
108
109       // Run task with most dependents first when priority is the same.
110       return a->dependents().size() < b->dependents().size();
111     }
112   };
113
114   // Overridden from base::DelegateSimpleThread:
115   virtual void Run() OVERRIDE;
116
117   // This lock protects all members of this class except
118   // |worker_pool_on_origin_thread_|. Do not read or modify anything
119   // without holding this lock. Do not block while holding this lock.
120   mutable base::Lock lock_;
121
122   // Condition variable that is waited on by worker threads until new
123   // tasks are ready to run or shutdown starts.
124   base::ConditionVariable has_ready_to_run_tasks_cv_;
125
126   // Provides each running thread loop with a unique index. First thread
127   // loop index is 0.
128   unsigned next_thread_index_;
129
130   // Set during shutdown. Tells workers to exit when no more tasks
131   // are pending.
132   bool shutdown_;
133
134   // This set contains all pending tasks.
135   GraphNodeMap pending_tasks_;
136
137   // Ordered set of tasks that are ready to run.
138   typedef std::priority_queue<internal::GraphNode*,
139                               std::vector<internal::GraphNode*>,
140                               PriorityComparator> TaskQueue;
141   TaskQueue ready_to_run_tasks_;
142
143   // This set contains all currently running tasks.
144   GraphNodeMap running_tasks_;
145
146   // Completed tasks not yet collected by origin thread.
147   TaskVector completed_tasks_;
148
149   ScopedPtrDeque<base::DelegateSimpleThread> workers_;
150
151   DISALLOW_COPY_AND_ASSIGN(Inner);
152 };
153
154 WorkerPool::Inner::Inner(
155     size_t num_threads, const std::string& thread_name_prefix)
156     : lock_(),
157       has_ready_to_run_tasks_cv_(&lock_),
158       next_thread_index_(0),
159       shutdown_(false) {
160   base::AutoLock lock(lock_);
161
162   while (workers_.size() < num_threads) {
163     scoped_ptr<base::DelegateSimpleThread> worker = make_scoped_ptr(
164         new base::DelegateSimpleThread(
165             this,
166             thread_name_prefix +
167             base::StringPrintf(
168                 "Worker%u",
169                 static_cast<unsigned>(workers_.size() + 1)).c_str()));
170     worker->Start();
171 #if defined(OS_ANDROID) || defined(OS_LINUX)
172     worker->SetThreadPriority(base::kThreadPriority_Background);
173 #endif
174     workers_.push_back(worker.Pass());
175   }
176 }
177
178 WorkerPool::Inner::~Inner() {
179   base::AutoLock lock(lock_);
180
181   DCHECK(shutdown_);
182
183   DCHECK_EQ(0u, pending_tasks_.size());
184   DCHECK_EQ(0u, ready_to_run_tasks_.size());
185   DCHECK_EQ(0u, running_tasks_.size());
186   DCHECK_EQ(0u, completed_tasks_.size());
187 }
188
189 void WorkerPool::Inner::Shutdown() {
190   {
191     base::AutoLock lock(lock_);
192
193     DCHECK(!shutdown_);
194     shutdown_ = true;
195
196     // Wake up a worker so it knows it should exit. This will cause all workers
197     // to exit as each will wake up another worker before exiting.
198     has_ready_to_run_tasks_cv_.Signal();
199   }
200
201   while (workers_.size()) {
202     scoped_ptr<base::DelegateSimpleThread> worker = workers_.take_front();
203     // http://crbug.com/240453 - Join() is considered IO and will block this
204     // thread. See also http://crbug.com/239423 for further ideas.
205     base::ThreadRestrictions::ScopedAllowIO allow_io;
206     worker->Join();
207   }
208 }
209
210 void WorkerPool::Inner::SetTaskGraph(TaskGraph* graph) {
211   // It is OK to call SetTaskGraph() after shutdown if |graph| is empty.
212   DCHECK(graph->empty() || !shutdown_);
213
214   GraphNodeMap new_pending_tasks;
215   GraphNodeMap new_running_tasks;
216   TaskQueue new_ready_to_run_tasks;
217
218   new_pending_tasks.swap(*graph);
219
220   {
221     base::AutoLock lock(lock_);
222
223     // First remove all completed tasks from |new_pending_tasks| and
224     // adjust number of dependencies.
225     for (TaskVector::iterator it = completed_tasks_.begin();
226          it != completed_tasks_.end(); ++it) {
227       internal::WorkerPoolTask* task = it->get();
228
229       scoped_ptr<internal::GraphNode> node = new_pending_tasks.take_and_erase(
230           task);
231       if (node) {
232         for (internal::GraphNode::Vector::const_iterator it =
233                  node->dependents().begin();
234              it != node->dependents().end(); ++it) {
235           internal::GraphNode* dependent_node = *it;
236           dependent_node->remove_dependency();
237         }
238       }
239     }
240
241     // Build new running task set.
242     for (GraphNodeMap::iterator it = running_tasks_.begin();
243          it != running_tasks_.end(); ++it) {
244       internal::WorkerPoolTask* task = it->first;
245       // Transfer scheduled task value from |new_pending_tasks| to
246       // |new_running_tasks| if currently running. Value must be set to
247       // NULL if |new_pending_tasks| doesn't contain task. This does
248       // the right in both cases.
249       new_running_tasks.set(task, new_pending_tasks.take_and_erase(task));
250     }
251
252     // Build new "ready to run" tasks queue.
253     // TODO(reveman): Create this queue when building the task graph instead.
254     for (GraphNodeMap::iterator it = new_pending_tasks.begin();
255          it != new_pending_tasks.end(); ++it) {
256       internal::WorkerPoolTask* task = it->first;
257       DCHECK(task);
258       internal::GraphNode* node = it->second;
259
260       // Completed tasks should not exist in |new_pending_tasks|.
261       DCHECK(!task->HasFinishedRunning());
262
263       // Call DidSchedule() to indicate that this task has been scheduled.
264       // Note: This is only for debugging purposes.
265       task->DidSchedule();
266
267       if (!node->num_dependencies())
268         new_ready_to_run_tasks.push(node);
269
270       // Erase the task from old pending tasks.
271       pending_tasks_.erase(task);
272     }
273
274     completed_tasks_.reserve(completed_tasks_.size() + pending_tasks_.size());
275
276     // The items left in |pending_tasks_| need to be canceled.
277     for (GraphNodeMap::const_iterator it = pending_tasks_.begin();
278          it != pending_tasks_.end();
279          ++it) {
280       completed_tasks_.push_back(it->first);
281     }
282
283     // Swap task sets.
284     // Note: old tasks are intentionally destroyed after releasing |lock_|.
285     pending_tasks_.swap(new_pending_tasks);
286     running_tasks_.swap(new_running_tasks);
287     std::swap(ready_to_run_tasks_, new_ready_to_run_tasks);
288
289     // If |ready_to_run_tasks_| is empty, it means we either have
290     // running tasks, or we have no pending tasks.
291     DCHECK(!ready_to_run_tasks_.empty() ||
292            (pending_tasks_.empty() || !running_tasks_.empty()));
293
294     // If there is more work available, wake up worker thread.
295     if (!ready_to_run_tasks_.empty())
296       has_ready_to_run_tasks_cv_.Signal();
297   }
298 }
299
300 void WorkerPool::Inner::CollectCompletedTasks(TaskVector* completed_tasks) {
301   base::AutoLock lock(lock_);
302
303   DCHECK_EQ(0u, completed_tasks->size());
304   completed_tasks->swap(completed_tasks_);
305 }
306
307 void WorkerPool::Inner::Run() {
308   base::AutoLock lock(lock_);
309
310   // Get a unique thread index.
311   int thread_index = next_thread_index_++;
312
313   while (true) {
314     if (ready_to_run_tasks_.empty()) {
315       // Exit when shutdown is set and no more tasks are pending.
316       if (shutdown_ && pending_tasks_.empty())
317         break;
318
319       // Wait for more tasks.
320       has_ready_to_run_tasks_cv_.Wait();
321       continue;
322     }
323
324     // Take top priority task from |ready_to_run_tasks_|.
325     scoped_refptr<internal::WorkerPoolTask> task(
326         ready_to_run_tasks_.top()->task());
327     ready_to_run_tasks_.pop();
328
329     // Move task from |pending_tasks_| to |running_tasks_|.
330     DCHECK(pending_tasks_.contains(task.get()));
331     DCHECK(!running_tasks_.contains(task.get()));
332     running_tasks_.set(task.get(), pending_tasks_.take_and_erase(task.get()));
333
334     // There may be more work available, so wake up another worker thread.
335     has_ready_to_run_tasks_cv_.Signal();
336
337     // Call WillRun() before releasing |lock_| and running task.
338     task->WillRun();
339
340     {
341       base::AutoUnlock unlock(lock_);
342
343       task->RunOnWorkerThread(thread_index);
344     }
345
346     // This will mark task as finished running.
347     task->DidRun();
348
349     // Now iterate over all dependents to remove dependency and check
350     // if they are ready to run.
351     scoped_ptr<internal::GraphNode> node = running_tasks_.take_and_erase(
352         task.get());
353     if (node) {
354       for (internal::GraphNode::Vector::const_iterator it =
355                node->dependents().begin();
356            it != node->dependents().end(); ++it) {
357         internal::GraphNode* dependent_node = *it;
358
359         dependent_node->remove_dependency();
360         // Task is ready if it has no dependencies. Add it to
361         // |ready_to_run_tasks_|.
362         if (!dependent_node->num_dependencies())
363           ready_to_run_tasks_.push(dependent_node);
364       }
365     }
366
367     // Finally add task to |completed_tasks_|.
368     completed_tasks_.push_back(task);
369   }
370
371   // We noticed we should exit. Wake up the next worker so it knows it should
372   // exit as well (because the Shutdown() code only signals once).
373   has_ready_to_run_tasks_cv_.Signal();
374 }
375
376 WorkerPool::WorkerPool(size_t num_threads,
377                        const std::string& thread_name_prefix)
378     : in_dispatch_completion_callbacks_(false),
379       inner_(make_scoped_ptr(new Inner(num_threads, thread_name_prefix))) {
380 }
381
382 WorkerPool::~WorkerPool() {
383 }
384
385 void WorkerPool::Shutdown() {
386   TRACE_EVENT0("cc", "WorkerPool::Shutdown");
387
388   DCHECK(!in_dispatch_completion_callbacks_);
389
390   inner_->Shutdown();
391 }
392
393 void WorkerPool::CheckForCompletedTasks() {
394   TRACE_EVENT0("cc", "WorkerPool::CheckForCompletedTasks");
395
396   DCHECK(!in_dispatch_completion_callbacks_);
397
398   TaskVector completed_tasks;
399   inner_->CollectCompletedTasks(&completed_tasks);
400   ProcessCompletedTasks(completed_tasks);
401 }
402
403 void WorkerPool::ProcessCompletedTasks(
404     const TaskVector& completed_tasks) {
405   TRACE_EVENT1("cc", "WorkerPool::ProcessCompletedTasks",
406                "completed_task_count", completed_tasks.size());
407
408   // Worker pool instance is not reentrant while processing completed tasks.
409   in_dispatch_completion_callbacks_ = true;
410
411   for (TaskVector::const_iterator it = completed_tasks.begin();
412        it != completed_tasks.end();
413        ++it) {
414     internal::WorkerPoolTask* task = it->get();
415
416     task->WillComplete();
417     task->CompleteOnOriginThread();
418     task->DidComplete();
419   }
420
421   in_dispatch_completion_callbacks_ = false;
422 }
423
424 void WorkerPool::SetTaskGraph(TaskGraph* graph) {
425   TRACE_EVENT1("cc", "WorkerPool::SetTaskGraph",
426                "num_tasks", graph->size());
427
428   DCHECK(!in_dispatch_completion_callbacks_);
429
430   inner_->SetTaskGraph(graph);
431 }
432
433 }  // namespace cc