Fix a build error
[platform/upstream/chromium.git] / ipc / ipc_sync_channel.cc
1 // Copyright (c) 2011 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "ipc/ipc_sync_channel.h"
6
7 #include "base/lazy_instance.h"
8 #include "base/logging.h"
9 #include "base/threading/thread_local.h"
10 #include "base/synchronization/waitable_event.h"
11 #include "base/synchronization/waitable_event_watcher.h"
12 #include "ipc/ipc_sync_message.h"
13
14 using base::TimeDelta;
15 using base::TimeTicks;
16 using base::WaitableEvent;
17
18 namespace IPC {
19 // When we're blocked in a Send(), we need to process incoming synchronous
20 // messages right away because it could be blocking our reply (either
21 // directly from the same object we're calling, or indirectly through one or
22 // more other channels).  That means that in SyncContext's OnMessageReceived,
23 // we need to process sync message right away if we're blocked.  However a
24 // simple check isn't sufficient, because the listener thread can be in the
25 // process of calling Send.
26 // To work around this, when SyncChannel filters a sync message, it sets
27 // an event that the listener thread waits on during its Send() call.  This
28 // allows us to dispatch incoming sync messages when blocked.  The race
29 // condition is handled because if Send is in the process of being called, it
30 // will check the event.  In case the listener thread isn't sending a message,
31 // we queue a task on the listener thread to dispatch the received messages.
32 // The messages are stored in this queue object that's shared among all
33 // SyncChannel objects on the same thread (since one object can receive a
34 // sync message while another one is blocked).
35
36 class SyncChannel::ReceivedSyncMsgQueue :
37     public base::RefCountedThreadSafe<ReceivedSyncMsgQueue> {
38  public:
39   // Returns the ReceivedSyncMsgQueue instance for this thread, creating one
40   // if necessary.  Call RemoveContext on the same thread when done.
41   static ReceivedSyncMsgQueue* AddContext() {
42     // We want one ReceivedSyncMsgQueue per listener thread (i.e. since multiple
43     // SyncChannel objects can block the same thread).
44     ReceivedSyncMsgQueue* rv = lazy_tls_ptr_.Pointer()->Get();
45     if (!rv) {
46       rv = new ReceivedSyncMsgQueue();
47       ReceivedSyncMsgQueue::lazy_tls_ptr_.Pointer()->Set(rv);
48     }
49     rv->listener_count_++;
50     return rv;
51   }
52
53   // Called on IPC thread when a synchronous message or reply arrives.
54   void QueueMessage(const Message& msg, SyncChannel::SyncContext* context) {
55     bool was_task_pending;
56     {
57       base::AutoLock auto_lock(message_lock_);
58
59       was_task_pending = task_pending_;
60       task_pending_ = true;
61
62       // We set the event in case the listener thread is blocked (or is about
63       // to). In case it's not, the PostTask dispatches the messages.
64       message_queue_.push_back(QueuedMessage(new Message(msg), context));
65     }
66
67     dispatch_event_.Signal();
68     if (!was_task_pending) {
69       listener_message_loop_->PostTask(FROM_HERE, NewRunnableMethod(
70           this,
71           &ReceivedSyncMsgQueue::DispatchMessagesTask,
72           scoped_refptr<SyncContext>(context)));
73     }
74   }
75
76   void QueueReply(const Message &msg, SyncChannel::SyncContext* context) {
77     received_replies_.push_back(QueuedMessage(new Message(msg), context));
78   }
79
80   // Called on the listener's thread to process any queues synchronous
81   // messages.
82   void DispatchMessagesTask(SyncContext* context) {
83     {
84       base::AutoLock auto_lock(message_lock_);
85       task_pending_ = false;
86     }
87     context->DispatchMessages();
88   }
89
90   void DispatchMessages(SyncContext* dispatching_context) {
91     SyncMessageQueue delayed_queue;
92     while (true) {
93       Message* message;
94       scoped_refptr<SyncChannel::SyncContext> context;
95       {
96         base::AutoLock auto_lock(message_lock_);
97         if (message_queue_.empty()) {
98           message_queue_ = delayed_queue;
99           break;
100         }
101
102         message = message_queue_.front().message;
103         context = message_queue_.front().context;
104         message_queue_.pop_front();
105       }
106       if (context->restrict_dispatch() && context != dispatching_context) {
107         delayed_queue.push_back(QueuedMessage(message, context));
108       } else {
109         context->OnDispatchMessage(*message);
110         delete message;
111       }
112     }
113   }
114
115   // SyncChannel calls this in its destructor.
116   void RemoveContext(SyncContext* context) {
117     base::AutoLock auto_lock(message_lock_);
118
119     SyncMessageQueue::iterator iter = message_queue_.begin();
120     while (iter != message_queue_.end()) {
121       if (iter->context == context) {
122         delete iter->message;
123         iter = message_queue_.erase(iter);
124       } else {
125         iter++;
126       }
127     }
128
129     if (--listener_count_ == 0) {
130       DCHECK(lazy_tls_ptr_.Pointer()->Get());
131       lazy_tls_ptr_.Pointer()->Set(NULL);
132     }
133   }
134
135   WaitableEvent* dispatch_event() { return &dispatch_event_; }
136   base::MessageLoopProxy* listener_message_loop() {
137     return listener_message_loop_;
138   }
139
140   // Holds a pointer to the per-thread ReceivedSyncMsgQueue object.
141   static base::LazyInstance<base::ThreadLocalPointer<ReceivedSyncMsgQueue> >
142       lazy_tls_ptr_;
143
144   // Called on the ipc thread to check if we can unblock any current Send()
145   // calls based on a queued reply.
146   void DispatchReplies() {
147     for (size_t i = 0; i < received_replies_.size(); ++i) {
148       Message* message = received_replies_[i].message;
149       if (received_replies_[i].context->TryToUnblockListener(message)) {
150         delete message;
151         received_replies_.erase(received_replies_.begin() + i);
152         return;
153       }
154     }
155   }
156
157   base::WaitableEventWatcher* top_send_done_watcher() {
158     return top_send_done_watcher_;
159   }
160
161   void set_top_send_done_watcher(base::WaitableEventWatcher* watcher) {
162     top_send_done_watcher_ = watcher;
163   }
164
165  private:
166   friend class base::RefCountedThreadSafe<ReceivedSyncMsgQueue>;
167
168   // See the comment in SyncChannel::SyncChannel for why this event is created
169   // as manual reset.
170   ReceivedSyncMsgQueue() :
171       dispatch_event_(true, false),
172       listener_message_loop_(base::MessageLoopProxy::current()),
173       task_pending_(false),
174       listener_count_(0),
175       top_send_done_watcher_(NULL) {
176   }
177
178   ~ReceivedSyncMsgQueue() {}
179
180   // Holds information about a queued synchronous message or reply.
181   struct QueuedMessage {
182     QueuedMessage(Message* m, SyncContext* c) : message(m), context(c) { }
183     Message* message;
184     scoped_refptr<SyncChannel::SyncContext> context;
185   };
186
187   typedef std::deque<QueuedMessage> SyncMessageQueue;
188   SyncMessageQueue message_queue_;
189
190   std::vector<QueuedMessage> received_replies_;
191
192   // Set when we got a synchronous message that we must respond to as the
193   // sender needs its reply before it can reply to our original synchronous
194   // message.
195   WaitableEvent dispatch_event_;
196   scoped_refptr<base::MessageLoopProxy> listener_message_loop_;
197   base::Lock message_lock_;
198   bool task_pending_;
199   int listener_count_;
200
201   // The current send done event watcher for this thread. Used to maintain
202   // a local global stack of send done watchers to ensure that nested sync
203   // message loops complete correctly.
204   base::WaitableEventWatcher* top_send_done_watcher_;
205 };
206
207 base::LazyInstance<base::ThreadLocalPointer<SyncChannel::ReceivedSyncMsgQueue> >
208     SyncChannel::ReceivedSyncMsgQueue::lazy_tls_ptr_(base::LINKER_INITIALIZED);
209
210 SyncChannel::SyncContext::SyncContext(
211     Channel::Listener* listener,
212     base::MessageLoopProxy* ipc_thread,
213     WaitableEvent* shutdown_event)
214     : ChannelProxy::Context(listener, ipc_thread),
215       received_sync_msgs_(ReceivedSyncMsgQueue::AddContext()),
216       shutdown_event_(shutdown_event),
217       restrict_dispatch_(false) {
218 }
219
220 SyncChannel::SyncContext::~SyncContext() {
221   while (!deserializers_.empty())
222     Pop();
223 }
224
225 // Adds information about an outgoing sync message to the context so that
226 // we know how to deserialize the reply.  Returns a handle that's set when
227 // the reply has arrived.
228 void SyncChannel::SyncContext::Push(SyncMessage* sync_msg) {
229   // Create the tracking information for this message. This object is stored
230   // by value since all members are pointers that are cheap to copy. These
231   // pointers are cleaned up in the Pop() function.
232   //
233   // The event is created as manual reset because in between Signal and
234   // OnObjectSignalled, another Send can happen which would stop the watcher
235   // from being called.  The event would get watched later, when the nested
236   // Send completes, so the event will need to remain set.
237   PendingSyncMsg pending(SyncMessage::GetMessageId(*sync_msg),
238                          sync_msg->GetReplyDeserializer(),
239                          new WaitableEvent(true, false));
240   base::AutoLock auto_lock(deserializers_lock_);
241   deserializers_.push_back(pending);
242 }
243
244 bool SyncChannel::SyncContext::Pop() {
245   bool result;
246   {
247     base::AutoLock auto_lock(deserializers_lock_);
248     PendingSyncMsg msg = deserializers_.back();
249     delete msg.deserializer;
250     delete msg.done_event;
251     msg.done_event = NULL;
252     deserializers_.pop_back();
253     result = msg.send_result;
254   }
255
256   // We got a reply to a synchronous Send() call that's blocking the listener
257   // thread.  However, further down the call stack there could be another
258   // blocking Send() call, whose reply we received after we made this last
259   // Send() call.  So check if we have any queued replies available that
260   // can now unblock the listener thread.
261   ipc_message_loop()->PostTask(FROM_HERE, NewRunnableMethod(
262       received_sync_msgs_.get(), &ReceivedSyncMsgQueue::DispatchReplies));
263
264   return result;
265 }
266
267 WaitableEvent* SyncChannel::SyncContext::GetSendDoneEvent() {
268   base::AutoLock auto_lock(deserializers_lock_);
269   return deserializers_.back().done_event;
270 }
271
272 WaitableEvent* SyncChannel::SyncContext::GetDispatchEvent() {
273   return received_sync_msgs_->dispatch_event();
274 }
275
276 void SyncChannel::SyncContext::DispatchMessages() {
277   received_sync_msgs_->DispatchMessages(this);
278 }
279
280 bool SyncChannel::SyncContext::TryToUnblockListener(const Message* msg) {
281   base::AutoLock auto_lock(deserializers_lock_);
282   if (deserializers_.empty() ||
283       !SyncMessage::IsMessageReplyTo(*msg, deserializers_.back().id)) {
284     return false;
285   }
286
287   if (!msg->is_reply_error()) {
288     deserializers_.back().send_result = deserializers_.back().deserializer->
289         SerializeOutputParameters(*msg);
290   }
291   deserializers_.back().done_event->Signal();
292
293   return true;
294 }
295
296 void SyncChannel::SyncContext::Clear() {
297   CancelPendingSends();
298   received_sync_msgs_->RemoveContext(this);
299   Context::Clear();
300 }
301
302 bool SyncChannel::SyncContext::OnMessageReceived(const Message& msg) {
303   // Give the filters a chance at processing this message.
304   if (TryFilters(msg))
305     return true;
306
307   if (TryToUnblockListener(&msg))
308     return true;
309
310   if (msg.should_unblock()) {
311     received_sync_msgs_->QueueMessage(msg, this);
312     return true;
313   }
314
315   if (msg.is_reply()) {
316     received_sync_msgs_->QueueReply(msg, this);
317     return true;
318   }
319
320   return Context::OnMessageReceivedNoFilter(msg);
321 }
322
323 void SyncChannel::SyncContext::OnChannelError() {
324   CancelPendingSends();
325   shutdown_watcher_.StopWatching();
326   Context::OnChannelError();
327 }
328
329 void SyncChannel::SyncContext::OnChannelOpened() {
330   shutdown_watcher_.StartWatching(shutdown_event_, this);
331   Context::OnChannelOpened();
332 }
333
334 void SyncChannel::SyncContext::OnChannelClosed() {
335   CancelPendingSends();
336   shutdown_watcher_.StopWatching();
337   Context::OnChannelClosed();
338 }
339
340 void SyncChannel::SyncContext::OnSendTimeout(int message_id) {
341   base::AutoLock auto_lock(deserializers_lock_);
342   PendingSyncMessageQueue::iterator iter;
343   for (iter = deserializers_.begin(); iter != deserializers_.end(); iter++) {
344     if (iter->id == message_id) {
345       iter->done_event->Signal();
346       break;
347     }
348   }
349 }
350
351 void SyncChannel::SyncContext::CancelPendingSends() {
352   base::AutoLock auto_lock(deserializers_lock_);
353   PendingSyncMessageQueue::iterator iter;
354   for (iter = deserializers_.begin(); iter != deserializers_.end(); iter++)
355     iter->done_event->Signal();
356 }
357
358 void SyncChannel::SyncContext::OnWaitableEventSignaled(WaitableEvent* event) {
359   if (event == shutdown_event_) {
360     // Process shut down before we can get a reply to a synchronous message.
361     // Cancel pending Send calls, which will end up setting the send done event.
362     CancelPendingSends();
363   } else {
364     // We got the reply, timed out or the process shutdown.
365     DCHECK_EQ(GetSendDoneEvent(), event);
366     MessageLoop::current()->QuitNow();
367   }
368 }
369
370
371 SyncChannel::SyncChannel(
372     const IPC::ChannelHandle& channel_handle,
373     Channel::Mode mode,
374     Channel::Listener* listener,
375     base::MessageLoopProxy* ipc_message_loop,
376     bool create_pipe_now,
377     WaitableEvent* shutdown_event)
378     : ChannelProxy(
379           channel_handle, mode, ipc_message_loop,
380           new SyncContext(listener, ipc_message_loop, shutdown_event),
381           create_pipe_now),
382       sync_messages_with_no_timeout_allowed_(true) {
383   // Ideally we only want to watch this object when running a nested message
384   // loop.  However, we don't know when it exits if there's another nested
385   // message loop running under it or not, so we wouldn't know whether to
386   // stop or keep watching.  So we always watch it, and create the event as
387   // manual reset since the object watcher might otherwise reset the event
388   // when we're doing a WaitMany.
389   dispatch_watcher_.StartWatching(sync_context()->GetDispatchEvent(), this);
390 }
391
392 SyncChannel::~SyncChannel() {
393 }
394
395 void SyncChannel::SetRestrictDispatchToSameChannel(bool value) {
396   sync_context()->set_restrict_dispatch(value);
397 }
398
399 bool SyncChannel::Send(Message* message) {
400   return SendWithTimeout(message, base::kNoTimeout);
401 }
402
403 bool SyncChannel::SendWithTimeout(Message* message, int timeout_ms) {
404   if (!message->is_sync()) {
405     ChannelProxy::Send(message);
406     return true;
407   }
408
409   // *this* might get deleted in WaitForReply.
410   scoped_refptr<SyncContext> context(sync_context());
411   if (context->shutdown_event()->IsSignaled()) {
412     delete message;
413     return false;
414   }
415
416   DCHECK(sync_messages_with_no_timeout_allowed_ ||
417          timeout_ms != base::kNoTimeout);
418   SyncMessage* sync_msg = static_cast<SyncMessage*>(message);
419   context->Push(sync_msg);
420   int message_id = SyncMessage::GetMessageId(*sync_msg);
421   WaitableEvent* pump_messages_event = sync_msg->pump_messages_event();
422
423   ChannelProxy::Send(message);
424
425   if (timeout_ms != base::kNoTimeout) {
426     // We use the sync message id so that when a message times out, we don't
427     // confuse it with another send that is either above/below this Send in
428     // the call stack.
429     context->ipc_message_loop()->PostDelayedTask(FROM_HERE,
430         NewRunnableMethod(context.get(),
431             &SyncContext::OnSendTimeout, message_id), timeout_ms);
432   }
433
434   // Wait for reply, or for any other incoming synchronous messages.
435   // *this* might get deleted, so only call static functions at this point.
436   WaitForReply(context, pump_messages_event);
437
438   return context->Pop();
439 }
440
441 void SyncChannel::WaitForReply(
442     SyncContext* context, WaitableEvent* pump_messages_event) {
443   context->DispatchMessages();
444   while (true) {
445     WaitableEvent* objects[] = {
446       context->GetDispatchEvent(),
447       context->GetSendDoneEvent(),
448       pump_messages_event
449     };
450
451     unsigned count = pump_messages_event ? 3: 2;
452     size_t result = WaitableEvent::WaitMany(objects, count);
453     if (result == 0 /* dispatch event */) {
454       // We're waiting for a reply, but we received a blocking synchronous
455       // call.  We must process it or otherwise a deadlock might occur.
456       context->GetDispatchEvent()->Reset();
457       context->DispatchMessages();
458       continue;
459     }
460
461     if (result == 2 /* pump_messages_event */)
462       WaitForReplyWithNestedMessageLoop(context);  // Run a nested message loop.
463
464     break;
465   }
466 }
467
468 void SyncChannel::WaitForReplyWithNestedMessageLoop(SyncContext* context) {
469   base::WaitableEventWatcher send_done_watcher;
470
471   ReceivedSyncMsgQueue* sync_msg_queue = context->received_sync_msgs();
472   DCHECK(sync_msg_queue != NULL);
473
474   base::WaitableEventWatcher* old_send_done_event_watcher =
475       sync_msg_queue->top_send_done_watcher();
476
477   base::WaitableEventWatcher::Delegate* old_delegate = NULL;
478   base::WaitableEvent* old_event = NULL;
479
480   // Maintain a local global stack of send done delegates to ensure that
481   // nested sync calls complete in the correct sequence, i.e. the
482   // outermost call completes first, etc.
483   if (old_send_done_event_watcher) {
484     old_delegate = old_send_done_event_watcher->delegate();
485     old_event = old_send_done_event_watcher->GetWatchedEvent();
486     old_send_done_event_watcher->StopWatching();
487   }
488
489   sync_msg_queue->set_top_send_done_watcher(&send_done_watcher);
490
491   send_done_watcher.StartWatching(context->GetSendDoneEvent(), context);
492   bool old_state = MessageLoop::current()->NestableTasksAllowed();
493
494   MessageLoop::current()->SetNestableTasksAllowed(true);
495   MessageLoop::current()->Run();
496   MessageLoop::current()->SetNestableTasksAllowed(old_state);
497
498   sync_msg_queue->set_top_send_done_watcher(old_send_done_event_watcher);
499   if (old_send_done_event_watcher && old_event) {
500     old_send_done_event_watcher->StartWatching(old_event, old_delegate);
501   }
502 }
503
504 void SyncChannel::OnWaitableEventSignaled(WaitableEvent* event) {
505   DCHECK(event == sync_context()->GetDispatchEvent());
506   // The call to DispatchMessages might delete this object, so reregister
507   // the object watcher first.
508   event->Reset();
509   dispatch_watcher_.StartWatching(event, this);
510   sync_context()->DispatchMessages();
511 }
512
513 }  // namespace IPC