1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #ifndef BASE_OBSERVER_LIST_THREADSAFE_H_
6 #define BASE_OBSERVER_LIST_THREADSAFE_H_
8 #include <unordered_map>
11 #include "base/base_export.h"
12 #include "base/bind.h"
13 #include "base/check_op.h"
14 #include "base/containers/contains.h"
15 #include "base/lazy_instance.h"
16 #include "base/location.h"
17 #include "base/memory/ref_counted.h"
18 #include "base/observer_list.h"
19 #include "base/sequenced_task_runner.h"
20 #include "base/synchronization/lock.h"
21 #include "base/threading/sequenced_task_runner_handle.h"
22 #include "base/threading/thread_local.h"
23 #include "build/build_config.h"
25 ///////////////////////////////////////////////////////////////////////////////
29 // A thread-safe container for a list of observers. This is similar to the
30 // observer_list (see observer_list.h), but it is more robust for multi-
31 // threaded situations.
33 // The following use cases are supported:
34 // * Observers can register for notifications from any sequence. They are
35 // always notified on the sequence from which they were registered.
36 // * Any sequence may trigger a notification via Notify().
37 // * Observers can remove themselves from the observer list inside of a
39 // * If one sequence is notifying observers concurrently with an observer
40 // removing itself from the observer list, the notifications will be
43 // The drawback of the threadsafe observer list is that notifications are not
44 // as real-time as the non-threadsafe version of this class. Notifications
45 // will always be done via PostTask() to another sequence, whereas with the
46 // non-thread-safe ObserverList, notifications happen synchronously.
48 // Note: this class previously supported synchronous notifications for
49 // same-sequence observers, but it was error-prone and removed in
50 // crbug.com/1193750, think twice before re-considering this paradigm.
52 ///////////////////////////////////////////////////////////////////////////////
57 class BASE_EXPORT ObserverListThreadSafeBase
58 : public RefCountedThreadSafe<ObserverListThreadSafeBase> {
60 ObserverListThreadSafeBase() = default;
61 ObserverListThreadSafeBase(const ObserverListThreadSafeBase&) = delete;
62 ObserverListThreadSafeBase& operator=(const ObserverListThreadSafeBase&) =
66 template <typename ObserverType, typename Method>
69 template <typename ObserverType, typename ReceiverType, typename... Params>
70 struct Dispatcher<ObserverType, void (ReceiverType::*)(Params...)> {
71 static void Run(void (ReceiverType::*m)(Params...),
74 (obj->*m)(std::forward<Params>(params)...);
78 struct NotificationDataBase {
79 NotificationDataBase(void* observer_list_in, const Location& from_here_in)
80 : observer_list(observer_list_in), from_here(from_here_in) {}
86 virtual ~ObserverListThreadSafeBase() = default;
88 static LazyInstance<ThreadLocalPointer<const NotificationDataBase>>::Leaky
89 tls_current_notification_;
92 friend class RefCountedThreadSafe<ObserverListThreadSafeBase>;
95 } // namespace internal
97 template <class ObserverType>
98 class ObserverListThreadSafe : public internal::ObserverListThreadSafeBase {
100 enum class AddObserverResult {
104 enum class RemoveObserverResult {
109 ObserverListThreadSafe() = default;
110 explicit ObserverListThreadSafe(ObserverListPolicy policy)
112 ObserverListThreadSafe(const ObserverListThreadSafe&) = delete;
113 ObserverListThreadSafe& operator=(const ObserverListThreadSafe&) = delete;
115 // Adds |observer| to the list. |observer| must not already be in the list.
116 AddObserverResult AddObserver(ObserverType* observer) {
117 DCHECK(SequencedTaskRunnerHandle::IsSet())
118 << "An observer can only be registered when SequencedTaskRunnerHandle "
119 "is set. If this is in a unit test, you're likely merely missing a "
120 "base::test::(SingleThread)TaskEnvironment in your fixture. "
121 "Otherwise, try running this code on a named thread (main/UI/IO) or "
122 "from a task posted to a base::SequencedTaskRunner or "
123 "base::SingleThreadTaskRunner.";
125 AutoLock auto_lock(lock_);
127 bool was_empty = observers_.empty();
129 // Add |observer| to the list of observers.
130 DCHECK(!Contains(observers_, observer));
131 const scoped_refptr<SequencedTaskRunner> task_runner =
132 SequencedTaskRunnerHandle::Get();
133 // Each observer gets a unique identifier. These unique identifiers are used
134 // to avoid execution of pending posted-tasks over removed or released
136 const size_t observer_id = ++observer_id_counter_;
137 ObserverTaskRunnerInfo task_info = {task_runner, observer_id};
138 observers_[observer] = std::move(task_info);
140 // If this is called while a notification is being dispatched on this thread
141 // and |policy_| is ALL, |observer| must be notified (if a notification is
142 // being dispatched on another thread in parallel, the notification may or
143 // may not make it to |observer| depending on the outcome of the race to
145 if (policy_ == ObserverListPolicy::ALL) {
146 const NotificationDataBase* current_notification =
147 tls_current_notification_.Get().Get();
148 if (current_notification && current_notification->observer_list == this) {
149 const NotificationData* notification_data =
150 static_cast<const NotificationData*>(current_notification);
151 task_runner->PostTask(
152 current_notification->from_here,
153 BindOnce(&ObserverListThreadSafe<ObserverType>::NotifyWrapper, this,
155 NotificationData(this, observer_id,
156 current_notification->from_here,
157 notification_data->method)));
161 return was_empty ? AddObserverResult::kBecameNonEmpty
162 : AddObserverResult::kWasAlreadyNonEmpty;
165 // Remove an observer from the list if it is in the list.
167 // If a notification was sent to the observer but hasn't started to run yet,
168 // it will be aborted. If a notification has started to run, removing the
169 // observer won't stop it.
170 RemoveObserverResult RemoveObserver(ObserverType* observer) {
171 AutoLock auto_lock(lock_);
172 observers_.erase(observer);
173 return observers_.empty() ? RemoveObserverResult::kWasOrBecameEmpty
174 : RemoveObserverResult::kRemainsNonEmpty;
177 // Verifies that the list is currently empty (i.e. there are no observers).
178 void AssertEmpty() const {
180 AutoLock auto_lock(lock_);
181 DCHECK(observers_.empty());
185 // Asynchronously invokes a callback on all observers, on their registration
186 // sequence. You cannot assume that at the completion of the Notify call that
187 // all Observers have been Notified. The notification may still be pending
189 template <typename Method, typename... Params>
190 void Notify(const Location& from_here, Method m, Params&&... params) {
191 RepeatingCallback<void(ObserverType*)> method =
192 BindRepeating(&Dispatcher<ObserverType, Method>::Run, m,
193 std::forward<Params>(params)...);
195 AutoLock lock(lock_);
196 for (const auto& observer : observers_) {
197 observer.second.task_runner->PostTask(
199 BindOnce(&ObserverListThreadSafe<ObserverType>::NotifyWrapper, this,
201 NotificationData(this, observer.second.observer_id,
202 from_here, method)));
207 friend class RefCountedThreadSafe<ObserverListThreadSafeBase>;
209 struct NotificationData : public NotificationDataBase {
210 NotificationData(ObserverListThreadSafe* observer_list_in,
211 size_t observer_id_in,
212 const Location& from_here_in,
213 const RepeatingCallback<void(ObserverType*)>& method_in)
214 : NotificationDataBase(observer_list_in, from_here_in),
216 observer_id(observer_id_in) {}
218 RepeatingCallback<void(ObserverType*)> method;
222 ~ObserverListThreadSafe() override = default;
224 void NotifyWrapper(ObserverType* observer,
225 const NotificationData& notification) {
227 AutoLock auto_lock(lock_);
229 // Check whether the observer still needs a notification.
230 DCHECK_EQ(notification.observer_list, this);
231 auto it = observers_.find(observer);
232 if (it == observers_.end() ||
233 it->second.observer_id != notification.observer_id) {
236 DCHECK(it->second.task_runner->RunsTasksInCurrentSequence());
239 // Keep track of the notification being dispatched on the current thread.
240 // This will be used if the callback below calls AddObserver().
242 // Note: |tls_current_notification_| may not be nullptr if this runs in a
243 // nested loop started by a notification callback. In that case, it is
244 // important to save the previous value to restore it later.
245 auto& tls_current_notification = tls_current_notification_.Get();
246 const NotificationDataBase* const previous_notification =
247 tls_current_notification.Get();
248 tls_current_notification.Set(¬ification);
250 // Invoke the callback.
251 notification.method.Run(observer);
253 // Reset the notification being dispatched on the current thread to its
255 tls_current_notification.Set(previous_notification);
258 const ObserverListPolicy policy_ = ObserverListPolicy::ALL;
262 size_t observer_id_counter_ GUARDED_BY(lock_) = 0;
264 struct ObserverTaskRunnerInfo {
265 scoped_refptr<SequencedTaskRunner> task_runner;
266 size_t observer_id = 0;
269 // Keys are observers. Values are the SequencedTaskRunners on which they must
271 std::unordered_map<ObserverType*, ObserverTaskRunnerInfo> observers_
277 #endif // BASE_OBSERVER_LIST_THREADSAFE_H_