1 // Copyright 2018 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "base/observer_list_threadsafe.h"
10 #include "base/bind.h"
11 #include "base/compiler_specific.h"
12 #include "base/location.h"
13 #include "base/logging.h"
14 #include "base/memory/weak_ptr.h"
15 #include "base/run_loop.h"
16 #include "base/sequenced_task_runner.h"
17 #include "base/single_thread_task_runner.h"
18 #include "base/synchronization/waitable_event.h"
19 #include "base/task/post_task.h"
20 #include "base/task/thread_pool.h"
21 #include "base/task/thread_pool/thread_pool_instance.h"
22 #include "base/test/bind.h"
23 #include "base/test/task_environment.h"
24 #include "base/threading/platform_thread.h"
25 #include "base/threading/thread_restrictions.h"
26 #include "base/threading/thread_task_runner_handle.h"
27 #include "build/build_config.h"
28 #include "testing/gtest/include/gtest/gtest.h"
29 #include "third_party/abseil-cpp/absl/types/optional.h"
34 constexpr int kThreadRunTime = 2000; // ms to run the multi-threaded test.
38 virtual void Observe(int x) = 0;
39 virtual ~Foo() = default;
40 virtual int GetValue() const { return 0; }
43 class Adder : public Foo {
45 explicit Adder(int scaler) : total(0), scaler_(scaler) {}
46 ~Adder() override = default;
48 void Observe(int x) override { total += x * scaler_; }
49 int GetValue() const override { return total; }
57 class AddInObserve : public Foo {
59 explicit AddInObserve(ObserverListThreadSafe<Foo>* observer_list)
60 : observer_list(observer_list), to_add_() {}
62 void SetToAdd(Foo* to_add) { to_add_ = to_add; }
64 void Observe(int x) override {
66 observer_list->AddObserver(to_add_);
71 ObserverListThreadSafe<Foo>* observer_list;
75 // A task for use in the ThreadSafeObserver test which will add and remove
76 // itself from the notification list repeatedly.
77 class AddRemoveThread : public Foo {
79 AddRemoveThread(ObserverListThreadSafe<Foo>* list, bool notify)
81 task_runner_(ThreadPool::CreateSingleThreadTaskRunner(
83 SingleThreadTaskRunnerThreadMode::DEDICATED)),
86 do_notifies_(notify) {
87 task_runner_->PostTask(
89 base::BindOnce(&AddRemoveThread::AddTask, weak_factory_.GetWeakPtr()));
92 ~AddRemoveThread() override = default;
94 // This task just keeps posting to itself in an attempt to race with the
97 if ((Time::Now() - start_).InMilliseconds() > kThreadRunTime) {
103 list_->AddObserver(this);
108 list_->Notify(FROM_HERE, &Foo::Observe, 10);
111 ThreadTaskRunnerHandle::Get()->PostTask(
113 base::BindOnce(&AddRemoveThread::AddTask, weak_factory_.GetWeakPtr()));
116 void Observe(int x) override {
117 // If we're getting called after we removed ourselves from the list, that is
119 EXPECT_TRUE(in_list_);
121 // This callback should fire on the appropriate thread
122 EXPECT_TRUE(task_runner_->BelongsToCurrentThread());
124 list_->RemoveObserver(this);
129 ObserverListThreadSafe<Foo>* list_;
130 scoped_refptr<SingleThreadTaskRunner> task_runner_;
131 bool in_list_; // Are we currently registered for notifications.
132 // in_list_ is only used on |this| thread.
133 Time start_; // The time we started the test.
135 bool do_notifies_; // Whether these threads should do notifications.
137 base::WeakPtrFactory<AddRemoveThread> weak_factory_{this};
142 TEST(ObserverListThreadSafeTest, BasicTest) {
143 using List = ObserverListThreadSafe<Foo>;
144 test::TaskEnvironment task_environment;
146 scoped_refptr<List> observer_list(new List);
152 List::AddObserverResult result;
154 result = observer_list->AddObserver(&a);
155 EXPECT_EQ(result, List::AddObserverResult::kBecameNonEmpty);
156 result = observer_list->AddObserver(&b);
157 EXPECT_EQ(result, List::AddObserverResult::kWasAlreadyNonEmpty);
159 observer_list->Notify(FROM_HERE, &Foo::Observe, 10);
160 RunLoop().RunUntilIdle();
162 result = observer_list->AddObserver(&c);
163 EXPECT_EQ(result, List::AddObserverResult::kWasAlreadyNonEmpty);
164 result = observer_list->AddObserver(&d);
165 EXPECT_EQ(result, List::AddObserverResult::kWasAlreadyNonEmpty);
167 observer_list->Notify(FROM_HERE, &Foo::Observe, 10);
168 observer_list->RemoveObserver(&c);
169 RunLoop().RunUntilIdle();
171 EXPECT_EQ(20, a.total);
172 EXPECT_EQ(-20, b.total);
173 EXPECT_EQ(0, c.total);
174 EXPECT_EQ(-10, d.total);
177 TEST(ObserverListThreadSafeTest, RemoveObserver) {
178 using List = ObserverListThreadSafe<Foo>;
179 test::TaskEnvironment task_environment;
181 scoped_refptr<List> observer_list(new List);
184 // A workaround for the compiler bug. See http://crbug.com/121960.
187 List::RemoveObserverResult result;
189 // Should do nothing.
190 result = observer_list->RemoveObserver(&a);
191 EXPECT_EQ(result, List::RemoveObserverResult::kWasOrBecameEmpty);
192 result = observer_list->RemoveObserver(&b);
193 EXPECT_EQ(result, List::RemoveObserverResult::kWasOrBecameEmpty);
195 observer_list->Notify(FROM_HERE, &Foo::Observe, 10);
196 RunLoop().RunUntilIdle();
198 EXPECT_EQ(0, a.total);
199 EXPECT_EQ(0, b.total);
201 observer_list->AddObserver(&a);
203 // Should also do nothing.
204 result = observer_list->RemoveObserver(&b);
205 EXPECT_EQ(result, List::RemoveObserverResult::kRemainsNonEmpty);
207 observer_list->Notify(FROM_HERE, &Foo::Observe, 10);
208 RunLoop().RunUntilIdle();
210 EXPECT_EQ(10, a.total);
211 EXPECT_EQ(0, b.total);
213 result = observer_list->RemoveObserver(&a);
214 EXPECT_EQ(result, List::RemoveObserverResult::kWasOrBecameEmpty);
217 class FooRemover : public Foo {
219 explicit FooRemover(ObserverListThreadSafe<Foo>* list) : list_(list) {}
220 ~FooRemover() override = default;
222 void AddFooToRemove(Foo* foo) { foos_.push_back(foo); }
224 void Observe(int x) override {
225 std::vector<Foo*> tmp;
227 for (auto* it : tmp) {
228 list_->RemoveObserver(it);
233 const scoped_refptr<ObserverListThreadSafe<Foo>> list_;
234 std::vector<Foo*> foos_;
237 TEST(ObserverListThreadSafeTest, RemoveMultipleObservers) {
238 test::TaskEnvironment task_environment;
239 scoped_refptr<ObserverListThreadSafe<Foo>> observer_list(
240 new ObserverListThreadSafe<Foo>);
242 FooRemover a(observer_list.get());
245 observer_list->AddObserver(&a);
246 observer_list->AddObserver(&b);
248 a.AddFooToRemove(&a);
249 a.AddFooToRemove(&b);
251 observer_list->Notify(FROM_HERE, &Foo::Observe, 1);
252 RunLoop().RunUntilIdle();
255 // A test driver for a multi-threaded notification loop. Runs a number of
256 // observer threads, each of which constantly adds/removes itself from the
257 // observer list. Optionally, if cross_thread_notifies is set to true, the
258 // observer threads will also trigger notifications to all observers.
259 static void ThreadSafeObserverHarness(int num_threads,
260 bool cross_thread_notifies) {
261 test::TaskEnvironment task_environment;
263 scoped_refptr<ObserverListThreadSafe<Foo>> observer_list(
264 new ObserverListThreadSafe<Foo>);
268 observer_list->AddObserver(&a);
269 observer_list->AddObserver(&b);
271 std::vector<std::unique_ptr<AddRemoveThread>> threaded_observer;
272 threaded_observer.reserve(num_threads);
273 for (int index = 0; index < num_threads; index++) {
274 threaded_observer.push_back(std::make_unique<AddRemoveThread>(
275 observer_list.get(), cross_thread_notifies));
277 ASSERT_EQ(static_cast<size_t>(num_threads), threaded_observer.size());
279 Time start = Time::Now();
281 if ((Time::Now() - start).InMilliseconds() > kThreadRunTime)
284 observer_list->Notify(FROM_HERE, &Foo::Observe, 10);
286 RunLoop().RunUntilIdle();
289 task_environment.RunUntilIdle();
292 TEST(ObserverListThreadSafeTest, CrossThreadObserver) {
293 // Use 7 observer threads. Notifications only come from the main thread.
294 ThreadSafeObserverHarness(7, false);
297 TEST(ObserverListThreadSafeTest, CrossThreadNotifications) {
298 // Use 3 observer threads. Notifications will fire from the main thread and
299 // all 3 observer threads.
300 ThreadSafeObserverHarness(3, true);
303 TEST(ObserverListThreadSafeTest, OutlivesTaskEnvironment) {
304 absl::optional<test::TaskEnvironment> task_environment(absl::in_place);
305 scoped_refptr<ObserverListThreadSafe<Foo>> observer_list(
306 new ObserverListThreadSafe<Foo>);
309 observer_list->AddObserver(&a);
310 task_environment.reset();
311 // Test passes if we don't crash here.
312 observer_list->Notify(FROM_HERE, &Foo::Observe, 1);
317 class SequenceVerificationObserver : public Foo {
319 explicit SequenceVerificationObserver(
320 scoped_refptr<SequencedTaskRunner> task_runner)
321 : task_runner_(std::move(task_runner)) {}
322 SequenceVerificationObserver(const SequenceVerificationObserver&) = delete;
323 SequenceVerificationObserver& operator=(const SequenceVerificationObserver&) =
325 ~SequenceVerificationObserver() override = default;
327 void Observe(int x) override {
328 called_on_valid_sequence_ = task_runner_->RunsTasksInCurrentSequence();
331 bool called_on_valid_sequence() const { return called_on_valid_sequence_; }
334 const scoped_refptr<SequencedTaskRunner> task_runner_;
335 bool called_on_valid_sequence_ = false;
340 // Verify that observers are notified on the correct sequence.
341 TEST(ObserverListThreadSafeTest, NotificationOnValidSequence) {
342 test::TaskEnvironment task_environment;
344 auto task_runner_1 = ThreadPool::CreateSequencedTaskRunner({});
345 auto task_runner_2 = ThreadPool::ThreadPool::CreateSequencedTaskRunner({});
347 auto observer_list = MakeRefCounted<ObserverListThreadSafe<Foo>>();
349 SequenceVerificationObserver observer_1(task_runner_1);
350 SequenceVerificationObserver observer_2(task_runner_2);
352 task_runner_1->PostTask(
354 BindOnce(base::IgnoreResult(&ObserverListThreadSafe<Foo>::AddObserver),
355 observer_list, Unretained(&observer_1)));
356 task_runner_2->PostTask(
358 BindOnce(base::IgnoreResult(&ObserverListThreadSafe<Foo>::AddObserver),
359 observer_list, Unretained(&observer_2)));
361 ThreadPoolInstance::Get()->FlushForTesting();
363 observer_list->Notify(FROM_HERE, &Foo::Observe, 1);
365 ThreadPoolInstance::Get()->FlushForTesting();
367 EXPECT_TRUE(observer_1.called_on_valid_sequence());
368 EXPECT_TRUE(observer_2.called_on_valid_sequence());
371 // Verify that when an observer is added to a NOTIFY_ALL ObserverListThreadSafe
372 // from a notification, it is itself notified.
373 TEST(ObserverListThreadSafeTest, AddObserverFromNotificationNotifyAll) {
374 test::TaskEnvironment task_environment;
375 auto observer_list = MakeRefCounted<ObserverListThreadSafe<Foo>>();
377 Adder observer_added_from_notification(1);
379 AddInObserve initial_observer(observer_list.get());
380 initial_observer.SetToAdd(&observer_added_from_notification);
381 observer_list->AddObserver(&initial_observer);
383 observer_list->Notify(FROM_HERE, &Foo::Observe, 1);
385 base::RunLoop().RunUntilIdle();
387 EXPECT_EQ(1, observer_added_from_notification.GetValue());
392 class RemoveWhileNotificationIsRunningObserver : public Foo {
394 RemoveWhileNotificationIsRunningObserver()
395 : notification_running_(WaitableEvent::ResetPolicy::AUTOMATIC,
396 WaitableEvent::InitialState::NOT_SIGNALED),
397 barrier_(WaitableEvent::ResetPolicy::AUTOMATIC,
398 WaitableEvent::InitialState::NOT_SIGNALED) {}
399 RemoveWhileNotificationIsRunningObserver(
400 const RemoveWhileNotificationIsRunningObserver&) = delete;
401 RemoveWhileNotificationIsRunningObserver& operator=(
402 const RemoveWhileNotificationIsRunningObserver&) = delete;
403 ~RemoveWhileNotificationIsRunningObserver() override = default;
405 void Observe(int x) override {
406 notification_running_.Signal();
407 ScopedAllowBaseSyncPrimitivesForTesting allow_base_sync_primitives;
411 void WaitForNotificationRunning() { notification_running_.Wait(); }
412 void Unblock() { barrier_.Signal(); }
415 WaitableEvent notification_running_;
416 WaitableEvent barrier_;
421 // Verify that there is no crash when an observer is removed while it is being
423 TEST(ObserverListThreadSafeTest, RemoveWhileNotificationIsRunning) {
424 auto observer_list = MakeRefCounted<ObserverListThreadSafe<Foo>>();
425 RemoveWhileNotificationIsRunningObserver observer;
427 WaitableEvent task_running(WaitableEvent::ResetPolicy::AUTOMATIC,
428 WaitableEvent::InitialState::NOT_SIGNALED);
429 WaitableEvent barrier(WaitableEvent::ResetPolicy::AUTOMATIC,
430 WaitableEvent::InitialState::NOT_SIGNALED);
432 // This must be after the declaration of |barrier| so that tasks posted to
433 // ThreadPool can safely use |barrier|.
434 test::TaskEnvironment task_environment;
436 ThreadPool::CreateSequencedTaskRunner({MayBlock()})
437 ->PostTask(FROM_HERE,
438 base::BindOnce(base::IgnoreResult(
439 &ObserverListThreadSafe<Foo>::AddObserver),
440 observer_list, Unretained(&observer)));
441 ThreadPoolInstance::Get()->FlushForTesting();
443 observer_list->Notify(FROM_HERE, &Foo::Observe, 1);
444 observer.WaitForNotificationRunning();
445 observer_list->RemoveObserver(&observer);
450 TEST(ObserverListThreadSafeTest, AddRemoveWithPendingNotifications) {
451 test::TaskEnvironment task_environment;
453 scoped_refptr<ObserverListThreadSafe<Foo>> observer_list(
454 new ObserverListThreadSafe<Foo>);
458 observer_list->AddObserver(&a);
459 observer_list->AddObserver(&b);
461 // Remove observer `a` while there is a pending notification for observer `a`.
462 observer_list->Notify(FROM_HERE, &Foo::Observe, 10);
463 observer_list->RemoveObserver(&a);
464 RunLoop().RunUntilIdle();
465 observer_list->AddObserver(&a);
467 EXPECT_EQ(0, a.total);
468 EXPECT_EQ(10, b.total);
470 // Remove and re-adding observer `a` while there is a pending notification for
471 // observer `a`. The notification to `a` must not be executed since it was
472 // sent before the removal of `a`.
473 observer_list->Notify(FROM_HERE, &Foo::Observe, 10);
474 observer_list->RemoveObserver(&a);
475 observer_list->AddObserver(&a);
476 RunLoop().RunUntilIdle();
478 EXPECT_EQ(0, a.total);
479 EXPECT_EQ(20, b.total);
481 // Observer `a` and `b` are present and should both receive a notification.
482 observer_list->RemoveObserver(&a);
483 observer_list->AddObserver(&a);
484 observer_list->Notify(FROM_HERE, &Foo::Observe, 10);
485 RunLoop().RunUntilIdle();
487 EXPECT_EQ(10, a.total);
488 EXPECT_EQ(30, b.total);
491 // Same as ObserverListTest.Existing, but for ObserverListThreadSafe
492 TEST(ObserverListThreadSafeTest, Existing) {
493 test::TaskEnvironment task_environment;
494 scoped_refptr<ObserverListThreadSafe<Foo>> observer_list(
495 new ObserverListThreadSafe<Foo>(ObserverListPolicy::EXISTING_ONLY));
497 AddInObserve b(observer_list.get());
501 observer_list->AddObserver(&a);
502 observer_list->AddObserver(&b);
504 observer_list->Notify(FROM_HERE, &Foo::Observe, 1);
505 RunLoop().RunUntilIdle();
507 EXPECT_FALSE(b.to_add_);
508 // B's adder should not have been notified because it was added during
510 EXPECT_EQ(0, c.total);
512 // Notify again to make sure b's adder is notified.
513 observer_list->Notify(FROM_HERE, &Foo::Observe, 1);
514 RunLoop().RunUntilIdle();
515 EXPECT_EQ(1, c.total);