Upstream version 7.36.149.0
[platform/framework/web/crosswalk.git] / src / mojo / common / handle_watcher.cc
1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "mojo/common/handle_watcher.h"
6
7 #include <map>
8
9 #include "base/atomic_sequence_num.h"
10 #include "base/bind.h"
11 #include "base/lazy_instance.h"
12 #include "base/memory/weak_ptr.h"
13 #include "base/message_loop/message_loop.h"
14 #include "base/message_loop/message_loop_proxy.h"
15 #include "base/synchronization/lock.h"
16 #include "base/threading/thread.h"
17 #include "base/time/time.h"
18 #include "mojo/common/environment_data.h"
19 #include "mojo/common/message_pump_mojo.h"
20 #include "mojo/common/message_pump_mojo_handler.h"
21 #include "mojo/common/time_helper.h"
22
23 namespace mojo {
24 namespace common {
25
26 typedef int WatcherID;
27
28 namespace {
29
30 const char kWatcherThreadName[] = "handle-watcher-thread";
31
32 const char kWatcherThreadManagerKey[] = "watcher-thread-manager";
33
34 // TODO(sky): this should be unnecessary once MessageLoop has been refactored.
35 MessagePumpMojo* message_pump_mojo = NULL;
36
37 scoped_ptr<base::MessagePump> CreateMessagePumpMojo() {
38   message_pump_mojo = new MessagePumpMojo;
39   return scoped_ptr<base::MessagePump>(message_pump_mojo).Pass();
40 }
41
42 base::TimeTicks MojoDeadlineToTimeTicks(MojoDeadline deadline) {
43   return deadline == MOJO_DEADLINE_INDEFINITE ? base::TimeTicks() :
44       internal::NowTicks() + base::TimeDelta::FromMicroseconds(deadline);
45 }
46
47 // Tracks the data for a single call to Start().
48 struct WatchData {
49   WatchData()
50       : id(0),
51         wait_flags(MOJO_WAIT_FLAG_NONE),
52         message_loop(NULL) {}
53
54   WatcherID id;
55   Handle handle;
56   MojoWaitFlags wait_flags;
57   base::TimeTicks deadline;
58   base::Callback<void(MojoResult)> callback;
59   scoped_refptr<base::MessageLoopProxy> message_loop;
60 };
61
62 // WatcherBackend --------------------------------------------------------------
63
64 // WatcherBackend is responsible for managing the requests and interacting with
65 // MessagePumpMojo. All access (outside of creation/destruction) is done on the
66 // thread WatcherThreadManager creates.
67 class WatcherBackend : public MessagePumpMojoHandler {
68  public:
69   WatcherBackend();
70   virtual ~WatcherBackend();
71
72   void StartWatching(const WatchData& data);
73   void StopWatching(WatcherID watcher_id);
74
75  private:
76   typedef std::map<Handle, WatchData> HandleToWatchDataMap;
77
78   // Invoked when a handle needs to be removed and notified.
79   void RemoveAndNotify(const Handle& handle, MojoResult result);
80
81   // Searches through |handle_to_data_| for |watcher_id|. Returns true if found
82   // and sets |handle| to the Handle. Returns false if not a known id.
83   bool GetMojoHandleByWatcherID(WatcherID watcher_id, Handle* handle) const;
84
85   // MessagePumpMojoHandler overrides:
86   virtual void OnHandleReady(const Handle& handle) OVERRIDE;
87   virtual void OnHandleError(const Handle& handle, MojoResult result) OVERRIDE;
88
89   // Maps from assigned id to WatchData.
90   HandleToWatchDataMap handle_to_data_;
91
92   DISALLOW_COPY_AND_ASSIGN(WatcherBackend);
93 };
94
95 WatcherBackend::WatcherBackend() {
96 }
97
98 WatcherBackend::~WatcherBackend() {
99 }
100
101 void WatcherBackend::StartWatching(const WatchData& data) {
102   RemoveAndNotify(data.handle, MOJO_RESULT_CANCELLED);
103
104   DCHECK_EQ(0u, handle_to_data_.count(data.handle));
105
106   handle_to_data_[data.handle] = data;
107   message_pump_mojo->AddHandler(this, data.handle,
108                                 data.wait_flags,
109                                 data.deadline);
110 }
111
112 void WatcherBackend::StopWatching(WatcherID watcher_id) {
113   // Because of the thread hop it is entirely possible to get here and not
114   // have a valid handle registered for |watcher_id|.
115   Handle handle;
116   if (!GetMojoHandleByWatcherID(watcher_id, &handle))
117     return;
118
119   handle_to_data_.erase(handle);
120   message_pump_mojo->RemoveHandler(handle);
121 }
122
123 void WatcherBackend::RemoveAndNotify(const Handle& handle,
124                                      MojoResult result) {
125   if (handle_to_data_.count(handle) == 0)
126     return;
127
128   const WatchData data(handle_to_data_[handle]);
129   handle_to_data_.erase(handle);
130   message_pump_mojo->RemoveHandler(handle);
131   data.message_loop->PostTask(FROM_HERE, base::Bind(data.callback, result));
132 }
133
134 bool WatcherBackend::GetMojoHandleByWatcherID(WatcherID watcher_id,
135                                               Handle* handle) const {
136   for (HandleToWatchDataMap::const_iterator i = handle_to_data_.begin();
137        i != handle_to_data_.end(); ++i) {
138     if (i->second.id == watcher_id) {
139       *handle = i->second.handle;
140       return true;
141     }
142   }
143   return false;
144 }
145
146 void WatcherBackend::OnHandleReady(const Handle& handle) {
147   RemoveAndNotify(handle, MOJO_RESULT_OK);
148 }
149
150 void WatcherBackend::OnHandleError(const Handle& handle, MojoResult result) {
151   RemoveAndNotify(handle, result);
152 }
153
154 // WatcherThreadManager --------------------------------------------------------
155
156 // WatcherThreadManager manages the background thread that listens for handles
157 // to be ready. All requests are handled by WatcherBackend.
158 class WatcherThreadManager {
159  public:
160   ~WatcherThreadManager();
161
162   // Returns the shared instance.
163   static WatcherThreadManager* GetInstance();
164
165   // Starts watching the requested handle. Returns a unique ID that is used to
166   // stop watching the handle. When the handle is ready |callback| is notified
167   // on the thread StartWatching() was invoked on.
168   // This may be invoked on any thread.
169   WatcherID StartWatching(const Handle& handle,
170                           MojoWaitFlags wait_flags,
171                           base::TimeTicks deadline,
172                           const base::Callback<void(MojoResult)>& callback);
173
174   // Stops watching a handle.
175   // This may be invoked on any thread.
176   void StopWatching(WatcherID watcher_id);
177
178  private:
179   WatcherThreadManager();
180
181   base::Thread thread_;
182
183   base::AtomicSequenceNumber watcher_id_generator_;
184
185   WatcherBackend backend_;
186
187   DISALLOW_COPY_AND_ASSIGN(WatcherThreadManager);
188 };
189
190 struct WatcherThreadManagerData : EnvironmentData::Data {
191   scoped_ptr<WatcherThreadManager> thread_manager;
192 };
193
194 WatcherThreadManager::~WatcherThreadManager() {
195   thread_.Stop();
196 }
197
198 static base::LazyInstance<base::Lock> thread_lookup_lock =
199     LAZY_INSTANCE_INITIALIZER;
200
201 WatcherThreadManager* WatcherThreadManager::GetInstance() {
202   base::AutoLock auto_lock(thread_lookup_lock.Get());
203   WatcherThreadManagerData* data = static_cast<WatcherThreadManagerData*>(
204       EnvironmentData::GetInstance()->GetData(kWatcherThreadManagerKey));
205   if (!data) {
206     data = new WatcherThreadManagerData;
207     data->thread_manager.reset(new WatcherThreadManager);
208     EnvironmentData::GetInstance()->SetData(
209         kWatcherThreadManagerKey,
210         scoped_ptr<EnvironmentData::Data>(data));
211   }
212   return data->thread_manager.get();
213 }
214
215 WatcherID WatcherThreadManager::StartWatching(
216     const Handle& handle,
217     MojoWaitFlags wait_flags,
218     base::TimeTicks deadline,
219     const base::Callback<void(MojoResult)>& callback) {
220   WatchData data;
221   data.id = watcher_id_generator_.GetNext();
222   data.handle = handle;
223   data.callback = callback;
224   data.wait_flags = wait_flags;
225   data.deadline = deadline;
226   data.message_loop = base::MessageLoopProxy::current();
227   DCHECK_NE(static_cast<base::MessageLoopProxy*>(NULL),
228             data.message_loop.get());
229   // We outlive |thread_|, so it's safe to use Unretained() here.
230   thread_.message_loop()->PostTask(
231       FROM_HERE,
232       base::Bind(&WatcherBackend::StartWatching,
233                  base::Unretained(&backend_),
234                  data));
235   return data.id;
236 }
237
238 void WatcherThreadManager::StopWatching(WatcherID watcher_id) {
239   // We outlive |thread_|, so it's safe to use Unretained() here.
240   thread_.message_loop()->PostTask(
241       FROM_HERE,
242       base::Bind(&WatcherBackend::StopWatching,
243                  base::Unretained(&backend_),
244                  watcher_id));
245 }
246
247 WatcherThreadManager::WatcherThreadManager()
248     : thread_(kWatcherThreadName) {
249   base::Thread::Options thread_options;
250   thread_options.message_pump_factory = base::Bind(&CreateMessagePumpMojo);
251   thread_.StartWithOptions(thread_options);
252 }
253
254 }  // namespace
255
256 // HandleWatcher::StartState ---------------------------------------------------
257
258 // Contains the information passed to Start().
259 struct HandleWatcher::StartState {
260   explicit StartState(HandleWatcher* watcher) : weak_factory(watcher) {
261   }
262
263   ~StartState() {
264   }
265
266   // ID assigned by WatcherThreadManager.
267   WatcherID watcher_id;
268
269   // Callback to notify when done.
270   base::Callback<void(MojoResult)> callback;
271
272   // When Start() is invoked a callback is passed to WatcherThreadManager
273   // using a WeakRef from |weak_refactory_|. The callback invokes
274   // OnHandleReady() (on the thread Start() is invoked from) which in turn
275   // notifies |callback_|. Doing this allows us to reset state when the handle
276   // is ready, and then notify the callback. Doing this also means Stop()
277   // cancels any pending callbacks that may be inflight.
278   base::WeakPtrFactory<HandleWatcher> weak_factory;
279 };
280
281 // HandleWatcher ---------------------------------------------------------------
282
283 HandleWatcher::HandleWatcher() {
284 }
285
286 HandleWatcher::~HandleWatcher() {
287   Stop();
288 }
289
290 void HandleWatcher::Start(const Handle& handle,
291                           MojoWaitFlags wait_flags,
292                           MojoDeadline deadline,
293                           const base::Callback<void(MojoResult)>& callback) {
294   DCHECK(handle.is_valid());
295   DCHECK_NE(MOJO_WAIT_FLAG_NONE, wait_flags);
296
297   Stop();
298
299   start_state_.reset(new StartState(this));
300   start_state_->callback = callback;
301   start_state_->watcher_id =
302       WatcherThreadManager::GetInstance()->StartWatching(
303           handle,
304           wait_flags,
305           MojoDeadlineToTimeTicks(deadline),
306           base::Bind(&HandleWatcher::OnHandleReady,
307                      start_state_->weak_factory.GetWeakPtr()));
308 }
309
310 void HandleWatcher::Stop() {
311   if (!start_state_.get())
312     return;
313
314   scoped_ptr<StartState> old_state(start_state_.Pass());
315   WatcherThreadManager::GetInstance()->StopWatching(old_state->watcher_id);
316 }
317
318 void HandleWatcher::OnHandleReady(MojoResult result) {
319   DCHECK(start_state_.get());
320   scoped_ptr<StartState> old_state(start_state_.Pass());
321   old_state->callback.Run(result);
322
323   // NOTE: We may have been deleted during callback execution.
324 }
325
326 }  // namespace common
327 }  // namespace mojo