// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
+#include "ipc/ipc_channel_proxy.h"
+
#include "base/bind.h"
#include "base/compiler_specific.h"
-#include "base/debug/trace_event.h"
#include "base/location.h"
#include "base/memory/ref_counted.h"
#include "base/memory/scoped_ptr.h"
#include "base/single_thread_task_runner.h"
#include "base/thread_task_runner_handle.h"
-#include "ipc/ipc_channel_proxy.h"
#include "ipc/ipc_listener.h"
#include "ipc/ipc_logging.h"
#include "ipc/ipc_message_macros.h"
-#include "ipc/ipc_message_utils.h"
+#include "ipc/message_filter.h"
+#include "ipc/message_filter_router.h"
namespace IPC {
//------------------------------------------------------------------------------
-ChannelProxy::MessageFilter::MessageFilter() {}
-
-void ChannelProxy::MessageFilter::OnFilterAdded(Channel* channel) {}
-
-void ChannelProxy::MessageFilter::OnFilterRemoved() {}
-
-void ChannelProxy::MessageFilter::OnChannelConnected(int32 peer_pid) {}
-
-void ChannelProxy::MessageFilter::OnChannelError() {}
-
-void ChannelProxy::MessageFilter::OnChannelClosing() {}
-
-bool ChannelProxy::MessageFilter::OnMessageReceived(const Message& message) {
- return false;
-}
-
-ChannelProxy::MessageFilter::~MessageFilter() {}
-
-//------------------------------------------------------------------------------
-
ChannelProxy::Context::Context(Listener* listener,
base::SingleThreadTaskRunner* ipc_task_runner)
: listener_task_runner_(base::ThreadTaskRunnerHandle::Get()),
listener_(listener),
ipc_task_runner_(ipc_task_runner),
channel_connected_called_(false),
+ message_filter_router_(new MessageFilterRouter()),
peer_pid_(base::kNullProcessId) {
DCHECK(ipc_task_runner_.get());
+ // The Listener thread where Messages are handled must be a separate thread
+ // to avoid oversubscribing the IO thread. If you trigger this error, you
+ // need to either:
+ // 1) Create the ChannelProxy on a different thread, or
+ // 2) Just use Channel
+ // Note, we currently make an exception for a NULL listener. That usage
+ // basically works, but is outside the intent of ChannelProxy. This support
+ // will disappear, so please don't rely on it. See crbug.com/364241
+ DCHECK(!listener || (ipc_task_runner_.get() != listener_task_runner_.get()));
}
ChannelProxy::Context::~Context() {
void ChannelProxy::Context::CreateChannel(const IPC::ChannelHandle& handle,
const Channel::Mode& mode) {
- DCHECK(channel_.get() == NULL);
+ DCHECK(!channel_);
channel_id_ = handle.name;
channel_.reset(new Channel(handle, mode, this));
}
bool ChannelProxy::Context::TryFilters(const Message& message) {
+ DCHECK(message_filter_router_);
#ifdef IPC_MESSAGE_LOG_ENABLED
Logging* logger = Logging::GetInstance();
if (logger->Enabled())
logger->OnPreDispatchMessage(message);
#endif
- for (size_t i = 0; i < filters_.size(); ++i) {
- if (filters_[i]->OnMessageReceived(message)) {
+ if (message_filter_router_->TryFilters(message)) {
#ifdef IPC_MESSAGE_LOG_ENABLED
- if (logger->Enabled())
- logger->OnPostDispatchMessage(message, channel_id_);
+ if (logger->Enabled())
+ logger->OnPostDispatchMessage(message, channel_id_);
#endif
- return true;
- }
+ return true;
}
return false;
}
// Called on the IPC::Channel thread
bool ChannelProxy::Context::OnMessageReceivedNoFilter(const Message& message) {
- // NOTE: This code relies on the listener's message loop not going away while
- // this thread is active. That should be a reasonable assumption, but it
- // feels risky. We may want to invent some more indirect way of referring to
- // a MessageLoop if this becomes a problem.
listener_task_runner_->PostTask(
FROM_HERE, base::Bind(&Context::OnDispatchMessage, this, message));
return true;
// Called on the IPC::Channel thread
void ChannelProxy::Context::OnChannelConnected(int32 peer_pid) {
+ // We cache off the peer_pid so it can be safely accessed from both threads.
+ peer_pid_ = channel_->peer_pid();
+
// Add any pending filters. This avoids a race condition where someone
// creates a ChannelProxy, calls AddFilter, and then right after starts the
// peer process. The IO thread could receive a message before the task to add
// the filter is run on the IO thread.
OnAddFilter();
- // We cache off the peer_pid so it can be safely accessed from both threads.
- peer_pid_ = channel_->peer_pid();
- for (size_t i = 0; i < filters_.size(); ++i)
- filters_[i]->OnChannelConnected(peer_pid);
-
// See above comment about using listener_task_runner_ here.
listener_task_runner_->PostTask(
FROM_HERE, base::Bind(&Context::OnDispatchConnected, this));
void ChannelProxy::Context::OnChannelClosed() {
// It's okay for IPC::ChannelProxy::Close to be called more than once, which
// would result in this branch being taken.
- if (!channel_.get())
+ if (!channel_)
return;
for (size_t i = 0; i < filters_.size(); ++i) {
}
// We don't need the filters anymore.
+ message_filter_router_->Clear();
filters_.clear();
+ // We don't need the lock, because at this point, the listener thread can't
+ // access it any more.
+ pending_filters_.clear();
channel_.reset();
// Called on the IPC::Channel thread
void ChannelProxy::Context::OnSendMessage(scoped_ptr<Message> message) {
- if (!channel_.get()) {
+ if (!channel_) {
OnChannelClosed();
return;
}
+
if (!channel_->Send(message.release()))
OnChannelError();
}
// Called on the IPC::Channel thread
void ChannelProxy::Context::OnAddFilter() {
+ // Our OnChannelConnected method has not yet been called, so we can't be
+ // sure that channel_ is valid yet. When OnChannelConnected *is* called,
+ // it invokes OnAddFilter, so any pending filter(s) will be added at that
+ // time.
+ if (peer_pid_ == base::kNullProcessId)
+ return;
+
std::vector<scoped_refptr<MessageFilter> > new_filters;
{
base::AutoLock auto_lock(pending_filters_lock_);
for (size_t i = 0; i < new_filters.size(); ++i) {
filters_.push_back(new_filters[i]);
- // If the channel has already been created, then we need to send this
- // message so that the filter gets access to the Channel.
- if (channel_.get())
- new_filters[i]->OnFilterAdded(channel_.get());
- // Ditto for if the channel has been connected.
- if (peer_pid_)
- new_filters[i]->OnChannelConnected(peer_pid_);
+ message_filter_router_->AddFilter(new_filters[i].get());
+
+ // The channel has already been created and connected, so we need to
+ // inform the filters right now.
+ new_filters[i]->OnFilterAdded(channel_.get());
+ new_filters[i]->OnChannelConnected(peer_pid_);
}
}
// Called on the IPC::Channel thread
void ChannelProxy::Context::OnRemoveFilter(MessageFilter* filter) {
- if (!channel_.get())
+ if (peer_pid_ == base::kNullProcessId) {
+ // The channel is not yet connected, so any filters are still pending.
+ base::AutoLock auto_lock(pending_filters_lock_);
+ for (size_t i = 0; i < pending_filters_.size(); ++i) {
+ if (pending_filters_[i].get() == filter) {
+ filter->OnFilterRemoved();
+ pending_filters_.erase(pending_filters_.begin() + i);
+ return;
+ }
+ }
+ return;
+ }
+ if (!channel_)
return; // The filters have already been deleted.
+ message_filter_router_->RemoveFilter(filter);
+
for (size_t i = 0; i < filters_.size(); ++i) {
if (filters_[i].get() == filter) {
filter->OnFilterRemoved();
Logging* logger = Logging::GetInstance();
std::string name;
logger->GetMessageText(message.type(), &name, &message, NULL);
- TRACE_EVENT1("task", "ChannelProxy::Context::OnDispatchMessage",
+ TRACE_EVENT1("ipc", "ChannelProxy::Context::OnDispatchMessage",
"name", name);
#else
- TRACE_EVENT2("task", "ChannelProxy::Context::OnDispatchMessage",
+ TRACE_EVENT2("ipc", "ChannelProxy::Context::OnDispatchMessage",
"class", IPC_MESSAGE_ID_CLASS(message.type()),
"line", IPC_MESSAGE_ID_LINE(message.type()));
#endif