Upstream version 7.36.149.0
[platform/framework/web/crosswalk.git] / src / ipc / ipc_sync_channel.cc
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "ipc/ipc_sync_channel.h"
6
7 #include "base/bind.h"
8 #include "base/debug/trace_event.h"
9 #include "base/lazy_instance.h"
10 #include "base/location.h"
11 #include "base/logging.h"
12 #include "base/synchronization/waitable_event.h"
13 #include "base/synchronization/waitable_event_watcher.h"
14 #include "base/thread_task_runner_handle.h"
15 #include "base/threading/thread_local.h"
16 #include "ipc/ipc_logging.h"
17 #include "ipc/ipc_message_macros.h"
18 #include "ipc/ipc_sync_message.h"
19
20 using base::TimeDelta;
21 using base::TimeTicks;
22 using base::WaitableEvent;
23
24 namespace IPC {
25 // When we're blocked in a Send(), we need to process incoming synchronous
26 // messages right away because it could be blocking our reply (either
27 // directly from the same object we're calling, or indirectly through one or
28 // more other channels).  That means that in SyncContext's OnMessageReceived,
29 // we need to process sync message right away if we're blocked.  However a
30 // simple check isn't sufficient, because the listener thread can be in the
31 // process of calling Send.
32 // To work around this, when SyncChannel filters a sync message, it sets
33 // an event that the listener thread waits on during its Send() call.  This
34 // allows us to dispatch incoming sync messages when blocked.  The race
35 // condition is handled because if Send is in the process of being called, it
36 // will check the event.  In case the listener thread isn't sending a message,
37 // we queue a task on the listener thread to dispatch the received messages.
38 // The messages are stored in this queue object that's shared among all
39 // SyncChannel objects on the same thread (since one object can receive a
40 // sync message while another one is blocked).
41
42 class SyncChannel::ReceivedSyncMsgQueue :
43     public base::RefCountedThreadSafe<ReceivedSyncMsgQueue> {
44  public:
45   // Returns the ReceivedSyncMsgQueue instance for this thread, creating one
46   // if necessary.  Call RemoveContext on the same thread when done.
47   static ReceivedSyncMsgQueue* AddContext() {
48     // We want one ReceivedSyncMsgQueue per listener thread (i.e. since multiple
49     // SyncChannel objects can block the same thread).
50     ReceivedSyncMsgQueue* rv = lazy_tls_ptr_.Pointer()->Get();
51     if (!rv) {
52       rv = new ReceivedSyncMsgQueue();
53       ReceivedSyncMsgQueue::lazy_tls_ptr_.Pointer()->Set(rv);
54     }
55     rv->listener_count_++;
56     return rv;
57   }
58
59   // Called on IPC thread when a synchronous message or reply arrives.
60   void QueueMessage(const Message& msg, SyncChannel::SyncContext* context) {
61     bool was_task_pending;
62     {
63       base::AutoLock auto_lock(message_lock_);
64
65       was_task_pending = task_pending_;
66       task_pending_ = true;
67
68       // We set the event in case the listener thread is blocked (or is about
69       // to). In case it's not, the PostTask dispatches the messages.
70       message_queue_.push_back(QueuedMessage(new Message(msg), context));
71       message_queue_version_++;
72     }
73
74     dispatch_event_.Signal();
75     if (!was_task_pending) {
76       listener_task_runner_->PostTask(
77           FROM_HERE, base::Bind(&ReceivedSyncMsgQueue::DispatchMessagesTask,
78                                 this, scoped_refptr<SyncContext>(context)));
79     }
80   }
81
82   void QueueReply(const Message &msg, SyncChannel::SyncContext* context) {
83     received_replies_.push_back(QueuedMessage(new Message(msg), context));
84   }
85
86   // Called on the listener's thread to process any queues synchronous
87   // messages.
88   void DispatchMessagesTask(SyncContext* context) {
89     {
90       base::AutoLock auto_lock(message_lock_);
91       task_pending_ = false;
92     }
93     context->DispatchMessages();
94   }
95
96   void DispatchMessages(SyncContext* dispatching_context) {
97     bool first_time = true;
98     uint32 expected_version = 0;
99     SyncMessageQueue::iterator it;
100     while (true) {
101       Message* message = NULL;
102       scoped_refptr<SyncChannel::SyncContext> context;
103       {
104         base::AutoLock auto_lock(message_lock_);
105         if (first_time || message_queue_version_ != expected_version) {
106           it = message_queue_.begin();
107           first_time = false;
108         }
109         for (; it != message_queue_.end(); it++) {
110           int message_group = it->context->restrict_dispatch_group();
111           if (message_group == kRestrictDispatchGroup_None ||
112               message_group == dispatching_context->restrict_dispatch_group()) {
113             message = it->message;
114             context = it->context;
115             it = message_queue_.erase(it);
116             message_queue_version_++;
117             expected_version = message_queue_version_;
118             break;
119           }
120         }
121       }
122
123       if (message == NULL)
124         break;
125       context->OnDispatchMessage(*message);
126       delete message;
127     }
128   }
129
130   // SyncChannel calls this in its destructor.
131   void RemoveContext(SyncContext* context) {
132     base::AutoLock auto_lock(message_lock_);
133
134     SyncMessageQueue::iterator iter = message_queue_.begin();
135     while (iter != message_queue_.end()) {
136       if (iter->context.get() == context) {
137         delete iter->message;
138         iter = message_queue_.erase(iter);
139         message_queue_version_++;
140       } else {
141         iter++;
142       }
143     }
144
145     if (--listener_count_ == 0) {
146       DCHECK(lazy_tls_ptr_.Pointer()->Get());
147       lazy_tls_ptr_.Pointer()->Set(NULL);
148     }
149   }
150
151   WaitableEvent* dispatch_event() { return &dispatch_event_; }
152   base::SingleThreadTaskRunner* listener_task_runner() {
153     return listener_task_runner_.get();
154   }
155
156   // Holds a pointer to the per-thread ReceivedSyncMsgQueue object.
157   static base::LazyInstance<base::ThreadLocalPointer<ReceivedSyncMsgQueue> >
158       lazy_tls_ptr_;
159
160   // Called on the ipc thread to check if we can unblock any current Send()
161   // calls based on a queued reply.
162   void DispatchReplies() {
163     for (size_t i = 0; i < received_replies_.size(); ++i) {
164       Message* message = received_replies_[i].message;
165       if (received_replies_[i].context->TryToUnblockListener(message)) {
166         delete message;
167         received_replies_.erase(received_replies_.begin() + i);
168         return;
169       }
170     }
171   }
172
173   base::WaitableEventWatcher* top_send_done_watcher() {
174     return top_send_done_watcher_;
175   }
176
177   void set_top_send_done_watcher(base::WaitableEventWatcher* watcher) {
178     top_send_done_watcher_ = watcher;
179   }
180
181  private:
182   friend class base::RefCountedThreadSafe<ReceivedSyncMsgQueue>;
183
184   // See the comment in SyncChannel::SyncChannel for why this event is created
185   // as manual reset.
186   ReceivedSyncMsgQueue() :
187       message_queue_version_(0),
188       dispatch_event_(true, false),
189       listener_task_runner_(base::ThreadTaskRunnerHandle::Get()),
190       task_pending_(false),
191       listener_count_(0),
192       top_send_done_watcher_(NULL) {
193   }
194
195   ~ReceivedSyncMsgQueue() {}
196
197   // Holds information about a queued synchronous message or reply.
198   struct QueuedMessage {
199     QueuedMessage(Message* m, SyncContext* c) : message(m), context(c) { }
200     Message* message;
201     scoped_refptr<SyncChannel::SyncContext> context;
202   };
203
204   typedef std::list<QueuedMessage> SyncMessageQueue;
205   SyncMessageQueue message_queue_;
206   uint32 message_queue_version_;  // Used to signal DispatchMessages to rescan
207
208   std::vector<QueuedMessage> received_replies_;
209
210   // Set when we got a synchronous message that we must respond to as the
211   // sender needs its reply before it can reply to our original synchronous
212   // message.
213   WaitableEvent dispatch_event_;
214   scoped_refptr<base::SingleThreadTaskRunner> listener_task_runner_;
215   base::Lock message_lock_;
216   bool task_pending_;
217   int listener_count_;
218
219   // The current send done event watcher for this thread. Used to maintain
220   // a local global stack of send done watchers to ensure that nested sync
221   // message loops complete correctly.
222   base::WaitableEventWatcher* top_send_done_watcher_;
223 };
224
225 base::LazyInstance<base::ThreadLocalPointer<SyncChannel::ReceivedSyncMsgQueue> >
226     SyncChannel::ReceivedSyncMsgQueue::lazy_tls_ptr_ =
227         LAZY_INSTANCE_INITIALIZER;
228
229 SyncChannel::SyncContext::SyncContext(
230     Listener* listener,
231     base::SingleThreadTaskRunner* ipc_task_runner,
232     WaitableEvent* shutdown_event)
233     : ChannelProxy::Context(listener, ipc_task_runner),
234       received_sync_msgs_(ReceivedSyncMsgQueue::AddContext()),
235       shutdown_event_(shutdown_event),
236       restrict_dispatch_group_(kRestrictDispatchGroup_None) {
237 }
238
239 SyncChannel::SyncContext::~SyncContext() {
240   while (!deserializers_.empty())
241     Pop();
242 }
243
244 // Adds information about an outgoing sync message to the context so that
245 // we know how to deserialize the reply.  Returns a handle that's set when
246 // the reply has arrived.
247 void SyncChannel::SyncContext::Push(SyncMessage* sync_msg) {
248   // Create the tracking information for this message. This object is stored
249   // by value since all members are pointers that are cheap to copy. These
250   // pointers are cleaned up in the Pop() function.
251   //
252   // The event is created as manual reset because in between Signal and
253   // OnObjectSignalled, another Send can happen which would stop the watcher
254   // from being called.  The event would get watched later, when the nested
255   // Send completes, so the event will need to remain set.
256   PendingSyncMsg pending(SyncMessage::GetMessageId(*sync_msg),
257                          sync_msg->GetReplyDeserializer(),
258                          new WaitableEvent(true, false));
259   base::AutoLock auto_lock(deserializers_lock_);
260   deserializers_.push_back(pending);
261 }
262
263 bool SyncChannel::SyncContext::Pop() {
264   bool result;
265   {
266     base::AutoLock auto_lock(deserializers_lock_);
267     PendingSyncMsg msg = deserializers_.back();
268     delete msg.deserializer;
269     delete msg.done_event;
270     msg.done_event = NULL;
271     deserializers_.pop_back();
272     result = msg.send_result;
273   }
274
275   // We got a reply to a synchronous Send() call that's blocking the listener
276   // thread.  However, further down the call stack there could be another
277   // blocking Send() call, whose reply we received after we made this last
278   // Send() call.  So check if we have any queued replies available that
279   // can now unblock the listener thread.
280   ipc_task_runner()->PostTask(
281       FROM_HERE, base::Bind(&ReceivedSyncMsgQueue::DispatchReplies,
282                             received_sync_msgs_.get()));
283
284   return result;
285 }
286
287 WaitableEvent* SyncChannel::SyncContext::GetSendDoneEvent() {
288   base::AutoLock auto_lock(deserializers_lock_);
289   return deserializers_.back().done_event;
290 }
291
292 WaitableEvent* SyncChannel::SyncContext::GetDispatchEvent() {
293   return received_sync_msgs_->dispatch_event();
294 }
295
296 void SyncChannel::SyncContext::DispatchMessages() {
297   received_sync_msgs_->DispatchMessages(this);
298 }
299
300 bool SyncChannel::SyncContext::TryToUnblockListener(const Message* msg) {
301   base::AutoLock auto_lock(deserializers_lock_);
302   if (deserializers_.empty() ||
303       !SyncMessage::IsMessageReplyTo(*msg, deserializers_.back().id)) {
304     return false;
305   }
306
307   // TODO(bauerb): Remove logging once investigation of http://crbug.com/141055
308   // has finished.
309   if (!msg->is_reply_error()) {
310     bool send_result = deserializers_.back().deserializer->
311         SerializeOutputParameters(*msg);
312     deserializers_.back().send_result = send_result;
313     VLOG_IF(1, !send_result) << "Couldn't deserialize reply message";
314   } else {
315     VLOG(1) << "Received error reply";
316   }
317   deserializers_.back().done_event->Signal();
318
319   return true;
320 }
321
322 void SyncChannel::SyncContext::Clear() {
323   CancelPendingSends();
324   received_sync_msgs_->RemoveContext(this);
325   Context::Clear();
326 }
327
328 bool SyncChannel::SyncContext::OnMessageReceived(const Message& msg) {
329   // Give the filters a chance at processing this message.
330   if (TryFilters(msg))
331     return true;
332
333   if (TryToUnblockListener(&msg))
334     return true;
335
336   if (msg.is_reply()) {
337     received_sync_msgs_->QueueReply(msg, this);
338     return true;
339   }
340
341   if (msg.should_unblock()) {
342     received_sync_msgs_->QueueMessage(msg, this);
343     return true;
344   }
345
346   return Context::OnMessageReceivedNoFilter(msg);
347 }
348
349 void SyncChannel::SyncContext::OnChannelError() {
350   CancelPendingSends();
351   shutdown_watcher_.StopWatching();
352   Context::OnChannelError();
353 }
354
355 void SyncChannel::SyncContext::OnChannelOpened() {
356   shutdown_watcher_.StartWatching(
357       shutdown_event_,
358       base::Bind(&SyncChannel::SyncContext::OnWaitableEventSignaled,
359                  base::Unretained(this)));
360   Context::OnChannelOpened();
361 }
362
363 void SyncChannel::SyncContext::OnChannelClosed() {
364   CancelPendingSends();
365   shutdown_watcher_.StopWatching();
366   Context::OnChannelClosed();
367 }
368
369 void SyncChannel::SyncContext::OnSendTimeout(int message_id) {
370   base::AutoLock auto_lock(deserializers_lock_);
371   PendingSyncMessageQueue::iterator iter;
372   VLOG(1) << "Send timeout";
373   for (iter = deserializers_.begin(); iter != deserializers_.end(); iter++) {
374     if (iter->id == message_id) {
375       iter->done_event->Signal();
376       break;
377     }
378   }
379 }
380
381 void SyncChannel::SyncContext::CancelPendingSends() {
382   base::AutoLock auto_lock(deserializers_lock_);
383   PendingSyncMessageQueue::iterator iter;
384   // TODO(bauerb): Remove once http://crbug/141055 is fixed.
385   VLOG(1) << "Canceling pending sends";
386   for (iter = deserializers_.begin(); iter != deserializers_.end(); iter++)
387     iter->done_event->Signal();
388 }
389
390 void SyncChannel::SyncContext::OnWaitableEventSignaled(WaitableEvent* event) {
391   if (event == shutdown_event_) {
392     // Process shut down before we can get a reply to a synchronous message.
393     // Cancel pending Send calls, which will end up setting the send done event.
394     CancelPendingSends();
395   } else {
396     // We got the reply, timed out or the process shutdown.
397     DCHECK_EQ(GetSendDoneEvent(), event);
398     base::MessageLoop::current()->QuitNow();
399   }
400 }
401
402 base::WaitableEventWatcher::EventCallback
403     SyncChannel::SyncContext::MakeWaitableEventCallback() {
404   return base::Bind(&SyncChannel::SyncContext::OnWaitableEventSignaled, this);
405 }
406
407 SyncChannel::SyncChannel(
408     const IPC::ChannelHandle& channel_handle,
409     Channel::Mode mode,
410     Listener* listener,
411     base::SingleThreadTaskRunner* ipc_task_runner,
412     bool create_pipe_now,
413     WaitableEvent* shutdown_event)
414     : ChannelProxy(new SyncContext(listener, ipc_task_runner, shutdown_event)) {
415   // The current (listener) thread must be distinct from the IPC thread, or else
416   // sending synchronous messages will deadlock.
417   DCHECK_NE(ipc_task_runner, base::ThreadTaskRunnerHandle::Get());
418   ChannelProxy::Init(channel_handle, mode, create_pipe_now);
419   StartWatching();
420 }
421
422 SyncChannel::SyncChannel(
423     Listener* listener,
424     base::SingleThreadTaskRunner* ipc_task_runner,
425     WaitableEvent* shutdown_event)
426     : ChannelProxy(new SyncContext(listener, ipc_task_runner, shutdown_event)) {
427   // The current (listener) thread must be distinct from the IPC thread, or else
428   // sending synchronous messages will deadlock.
429   DCHECK_NE(ipc_task_runner, base::ThreadTaskRunnerHandle::Get());
430   StartWatching();
431 }
432
433 SyncChannel::~SyncChannel() {
434 }
435
436 void SyncChannel::SetRestrictDispatchChannelGroup(int group) {
437   sync_context()->set_restrict_dispatch_group(group);
438 }
439
440 bool SyncChannel::Send(Message* message) {
441 #ifdef IPC_MESSAGE_LOG_ENABLED
442   Logging* logger = Logging::GetInstance();
443   std::string name;
444   logger->GetMessageText(message->type(), &name, message, NULL);
445   TRACE_EVENT1("ipc", "SyncChannel::Send", "name", name);
446 #else
447   TRACE_EVENT2("ipc", "SyncChannel::Send",
448                "class", IPC_MESSAGE_ID_CLASS(message->type()),
449                "line", IPC_MESSAGE_ID_LINE(message->type()));
450 #endif
451   if (!message->is_sync()) {
452     ChannelProxy::Send(message);
453     return true;
454   }
455
456   // *this* might get deleted in WaitForReply.
457   scoped_refptr<SyncContext> context(sync_context());
458   if (context->shutdown_event()->IsSignaled()) {
459     VLOG(1) << "shutdown event is signaled";
460     delete message;
461     return false;
462   }
463
464   SyncMessage* sync_msg = static_cast<SyncMessage*>(message);
465   context->Push(sync_msg);
466   WaitableEvent* pump_messages_event = sync_msg->pump_messages_event();
467
468   ChannelProxy::Send(message);
469
470   // Wait for reply, or for any other incoming synchronous messages.
471   // *this* might get deleted, so only call static functions at this point.
472   WaitForReply(context.get(), pump_messages_event);
473
474   return context->Pop();
475 }
476
477 void SyncChannel::WaitForReply(
478     SyncContext* context, WaitableEvent* pump_messages_event) {
479   context->DispatchMessages();
480   while (true) {
481     WaitableEvent* objects[] = {
482       context->GetDispatchEvent(),
483       context->GetSendDoneEvent(),
484       pump_messages_event
485     };
486
487     unsigned count = pump_messages_event ? 3: 2;
488     size_t result = WaitableEvent::WaitMany(objects, count);
489     if (result == 0 /* dispatch event */) {
490       // We're waiting for a reply, but we received a blocking synchronous
491       // call.  We must process it or otherwise a deadlock might occur.
492       context->GetDispatchEvent()->Reset();
493       context->DispatchMessages();
494       continue;
495     }
496
497     if (result == 2 /* pump_messages_event */)
498       WaitForReplyWithNestedMessageLoop(context);  // Run a nested message loop.
499
500     break;
501   }
502 }
503
504 void SyncChannel::WaitForReplyWithNestedMessageLoop(SyncContext* context) {
505   base::WaitableEventWatcher send_done_watcher;
506
507   ReceivedSyncMsgQueue* sync_msg_queue = context->received_sync_msgs();
508   DCHECK(sync_msg_queue != NULL);
509
510   base::WaitableEventWatcher* old_send_done_event_watcher =
511       sync_msg_queue->top_send_done_watcher();
512
513   base::WaitableEventWatcher::EventCallback old_callback;
514   base::WaitableEvent* old_event = NULL;
515
516   // Maintain a local global stack of send done delegates to ensure that
517   // nested sync calls complete in the correct sequence, i.e. the
518   // outermost call completes first, etc.
519   if (old_send_done_event_watcher) {
520     old_callback = old_send_done_event_watcher->callback();
521     old_event = old_send_done_event_watcher->GetWatchedEvent();
522     old_send_done_event_watcher->StopWatching();
523   }
524
525   sync_msg_queue->set_top_send_done_watcher(&send_done_watcher);
526
527   send_done_watcher.StartWatching(context->GetSendDoneEvent(),
528                                   context->MakeWaitableEventCallback());
529
530   {
531     base::MessageLoop::ScopedNestableTaskAllower allow(
532         base::MessageLoop::current());
533     base::MessageLoop::current()->Run();
534   }
535
536   sync_msg_queue->set_top_send_done_watcher(old_send_done_event_watcher);
537   if (old_send_done_event_watcher && old_event) {
538     old_send_done_event_watcher->StartWatching(old_event, old_callback);
539   }
540 }
541
542 void SyncChannel::OnWaitableEventSignaled(WaitableEvent* event) {
543   DCHECK(event == sync_context()->GetDispatchEvent());
544   // The call to DispatchMessages might delete this object, so reregister
545   // the object watcher first.
546   event->Reset();
547   dispatch_watcher_.StartWatching(event, dispatch_watcher_callback_);
548   sync_context()->DispatchMessages();
549 }
550
551 void SyncChannel::StartWatching() {
552   // Ideally we only want to watch this object when running a nested message
553   // loop.  However, we don't know when it exits if there's another nested
554   // message loop running under it or not, so we wouldn't know whether to
555   // stop or keep watching.  So we always watch it, and create the event as
556   // manual reset since the object watcher might otherwise reset the event
557   // when we're doing a WaitMany.
558   dispatch_watcher_callback_ =
559       base::Bind(&SyncChannel::OnWaitableEventSignaled,
560                   base::Unretained(this));
561   dispatch_watcher_.StartWatching(sync_context()->GetDispatchEvent(),
562                                   dispatch_watcher_callback_);
563 }
564
565 }  // namespace IPC