Update To 11.40.268.0
[platform/framework/web/crosswalk.git] / src / base / threading / sequenced_worker_pool_unittest.cc
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "base/threading/sequenced_worker_pool.h"
6
7 #include <algorithm>
8
9 #include "base/bind.h"
10 #include "base/compiler_specific.h"
11 #include "base/memory/ref_counted.h"
12 #include "base/memory/scoped_ptr.h"
13 #include "base/message_loop/message_loop.h"
14 #include "base/message_loop/message_loop_proxy.h"
15 #include "base/synchronization/condition_variable.h"
16 #include "base/synchronization/lock.h"
17 #include "base/test/sequenced_task_runner_test_template.h"
18 #include "base/test/sequenced_worker_pool_owner.h"
19 #include "base/test/task_runner_test_template.h"
20 #include "base/test/test_timeouts.h"
21 #include "base/threading/platform_thread.h"
22 #include "base/time/time.h"
23 #include "base/tracked_objects.h"
24 #include "testing/gtest/include/gtest/gtest.h"
25
26 namespace base {
27
28 // IMPORTANT NOTE:
29 //
30 // Many of these tests have failure modes where they'll hang forever. These
31 // tests should not be flaky, and hanging indicates a type of failure. Do not
32 // mark as flaky if they're hanging, it's likely an actual bug.
33
34 namespace {
35
36 const size_t kNumWorkerThreads = 3;
37
38 // Allows a number of threads to all be blocked on the same event, and
39 // provides a way to unblock a certain number of them.
40 class ThreadBlocker {
41  public:
42   ThreadBlocker() : lock_(), cond_var_(&lock_), unblock_counter_(0) {}
43
44   void Block() {
45     {
46       base::AutoLock lock(lock_);
47       while (unblock_counter_ == 0)
48         cond_var_.Wait();
49       unblock_counter_--;
50     }
51     cond_var_.Signal();
52   }
53
54   void Unblock(size_t count) {
55     {
56       base::AutoLock lock(lock_);
57       DCHECK(unblock_counter_ == 0);
58       unblock_counter_ = count;
59     }
60     cond_var_.Signal();
61   }
62
63  private:
64   base::Lock lock_;
65   base::ConditionVariable cond_var_;
66
67   size_t unblock_counter_;
68 };
69
70 class TestTracker : public base::RefCountedThreadSafe<TestTracker> {
71  public:
72   TestTracker()
73       : lock_(),
74         cond_var_(&lock_),
75         started_events_(0) {
76   }
77
78   // Each of these tasks appends the argument to the complete sequence vector
79   // so calling code can see what order they finished in.
80   void FastTask(int id) {
81     SignalWorkerDone(id);
82   }
83
84   void SlowTask(int id) {
85     base::PlatformThread::Sleep(base::TimeDelta::FromSeconds(1));
86     SignalWorkerDone(id);
87   }
88
89   void BlockTask(int id, ThreadBlocker* blocker) {
90     // Note that this task has started and signal anybody waiting for that
91     // to happen.
92     {
93       base::AutoLock lock(lock_);
94       started_events_++;
95     }
96     cond_var_.Signal();
97
98     blocker->Block();
99     SignalWorkerDone(id);
100   }
101
102   void PostAdditionalTasks(
103         int id, SequencedWorkerPool* pool,
104         bool expected_return_value) {
105     Closure fast_task = base::Bind(&TestTracker::FastTask, this, 100);
106     EXPECT_EQ(expected_return_value,
107               pool->PostWorkerTaskWithShutdownBehavior(
108                   FROM_HERE, fast_task,
109                   SequencedWorkerPool::CONTINUE_ON_SHUTDOWN));
110     EXPECT_EQ(expected_return_value,
111               pool->PostWorkerTaskWithShutdownBehavior(
112                   FROM_HERE, fast_task,
113                   SequencedWorkerPool::SKIP_ON_SHUTDOWN));
114     pool->PostWorkerTaskWithShutdownBehavior(
115         FROM_HERE, fast_task,
116         SequencedWorkerPool::BLOCK_SHUTDOWN);
117     SignalWorkerDone(id);
118   }
119
120   // Waits until the given number of tasks have started executing.
121   void WaitUntilTasksBlocked(size_t count) {
122     {
123       base::AutoLock lock(lock_);
124       while (started_events_ < count)
125         cond_var_.Wait();
126     }
127     cond_var_.Signal();
128   }
129
130   // Blocks the current thread until at least the given number of tasks are in
131   // the completed vector, and then returns a copy.
132   std::vector<int> WaitUntilTasksComplete(size_t num_tasks) {
133     std::vector<int> ret;
134     {
135       base::AutoLock lock(lock_);
136       while (complete_sequence_.size() < num_tasks)
137         cond_var_.Wait();
138       ret = complete_sequence_;
139     }
140     cond_var_.Signal();
141     return ret;
142   }
143
144   size_t GetTasksCompletedCount() {
145     base::AutoLock lock(lock_);
146     return complete_sequence_.size();
147   }
148
149   void ClearCompleteSequence() {
150     base::AutoLock lock(lock_);
151     complete_sequence_.clear();
152     started_events_ = 0;
153   }
154
155  private:
156   friend class base::RefCountedThreadSafe<TestTracker>;
157   ~TestTracker() {}
158
159   void SignalWorkerDone(int id) {
160     {
161       base::AutoLock lock(lock_);
162       complete_sequence_.push_back(id);
163     }
164     cond_var_.Signal();
165   }
166
167   // Protects the complete_sequence.
168   base::Lock lock_;
169
170   base::ConditionVariable cond_var_;
171
172   // Protected by lock_.
173   std::vector<int> complete_sequence_;
174
175   // Counter of the number of "block" workers that have started.
176   size_t started_events_;
177 };
178
179 class SequencedWorkerPoolTest : public testing::Test {
180  public:
181   SequencedWorkerPoolTest()
182       : tracker_(new TestTracker) {
183     ResetPool();
184   }
185
186   virtual ~SequencedWorkerPoolTest() {}
187
188   virtual void SetUp() override {}
189
190   virtual void TearDown() override {
191     pool()->Shutdown();
192   }
193
194   const scoped_refptr<SequencedWorkerPool>& pool() {
195     return pool_owner_->pool();
196   }
197   TestTracker* tracker() { return tracker_.get(); }
198
199   // Destroys the SequencedWorkerPool instance, blocking until it is fully shut
200   // down, and creates a new instance.
201   void ResetPool() {
202     pool_owner_.reset(new SequencedWorkerPoolOwner(kNumWorkerThreads, "test"));
203   }
204
205   void SetWillWaitForShutdownCallback(const Closure& callback) {
206     pool_owner_->SetWillWaitForShutdownCallback(callback);
207   }
208
209   // Ensures that the given number of worker threads is created by adding
210   // tasks and waiting until they complete. Worker thread creation is
211   // serialized, can happen on background threads asynchronously, and doesn't
212   // happen any more at shutdown. This means that if a test posts a bunch of
213   // tasks and calls shutdown, fewer workers will be created than the test may
214   // expect.
215   //
216   // This function ensures that this condition can't happen so tests can make
217   // assumptions about the number of workers active. See the comment in
218   // PrepareToStartAdditionalThreadIfNecessary in the .cc file for more
219   // details.
220   //
221   // It will post tasks to the queue with id -1. It also assumes this is the
222   // first thing called in a test since it will clear the complete_sequence_.
223   void EnsureAllWorkersCreated() {
224     // Create a bunch of threads, all waiting. This will cause that may
225     // workers to be created.
226     ThreadBlocker blocker;
227     for (size_t i = 0; i < kNumWorkerThreads; i++) {
228       pool()->PostWorkerTask(FROM_HERE,
229                              base::Bind(&TestTracker::BlockTask,
230                                         tracker(), -1, &blocker));
231     }
232     tracker()->WaitUntilTasksBlocked(kNumWorkerThreads);
233
234     // Now wake them up and wait until they're done.
235     blocker.Unblock(kNumWorkerThreads);
236     tracker()->WaitUntilTasksComplete(kNumWorkerThreads);
237
238     // Clean up the task IDs we added.
239     tracker()->ClearCompleteSequence();
240   }
241
242   int has_work_call_count() const {
243     return pool_owner_->has_work_call_count();
244   }
245
246  private:
247   MessageLoop message_loop_;
248   scoped_ptr<SequencedWorkerPoolOwner> pool_owner_;
249   const scoped_refptr<TestTracker> tracker_;
250 };
251
252 // Checks that the given number of entries are in the tasks to complete of
253 // the given tracker, and then signals the given event the given number of
254 // times. This is used to wakt up blocked background threads before blocking
255 // on shutdown.
256 void EnsureTasksToCompleteCountAndUnblock(scoped_refptr<TestTracker> tracker,
257                                           size_t expected_tasks_to_complete,
258                                           ThreadBlocker* blocker,
259                                           size_t threads_to_awake) {
260   EXPECT_EQ(
261       expected_tasks_to_complete,
262       tracker->WaitUntilTasksComplete(expected_tasks_to_complete).size());
263
264   blocker->Unblock(threads_to_awake);
265 }
266
267 class DeletionHelper : public base::RefCountedThreadSafe<DeletionHelper> {
268  public:
269   explicit DeletionHelper(
270       const scoped_refptr<base::RefCountedData<bool> >& deleted_flag)
271       : deleted_flag_(deleted_flag) {
272   }
273
274  private:
275   friend class base::RefCountedThreadSafe<DeletionHelper>;
276   virtual ~DeletionHelper() { deleted_flag_->data = true; }
277
278   const scoped_refptr<base::RefCountedData<bool> > deleted_flag_;
279   DISALLOW_COPY_AND_ASSIGN(DeletionHelper);
280 };
281
282 void HoldPoolReference(const scoped_refptr<base::SequencedWorkerPool>& pool,
283                        const scoped_refptr<DeletionHelper>& helper) {
284   ADD_FAILURE() << "Should never run";
285 }
286
287 // Tests that delayed tasks are deleted upon shutdown of the pool.
288 TEST_F(SequencedWorkerPoolTest, DelayedTaskDuringShutdown) {
289   // Post something to verify the pool is started up.
290   EXPECT_TRUE(pool()->PostTask(
291       FROM_HERE, base::Bind(&TestTracker::FastTask, tracker(), 1)));
292
293   scoped_refptr<base::RefCountedData<bool> > deleted_flag(
294       new base::RefCountedData<bool>(false));
295
296   base::Time posted_at(base::Time::Now());
297   // Post something that shouldn't run.
298   EXPECT_TRUE(pool()->PostDelayedTask(
299       FROM_HERE,
300       base::Bind(&HoldPoolReference,
301                  pool(),
302                  make_scoped_refptr(new DeletionHelper(deleted_flag))),
303       TestTimeouts::action_timeout()));
304
305   std::vector<int> completion_sequence = tracker()->WaitUntilTasksComplete(1);
306   ASSERT_EQ(1u, completion_sequence.size());
307   ASSERT_EQ(1, completion_sequence[0]);
308
309   pool()->Shutdown();
310   // Shutdown is asynchronous, so use ResetPool() to block until the pool is
311   // fully destroyed (and thus shut down).
312   ResetPool();
313
314   // Verify that we didn't block until the task was due.
315   ASSERT_LT(base::Time::Now() - posted_at, TestTimeouts::action_timeout());
316
317   // Verify that the deferred task has not only not run, but has also been
318   // destroyed.
319   ASSERT_TRUE(deleted_flag->data);
320 }
321
322 // Tests that same-named tokens have the same ID.
323 TEST_F(SequencedWorkerPoolTest, NamedTokens) {
324   const std::string name1("hello");
325   SequencedWorkerPool::SequenceToken token1 =
326       pool()->GetNamedSequenceToken(name1);
327
328   SequencedWorkerPool::SequenceToken token2 = pool()->GetSequenceToken();
329
330   const std::string name3("goodbye");
331   SequencedWorkerPool::SequenceToken token3 =
332       pool()->GetNamedSequenceToken(name3);
333
334   // All 3 tokens should be different.
335   EXPECT_FALSE(token1.Equals(token2));
336   EXPECT_FALSE(token1.Equals(token3));
337   EXPECT_FALSE(token2.Equals(token3));
338
339   // Requesting the same name again should give the same value.
340   SequencedWorkerPool::SequenceToken token1again =
341       pool()->GetNamedSequenceToken(name1);
342   EXPECT_TRUE(token1.Equals(token1again));
343
344   SequencedWorkerPool::SequenceToken token3again =
345       pool()->GetNamedSequenceToken(name3);
346   EXPECT_TRUE(token3.Equals(token3again));
347 }
348
349 // Tests that posting a bunch of tasks (many more than the number of worker
350 // threads) runs them all.
351 TEST_F(SequencedWorkerPoolTest, LotsOfTasks) {
352   pool()->PostWorkerTask(FROM_HERE,
353                          base::Bind(&TestTracker::SlowTask, tracker(), 0));
354
355   const size_t kNumTasks = 20;
356   for (size_t i = 1; i < kNumTasks; i++) {
357     pool()->PostWorkerTask(FROM_HERE,
358                            base::Bind(&TestTracker::FastTask, tracker(), i));
359   }
360
361   std::vector<int> result = tracker()->WaitUntilTasksComplete(kNumTasks);
362   EXPECT_EQ(kNumTasks, result.size());
363 }
364
365 // Tests that posting a bunch of tasks (many more than the number of
366 // worker threads) to two pools simultaneously runs them all twice.
367 // This test is meant to shake out any concurrency issues between
368 // pools (like histograms).
369 TEST_F(SequencedWorkerPoolTest, LotsOfTasksTwoPools) {
370   SequencedWorkerPoolOwner pool1(kNumWorkerThreads, "test1");
371   SequencedWorkerPoolOwner pool2(kNumWorkerThreads, "test2");
372
373   base::Closure slow_task = base::Bind(&TestTracker::SlowTask, tracker(), 0);
374   pool1.pool()->PostWorkerTask(FROM_HERE, slow_task);
375   pool2.pool()->PostWorkerTask(FROM_HERE, slow_task);
376
377   const size_t kNumTasks = 20;
378   for (size_t i = 1; i < kNumTasks; i++) {
379     base::Closure fast_task =
380         base::Bind(&TestTracker::FastTask, tracker(), i);
381     pool1.pool()->PostWorkerTask(FROM_HERE, fast_task);
382     pool2.pool()->PostWorkerTask(FROM_HERE, fast_task);
383   }
384
385   std::vector<int> result =
386       tracker()->WaitUntilTasksComplete(2*kNumTasks);
387   EXPECT_EQ(2 * kNumTasks, result.size());
388
389   pool2.pool()->Shutdown();
390   pool1.pool()->Shutdown();
391 }
392
393 // Test that tasks with the same sequence token are executed in order but don't
394 // affect other tasks.
395 TEST_F(SequencedWorkerPoolTest, Sequence) {
396   // Fill all the worker threads except one.
397   const size_t kNumBackgroundTasks = kNumWorkerThreads - 1;
398   ThreadBlocker background_blocker;
399   for (size_t i = 0; i < kNumBackgroundTasks; i++) {
400     pool()->PostWorkerTask(FROM_HERE,
401                            base::Bind(&TestTracker::BlockTask,
402                                       tracker(), i, &background_blocker));
403   }
404   tracker()->WaitUntilTasksBlocked(kNumBackgroundTasks);
405
406   // Create two tasks with the same sequence token, one that will block on the
407   // event, and one which will just complete quickly when it's run. Since there
408   // is one worker thread free, the first task will start and then block, and
409   // the second task should be waiting.
410   ThreadBlocker blocker;
411   SequencedWorkerPool::SequenceToken token1 = pool()->GetSequenceToken();
412   pool()->PostSequencedWorkerTask(
413       token1, FROM_HERE,
414       base::Bind(&TestTracker::BlockTask, tracker(), 100, &blocker));
415   pool()->PostSequencedWorkerTask(
416       token1, FROM_HERE,
417       base::Bind(&TestTracker::FastTask, tracker(), 101));
418   EXPECT_EQ(0u, tracker()->WaitUntilTasksComplete(0).size());
419
420   // Create another two tasks as above with a different token. These will be
421   // blocked since there are no slots to run.
422   SequencedWorkerPool::SequenceToken token2 = pool()->GetSequenceToken();
423   pool()->PostSequencedWorkerTask(
424       token2, FROM_HERE,
425       base::Bind(&TestTracker::FastTask, tracker(), 200));
426   pool()->PostSequencedWorkerTask(
427       token2, FROM_HERE,
428       base::Bind(&TestTracker::FastTask, tracker(), 201));
429   EXPECT_EQ(0u, tracker()->WaitUntilTasksComplete(0).size());
430
431   // Let one background task complete. This should then let both tasks of
432   // token2 run to completion in order. The second task of token1 should still
433   // be blocked.
434   background_blocker.Unblock(1);
435   std::vector<int> result = tracker()->WaitUntilTasksComplete(3);
436   ASSERT_EQ(3u, result.size());
437   EXPECT_EQ(200, result[1]);
438   EXPECT_EQ(201, result[2]);
439
440   // Finish the rest of the background tasks. This should leave some workers
441   // free with the second token1 task still blocked on the first.
442   background_blocker.Unblock(kNumBackgroundTasks - 1);
443   EXPECT_EQ(kNumBackgroundTasks + 2,
444             tracker()->WaitUntilTasksComplete(kNumBackgroundTasks + 2).size());
445
446   // Allow the first task of token1 to complete. This should run the second.
447   blocker.Unblock(1);
448   result = tracker()->WaitUntilTasksComplete(kNumBackgroundTasks + 4);
449   ASSERT_EQ(kNumBackgroundTasks + 4, result.size());
450   EXPECT_EQ(100, result[result.size() - 2]);
451   EXPECT_EQ(101, result[result.size() - 1]);
452 }
453
454 // Tests that any tasks posted after Shutdown are ignored.
455 // Disabled for flakiness.  See http://crbug.com/166451.
456 TEST_F(SequencedWorkerPoolTest, DISABLED_IgnoresAfterShutdown) {
457   // Start tasks to take all the threads and block them.
458   EnsureAllWorkersCreated();
459   ThreadBlocker blocker;
460   for (size_t i = 0; i < kNumWorkerThreads; i++) {
461     pool()->PostWorkerTask(FROM_HERE,
462                            base::Bind(&TestTracker::BlockTask,
463                                       tracker(), i, &blocker));
464   }
465   tracker()->WaitUntilTasksBlocked(kNumWorkerThreads);
466
467   SetWillWaitForShutdownCallback(
468       base::Bind(&EnsureTasksToCompleteCountAndUnblock,
469                  scoped_refptr<TestTracker>(tracker()), 0,
470                  &blocker, kNumWorkerThreads));
471
472   // Shutdown the worker pool. This should discard all non-blocking tasks.
473   const int kMaxNewBlockingTasksAfterShutdown = 100;
474   pool()->Shutdown(kMaxNewBlockingTasksAfterShutdown);
475
476   int old_has_work_call_count = has_work_call_count();
477
478   std::vector<int> result =
479       tracker()->WaitUntilTasksComplete(kNumWorkerThreads);
480
481   // The kNumWorkerThread items should have completed, in no particular order.
482   ASSERT_EQ(kNumWorkerThreads, result.size());
483   for (size_t i = 0; i < kNumWorkerThreads; i++) {
484     EXPECT_TRUE(std::find(result.begin(), result.end(), static_cast<int>(i)) !=
485                 result.end());
486   }
487
488   // No further tasks, regardless of shutdown mode, should be allowed.
489   EXPECT_FALSE(pool()->PostWorkerTaskWithShutdownBehavior(
490       FROM_HERE,
491       base::Bind(&TestTracker::FastTask, tracker(), 100),
492       SequencedWorkerPool::CONTINUE_ON_SHUTDOWN));
493   EXPECT_FALSE(pool()->PostWorkerTaskWithShutdownBehavior(
494       FROM_HERE,
495       base::Bind(&TestTracker::FastTask, tracker(), 101),
496       SequencedWorkerPool::SKIP_ON_SHUTDOWN));
497   EXPECT_FALSE(pool()->PostWorkerTaskWithShutdownBehavior(
498       FROM_HERE,
499       base::Bind(&TestTracker::FastTask, tracker(), 102),
500       SequencedWorkerPool::BLOCK_SHUTDOWN));
501
502   ASSERT_EQ(old_has_work_call_count, has_work_call_count());
503 }
504
505 TEST_F(SequencedWorkerPoolTest, AllowsAfterShutdown) {
506   // Test that <n> new blocking tasks are allowed provided they're posted
507   // by a running tasks.
508   EnsureAllWorkersCreated();
509   ThreadBlocker blocker;
510
511   // Start tasks to take all the threads and block them.
512   const int kNumBlockTasks = static_cast<int>(kNumWorkerThreads);
513   for (int i = 0; i < kNumBlockTasks; ++i) {
514     EXPECT_TRUE(pool()->PostWorkerTask(
515         FROM_HERE,
516         base::Bind(&TestTracker::BlockTask, tracker(), i, &blocker)));
517   }
518   tracker()->WaitUntilTasksBlocked(kNumWorkerThreads);
519
520   // Queue up shutdown blocking tasks behind those which will attempt to post
521   // additional tasks when run, PostAdditionalTasks attemtps to post 3
522   // new FastTasks, one for each shutdown_behavior.
523   const int kNumQueuedTasks = static_cast<int>(kNumWorkerThreads);
524   for (int i = 0; i < kNumQueuedTasks; ++i) {
525     EXPECT_TRUE(pool()->PostWorkerTaskWithShutdownBehavior(
526         FROM_HERE,
527         base::Bind(&TestTracker::PostAdditionalTasks, tracker(), i, pool(),
528                    false),
529         SequencedWorkerPool::BLOCK_SHUTDOWN));
530   }
531
532   // Setup to open the floodgates from within Shutdown().
533   SetWillWaitForShutdownCallback(
534       base::Bind(&EnsureTasksToCompleteCountAndUnblock,
535                  scoped_refptr<TestTracker>(tracker()),
536                  0, &blocker, kNumBlockTasks));
537
538   // Allow half of the additional blocking tasks thru.
539   const int kNumNewBlockingTasksToAllow = kNumWorkerThreads / 2;
540   pool()->Shutdown(kNumNewBlockingTasksToAllow);
541
542   // Ensure that the correct number of tasks actually got run.
543   tracker()->WaitUntilTasksComplete(static_cast<size_t>(
544       kNumBlockTasks + kNumQueuedTasks + kNumNewBlockingTasksToAllow));
545
546   // Clean up the task IDs we added and go home.
547   tracker()->ClearCompleteSequence();
548 }
549
550 // Tests that unrun tasks are discarded properly according to their shutdown
551 // mode.
552 TEST_F(SequencedWorkerPoolTest, DiscardOnShutdown) {
553   // Start tasks to take all the threads and block them.
554   EnsureAllWorkersCreated();
555   ThreadBlocker blocker;
556   for (size_t i = 0; i < kNumWorkerThreads; i++) {
557     pool()->PostWorkerTask(FROM_HERE,
558                            base::Bind(&TestTracker::BlockTask,
559                                       tracker(), i, &blocker));
560   }
561   tracker()->WaitUntilTasksBlocked(kNumWorkerThreads);
562
563   // Create some tasks with different shutdown modes.
564   pool()->PostWorkerTaskWithShutdownBehavior(
565       FROM_HERE,
566       base::Bind(&TestTracker::FastTask, tracker(), 100),
567       SequencedWorkerPool::CONTINUE_ON_SHUTDOWN);
568   pool()->PostWorkerTaskWithShutdownBehavior(
569       FROM_HERE,
570       base::Bind(&TestTracker::FastTask, tracker(), 101),
571       SequencedWorkerPool::SKIP_ON_SHUTDOWN);
572   pool()->PostWorkerTaskWithShutdownBehavior(
573       FROM_HERE,
574       base::Bind(&TestTracker::FastTask, tracker(), 102),
575       SequencedWorkerPool::BLOCK_SHUTDOWN);
576
577   // Shutdown the worker pool. This should discard all non-blocking tasks.
578   SetWillWaitForShutdownCallback(
579       base::Bind(&EnsureTasksToCompleteCountAndUnblock,
580                  scoped_refptr<TestTracker>(tracker()), 0,
581                  &blocker, kNumWorkerThreads));
582   pool()->Shutdown();
583
584   std::vector<int> result =
585       tracker()->WaitUntilTasksComplete(kNumWorkerThreads + 1);
586
587   // The kNumWorkerThread items should have completed, plus the BLOCK_SHUTDOWN
588   // one, in no particular order.
589   ASSERT_EQ(kNumWorkerThreads + 1, result.size());
590   for (size_t i = 0; i < kNumWorkerThreads; i++) {
591     EXPECT_TRUE(std::find(result.begin(), result.end(), static_cast<int>(i)) !=
592                 result.end());
593   }
594   EXPECT_TRUE(std::find(result.begin(), result.end(), 102) != result.end());
595 }
596
597 // Tests that CONTINUE_ON_SHUTDOWN tasks don't block shutdown.
598 TEST_F(SequencedWorkerPoolTest, ContinueOnShutdown) {
599   scoped_refptr<TaskRunner> runner(pool()->GetTaskRunnerWithShutdownBehavior(
600       SequencedWorkerPool::CONTINUE_ON_SHUTDOWN));
601   scoped_refptr<SequencedTaskRunner> sequenced_runner(
602       pool()->GetSequencedTaskRunnerWithShutdownBehavior(
603           pool()->GetSequenceToken(),
604           SequencedWorkerPool::CONTINUE_ON_SHUTDOWN));
605   EnsureAllWorkersCreated();
606   ThreadBlocker blocker;
607   pool()->PostWorkerTaskWithShutdownBehavior(
608       FROM_HERE,
609       base::Bind(&TestTracker::BlockTask,
610                  tracker(), 0, &blocker),
611       SequencedWorkerPool::CONTINUE_ON_SHUTDOWN);
612   runner->PostTask(
613       FROM_HERE,
614       base::Bind(&TestTracker::BlockTask,
615                  tracker(), 1, &blocker));
616   sequenced_runner->PostTask(
617       FROM_HERE,
618       base::Bind(&TestTracker::BlockTask,
619                  tracker(), 2, &blocker));
620
621   tracker()->WaitUntilTasksBlocked(3);
622
623   // This should not block. If this test hangs, it means it failed.
624   pool()->Shutdown();
625
626   // The task should not have completed yet.
627   EXPECT_EQ(0u, tracker()->WaitUntilTasksComplete(0).size());
628
629   // Posting more tasks should fail.
630   EXPECT_FALSE(pool()->PostWorkerTaskWithShutdownBehavior(
631       FROM_HERE, base::Bind(&TestTracker::FastTask, tracker(), 0),
632       SequencedWorkerPool::CONTINUE_ON_SHUTDOWN));
633   EXPECT_FALSE(runner->PostTask(
634       FROM_HERE, base::Bind(&TestTracker::FastTask, tracker(), 0)));
635   EXPECT_FALSE(sequenced_runner->PostTask(
636       FROM_HERE, base::Bind(&TestTracker::FastTask, tracker(), 0)));
637
638   // Continue the background thread and make sure the tasks can complete.
639   blocker.Unblock(3);
640   std::vector<int> result = tracker()->WaitUntilTasksComplete(3);
641   EXPECT_EQ(3u, result.size());
642 }
643
644 // Tests that SKIP_ON_SHUTDOWN tasks that have been started block Shutdown
645 // until they stop, but tasks not yet started do not.
646 TEST_F(SequencedWorkerPoolTest, SkipOnShutdown) {
647   // Start tasks to take all the threads and block them.
648   EnsureAllWorkersCreated();
649   ThreadBlocker blocker;
650
651   // Now block all the threads with SKIP_ON_SHUTDOWN. Shutdown() should not
652   // return until these tasks have completed.
653   for (size_t i = 0; i < kNumWorkerThreads; i++) {
654     pool()->PostWorkerTaskWithShutdownBehavior(
655         FROM_HERE,
656         base::Bind(&TestTracker::BlockTask, tracker(), i, &blocker),
657         SequencedWorkerPool::SKIP_ON_SHUTDOWN);
658   }
659   tracker()->WaitUntilTasksBlocked(kNumWorkerThreads);
660
661   // Now post an additional task as SKIP_ON_SHUTDOWN, which should not be
662   // executed once Shutdown() has been called.
663   pool()->PostWorkerTaskWithShutdownBehavior(
664       FROM_HERE,
665       base::Bind(&TestTracker::BlockTask,
666                  tracker(), 0, &blocker),
667       SequencedWorkerPool::SKIP_ON_SHUTDOWN);
668
669   // This callback will only be invoked if SKIP_ON_SHUTDOWN tasks that have
670   // been started block shutdown.
671   SetWillWaitForShutdownCallback(
672       base::Bind(&EnsureTasksToCompleteCountAndUnblock,
673                  scoped_refptr<TestTracker>(tracker()), 0,
674                  &blocker, kNumWorkerThreads));
675
676   // No tasks should have completed yet.
677   EXPECT_EQ(0u, tracker()->WaitUntilTasksComplete(0).size());
678
679   // This should not block. If this test hangs, it means it failed.
680   pool()->Shutdown();
681
682   // Shutdown should not return until all of the tasks have completed.
683   std::vector<int> result =
684       tracker()->WaitUntilTasksComplete(kNumWorkerThreads);
685
686   // Only tasks marked SKIP_ON_SHUTDOWN that were already started should be
687   // allowed to complete. No additional non-blocking tasks should have been
688   // started.
689   ASSERT_EQ(kNumWorkerThreads, result.size());
690   for (size_t i = 0; i < kNumWorkerThreads; i++) {
691     EXPECT_TRUE(std::find(result.begin(), result.end(), static_cast<int>(i)) !=
692                 result.end());
693   }
694 }
695
696 // Ensure all worker threads are created, and then trigger a spurious
697 // work signal. This shouldn't cause any other work signals to be
698 // triggered. This is a regression test for http://crbug.com/117469.
699 TEST_F(SequencedWorkerPoolTest, SpuriousWorkSignal) {
700   EnsureAllWorkersCreated();
701   int old_has_work_call_count = has_work_call_count();
702   pool()->SignalHasWorkForTesting();
703   // This is inherently racy, but can only produce false positives.
704   base::PlatformThread::Sleep(base::TimeDelta::FromMilliseconds(100));
705   EXPECT_EQ(old_has_work_call_count + 1, has_work_call_count());
706 }
707
708 void IsRunningOnCurrentThreadTask(
709     SequencedWorkerPool::SequenceToken test_positive_token,
710     SequencedWorkerPool::SequenceToken test_negative_token,
711     SequencedWorkerPool* pool,
712     SequencedWorkerPool* unused_pool) {
713   EXPECT_TRUE(pool->RunsTasksOnCurrentThread());
714   EXPECT_TRUE(pool->IsRunningSequenceOnCurrentThread(test_positive_token));
715   EXPECT_FALSE(pool->IsRunningSequenceOnCurrentThread(test_negative_token));
716   EXPECT_FALSE(unused_pool->RunsTasksOnCurrentThread());
717   EXPECT_FALSE(
718       unused_pool->IsRunningSequenceOnCurrentThread(test_positive_token));
719   EXPECT_FALSE(
720       unused_pool->IsRunningSequenceOnCurrentThread(test_negative_token));
721 }
722
723 // Verify correctness of the IsRunningSequenceOnCurrentThread method.
724 TEST_F(SequencedWorkerPoolTest, IsRunningOnCurrentThread) {
725   SequencedWorkerPool::SequenceToken token1 = pool()->GetSequenceToken();
726   SequencedWorkerPool::SequenceToken token2 = pool()->GetSequenceToken();
727   SequencedWorkerPool::SequenceToken unsequenced_token;
728
729   scoped_refptr<SequencedWorkerPool> unused_pool =
730       new SequencedWorkerPool(2, "unused_pool");
731
732   EXPECT_FALSE(pool()->RunsTasksOnCurrentThread());
733   EXPECT_FALSE(pool()->IsRunningSequenceOnCurrentThread(token1));
734   EXPECT_FALSE(pool()->IsRunningSequenceOnCurrentThread(token2));
735   EXPECT_FALSE(pool()->IsRunningSequenceOnCurrentThread(unsequenced_token));
736   EXPECT_FALSE(unused_pool->RunsTasksOnCurrentThread());
737   EXPECT_FALSE(unused_pool->IsRunningSequenceOnCurrentThread(token1));
738   EXPECT_FALSE(unused_pool->IsRunningSequenceOnCurrentThread(token2));
739   EXPECT_FALSE(
740       unused_pool->IsRunningSequenceOnCurrentThread(unsequenced_token));
741
742   pool()->PostSequencedWorkerTask(
743       token1, FROM_HERE,
744       base::Bind(&IsRunningOnCurrentThreadTask,
745                  token1, token2, pool(), unused_pool));
746   pool()->PostSequencedWorkerTask(
747       token2, FROM_HERE,
748       base::Bind(&IsRunningOnCurrentThreadTask,
749                  token2, unsequenced_token, pool(), unused_pool));
750   pool()->PostWorkerTask(
751       FROM_HERE,
752       base::Bind(&IsRunningOnCurrentThreadTask,
753                  unsequenced_token, token1, pool(), unused_pool));
754   pool()->Shutdown();
755   unused_pool->Shutdown();
756 }
757
758 // Verify that FlushForTesting works as intended.
759 TEST_F(SequencedWorkerPoolTest, FlushForTesting) {
760   // Should be fine to call on a new instance.
761   pool()->FlushForTesting();
762
763   // Queue up a bunch of work, including  a long delayed task and
764   // a task that produces additional tasks as an artifact.
765   pool()->PostDelayedWorkerTask(
766       FROM_HERE,
767       base::Bind(&TestTracker::FastTask, tracker(), 0),
768       TimeDelta::FromMinutes(5));
769   pool()->PostWorkerTask(FROM_HERE,
770                          base::Bind(&TestTracker::SlowTask, tracker(), 0));
771   const size_t kNumFastTasks = 20;
772   for (size_t i = 0; i < kNumFastTasks; i++) {
773     pool()->PostWorkerTask(FROM_HERE,
774                            base::Bind(&TestTracker::FastTask, tracker(), 0));
775   }
776   pool()->PostWorkerTask(
777       FROM_HERE,
778       base::Bind(&TestTracker::PostAdditionalTasks, tracker(), 0, pool(),
779                  true));
780
781   // We expect all except the delayed task to have been run. We verify all
782   // closures have been deleted by looking at the refcount of the
783   // tracker.
784   EXPECT_FALSE(tracker()->HasOneRef());
785   pool()->FlushForTesting();
786   EXPECT_TRUE(tracker()->HasOneRef());
787   EXPECT_EQ(1 + kNumFastTasks + 1 + 3, tracker()->GetTasksCompletedCount());
788
789   // Should be fine to call on an idle instance with all threads created, and
790   // spamming the method shouldn't deadlock or confuse the class.
791   pool()->FlushForTesting();
792   pool()->FlushForTesting();
793
794   // Should be fine to call after shutdown too.
795   pool()->Shutdown();
796   pool()->FlushForTesting();
797 }
798
799 TEST(SequencedWorkerPoolRefPtrTest, ShutsDownCleanWithContinueOnShutdown) {
800   MessageLoop loop;
801   scoped_refptr<SequencedWorkerPool> pool(new SequencedWorkerPool(3, "Pool"));
802   scoped_refptr<SequencedTaskRunner> task_runner =
803       pool->GetSequencedTaskRunnerWithShutdownBehavior(
804           pool->GetSequenceToken(),
805           base::SequencedWorkerPool::CONTINUE_ON_SHUTDOWN);
806
807   // Upon test exit, should shut down without hanging.
808   pool->Shutdown();
809 }
810
811 class SequencedWorkerPoolTaskRunnerTestDelegate {
812  public:
813   SequencedWorkerPoolTaskRunnerTestDelegate() {}
814
815   ~SequencedWorkerPoolTaskRunnerTestDelegate() {}
816
817   void StartTaskRunner() {
818     pool_owner_.reset(
819         new SequencedWorkerPoolOwner(10, "SequencedWorkerPoolTaskRunnerTest"));
820   }
821
822   scoped_refptr<SequencedWorkerPool> GetTaskRunner() {
823     return pool_owner_->pool();
824   }
825
826   void StopTaskRunner() {
827     // Make sure all tasks are run before shutting down. Delayed tasks are
828     // not run, they're simply deleted.
829     pool_owner_->pool()->FlushForTesting();
830     pool_owner_->pool()->Shutdown();
831     // Don't reset |pool_owner_| here, as the test may still hold a
832     // reference to the pool.
833   }
834
835  private:
836   MessageLoop message_loop_;
837   scoped_ptr<SequencedWorkerPoolOwner> pool_owner_;
838 };
839
840 INSTANTIATE_TYPED_TEST_CASE_P(
841     SequencedWorkerPool, TaskRunnerTest,
842     SequencedWorkerPoolTaskRunnerTestDelegate);
843
844 class SequencedWorkerPoolTaskRunnerWithShutdownBehaviorTestDelegate {
845  public:
846   SequencedWorkerPoolTaskRunnerWithShutdownBehaviorTestDelegate() {}
847
848   ~SequencedWorkerPoolTaskRunnerWithShutdownBehaviorTestDelegate() {
849   }
850
851   void StartTaskRunner() {
852     pool_owner_.reset(
853         new SequencedWorkerPoolOwner(10, "SequencedWorkerPoolTaskRunnerTest"));
854     task_runner_ = pool_owner_->pool()->GetTaskRunnerWithShutdownBehavior(
855         SequencedWorkerPool::BLOCK_SHUTDOWN);
856   }
857
858   scoped_refptr<TaskRunner> GetTaskRunner() {
859     return task_runner_;
860   }
861
862   void StopTaskRunner() {
863     // Make sure all tasks are run before shutting down. Delayed tasks are
864     // not run, they're simply deleted.
865     pool_owner_->pool()->FlushForTesting();
866     pool_owner_->pool()->Shutdown();
867     // Don't reset |pool_owner_| here, as the test may still hold a
868     // reference to the pool.
869   }
870
871  private:
872   MessageLoop message_loop_;
873   scoped_ptr<SequencedWorkerPoolOwner> pool_owner_;
874   scoped_refptr<TaskRunner> task_runner_;
875 };
876
877 INSTANTIATE_TYPED_TEST_CASE_P(
878     SequencedWorkerPoolTaskRunner, TaskRunnerTest,
879     SequencedWorkerPoolTaskRunnerWithShutdownBehaviorTestDelegate);
880
881 class SequencedWorkerPoolSequencedTaskRunnerTestDelegate {
882  public:
883   SequencedWorkerPoolSequencedTaskRunnerTestDelegate() {}
884
885   ~SequencedWorkerPoolSequencedTaskRunnerTestDelegate() {
886   }
887
888   void StartTaskRunner() {
889     pool_owner_.reset(new SequencedWorkerPoolOwner(
890         10, "SequencedWorkerPoolSequencedTaskRunnerTest"));
891     task_runner_ = pool_owner_->pool()->GetSequencedTaskRunner(
892         pool_owner_->pool()->GetSequenceToken());
893   }
894
895   scoped_refptr<SequencedTaskRunner> GetTaskRunner() {
896     return task_runner_;
897   }
898
899   void StopTaskRunner() {
900     // Make sure all tasks are run before shutting down. Delayed tasks are
901     // not run, they're simply deleted.
902     pool_owner_->pool()->FlushForTesting();
903     pool_owner_->pool()->Shutdown();
904     // Don't reset |pool_owner_| here, as the test may still hold a
905     // reference to the pool.
906   }
907
908  private:
909   MessageLoop message_loop_;
910   scoped_ptr<SequencedWorkerPoolOwner> pool_owner_;
911   scoped_refptr<SequencedTaskRunner> task_runner_;
912 };
913
914 INSTANTIATE_TYPED_TEST_CASE_P(
915     SequencedWorkerPoolSequencedTaskRunner, TaskRunnerTest,
916     SequencedWorkerPoolSequencedTaskRunnerTestDelegate);
917
918 INSTANTIATE_TYPED_TEST_CASE_P(
919     SequencedWorkerPoolSequencedTaskRunner, SequencedTaskRunnerTest,
920     SequencedWorkerPoolSequencedTaskRunnerTestDelegate);
921
922 }  // namespace
923
924 }  // namespace base