1 // Copyright 2018 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "base/observer_list_threadsafe.h"
11 #include "base/compiler_specific.h"
12 #include "base/functional/bind.h"
13 #include "base/location.h"
14 #include "base/logging.h"
15 #include "base/memory/raw_ptr.h"
16 #include "base/memory/weak_ptr.h"
17 #include "base/run_loop.h"
18 #include "base/synchronization/waitable_event.h"
19 #include "base/task/sequenced_task_runner.h"
20 #include "base/task/single_thread_task_runner.h"
21 #include "base/task/thread_pool.h"
22 #include "base/task/thread_pool/thread_pool_instance.h"
23 #include "base/test/bind.h"
24 #include "base/test/gtest_util.h"
25 #include "base/test/task_environment.h"
26 #include "base/test/test_waitable_event.h"
27 #include "base/threading/platform_thread.h"
28 #include "base/threading/thread_restrictions.h"
29 #include "build/build_config.h"
30 #include "testing/gtest/include/gtest/gtest.h"
31 #include "third_party/abseil-cpp/absl/types/optional.h"
36 constexpr int kThreadRunTime = 1000; // ms to run the multi-threaded test.
40 virtual void Observe(int x) = 0;
41 virtual ~Foo() = default;
42 virtual int GetValue() const { return 0; }
45 class Adder : public Foo {
47 explicit Adder(int scaler) : total(0), scaler_(scaler) {}
48 ~Adder() override = default;
50 void Observe(int x) override { total += x * scaler_; }
51 int GetValue() const override { return total; }
59 class AddInObserve : public Foo {
61 explicit AddInObserve(ObserverListThreadSafe<Foo>* observer_list)
62 : observer_list(observer_list), to_add_() {}
64 void SetToAdd(Foo* to_add) { to_add_ = to_add; }
66 void Observe(int x) override {
68 observer_list->AddObserver(to_add_.get());
73 raw_ptr<ObserverListThreadSafe<Foo>> observer_list;
77 // A task for use in the ThreadSafeObserver test which will add and remove
78 // itself from the notification list repeatedly.
79 template <RemoveObserverPolicy RemovePolicy =
80 RemoveObserverPolicy::kAnySequence>
81 class AddRemoveThread : public Foo {
82 using Self = AddRemoveThread<RemovePolicy>;
83 using ObserverList = ObserverListThreadSafe<Foo, RemovePolicy>;
86 AddRemoveThread(ObserverList* list,
88 scoped_refptr<SingleThreadTaskRunner> removal_task_runner)
90 task_runner_(ThreadPool::CreateSingleThreadTaskRunner(
92 SingleThreadTaskRunnerThreadMode::DEDICATED)),
93 removal_task_runner_(std::move(removal_task_runner)),
96 do_notifies_(notify) {
97 task_runner_->PostTask(
98 FROM_HERE, base::BindOnce(&Self::AddTask, weak_factory_.GetWeakPtr()));
101 ~AddRemoveThread() override = default;
103 // This task just keeps posting to itself in an attempt to race with the
106 if ((Time::Now() - start_).InMilliseconds() > kThreadRunTime) {
112 list_->AddObserver(this);
117 list_->Notify(FROM_HERE, &Foo::Observe, 10);
120 SingleThreadTaskRunner::GetCurrentDefault()->PostTask(
121 FROM_HERE, base::BindOnce(&Self::AddTask, weak_factory_.GetWeakPtr()));
125 list_->RemoveObserver(this);
129 void Observe(int x) override {
130 // If we're getting called after we removed ourselves from the list, that is
132 EXPECT_TRUE(in_list_);
134 // This callback should fire on the appropriate thread
135 EXPECT_TRUE(task_runner_->BelongsToCurrentThread());
137 if (removal_task_runner_) {
138 // Remove the observer on a different thread, blocking the current thread
139 // until it's removed. Unretained is safe since the pointers are valid
140 // until the thread is unblocked.
141 base::TestWaitableEvent event;
142 removal_task_runner_->PostTask(
143 FROM_HERE, base::BindOnce(&Self::RemoveTask, base::Unretained(this))
144 .Then(base::BindOnce(&base::TestWaitableEvent::Signal,
145 base::Unretained(&event))));
148 // Remove the observer on the same thread.
153 scoped_refptr<SingleThreadTaskRunner> task_runner() const {
158 raw_ptr<ObserverList> list_;
159 scoped_refptr<SingleThreadTaskRunner> task_runner_;
160 // Optional task runner used to remove observers. This will be the main task
161 // runner of a different AddRemoveThread.
162 scoped_refptr<SingleThreadTaskRunner> removal_task_runner_;
163 bool in_list_; // Are we currently registered for notifications.
164 // in_list_ is only used on |this| thread.
165 Time start_; // The time we started the test.
167 bool do_notifies_; // Whether these threads should do notifications.
169 base::WeakPtrFactory<Self> weak_factory_{this};
174 TEST(ObserverListThreadSafeTest, BasicTest) {
175 using List = ObserverListThreadSafe<Foo>;
176 test::TaskEnvironment task_environment;
178 scoped_refptr<List> observer_list(new List);
184 List::AddObserverResult result;
186 result = observer_list->AddObserver(&a);
187 EXPECT_EQ(result, List::AddObserverResult::kBecameNonEmpty);
188 result = observer_list->AddObserver(&b);
189 EXPECT_EQ(result, List::AddObserverResult::kWasAlreadyNonEmpty);
191 observer_list->Notify(FROM_HERE, &Foo::Observe, 10);
192 RunLoop().RunUntilIdle();
194 result = observer_list->AddObserver(&c);
195 EXPECT_EQ(result, List::AddObserverResult::kWasAlreadyNonEmpty);
196 result = observer_list->AddObserver(&d);
197 EXPECT_EQ(result, List::AddObserverResult::kWasAlreadyNonEmpty);
199 observer_list->Notify(FROM_HERE, &Foo::Observe, 10);
200 observer_list->RemoveObserver(&c);
201 RunLoop().RunUntilIdle();
203 EXPECT_EQ(20, a.total);
204 EXPECT_EQ(-20, b.total);
205 EXPECT_EQ(0, c.total);
206 EXPECT_EQ(-10, d.total);
209 TEST(ObserverListThreadSafeTest, RemoveObserver) {
210 using List = ObserverListThreadSafe<Foo>;
211 test::TaskEnvironment task_environment;
213 scoped_refptr<List> observer_list(new List);
216 // A workaround for the compiler bug. See http://crbug.com/121960.
219 List::RemoveObserverResult result;
221 // Should do nothing.
222 result = observer_list->RemoveObserver(&a);
223 EXPECT_EQ(result, List::RemoveObserverResult::kWasOrBecameEmpty);
224 result = observer_list->RemoveObserver(&b);
225 EXPECT_EQ(result, List::RemoveObserverResult::kWasOrBecameEmpty);
227 observer_list->Notify(FROM_HERE, &Foo::Observe, 10);
228 RunLoop().RunUntilIdle();
230 EXPECT_EQ(0, a.total);
231 EXPECT_EQ(0, b.total);
233 observer_list->AddObserver(&a);
235 // Should also do nothing.
236 result = observer_list->RemoveObserver(&b);
237 EXPECT_EQ(result, List::RemoveObserverResult::kRemainsNonEmpty);
239 observer_list->Notify(FROM_HERE, &Foo::Observe, 10);
240 RunLoop().RunUntilIdle();
242 EXPECT_EQ(10, a.total);
243 EXPECT_EQ(0, b.total);
245 result = observer_list->RemoveObserver(&a);
246 EXPECT_EQ(result, List::RemoveObserverResult::kWasOrBecameEmpty);
249 class FooRemover : public Foo {
251 explicit FooRemover(ObserverListThreadSafe<Foo>* list) : list_(list) {}
252 ~FooRemover() override = default;
254 void AddFooToRemove(Foo* foo) { foos_.push_back(foo); }
256 void Observe(int x) override {
257 std::vector<Foo*> tmp;
259 for (auto* it : tmp) {
260 list_->RemoveObserver(it);
265 const scoped_refptr<ObserverListThreadSafe<Foo>> list_;
266 std::vector<Foo*> foos_;
269 TEST(ObserverListThreadSafeTest, RemoveMultipleObservers) {
270 test::TaskEnvironment task_environment;
271 scoped_refptr<ObserverListThreadSafe<Foo>> observer_list(
272 new ObserverListThreadSafe<Foo>);
274 FooRemover a(observer_list.get());
277 observer_list->AddObserver(&a);
278 observer_list->AddObserver(&b);
280 a.AddFooToRemove(&a);
281 a.AddFooToRemove(&b);
283 observer_list->Notify(FROM_HERE, &Foo::Observe, 1);
284 RunLoop().RunUntilIdle();
287 // A test driver for a multi-threaded notification loop. Runs a number of
288 // observer threads, each of which constantly adds/removes itself from the
289 // observer list. Optionally, if `cross_thread_notifies` is set to true, the
290 // observer threads will also trigger notifications to all observers, and if
291 // `cross_thread_removes` is set to true, the observer threads will also remove
292 // observers added by other threads.
294 RemoveObserverPolicy RemovePolicy = RemoveObserverPolicy::kAnySequence>
295 static void ThreadSafeObserverHarness(int num_threads,
296 bool cross_thread_notifies = false,
297 bool cross_thread_removes = false) {
298 test::TaskEnvironment task_environment;
301 base::MakeRefCounted<ObserverListThreadSafe<Foo, RemovePolicy>>();
306 observer_list->AddObserver(&a);
307 observer_list->AddObserver(&b);
309 using TestThread = AddRemoveThread<RemovePolicy>;
310 std::vector<std::unique_ptr<TestThread>> threaded_observers;
311 threaded_observers.reserve(num_threads);
312 scoped_refptr<SingleThreadTaskRunner> removal_task_runner;
313 for (int index = 0; index < num_threads; index++) {
314 auto add_remove_thread =
315 std::make_unique<TestThread>(observer_list.get(), cross_thread_notifies,
316 std::move(removal_task_runner));
317 if (cross_thread_removes) {
318 // Save the task runner to pass to the next thread.
319 removal_task_runner = add_remove_thread->task_runner();
321 threaded_observers.push_back(std::move(add_remove_thread));
323 ASSERT_EQ(static_cast<size_t>(num_threads), threaded_observers.size());
325 Time start = Time::Now();
327 if ((Time::Now() - start).InMilliseconds() > kThreadRunTime)
330 observer_list->Notify(FROM_HERE, &Foo::Observe, 10);
332 RunLoop().RunUntilIdle();
335 task_environment.RunUntilIdle();
338 TEST(ObserverListThreadSafeTest, CrossThreadObserver) {
339 // Use 7 observer threads. Notifications only come from the main thread.
340 ThreadSafeObserverHarness(7);
343 TEST(ObserverListThreadSafeTest, CrossThreadNotifications) {
344 // Use 3 observer threads. Notifications will fire from the main thread and
345 // all 3 observer threads.
346 ThreadSafeObserverHarness(3, /*cross_thread_notifies=*/true);
349 TEST(ObserverListThreadSafeTest, CrossThreadRemoval) {
350 // Use 3 observer threads. Observers can be removed from any thread.
351 ThreadSafeObserverHarness(3, /*cross_thread_notifies=*/true,
352 /*cross_thread_removes=*/true);
355 TEST(ObserverListThreadSafeTest, CrossThreadRemovalRestricted) {
356 // Use 3 observer threads. Observers must be removed from the thread that
357 // added them. This should succeed because the test doesn't break that
359 ThreadSafeObserverHarness<RemoveObserverPolicy::kAddingSequenceOnly>(
360 3, /*cross_thread_notifies=*/true, /*cross_thread_removes=*/false);
363 TEST(ObserverListThreadSafeDeathTest, CrossThreadRemovalRestricted) {
364 // Use 3 observer threads. Observers must be removed from the thread that
365 // added them. This should CHECK because the test breaks that restriction.
367 ThreadSafeObserverHarness<RemoveObserverPolicy::kAddingSequenceOnly>(
368 3, /*cross_thread_notifies=*/true, /*cross_thread_removes=*/true));
371 TEST(ObserverListThreadSafeTest, OutlivesTaskEnvironment) {
372 absl::optional<test::TaskEnvironment> task_environment(absl::in_place);
373 auto observer_list = base::MakeRefCounted<ObserverListThreadSafe<Foo>>();
376 observer_list->AddObserver(&a);
377 task_environment.reset();
378 // Test passes if we don't crash here.
379 observer_list->Notify(FROM_HERE, &Foo::Observe, 1);
380 observer_list->RemoveObserver(&a);
383 TEST(ObserverListThreadSafeTest, OutlivesTaskEnvironmentRemovalRestricted) {
384 absl::optional<test::TaskEnvironment> task_environment(absl::in_place);
385 auto observer_list = base::MakeRefCounted<
386 ObserverListThreadSafe<Foo, RemoveObserverPolicy::kAddingSequenceOnly>>();
389 observer_list->AddObserver(&a);
390 task_environment.reset();
391 // Test passes if we don't crash here.
392 observer_list->Notify(FROM_HERE, &Foo::Observe, 1);
393 observer_list->RemoveObserver(&a);
398 class SequenceVerificationObserver : public Foo {
400 explicit SequenceVerificationObserver(
401 scoped_refptr<SequencedTaskRunner> task_runner)
402 : task_runner_(std::move(task_runner)) {}
403 SequenceVerificationObserver(const SequenceVerificationObserver&) = delete;
404 SequenceVerificationObserver& operator=(const SequenceVerificationObserver&) =
406 ~SequenceVerificationObserver() override = default;
408 void Observe(int x) override {
409 called_on_valid_sequence_ = task_runner_->RunsTasksInCurrentSequence();
412 bool called_on_valid_sequence() const { return called_on_valid_sequence_; }
415 const scoped_refptr<SequencedTaskRunner> task_runner_;
416 bool called_on_valid_sequence_ = false;
421 // Verify that observers are notified on the correct sequence.
422 TEST(ObserverListThreadSafeTest, NotificationOnValidSequence) {
423 test::TaskEnvironment task_environment;
425 auto task_runner_1 = ThreadPool::CreateSequencedTaskRunner({});
426 auto task_runner_2 = ThreadPool::ThreadPool::CreateSequencedTaskRunner({});
428 auto observer_list = MakeRefCounted<ObserverListThreadSafe<Foo>>();
430 SequenceVerificationObserver observer_1(task_runner_1);
431 SequenceVerificationObserver observer_2(task_runner_2);
433 task_runner_1->PostTask(
435 BindOnce(base::IgnoreResult(&ObserverListThreadSafe<Foo>::AddObserver),
436 observer_list, Unretained(&observer_1)));
437 task_runner_2->PostTask(
439 BindOnce(base::IgnoreResult(&ObserverListThreadSafe<Foo>::AddObserver),
440 observer_list, Unretained(&observer_2)));
442 ThreadPoolInstance::Get()->FlushForTesting();
444 observer_list->Notify(FROM_HERE, &Foo::Observe, 1);
446 ThreadPoolInstance::Get()->FlushForTesting();
448 EXPECT_TRUE(observer_1.called_on_valid_sequence());
449 EXPECT_TRUE(observer_2.called_on_valid_sequence());
452 // Verify that when an observer is added to a NOTIFY_ALL ObserverListThreadSafe
453 // from a notification, it is itself notified.
454 TEST(ObserverListThreadSafeTest, AddObserverFromNotificationNotifyAll) {
455 test::TaskEnvironment task_environment;
456 auto observer_list = MakeRefCounted<ObserverListThreadSafe<Foo>>();
458 Adder observer_added_from_notification(1);
460 AddInObserve initial_observer(observer_list.get());
461 initial_observer.SetToAdd(&observer_added_from_notification);
462 observer_list->AddObserver(&initial_observer);
464 observer_list->Notify(FROM_HERE, &Foo::Observe, 1);
466 base::RunLoop().RunUntilIdle();
468 EXPECT_EQ(1, observer_added_from_notification.GetValue());
473 class RemoveWhileNotificationIsRunningObserver : public Foo {
475 RemoveWhileNotificationIsRunningObserver()
476 : notification_running_(WaitableEvent::ResetPolicy::AUTOMATIC,
477 WaitableEvent::InitialState::NOT_SIGNALED),
478 barrier_(WaitableEvent::ResetPolicy::AUTOMATIC,
479 WaitableEvent::InitialState::NOT_SIGNALED) {}
480 RemoveWhileNotificationIsRunningObserver(
481 const RemoveWhileNotificationIsRunningObserver&) = delete;
482 RemoveWhileNotificationIsRunningObserver& operator=(
483 const RemoveWhileNotificationIsRunningObserver&) = delete;
484 ~RemoveWhileNotificationIsRunningObserver() override = default;
486 void Observe(int x) override {
487 notification_running_.Signal();
488 ScopedAllowBaseSyncPrimitivesForTesting allow_base_sync_primitives;
492 void WaitForNotificationRunning() { notification_running_.Wait(); }
493 void Unblock() { barrier_.Signal(); }
496 WaitableEvent notification_running_;
497 WaitableEvent barrier_;
502 // Verify that there is no crash when an observer is removed while it is being
504 TEST(ObserverListThreadSafeTest, RemoveWhileNotificationIsRunning) {
505 auto observer_list = MakeRefCounted<ObserverListThreadSafe<Foo>>();
506 RemoveWhileNotificationIsRunningObserver observer;
508 WaitableEvent task_running(WaitableEvent::ResetPolicy::AUTOMATIC,
509 WaitableEvent::InitialState::NOT_SIGNALED);
510 WaitableEvent barrier(WaitableEvent::ResetPolicy::AUTOMATIC,
511 WaitableEvent::InitialState::NOT_SIGNALED);
513 // This must be after the declaration of |barrier| so that tasks posted to
514 // ThreadPool can safely use |barrier|.
515 test::TaskEnvironment task_environment;
517 ThreadPool::CreateSequencedTaskRunner({MayBlock()})
518 ->PostTask(FROM_HERE,
519 base::BindOnce(base::IgnoreResult(
520 &ObserverListThreadSafe<Foo>::AddObserver),
521 observer_list, Unretained(&observer)));
522 ThreadPoolInstance::Get()->FlushForTesting();
524 observer_list->Notify(FROM_HERE, &Foo::Observe, 1);
525 observer.WaitForNotificationRunning();
526 observer_list->RemoveObserver(&observer);
531 TEST(ObserverListThreadSafeTest, AddRemoveWithPendingNotifications) {
532 test::TaskEnvironment task_environment;
534 scoped_refptr<ObserverListThreadSafe<Foo>> observer_list(
535 new ObserverListThreadSafe<Foo>);
539 observer_list->AddObserver(&a);
540 observer_list->AddObserver(&b);
542 // Remove observer `a` while there is a pending notification for observer `a`.
543 observer_list->Notify(FROM_HERE, &Foo::Observe, 10);
544 observer_list->RemoveObserver(&a);
545 RunLoop().RunUntilIdle();
546 observer_list->AddObserver(&a);
548 EXPECT_EQ(0, a.total);
549 EXPECT_EQ(10, b.total);
551 // Remove and re-adding observer `a` while there is a pending notification for
552 // observer `a`. The notification to `a` must not be executed since it was
553 // sent before the removal of `a`.
554 observer_list->Notify(FROM_HERE, &Foo::Observe, 10);
555 observer_list->RemoveObserver(&a);
556 observer_list->AddObserver(&a);
557 RunLoop().RunUntilIdle();
559 EXPECT_EQ(0, a.total);
560 EXPECT_EQ(20, b.total);
562 // Observer `a` and `b` are present and should both receive a notification.
563 observer_list->RemoveObserver(&a);
564 observer_list->AddObserver(&a);
565 observer_list->Notify(FROM_HERE, &Foo::Observe, 10);
566 RunLoop().RunUntilIdle();
568 EXPECT_EQ(10, a.total);
569 EXPECT_EQ(30, b.total);
572 // Same as ObserverListTest.Existing, but for ObserverListThreadSafe
573 TEST(ObserverListThreadSafeTest, Existing) {
574 test::TaskEnvironment task_environment;
575 scoped_refptr<ObserverListThreadSafe<Foo>> observer_list(
576 new ObserverListThreadSafe<Foo>(ObserverListPolicy::EXISTING_ONLY));
578 AddInObserve b(observer_list.get());
582 observer_list->AddObserver(&a);
583 observer_list->AddObserver(&b);
585 observer_list->Notify(FROM_HERE, &Foo::Observe, 1);
586 RunLoop().RunUntilIdle();
588 EXPECT_FALSE(b.to_add_);
589 // B's adder should not have been notified because it was added during
591 EXPECT_EQ(0, c.total);
593 // Notify again to make sure b's adder is notified.
594 observer_list->Notify(FROM_HERE, &Foo::Observe, 1);
595 RunLoop().RunUntilIdle();
596 EXPECT_EQ(1, c.total);