Upstream version 7.36.149.0
[platform/framework/web/crosswalk.git] / src / ipc / ipc_channel_proxy.cc
index 18ed304..5c4d743 100644 (file)
@@ -2,52 +2,43 @@
 // Use of this source code is governed by a BSD-style license that can be
 // found in the LICENSE file.
 
+#include "ipc/ipc_channel_proxy.h"
+
 #include "base/bind.h"
 #include "base/compiler_specific.h"
-#include "base/debug/trace_event.h"
 #include "base/location.h"
 #include "base/memory/ref_counted.h"
 #include "base/memory/scoped_ptr.h"
 #include "base/single_thread_task_runner.h"
 #include "base/thread_task_runner_handle.h"
-#include "ipc/ipc_channel_proxy.h"
 #include "ipc/ipc_listener.h"
 #include "ipc/ipc_logging.h"
 #include "ipc/ipc_message_macros.h"
-#include "ipc/ipc_message_utils.h"
+#include "ipc/message_filter.h"
+#include "ipc/message_filter_router.h"
 
 namespace IPC {
 
 //------------------------------------------------------------------------------
 
-ChannelProxy::MessageFilter::MessageFilter() {}
-
-void ChannelProxy::MessageFilter::OnFilterAdded(Channel* channel) {}
-
-void ChannelProxy::MessageFilter::OnFilterRemoved() {}
-
-void ChannelProxy::MessageFilter::OnChannelConnected(int32 peer_pid) {}
-
-void ChannelProxy::MessageFilter::OnChannelError() {}
-
-void ChannelProxy::MessageFilter::OnChannelClosing() {}
-
-bool ChannelProxy::MessageFilter::OnMessageReceived(const Message& message) {
-  return false;
-}
-
-ChannelProxy::MessageFilter::~MessageFilter() {}
-
-//------------------------------------------------------------------------------
-
 ChannelProxy::Context::Context(Listener* listener,
                                base::SingleThreadTaskRunner* ipc_task_runner)
     : listener_task_runner_(base::ThreadTaskRunnerHandle::Get()),
       listener_(listener),
       ipc_task_runner_(ipc_task_runner),
       channel_connected_called_(false),
+      message_filter_router_(new MessageFilterRouter()),
       peer_pid_(base::kNullProcessId) {
   DCHECK(ipc_task_runner_.get());
+  // The Listener thread where Messages are handled must be a separate thread
+  // to avoid oversubscribing the IO thread. If you trigger this error, you
+  // need to either:
+  // 1) Create the ChannelProxy on a different thread, or
+  // 2) Just use Channel
+  // Note, we currently make an exception for a NULL listener. That usage
+  // basically works, but is outside the intent of ChannelProxy. This support
+  // will disappear, so please don't rely on it. See crbug.com/364241
+  DCHECK(!listener || (ipc_task_runner_.get() != listener_task_runner_.get()));
 }
 
 ChannelProxy::Context::~Context() {
@@ -59,26 +50,25 @@ void ChannelProxy::Context::ClearIPCTaskRunner() {
 
 void ChannelProxy::Context::CreateChannel(const IPC::ChannelHandle& handle,
                                           const Channel::Mode& mode) {
-  DCHECK(channel_.get() == NULL);
+  DCHECK(!channel_);
   channel_id_ = handle.name;
   channel_.reset(new Channel(handle, mode, this));
 }
 
 bool ChannelProxy::Context::TryFilters(const Message& message) {
+  DCHECK(message_filter_router_);
 #ifdef IPC_MESSAGE_LOG_ENABLED
   Logging* logger = Logging::GetInstance();
   if (logger->Enabled())
     logger->OnPreDispatchMessage(message);
 #endif
 
-  for (size_t i = 0; i < filters_.size(); ++i) {
-    if (filters_[i]->OnMessageReceived(message)) {
+  if (message_filter_router_->TryFilters(message)) {
 #ifdef IPC_MESSAGE_LOG_ENABLED
-      if (logger->Enabled())
-        logger->OnPostDispatchMessage(message, channel_id_);
+    if (logger->Enabled())
+      logger->OnPostDispatchMessage(message, channel_id_);
 #endif
-      return true;
-    }
+    return true;
   }
   return false;
 }
@@ -93,10 +83,6 @@ bool ChannelProxy::Context::OnMessageReceived(const Message& message) {
 
 // Called on the IPC::Channel thread
 bool ChannelProxy::Context::OnMessageReceivedNoFilter(const Message& message) {
-  // NOTE: This code relies on the listener's message loop not going away while
-  // this thread is active.  That should be a reasonable assumption, but it
-  // feels risky.  We may want to invent some more indirect way of referring to
-  // a MessageLoop if this becomes a problem.
   listener_task_runner_->PostTask(
       FROM_HERE, base::Bind(&Context::OnDispatchMessage, this, message));
   return true;
@@ -104,17 +90,15 @@ bool ChannelProxy::Context::OnMessageReceivedNoFilter(const Message& message) {
 
 // Called on the IPC::Channel thread
 void ChannelProxy::Context::OnChannelConnected(int32 peer_pid) {
+  // We cache off the peer_pid so it can be safely accessed from both threads.
+  peer_pid_ = channel_->peer_pid();
+
   // Add any pending filters.  This avoids a race condition where someone
   // creates a ChannelProxy, calls AddFilter, and then right after starts the
   // peer process.  The IO thread could receive a message before the task to add
   // the filter is run on the IO thread.
   OnAddFilter();
 
-  // We cache off the peer_pid so it can be safely accessed from both threads.
-  peer_pid_ = channel_->peer_pid();
-  for (size_t i = 0; i < filters_.size(); ++i)
-    filters_[i]->OnChannelConnected(peer_pid);
-
   // See above comment about using listener_task_runner_ here.
   listener_task_runner_->PostTask(
       FROM_HERE, base::Bind(&Context::OnDispatchConnected, this));
@@ -151,7 +135,7 @@ void ChannelProxy::Context::OnChannelOpened() {
 void ChannelProxy::Context::OnChannelClosed() {
   // It's okay for IPC::ChannelProxy::Close to be called more than once, which
   // would result in this branch being taken.
-  if (!channel_.get())
+  if (!channel_)
     return;
 
   for (size_t i = 0; i < filters_.size(); ++i) {
@@ -160,7 +144,11 @@ void ChannelProxy::Context::OnChannelClosed() {
   }
 
   // We don't need the filters anymore.
+  message_filter_router_->Clear();
   filters_.clear();
+  // We don't need the lock, because at this point, the listener thread can't
+  // access it any more.
+  pending_filters_.clear();
 
   channel_.reset();
 
@@ -175,16 +163,24 @@ void ChannelProxy::Context::Clear() {
 
 // Called on the IPC::Channel thread
 void ChannelProxy::Context::OnSendMessage(scoped_ptr<Message> message) {
-  if (!channel_.get()) {
+  if (!channel_) {
     OnChannelClosed();
     return;
   }
+
   if (!channel_->Send(message.release()))
     OnChannelError();
 }
 
 // Called on the IPC::Channel thread
 void ChannelProxy::Context::OnAddFilter() {
+  // Our OnChannelConnected method has not yet been called, so we can't be
+  // sure that channel_ is valid yet. When OnChannelConnected *is* called,
+  // it invokes OnAddFilter, so any pending filter(s) will be added at that
+  // time.
+  if (peer_pid_ == base::kNullProcessId)
+    return;
+
   std::vector<scoped_refptr<MessageFilter> > new_filters;
   {
     base::AutoLock auto_lock(pending_filters_lock_);
@@ -194,21 +190,34 @@ void ChannelProxy::Context::OnAddFilter() {
   for (size_t i = 0; i < new_filters.size(); ++i) {
     filters_.push_back(new_filters[i]);
 
-    // If the channel has already been created, then we need to send this
-    // message so that the filter gets access to the Channel.
-    if (channel_.get())
-      new_filters[i]->OnFilterAdded(channel_.get());
-    // Ditto for if the channel has been connected.
-    if (peer_pid_)
-      new_filters[i]->OnChannelConnected(peer_pid_);
+    message_filter_router_->AddFilter(new_filters[i].get());
+
+    // The channel has already been created and connected, so we need to
+    // inform the filters right now.
+    new_filters[i]->OnFilterAdded(channel_.get());
+    new_filters[i]->OnChannelConnected(peer_pid_);
   }
 }
 
 // Called on the IPC::Channel thread
 void ChannelProxy::Context::OnRemoveFilter(MessageFilter* filter) {
-  if (!channel_.get())
+  if (peer_pid_ == base::kNullProcessId) {
+    // The channel is not yet connected, so any filters are still pending.
+    base::AutoLock auto_lock(pending_filters_lock_);
+    for (size_t i = 0; i < pending_filters_.size(); ++i) {
+      if (pending_filters_[i].get() == filter) {
+        filter->OnFilterRemoved();
+        pending_filters_.erase(pending_filters_.begin() + i);
+        return;
+      }
+    }
+    return;
+  }
+  if (!channel_)
     return;  // The filters have already been deleted.
 
+  message_filter_router_->RemoveFilter(filter);
+
   for (size_t i = 0; i < filters_.size(); ++i) {
     if (filters_[i].get() == filter) {
       filter->OnFilterRemoved();
@@ -234,10 +243,10 @@ void ChannelProxy::Context::OnDispatchMessage(const Message& message) {
   Logging* logger = Logging::GetInstance();
   std::string name;
   logger->GetMessageText(message.type(), &name, &message, NULL);
-  TRACE_EVENT1("task", "ChannelProxy::Context::OnDispatchMessage",
+  TRACE_EVENT1("ipc", "ChannelProxy::Context::OnDispatchMessage",
                "name", name);
 #else
-  TRACE_EVENT2("task", "ChannelProxy::Context::OnDispatchMessage",
+  TRACE_EVENT2("ipc", "ChannelProxy::Context::OnDispatchMessage",
                "class", IPC_MESSAGE_ID_CLASS(message.type()),
                "line", IPC_MESSAGE_ID_LINE(message.type()));
 #endif