1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
6 #include "base/compiler_specific.h"
7 #include "base/debug/trace_event.h"
8 #include "base/location.h"
9 #include "base/memory/ref_counted.h"
10 #include "base/memory/scoped_ptr.h"
11 #include "base/single_thread_task_runner.h"
12 #include "base/thread_task_runner_handle.h"
13 #include "ipc/ipc_channel_proxy.h"
14 #include "ipc/ipc_listener.h"
15 #include "ipc/ipc_logging.h"
16 #include "ipc/ipc_message_macros.h"
17 #include "ipc/ipc_message_utils.h"
21 //------------------------------------------------------------------------------
23 ChannelProxy::MessageFilter::MessageFilter() {}
25 void ChannelProxy::MessageFilter::OnFilterAdded(Channel* channel) {}
27 void ChannelProxy::MessageFilter::OnFilterRemoved() {}
29 void ChannelProxy::MessageFilter::OnChannelConnected(int32 peer_pid) {}
31 void ChannelProxy::MessageFilter::OnChannelError() {}
33 void ChannelProxy::MessageFilter::OnChannelClosing() {}
35 bool ChannelProxy::MessageFilter::OnMessageReceived(const Message& message) {
39 ChannelProxy::MessageFilter::~MessageFilter() {}
41 //------------------------------------------------------------------------------
43 ChannelProxy::Context::Context(Listener* listener,
44 base::SingleThreadTaskRunner* ipc_task_runner)
45 : listener_task_runner_(base::ThreadTaskRunnerHandle::Get()),
47 ipc_task_runner_(ipc_task_runner),
48 channel_connected_called_(false),
49 peer_pid_(base::kNullProcessId) {
50 DCHECK(ipc_task_runner_.get());
53 ChannelProxy::Context::~Context() {
56 void ChannelProxy::Context::ClearIPCTaskRunner() {
57 ipc_task_runner_ = NULL;
60 void ChannelProxy::Context::CreateChannel(const IPC::ChannelHandle& handle,
61 const Channel::Mode& mode) {
62 DCHECK(channel_.get() == NULL);
63 channel_id_ = handle.name;
64 channel_.reset(new Channel(handle, mode, this));
67 bool ChannelProxy::Context::TryFilters(const Message& message) {
68 #ifdef IPC_MESSAGE_LOG_ENABLED
69 Logging* logger = Logging::GetInstance();
70 if (logger->Enabled())
71 logger->OnPreDispatchMessage(message);
74 for (size_t i = 0; i < filters_.size(); ++i) {
75 if (filters_[i]->OnMessageReceived(message)) {
76 #ifdef IPC_MESSAGE_LOG_ENABLED
77 if (logger->Enabled())
78 logger->OnPostDispatchMessage(message, channel_id_);
86 // Called on the IPC::Channel thread
87 bool ChannelProxy::Context::OnMessageReceived(const Message& message) {
88 // First give a chance to the filters to process this message.
89 if (!TryFilters(message))
90 OnMessageReceivedNoFilter(message);
94 // Called on the IPC::Channel thread
95 bool ChannelProxy::Context::OnMessageReceivedNoFilter(const Message& message) {
96 listener_task_runner_->PostTask(
97 FROM_HERE, base::Bind(&Context::OnDispatchMessage, this, message));
101 // Called on the IPC::Channel thread
102 void ChannelProxy::Context::OnChannelConnected(int32 peer_pid) {
103 // Add any pending filters. This avoids a race condition where someone
104 // creates a ChannelProxy, calls AddFilter, and then right after starts the
105 // peer process. The IO thread could receive a message before the task to add
106 // the filter is run on the IO thread.
109 // We cache off the peer_pid so it can be safely accessed from both threads.
110 peer_pid_ = channel_->peer_pid();
111 for (size_t i = 0; i < filters_.size(); ++i)
112 filters_[i]->OnChannelConnected(peer_pid);
114 // See above comment about using listener_task_runner_ here.
115 listener_task_runner_->PostTask(
116 FROM_HERE, base::Bind(&Context::OnDispatchConnected, this));
119 // Called on the IPC::Channel thread
120 void ChannelProxy::Context::OnChannelError() {
121 for (size_t i = 0; i < filters_.size(); ++i)
122 filters_[i]->OnChannelError();
124 // See above comment about using listener_task_runner_ here.
125 listener_task_runner_->PostTask(
126 FROM_HERE, base::Bind(&Context::OnDispatchError, this));
129 // Called on the IPC::Channel thread
130 void ChannelProxy::Context::OnChannelOpened() {
131 DCHECK(channel_ != NULL);
133 // Assume a reference to ourselves on behalf of this thread. This reference
134 // will be released when we are closed.
137 if (!channel_->Connect()) {
142 for (size_t i = 0; i < filters_.size(); ++i)
143 filters_[i]->OnFilterAdded(channel_.get());
146 // Called on the IPC::Channel thread
147 void ChannelProxy::Context::OnChannelClosed() {
148 // It's okay for IPC::ChannelProxy::Close to be called more than once, which
149 // would result in this branch being taken.
153 for (size_t i = 0; i < filters_.size(); ++i) {
154 filters_[i]->OnChannelClosing();
155 filters_[i]->OnFilterRemoved();
158 // We don't need the filters anymore.
163 // Balance with the reference taken during startup. This may result in
168 void ChannelProxy::Context::Clear() {
172 // Called on the IPC::Channel thread
173 void ChannelProxy::Context::OnSendMessage(scoped_ptr<Message> message) {
174 if (!channel_.get()) {
178 if (!channel_->Send(message.release()))
182 // Called on the IPC::Channel thread
183 void ChannelProxy::Context::OnAddFilter() {
184 std::vector<scoped_refptr<MessageFilter> > new_filters;
186 base::AutoLock auto_lock(pending_filters_lock_);
187 new_filters.swap(pending_filters_);
190 for (size_t i = 0; i < new_filters.size(); ++i) {
191 filters_.push_back(new_filters[i]);
193 // If the channel has already been created, then we need to send this
194 // message so that the filter gets access to the Channel.
196 new_filters[i]->OnFilterAdded(channel_.get());
197 // Ditto for if the channel has been connected.
199 new_filters[i]->OnChannelConnected(peer_pid_);
203 // Called on the IPC::Channel thread
204 void ChannelProxy::Context::OnRemoveFilter(MessageFilter* filter) {
206 return; // The filters have already been deleted.
208 for (size_t i = 0; i < filters_.size(); ++i) {
209 if (filters_[i].get() == filter) {
210 filter->OnFilterRemoved();
211 filters_.erase(filters_.begin() + i);
216 NOTREACHED() << "filter to be removed not found";
219 // Called on the listener's thread
220 void ChannelProxy::Context::AddFilter(MessageFilter* filter) {
221 base::AutoLock auto_lock(pending_filters_lock_);
222 pending_filters_.push_back(make_scoped_refptr(filter));
223 ipc_task_runner_->PostTask(
224 FROM_HERE, base::Bind(&Context::OnAddFilter, this));
227 // Called on the listener's thread
228 void ChannelProxy::Context::OnDispatchMessage(const Message& message) {
229 #ifdef IPC_MESSAGE_LOG_ENABLED
230 Logging* logger = Logging::GetInstance();
232 logger->GetMessageText(message.type(), &name, &message, NULL);
233 TRACE_EVENT1("toplevel", "ChannelProxy::Context::OnDispatchMessage",
236 TRACE_EVENT2("toplevel", "ChannelProxy::Context::OnDispatchMessage",
237 "class", IPC_MESSAGE_ID_CLASS(message.type()),
238 "line", IPC_MESSAGE_ID_LINE(message.type()));
244 OnDispatchConnected();
246 #ifdef IPC_MESSAGE_LOG_ENABLED
247 if (message.type() == IPC_LOGGING_ID) {
248 logger->OnReceivedLoggingMessage(message);
252 if (logger->Enabled())
253 logger->OnPreDispatchMessage(message);
256 listener_->OnMessageReceived(message);
258 #ifdef IPC_MESSAGE_LOG_ENABLED
259 if (logger->Enabled())
260 logger->OnPostDispatchMessage(message, channel_id_);
264 // Called on the listener's thread
265 void ChannelProxy::Context::OnDispatchConnected() {
266 if (channel_connected_called_)
269 channel_connected_called_ = true;
271 listener_->OnChannelConnected(peer_pid_);
274 // Called on the listener's thread
275 void ChannelProxy::Context::OnDispatchError() {
277 listener_->OnChannelError();
280 //-----------------------------------------------------------------------------
282 ChannelProxy::ChannelProxy(const IPC::ChannelHandle& channel_handle,
285 base::SingleThreadTaskRunner* ipc_task_runner)
286 : context_(new Context(listener, ipc_task_runner)),
288 Init(channel_handle, mode, true);
291 ChannelProxy::ChannelProxy(Context* context)
296 ChannelProxy::~ChannelProxy() {
297 DCHECK(CalledOnValidThread());
302 void ChannelProxy::Init(const IPC::ChannelHandle& channel_handle,
304 bool create_pipe_now) {
305 DCHECK(CalledOnValidThread());
307 #if defined(OS_POSIX)
308 // When we are creating a server on POSIX, we need its file descriptor
309 // to be created immediately so that it can be accessed and passed
310 // to other processes. Forcing it to be created immediately avoids
311 // race conditions that may otherwise arise.
312 if (mode & Channel::MODE_SERVER_FLAG) {
313 create_pipe_now = true;
315 #endif // defined(OS_POSIX)
317 if (create_pipe_now) {
318 // Create the channel immediately. This effectively sets up the
319 // low-level pipe so that the client can connect. Without creating
320 // the pipe immediately, it is possible for a listener to attempt
321 // to connect and get an error since the pipe doesn't exist yet.
322 context_->CreateChannel(channel_handle, mode);
324 context_->ipc_task_runner()->PostTask(
325 FROM_HERE, base::Bind(&Context::CreateChannel, context_.get(),
326 channel_handle, mode));
329 // complete initialization on the background thread
330 context_->ipc_task_runner()->PostTask(
331 FROM_HERE, base::Bind(&Context::OnChannelOpened, context_.get()));
336 void ChannelProxy::Close() {
337 DCHECK(CalledOnValidThread());
339 // Clear the backpointer to the listener so that any pending calls to
340 // Context::OnDispatchMessage or OnDispatchError will be ignored. It is
341 // possible that the channel could be closed while it is receiving messages!
344 if (context_->ipc_task_runner()) {
345 context_->ipc_task_runner()->PostTask(
346 FROM_HERE, base::Bind(&Context::OnChannelClosed, context_.get()));
350 bool ChannelProxy::Send(Message* message) {
353 // TODO(alexeypa): add DCHECK(CalledOnValidThread()) here. Currently there are
354 // tests that call Send() from a wrong thread. See http://crbug.com/163523.
356 #ifdef IPC_MESSAGE_LOG_ENABLED
357 Logging::GetInstance()->OnSendMessage(message, context_->channel_id());
360 context_->ipc_task_runner()->PostTask(
362 base::Bind(&ChannelProxy::Context::OnSendMessage,
363 context_, base::Passed(scoped_ptr<Message>(message))));
367 void ChannelProxy::AddFilter(MessageFilter* filter) {
368 DCHECK(CalledOnValidThread());
370 context_->AddFilter(filter);
373 void ChannelProxy::RemoveFilter(MessageFilter* filter) {
374 DCHECK(CalledOnValidThread());
376 context_->ipc_task_runner()->PostTask(
377 FROM_HERE, base::Bind(&Context::OnRemoveFilter, context_.get(),
378 make_scoped_refptr(filter)));
381 void ChannelProxy::ClearIPCTaskRunner() {
382 DCHECK(CalledOnValidThread());
384 context()->ClearIPCTaskRunner();
387 #if defined(OS_POSIX) && !defined(OS_NACL)
388 // See the TODO regarding lazy initialization of the channel in
389 // ChannelProxy::Init().
390 int ChannelProxy::GetClientFileDescriptor() {
391 DCHECK(CalledOnValidThread());
393 Channel* channel = context_.get()->channel_.get();
394 // Channel must have been created first.
395 DCHECK(channel) << context_.get()->channel_id_;
396 return channel->GetClientFileDescriptor();
399 int ChannelProxy::TakeClientFileDescriptor() {
400 DCHECK(CalledOnValidThread());
402 Channel* channel = context_.get()->channel_.get();
403 // Channel must have been created first.
404 DCHECK(channel) << context_.get()->channel_id_;
405 return channel->TakeClientFileDescriptor();
408 bool ChannelProxy::GetPeerEuid(uid_t* peer_euid) const {
409 DCHECK(CalledOnValidThread());
411 Channel* channel = context_.get()->channel_.get();
412 // Channel must have been created first.
413 DCHECK(channel) << context_.get()->channel_id_;
414 return channel->GetPeerEuid(peer_euid);
418 //-----------------------------------------------------------------------------