Upstream version 8.37.180.0
[platform/framework/web/crosswalk.git] / src / mojo / common / handle_watcher.cc
1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "mojo/common/handle_watcher.h"
6
7 #include <map>
8
9 #include "base/atomic_sequence_num.h"
10 #include "base/bind.h"
11 #include "base/lazy_instance.h"
12 #include "base/memory/singleton.h"
13 #include "base/memory/weak_ptr.h"
14 #include "base/message_loop/message_loop.h"
15 #include "base/message_loop/message_loop_proxy.h"
16 #include "base/synchronization/lock.h"
17 #include "base/threading/thread.h"
18 #include "base/time/time.h"
19 #include "mojo/common/message_pump_mojo.h"
20 #include "mojo/common/message_pump_mojo_handler.h"
21 #include "mojo/common/time_helper.h"
22
23 namespace mojo {
24 namespace common {
25
26 typedef int WatcherID;
27
28 namespace {
29
30 const char kWatcherThreadName[] = "handle-watcher-thread";
31
32 // TODO(sky): this should be unnecessary once MessageLoop has been refactored.
33 MessagePumpMojo* message_pump_mojo = NULL;
34
35 scoped_ptr<base::MessagePump> CreateMessagePumpMojo() {
36   message_pump_mojo = new MessagePumpMojo;
37   return scoped_ptr<base::MessagePump>(message_pump_mojo).Pass();
38 }
39
40 base::TimeTicks MojoDeadlineToTimeTicks(MojoDeadline deadline) {
41   return deadline == MOJO_DEADLINE_INDEFINITE ? base::TimeTicks() :
42       internal::NowTicks() + base::TimeDelta::FromMicroseconds(deadline);
43 }
44
45 // Tracks the data for a single call to Start().
46 struct WatchData {
47   WatchData()
48       : id(0),
49         handle_signals(MOJO_HANDLE_SIGNAL_NONE),
50         message_loop(NULL) {}
51
52   WatcherID id;
53   Handle handle;
54   MojoHandleSignals handle_signals;
55   base::TimeTicks deadline;
56   base::Callback<void(MojoResult)> callback;
57   scoped_refptr<base::MessageLoopProxy> message_loop;
58 };
59
60 // WatcherBackend --------------------------------------------------------------
61
62 // WatcherBackend is responsible for managing the requests and interacting with
63 // MessagePumpMojo. All access (outside of creation/destruction) is done on the
64 // thread WatcherThreadManager creates.
65 class WatcherBackend : public MessagePumpMojoHandler {
66  public:
67   WatcherBackend();
68   virtual ~WatcherBackend();
69
70   void StartWatching(const WatchData& data);
71   void StopWatching(WatcherID watcher_id);
72
73  private:
74   typedef std::map<Handle, WatchData> HandleToWatchDataMap;
75
76   // Invoked when a handle needs to be removed and notified.
77   void RemoveAndNotify(const Handle& handle, MojoResult result);
78
79   // Searches through |handle_to_data_| for |watcher_id|. Returns true if found
80   // and sets |handle| to the Handle. Returns false if not a known id.
81   bool GetMojoHandleByWatcherID(WatcherID watcher_id, Handle* handle) const;
82
83   // MessagePumpMojoHandler overrides:
84   virtual void OnHandleReady(const Handle& handle) OVERRIDE;
85   virtual void OnHandleError(const Handle& handle, MojoResult result) OVERRIDE;
86
87   // Maps from assigned id to WatchData.
88   HandleToWatchDataMap handle_to_data_;
89
90   DISALLOW_COPY_AND_ASSIGN(WatcherBackend);
91 };
92
93 WatcherBackend::WatcherBackend() {
94 }
95
96 WatcherBackend::~WatcherBackend() {
97 }
98
99 void WatcherBackend::StartWatching(const WatchData& data) {
100   RemoveAndNotify(data.handle, MOJO_RESULT_CANCELLED);
101
102   DCHECK_EQ(0u, handle_to_data_.count(data.handle));
103
104   handle_to_data_[data.handle] = data;
105   message_pump_mojo->AddHandler(this, data.handle,
106                                 data.handle_signals,
107                                 data.deadline);
108 }
109
110 void WatcherBackend::StopWatching(WatcherID watcher_id) {
111   // Because of the thread hop it is entirely possible to get here and not
112   // have a valid handle registered for |watcher_id|.
113   Handle handle;
114   if (!GetMojoHandleByWatcherID(watcher_id, &handle))
115     return;
116
117   handle_to_data_.erase(handle);
118   message_pump_mojo->RemoveHandler(handle);
119 }
120
121 void WatcherBackend::RemoveAndNotify(const Handle& handle,
122                                      MojoResult result) {
123   if (handle_to_data_.count(handle) == 0)
124     return;
125
126   const WatchData data(handle_to_data_[handle]);
127   handle_to_data_.erase(handle);
128   message_pump_mojo->RemoveHandler(handle);
129   data.message_loop->PostTask(FROM_HERE, base::Bind(data.callback, result));
130 }
131
132 bool WatcherBackend::GetMojoHandleByWatcherID(WatcherID watcher_id,
133                                               Handle* handle) const {
134   for (HandleToWatchDataMap::const_iterator i = handle_to_data_.begin();
135        i != handle_to_data_.end(); ++i) {
136     if (i->second.id == watcher_id) {
137       *handle = i->second.handle;
138       return true;
139     }
140   }
141   return false;
142 }
143
144 void WatcherBackend::OnHandleReady(const Handle& handle) {
145   RemoveAndNotify(handle, MOJO_RESULT_OK);
146 }
147
148 void WatcherBackend::OnHandleError(const Handle& handle, MojoResult result) {
149   RemoveAndNotify(handle, result);
150 }
151
152 // WatcherThreadManager --------------------------------------------------------
153
154 // WatcherThreadManager manages the background thread that listens for handles
155 // to be ready. All requests are handled by WatcherBackend.
156 class WatcherThreadManager {
157  public:
158   ~WatcherThreadManager();
159
160   // Returns the shared instance.
161   static WatcherThreadManager* GetInstance();
162
163   // Starts watching the requested handle. Returns a unique ID that is used to
164   // stop watching the handle. When the handle is ready |callback| is notified
165   // on the thread StartWatching() was invoked on.
166   // This may be invoked on any thread.
167   WatcherID StartWatching(const Handle& handle,
168                           MojoHandleSignals handle_signals,
169                           base::TimeTicks deadline,
170                           const base::Callback<void(MojoResult)>& callback);
171
172   // Stops watching a handle.
173   // This may be invoked on any thread.
174   void StopWatching(WatcherID watcher_id);
175
176  private:
177   friend struct DefaultSingletonTraits<WatcherThreadManager>;
178   WatcherThreadManager();
179
180   base::Thread thread_;
181
182   base::AtomicSequenceNumber watcher_id_generator_;
183
184   WatcherBackend backend_;
185
186   DISALLOW_COPY_AND_ASSIGN(WatcherThreadManager);
187 };
188
189 WatcherThreadManager::~WatcherThreadManager() {
190   thread_.Stop();
191 }
192
193 WatcherThreadManager* WatcherThreadManager::GetInstance() {
194   return Singleton<WatcherThreadManager>::get();
195 }
196
197 WatcherID WatcherThreadManager::StartWatching(
198     const Handle& handle,
199     MojoHandleSignals handle_signals,
200     base::TimeTicks deadline,
201     const base::Callback<void(MojoResult)>& callback) {
202   WatchData data;
203   data.id = watcher_id_generator_.GetNext();
204   data.handle = handle;
205   data.callback = callback;
206   data.handle_signals = handle_signals;
207   data.deadline = deadline;
208   data.message_loop = base::MessageLoopProxy::current();
209   DCHECK_NE(static_cast<base::MessageLoopProxy*>(NULL),
210             data.message_loop.get());
211   // We outlive |thread_|, so it's safe to use Unretained() here.
212   thread_.message_loop()->PostTask(
213       FROM_HERE,
214       base::Bind(&WatcherBackend::StartWatching,
215                  base::Unretained(&backend_),
216                  data));
217   return data.id;
218 }
219
220 void WatcherThreadManager::StopWatching(WatcherID watcher_id) {
221   // We outlive |thread_|, so it's safe to use Unretained() here.
222   thread_.message_loop()->PostTask(
223       FROM_HERE,
224       base::Bind(&WatcherBackend::StopWatching,
225                  base::Unretained(&backend_),
226                  watcher_id));
227 }
228
229 WatcherThreadManager::WatcherThreadManager()
230     : thread_(kWatcherThreadName) {
231   base::Thread::Options thread_options;
232   thread_options.message_pump_factory = base::Bind(&CreateMessagePumpMojo);
233   thread_.StartWithOptions(thread_options);
234 }
235
236 }  // namespace
237
238 // HandleWatcher::State --------------------------------------------------------
239
240 // Represents the state of the HandleWatcher. Owns the user's callback and
241 // monitors the current thread's MessageLoop to know when to force the callback
242 // to run (with an error) even though the pipe hasn't been signaled yet.
243 class HandleWatcher::State : public base::MessageLoop::DestructionObserver {
244  public:
245   State(HandleWatcher* watcher,
246         const Handle& handle,
247         MojoHandleSignals handle_signals,
248         MojoDeadline deadline,
249         const base::Callback<void(MojoResult)>& callback)
250       : watcher_(watcher),
251         callback_(callback),
252         weak_factory_(this) {
253     base::MessageLoop::current()->AddDestructionObserver(this);
254
255     watcher_id_ = WatcherThreadManager::GetInstance()->StartWatching(
256         handle,
257         handle_signals,
258         MojoDeadlineToTimeTicks(deadline),
259         base::Bind(&State::OnHandleReady, weak_factory_.GetWeakPtr()));
260   }
261
262   virtual ~State() {
263     base::MessageLoop::current()->RemoveDestructionObserver(this);
264
265     WatcherThreadManager::GetInstance()->StopWatching(watcher_id_);
266   }
267
268  private:
269   virtual void WillDestroyCurrentMessageLoop() OVERRIDE {
270     // The current thread is exiting. Simulate a watch error.
271     OnHandleReady(MOJO_RESULT_ABORTED);
272   }
273
274   void OnHandleReady(MojoResult result) {
275     base::Callback<void(MojoResult)> callback = callback_;
276     watcher_->Stop();  // Destroys |this|.
277
278     callback.Run(result);
279   }
280
281   HandleWatcher* watcher_;
282   WatcherID watcher_id_;
283   base::Callback<void(MojoResult)> callback_;
284
285   // Used to weakly bind |this| to the WatcherThreadManager.
286   base::WeakPtrFactory<State> weak_factory_;
287 };
288
289 // HandleWatcher ---------------------------------------------------------------
290
291 HandleWatcher::HandleWatcher() {
292 }
293
294 HandleWatcher::~HandleWatcher() {
295 }
296
297 void HandleWatcher::Start(const Handle& handle,
298                           MojoHandleSignals handle_signals,
299                           MojoDeadline deadline,
300                           const base::Callback<void(MojoResult)>& callback) {
301   DCHECK(handle.is_valid());
302   DCHECK_NE(MOJO_HANDLE_SIGNAL_NONE, handle_signals);
303
304   state_.reset(new State(this, handle, handle_signals, deadline, callback));
305 }
306
307 void HandleWatcher::Stop() {
308   state_.reset();
309 }
310
311 }  // namespace common
312 }  // namespace mojo